contain_rs_core/client/
shared.rs

1use std::{
2    io::BufRead,
3    process::{Command, Output, Stdio},
4    thread,
5    time::Duration,
6};
7
8use regex::Regex;
9use tracing::*;
10
11use crate::{
12    container::{Container, Volume, WaitStrategy},
13    error::{ContainerResult, ContainersError},
14    rt::{ContainerStatus, DetailedContainerInfo},
15};
16
17use super::{Client, Log};
18
19pub fn run_and_wait_for_command(command: &mut Command) -> ContainerResult<String> {
20    let output = try_run_and_wait_for_command(command)?;
21
22    if let Some(0) = output.status.code() {
23        Ok(String::from_utf8(output.stdout).unwrap())
24    } else {
25        Err(ContainersError::CommandError(output))
26    }
27}
28
29#[instrument(skip_all)]
30pub fn try_run_and_wait_for_command(command: &mut Command) -> ContainerResult<Output> {
31    debug!(?command, "Running command");
32
33    let child = command
34        .stdout(Stdio::piped()) // TODO fm - Sometimes podman asks the user for which repo to use. This is currently ignored.
35        .stderr(Stdio::piped())
36        .spawn()
37        .unwrap();
38
39    Ok(child.wait_with_output()?)
40}
41
42pub fn build_log_command<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
43    command.arg("logs").arg("-f").arg(&container.name)
44}
45
46pub fn build_rm_command<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
47    command.arg("rm").arg("-f").arg(&container.name)
48}
49
50pub fn build_stop_command<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
51    command.arg("stop").arg(&container.name)
52}
53
54pub fn build_run_command<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
55    add_run_args(command);
56    add_name_arg(command, container);
57    add_env_var_args(command, container);
58    add_volume_args(command, container);
59    add_export_ports_args(command, container);
60    add_health_check_args(command, container);
61    add_image_arg(command, container);
62    add_command_arg(command, container);
63
64    command
65}
66
67fn add_volume_args<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
68    let folded = container
69        .volumes
70        .iter()
71        .fold(command, |c: &mut Command, volume| match volume {
72            Volume::Mount {
73                host_path,
74                mount_point,
75            } => c.arg("-v").arg(format!("{host_path}:{mount_point}")),
76            Volume::Named { name, mount_point } => c.arg("-v").arg(format!("{name}:{mount_point}")),
77        });
78
79    folded
80}
81
82fn add_command_arg<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
83    let folded = container
84        .command
85        .iter()
86        .fold(command, |c: &mut Command, arg| c.arg(arg));
87
88    folded
89}
90
91fn add_name_arg<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
92    command.arg("--name").arg(&container.name)
93}
94
95pub fn build_inspect_command<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
96    command.arg("inspect").arg(&container.name)
97}
98
99fn add_run_args(command: &mut Command) {
100    command.arg("run").arg("-d");
101}
102
103fn add_env_var_args(command: &mut Command, container: &Container) {
104    container.env_vars.iter().for_each(|env_var| {
105        command
106            .arg("-e")
107            .arg(format!("{}={}", env_var.key, env_var.value));
108    });
109}
110
111fn add_health_check_args(command: &mut Command, container: &Container) {
112    if let Some(check) = &container.health_check {
113        command.arg("--health-cmd").arg(&check.command);
114
115        if let Some(start_period) = check.start_period {
116            command.arg(format!("--health-start-period={}s", start_period.as_secs()));
117        }
118
119        if let Some(interval) = check.interval {
120            command.arg(format!("--health-interval={}s", interval.as_secs()));
121        }
122
123        if let Some(timeout) = check.timeout {
124            command.arg(format!("--health-timeout={}s", timeout.as_secs()));
125        }
126
127        if let Some(retries) = check.retries {
128            command.arg(format!("--health-retries={}", retries));
129        }
130    }
131}
132
133fn add_image_arg(command: &mut Command, container: &Container) {
134    command.arg(String::from(&container.image));
135}
136
137fn add_export_ports_args(command: &mut Command, container: &Container) {
138    container.port_mappings.iter().for_each(|port_mapping| {
139        command.arg(format!(
140            "-p{}:{}",
141            port_mapping.source.number, port_mapping.target.number
142        ));
143    })
144}
145
146#[instrument(skip_all)]
147pub fn inspect<C: Client>(
148    client: &C,
149    container: &Container,
150) -> ContainerResult<Option<DetailedContainerInfo>> {
151    let mut cmd = client.command();
152
153    build_inspect_command(&mut cmd, container);
154
155    let output = try_run_and_wait_for_command(&mut cmd)?;
156
157    let stdout = String::from_utf8(output.stdout.clone()).unwrap();
158    let stderr = String::from_utf8(output.stderr.clone()).unwrap();
159
160    match output.status.code() {
161        Some(0) => {
162            let container_infos: Vec<DetailedContainerInfo> = serde_json::from_str(&stdout)?;
163
164            debug!(?container_infos, "Inspect container");
165
166            match container_infos.get(0) {
167                Some(info) => Ok(Some(info.to_owned())),
168                None => Ok(None),
169            }
170        }
171        _ => {
172            if stderr.to_uppercase().contains("NO SUCH OBJECT") {
173                Ok(None)
174            } else {
175                Err(ContainersError::CommandError(output))
176            }
177        }
178    }
179}
180
181#[instrument(skip_all)]
182pub fn do_log<C: Client>(client: &C, container: &Container) -> ContainerResult<Log> {
183    let mut cmd = client.command();
184
185    build_log_command(&mut cmd, container);
186
187    let (reader, writer) = os_pipe::pipe()?;
188
189    // redirect all process out to that single pipe
190    let cmd = cmd
191        .stdout(writer.try_clone().unwrap())
192        .stderr(writer)
193        .spawn()?;
194
195    debug!(?cmd, "Reading log");
196
197    Ok(Log { reader })
198}
199
200pub fn wait_for<C: Client>(client: &C, container: &Container) -> ContainerResult<()> {
201    let result = match &container.wait_strategy {
202        Some(strategy) => match strategy {
203            WaitStrategy::LogMessage { pattern } => {
204                wait_for_log(client, container, strategy, pattern)
205            }
206            WaitStrategy::HealthCheck => Ok(wait_for_health_check(client, container)?),
207            WaitStrategy::WaitTime { duration } => Ok(wait_for_time(duration.to_owned())?),
208        },
209        None => Ok(()),
210    };
211
212    thread::sleep(container.additional_wait_period);
213
214    result
215}
216
217fn wait_for_time(duration: Duration) -> ContainerResult<()> {
218    thread::sleep(duration);
219    Ok(())
220}
221
222fn wait_for_log<C: Client>(
223    client: &C,
224    container: &Container,
225    wait_strategy: &WaitStrategy,
226    pattern: &Regex,
227) -> ContainerResult<()> {
228    let log = do_log(client, container)?;
229
230    do_wait_for_log(pattern, container, wait_strategy, log)?;
231
232    Ok(())
233}
234
235#[instrument(skip_all)]
236fn do_wait_for_log(
237    pattern: &Regex,
238    container: &Container,
239    wait_strategy: &WaitStrategy,
240    mut log: Log,
241) -> ContainerResult<()> {
242    debug!(?pattern, "Searching log");
243    for result in log.stream().lines() {
244        let line = result?;
245        debug!(?pattern, ?line, "Searching for pattern");
246        if pattern.is_match(&line) {
247            debug!(?line, ?pattern, "Found pattern");
248            return Ok(());
249        }
250    }
251
252    Err(ContainersError::ContainerWaitFailed {
253        container_name: container.name.clone(),
254        wait_strategy: wait_strategy.clone(),
255    })
256}
257
258#[instrument(skip_all)]
259fn wait_for_health_check<C: Client>(client: &C, container: &Container) -> ContainerResult<()> {
260    loop {
261        debug!("Checking health for {}", &container.name);
262
263        match inspect(client, container)? {
264            Some(info) => {
265                if let Some(health) = info.state.health {
266                    match health.status {
267                        ContainerStatus::Healthy => return Ok(()),
268                        ContainerStatus::Starting => thread::sleep(Duration::from_millis(200)),
269                        _ => {
270                            return Err(ContainersError::ContainerStatusError {
271                                status: health.status,
272                            })
273                        }
274                    }
275                }
276            }
277            None => {
278                return Err(ContainersError::ContainerNotExists {
279                    container_name: container.name.clone(),
280                })
281            }
282        }
283    }
284}