docker_compose_runner/
lib.rs

1use anyhow::{anyhow, Result};
2use regex::Regex;
3use serde_yaml::Value;
4use std::collections::HashMap;
5use std::fmt::Write;
6use std::io::ErrorKind;
7use std::process::Command;
8use std::time::{self, Duration};
9use subprocess::{Exec, Redirection};
10use tracing::trace;
11
12/// Runs a command and returns the output as a string.
13///
14/// Both stderr and stdout are returned in the result.
15///
16/// # Arguments
17/// * `command` - The system command to run
18/// * `args` - An array of command line arguments for the command
19pub(crate) fn run_command(command: &str, args: &[&str]) -> Result<String> {
20    trace!("executing {}", command);
21    let data = Exec::cmd(command)
22        .args(args)
23        .stdout(Redirection::Pipe)
24        .stderr(Redirection::Merge)
25        .capture()?;
26
27    if data.exit_status.success() {
28        Ok(data.stdout_str())
29    } else {
30        Err(anyhow!(
31            "command {} {:?} exited with {:?} and output:\n{}",
32            command,
33            args,
34            data.exit_status,
35            data.stdout_str()
36        ))
37    }
38}
39
40/// Launch and manage a docker compose instance
41#[must_use]
42pub struct DockerCompose {
43    file_path: String,
44    services: Vec<Service>,
45}
46
47impl DockerCompose {
48    /// Runs docker compose on the provided docker-compose.yaml file.
49    /// Dropping the returned object will stop and destroy the launched docker compose services.
50    ///
51    /// image_waiters gives DockerCompose a way to know when a container has finished starting up.
52    /// Each entry defines an image name and a regex such that if the regex matches on a log line output by a container running that image the container is considered started up.
53    ///
54    /// image_builder is a callback allowing the user to build a docker image if the docker-compose.yaml depends on it.
55    /// The argument is an iterator over all the image names docker compose is going to use.
56    pub fn new(
57        image_waiters: &'static [Image],
58        image_builder: impl FnOnce(&[&str]),
59        yaml_path: &str,
60    ) -> Self {
61        match Command::new("docker")
62            .arg("compose")
63            .output()
64            .map_err(|e| e.kind())
65        {
66            Err(ErrorKind::NotFound) => panic!("Could not find docker. Have you installed docker?"),
67            Err(err) => panic!("error running docker {:?}", err),
68            Ok(output) => {
69                if !output.status.success() {
70                    panic!("Could not find docker compose. Have you installed docker compose?");
71                }
72            }
73        }
74
75        // It is critical that clean_up is run before everything else as the internal `docker compose` commands act as validation
76        // for the docker-compose.yaml file that we later manually parse with poor error handling
77        DockerCompose::clean_up(yaml_path).unwrap();
78
79        let service_to_image = DockerCompose::get_service_to_image(yaml_path);
80
81        let images: Vec<&str> = service_to_image.values().map(|x| x.as_ref()).collect();
82        image_builder(&images);
83
84        run_command("docker", &["compose", "-f", yaml_path, "up", "-d"]).unwrap();
85
86        let mut services = DockerCompose::get_services(image_waiters, service_to_image);
87        let mut services_arg: Vec<&mut Service> = services.iter_mut().collect();
88        DockerCompose::wait_for_logs(yaml_path, &mut services_arg);
89
90        DockerCompose {
91            file_path: yaml_path.to_string(),
92            services,
93        }
94    }
95
96    /// Stops the container with the provided service name
97    pub fn stop_service(&self, service_name: &str) {
98        run_command(
99            "docker",
100            &["compose", "-f", &self.file_path, "stop", service_name],
101        )
102        .unwrap();
103    }
104
105    /// Kills the container with the provided service name
106    pub fn kill_service(&self, service_name: &str) {
107        run_command(
108            "docker",
109            &["compose", "-f", &self.file_path, "kill", service_name],
110        )
111        .unwrap();
112    }
113
114    /// Restarts the container with the provided service name
115    pub fn start_service(&mut self, service_name: &str) {
116        run_command(
117            "docker",
118            &["compose", "-f", &self.file_path, "start", service_name],
119        )
120        .unwrap();
121
122        // service must exist because previous command succeeded
123        let service = self
124            .services
125            .iter_mut()
126            .find(|x| x.name == service_name)
127            .unwrap();
128        DockerCompose::wait_for_logs(&self.file_path, &mut [service]);
129    }
130
131    /// constructs one service per service_to_image, the waiting regex is taken from the corresponding image entry in image_waiters.
132    fn get_services(
133        image_waiters: &[Image],
134        service_to_image: HashMap<String, String>,
135    ) -> Vec<Service> {
136        service_to_image
137            .into_iter()
138            .map(
139                |(service_name, image_name)| match image_waiters.iter().find(|image| image.name == image_name) {
140                    Some(image) => Service::new(service_name, image),
141                    None => panic!("The image_waiters list given to DockerCompose::new does not include the image {image_name}, please add it to the list."),
142                },
143            )
144            .collect()
145    }
146
147    fn get_service_to_image(file_path: &str) -> HashMap<String, String> {
148        let compose_yaml: Value =
149            serde_yaml::from_str(&std::fs::read_to_string(file_path).unwrap()).unwrap();
150        let mut result = HashMap::new();
151        match compose_yaml {
152            Value::Mapping(root) => match root.get("services").unwrap() {
153                Value::Mapping(services) => {
154                    for (service_name, service) in services {
155                        let service_name = match service_name {
156                            Value::String(service_name) => service_name,
157                            service_name => panic!("Unexpected service_name {service_name:?}"),
158                        };
159                        match service {
160                            Value::Mapping(service) => {
161                                let image = match service.get("image").unwrap() {
162                                    Value::String(image) => image,
163                                    image => panic!("Unexpected image {image:?}"),
164                                };
165                                result.insert(service_name.clone(), image.clone());
166                            }
167                            service => panic!("Unexpected service {service:?}"),
168                        }
169                    }
170                }
171                services => panic!("Unexpected services {services:?}"),
172            },
173            root => panic!("Unexpected root {root:?}"),
174        }
175        result
176    }
177
178    /// Wait until the requirements in every Service is met.
179    /// Will panic if a timeout occurs.
180    fn wait_for_logs(file_path: &str, services: &mut [&mut Service]) {
181        // Find the service with the maximum timeout and use that
182        let timeout = services
183            .iter()
184            .map(|service| service.timeout)
185            .max_by_key(|x| x.as_nanos())
186            .unwrap();
187
188        // TODO: remove this check once CI docker compose is updated (probably ubuntu 22.04)
189        let can_use_status_flag =
190            run_command("docker", &["compose", "-f", file_path, "ps", "--help"])
191                .unwrap()
192                .contains("--status");
193
194        let instant = time::Instant::now();
195        loop {
196            // check if every service is completely ready
197            if services.iter().all(|service| {
198                let log = run_command(
199                    "docker",
200                    &["compose", "-f", file_path, "logs", &service.name],
201                )
202                .unwrap();
203                service.log_to_wait_for.find_iter(&log).count() > service.logs_seen
204            }) {
205                for service in services.iter_mut() {
206                    service.logs_seen += 1;
207                }
208                let time_to_complete = instant.elapsed();
209                trace!("All services ready in {}", time_to_complete.as_secs());
210                return;
211            }
212
213            let all_logs = run_command("docker", &["compose", "-f", file_path, "logs"]).unwrap();
214
215            // check if the service has failed in some way
216            // this allows us to report the failure to the developer a lot sooner than just relying on the timeout
217            if can_use_status_flag {
218                DockerCompose::assert_no_containers_in_service_with_status(
219                    file_path, "exited", &all_logs,
220                );
221                DockerCompose::assert_no_containers_in_service_with_status(
222                    file_path, "dead", &all_logs,
223                );
224                DockerCompose::assert_no_containers_in_service_with_status(
225                    file_path, "removing", &all_logs,
226                );
227            }
228
229            // if all else fails timeout the wait
230            if instant.elapsed() > timeout {
231                let mut results = "".to_owned();
232                for service in services {
233                    let log = run_command(
234                        "docker",
235                        &["compose", "-f", file_path, "logs", &service.name],
236                    )
237                    .unwrap();
238                    let found = if service.log_to_wait_for.is_match(&log) {
239                        "Found"
240                    } else {
241                        "Missing"
242                    };
243
244                    writeln!(
245                        results,
246                        "*    Service {}, searched for '{}', was {}",
247                        service.name, service.log_to_wait_for, found
248                    )
249                    .unwrap();
250                }
251
252                panic!("wait_for_log {timeout:?} timer expired. Results:\n{results}\nLogs:\n{all_logs}");
253            }
254        }
255    }
256
257    fn assert_no_containers_in_service_with_status(file_path: &str, status: &str, full_log: &str) {
258        let containers = run_command(
259            "docker",
260            &["compose", "-f", file_path, "ps", "--status", status],
261        )
262        .unwrap();
263        // One line for the table heading. If there are more lines then there is some data indicating that containers exist with this status
264        if containers.matches('\n').count() > 1 {
265            panic!(
266                "At least one container failed to initialize\n{containers}\nFull log\n{full_log}"
267            );
268        }
269    }
270
271    /// Cleans up docker compose by shutting down the running system and removing the images.
272    ///
273    /// # Arguments
274    /// * `file_path` - The path to the docker-compose yaml file that was used to start docker.
275    fn clean_up(file_path: &str) -> Result<()> {
276        trace!("bringing down docker compose {}", file_path);
277
278        run_command("docker", &["compose", "-f", file_path, "kill"])?;
279        run_command("docker", &["compose", "-f", file_path, "down", "-v"])?;
280
281        Ok(())
282    }
283}
284
285pub struct Image {
286    pub name: &'static str,
287    pub log_regex_to_wait_for: &'static str,
288    pub timeout: Duration,
289}
290
291/// Holds the state for a running service
292struct Service {
293    name: String,
294    log_to_wait_for: Regex,
295    logs_seen: usize,
296    timeout: Duration,
297}
298
299impl Service {
300    fn new(name: String, image: &Image) -> Service {
301        Service {
302            name,
303            log_to_wait_for: Regex::new(image.log_regex_to_wait_for).unwrap(),
304            logs_seen: 0,
305            timeout: image.timeout,
306        }
307    }
308}
309
310impl Drop for DockerCompose {
311    fn drop(&mut self) {
312        if std::thread::panicking() {
313            if let Err(err) = DockerCompose::clean_up(&self.file_path) {
314                // We need to use println! here instead of error! because error! does not
315                // get output when panicking
316                println!(
317                    "ERROR: docker compose failed to bring down while already panicking: {err:?}",
318                );
319            }
320        } else {
321            DockerCompose::clean_up(&self.file_path).unwrap();
322        }
323    }
324}