1use anyhow::{anyhow, Result};
2use regex::Regex;
3use serde_yaml::Value;
4use std::collections::HashMap;
5use std::fmt::Write;
6use std::io::ErrorKind;
7use std::process::Command;
8use std::time::{self, Duration};
9use subprocess::{Exec, Redirection};
10use tracing::trace;
11
12pub(crate) fn run_command(command: &str, args: &[&str]) -> Result<String> {
20 trace!("executing {}", command);
21 let data = Exec::cmd(command)
22 .args(args)
23 .stdout(Redirection::Pipe)
24 .stderr(Redirection::Merge)
25 .capture()?;
26
27 if data.exit_status.success() {
28 Ok(data.stdout_str())
29 } else {
30 Err(anyhow!(
31 "command {} {:?} exited with {:?} and output:\n{}",
32 command,
33 args,
34 data.exit_status,
35 data.stdout_str()
36 ))
37 }
38}
39
40#[must_use]
42pub struct DockerCompose {
43 file_path: String,
44 services: Vec<Service>,
45}
46
47impl DockerCompose {
48 pub fn new(
57 image_waiters: &'static [Image],
58 image_builder: impl FnOnce(&[&str]),
59 yaml_path: &str,
60 ) -> Self {
61 match Command::new("docker")
62 .arg("compose")
63 .output()
64 .map_err(|e| e.kind())
65 {
66 Err(ErrorKind::NotFound) => panic!("Could not find docker. Have you installed docker?"),
67 Err(err) => panic!("error running docker {:?}", err),
68 Ok(output) => {
69 if !output.status.success() {
70 panic!("Could not find docker compose. Have you installed docker compose?");
71 }
72 }
73 }
74
75 DockerCompose::clean_up(yaml_path).unwrap();
78
79 let service_to_image = DockerCompose::get_service_to_image(yaml_path);
80
81 let images: Vec<&str> = service_to_image.values().map(|x| x.as_ref()).collect();
82 image_builder(&images);
83
84 run_command("docker", &["compose", "-f", yaml_path, "up", "-d"]).unwrap();
85
86 let mut services = DockerCompose::get_services(image_waiters, service_to_image);
87 let mut services_arg: Vec<&mut Service> = services.iter_mut().collect();
88 DockerCompose::wait_for_logs(yaml_path, &mut services_arg);
89
90 DockerCompose {
91 file_path: yaml_path.to_string(),
92 services,
93 }
94 }
95
96 pub fn stop_service(&self, service_name: &str) {
98 run_command(
99 "docker",
100 &["compose", "-f", &self.file_path, "stop", service_name],
101 )
102 .unwrap();
103 }
104
105 pub fn kill_service(&self, service_name: &str) {
107 run_command(
108 "docker",
109 &["compose", "-f", &self.file_path, "kill", service_name],
110 )
111 .unwrap();
112 }
113
114 pub fn start_service(&mut self, service_name: &str) {
116 run_command(
117 "docker",
118 &["compose", "-f", &self.file_path, "start", service_name],
119 )
120 .unwrap();
121
122 let service = self
124 .services
125 .iter_mut()
126 .find(|x| x.name == service_name)
127 .unwrap();
128 DockerCompose::wait_for_logs(&self.file_path, &mut [service]);
129 }
130
131 fn get_services(
133 image_waiters: &[Image],
134 service_to_image: HashMap<String, String>,
135 ) -> Vec<Service> {
136 service_to_image
137 .into_iter()
138 .map(
139 |(service_name, image_name)| match image_waiters.iter().find(|image| image.name == image_name) {
140 Some(image) => Service::new(service_name, image),
141 None => panic!("The image_waiters list given to DockerCompose::new does not include the image {image_name}, please add it to the list."),
142 },
143 )
144 .collect()
145 }
146
147 fn get_service_to_image(file_path: &str) -> HashMap<String, String> {
148 let compose_yaml: Value =
149 serde_yaml::from_str(&std::fs::read_to_string(file_path).unwrap()).unwrap();
150 let mut result = HashMap::new();
151 match compose_yaml {
152 Value::Mapping(root) => match root.get("services").unwrap() {
153 Value::Mapping(services) => {
154 for (service_name, service) in services {
155 let service_name = match service_name {
156 Value::String(service_name) => service_name,
157 service_name => panic!("Unexpected service_name {service_name:?}"),
158 };
159 match service {
160 Value::Mapping(service) => {
161 let image = match service.get("image").unwrap() {
162 Value::String(image) => image,
163 image => panic!("Unexpected image {image:?}"),
164 };
165 result.insert(service_name.clone(), image.clone());
166 }
167 service => panic!("Unexpected service {service:?}"),
168 }
169 }
170 }
171 services => panic!("Unexpected services {services:?}"),
172 },
173 root => panic!("Unexpected root {root:?}"),
174 }
175 result
176 }
177
178 fn wait_for_logs(file_path: &str, services: &mut [&mut Service]) {
181 let timeout = services
183 .iter()
184 .map(|service| service.timeout)
185 .max_by_key(|x| x.as_nanos())
186 .unwrap();
187
188 let can_use_status_flag =
190 run_command("docker", &["compose", "-f", file_path, "ps", "--help"])
191 .unwrap()
192 .contains("--status");
193
194 let instant = time::Instant::now();
195 loop {
196 if services.iter().all(|service| {
198 let log = run_command(
199 "docker",
200 &["compose", "-f", file_path, "logs", &service.name],
201 )
202 .unwrap();
203 service.log_to_wait_for.find_iter(&log).count() > service.logs_seen
204 }) {
205 for service in services.iter_mut() {
206 service.logs_seen += 1;
207 }
208 let time_to_complete = instant.elapsed();
209 trace!("All services ready in {}", time_to_complete.as_secs());
210 return;
211 }
212
213 let all_logs = run_command("docker", &["compose", "-f", file_path, "logs"]).unwrap();
214
215 if can_use_status_flag {
218 DockerCompose::assert_no_containers_in_service_with_status(
219 file_path, "exited", &all_logs,
220 );
221 DockerCompose::assert_no_containers_in_service_with_status(
222 file_path, "dead", &all_logs,
223 );
224 DockerCompose::assert_no_containers_in_service_with_status(
225 file_path, "removing", &all_logs,
226 );
227 }
228
229 if instant.elapsed() > timeout {
231 let mut results = "".to_owned();
232 for service in services {
233 let log = run_command(
234 "docker",
235 &["compose", "-f", file_path, "logs", &service.name],
236 )
237 .unwrap();
238 let found = if service.log_to_wait_for.is_match(&log) {
239 "Found"
240 } else {
241 "Missing"
242 };
243
244 writeln!(
245 results,
246 "* Service {}, searched for '{}', was {}",
247 service.name, service.log_to_wait_for, found
248 )
249 .unwrap();
250 }
251
252 panic!("wait_for_log {timeout:?} timer expired. Results:\n{results}\nLogs:\n{all_logs}");
253 }
254 }
255 }
256
257 fn assert_no_containers_in_service_with_status(file_path: &str, status: &str, full_log: &str) {
258 let containers = run_command(
259 "docker",
260 &["compose", "-f", file_path, "ps", "--status", status],
261 )
262 .unwrap();
263 if containers.matches('\n').count() > 1 {
265 panic!(
266 "At least one container failed to initialize\n{containers}\nFull log\n{full_log}"
267 );
268 }
269 }
270
271 fn clean_up(file_path: &str) -> Result<()> {
276 trace!("bringing down docker compose {}", file_path);
277
278 run_command("docker", &["compose", "-f", file_path, "kill"])?;
279 run_command("docker", &["compose", "-f", file_path, "down", "-v"])?;
280
281 Ok(())
282 }
283}
284
285pub struct Image {
286 pub name: &'static str,
287 pub log_regex_to_wait_for: &'static str,
288 pub timeout: Duration,
289}
290
291struct Service {
293 name: String,
294 log_to_wait_for: Regex,
295 logs_seen: usize,
296 timeout: Duration,
297}
298
299impl Service {
300 fn new(name: String, image: &Image) -> Service {
301 Service {
302 name,
303 log_to_wait_for: Regex::new(image.log_regex_to_wait_for).unwrap(),
304 logs_seen: 0,
305 timeout: image.timeout,
306 }
307 }
308}
309
310impl Drop for DockerCompose {
311 fn drop(&mut self) {
312 if std::thread::panicking() {
313 if let Err(err) = DockerCompose::clean_up(&self.file_path) {
314 println!(
317 "ERROR: docker compose failed to bring down while already panicking: {err:?}",
318 );
319 }
320 } else {
321 DockerCompose::clean_up(&self.file_path).unwrap();
322 }
323 }
324}