1use std::{
2 io::BufRead,
3 process::{Command, Output, Stdio},
4 thread,
5 time::Duration,
6};
7
8use regex::Regex;
9use tracing::*;
10
11use crate::{
12 container::{Container, Volume, WaitStrategy},
13 error::{ContainerResult, ContainersError},
14 rt::{ContainerStatus, DetailedContainerInfo},
15};
16
17use super::{Client, Log};
18
19pub fn run_and_wait_for_command(command: &mut Command) -> ContainerResult<String> {
20 let output = try_run_and_wait_for_command(command)?;
21
22 if let Some(0) = output.status.code() {
23 Ok(String::from_utf8(output.stdout).unwrap())
24 } else {
25 Err(ContainersError::CommandError(output))
26 }
27}
28
29#[instrument(skip_all)]
30pub fn try_run_and_wait_for_command(command: &mut Command) -> ContainerResult<Output> {
31 debug!(?command, "Running command");
32
33 let child = command
34 .stdout(Stdio::piped()) .stderr(Stdio::piped())
36 .spawn()
37 .unwrap();
38
39 Ok(child.wait_with_output()?)
40}
41
42pub fn build_log_command<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
43 command.arg("logs").arg("-f").arg(&container.name)
44}
45
46pub fn build_rm_command<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
47 command.arg("rm").arg("-f").arg(&container.name)
48}
49
50pub fn build_stop_command<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
51 command.arg("stop").arg(&container.name)
52}
53
54pub fn build_run_command<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
55 add_run_args(command);
56 add_name_arg(command, container);
57 add_env_var_args(command, container);
58 add_volume_args(command, container);
59 add_export_ports_args(command, container);
60 add_health_check_args(command, container);
61 add_image_arg(command, container);
62 add_command_arg(command, container);
63
64 command
65}
66
67fn add_volume_args<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
68 let folded = container
69 .volumes
70 .iter()
71 .fold(command, |c: &mut Command, volume| match volume {
72 Volume::Mount {
73 host_path,
74 mount_point,
75 } => c.arg("-v").arg(format!("{host_path}:{mount_point}")),
76 Volume::Named { name, mount_point } => c.arg("-v").arg(format!("{name}:{mount_point}")),
77 });
78
79 folded
80}
81
82fn add_command_arg<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
83 let folded = container
84 .command
85 .iter()
86 .fold(command, |c: &mut Command, arg| c.arg(arg));
87
88 folded
89}
90
91fn add_name_arg<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
92 command.arg("--name").arg(&container.name)
93}
94
95pub fn build_inspect_command<'a>(command: &'a mut Command, container: &Container) -> &'a Command {
96 command.arg("inspect").arg(&container.name)
97}
98
99fn add_run_args(command: &mut Command) {
100 command.arg("run").arg("-d");
101}
102
103fn add_env_var_args(command: &mut Command, container: &Container) {
104 container.env_vars.iter().for_each(|env_var| {
105 command
106 .arg("-e")
107 .arg(format!("{}={}", env_var.key, env_var.value));
108 });
109}
110
111fn add_health_check_args(command: &mut Command, container: &Container) {
112 if let Some(check) = &container.health_check {
113 command.arg("--health-cmd").arg(&check.command);
114
115 if let Some(start_period) = check.start_period {
116 command.arg(format!("--health-start-period={}s", start_period.as_secs()));
117 }
118
119 if let Some(interval) = check.interval {
120 command.arg(format!("--health-interval={}s", interval.as_secs()));
121 }
122
123 if let Some(timeout) = check.timeout {
124 command.arg(format!("--health-timeout={}s", timeout.as_secs()));
125 }
126
127 if let Some(retries) = check.retries {
128 command.arg(format!("--health-retries={}", retries));
129 }
130 }
131}
132
133fn add_image_arg(command: &mut Command, container: &Container) {
134 command.arg(String::from(&container.image));
135}
136
137fn add_export_ports_args(command: &mut Command, container: &Container) {
138 container.port_mappings.iter().for_each(|port_mapping| {
139 command.arg(format!(
140 "-p{}:{}",
141 port_mapping.source.number, port_mapping.target.number
142 ));
143 })
144}
145
146#[instrument(skip_all)]
147pub fn inspect<C: Client>(
148 client: &C,
149 container: &Container,
150) -> ContainerResult<Option<DetailedContainerInfo>> {
151 let mut cmd = client.command();
152
153 build_inspect_command(&mut cmd, container);
154
155 let output = try_run_and_wait_for_command(&mut cmd)?;
156
157 let stdout = String::from_utf8(output.stdout.clone()).unwrap();
158 let stderr = String::from_utf8(output.stderr.clone()).unwrap();
159
160 match output.status.code() {
161 Some(0) => {
162 let container_infos: Vec<DetailedContainerInfo> = serde_json::from_str(&stdout)?;
163
164 debug!(?container_infos, "Inspect container");
165
166 match container_infos.get(0) {
167 Some(info) => Ok(Some(info.to_owned())),
168 None => Ok(None),
169 }
170 }
171 _ => {
172 if stderr.to_uppercase().contains("NO SUCH OBJECT") {
173 Ok(None)
174 } else {
175 Err(ContainersError::CommandError(output))
176 }
177 }
178 }
179}
180
181#[instrument(skip_all)]
182pub fn do_log<C: Client>(client: &C, container: &Container) -> ContainerResult<Log> {
183 let mut cmd = client.command();
184
185 build_log_command(&mut cmd, container);
186
187 let (reader, writer) = os_pipe::pipe()?;
188
189 let cmd = cmd
191 .stdout(writer.try_clone().unwrap())
192 .stderr(writer)
193 .spawn()?;
194
195 debug!(?cmd, "Reading log");
196
197 Ok(Log { reader })
198}
199
200pub fn wait_for<C: Client>(client: &C, container: &Container) -> ContainerResult<()> {
201 let result = match &container.wait_strategy {
202 Some(strategy) => match strategy {
203 WaitStrategy::LogMessage { pattern } => {
204 wait_for_log(client, container, strategy, pattern)
205 }
206 WaitStrategy::HealthCheck => Ok(wait_for_health_check(client, container)?),
207 WaitStrategy::WaitTime { duration } => Ok(wait_for_time(duration.to_owned())?),
208 },
209 None => Ok(()),
210 };
211
212 thread::sleep(container.additional_wait_period);
213
214 result
215}
216
217fn wait_for_time(duration: Duration) -> ContainerResult<()> {
218 thread::sleep(duration);
219 Ok(())
220}
221
222fn wait_for_log<C: Client>(
223 client: &C,
224 container: &Container,
225 wait_strategy: &WaitStrategy,
226 pattern: &Regex,
227) -> ContainerResult<()> {
228 let log = do_log(client, container)?;
229
230 do_wait_for_log(pattern, container, wait_strategy, log)?;
231
232 Ok(())
233}
234
235#[instrument(skip_all)]
236fn do_wait_for_log(
237 pattern: &Regex,
238 container: &Container,
239 wait_strategy: &WaitStrategy,
240 mut log: Log,
241) -> ContainerResult<()> {
242 debug!(?pattern, "Searching log");
243 for result in log.stream().lines() {
244 let line = result?;
245 debug!(?pattern, ?line, "Searching for pattern");
246 if pattern.is_match(&line) {
247 debug!(?line, ?pattern, "Found pattern");
248 return Ok(());
249 }
250 }
251
252 Err(ContainersError::ContainerWaitFailed {
253 container_name: container.name.clone(),
254 wait_strategy: wait_strategy.clone(),
255 })
256}
257
258#[instrument(skip_all)]
259fn wait_for_health_check<C: Client>(client: &C, container: &Container) -> ContainerResult<()> {
260 loop {
261 debug!("Checking health for {}", &container.name);
262
263 match inspect(client, container)? {
264 Some(info) => {
265 if let Some(health) = info.state.health {
266 match health.status {
267 ContainerStatus::Healthy => return Ok(()),
268 ContainerStatus::Starting => thread::sleep(Duration::from_millis(200)),
269 _ => {
270 return Err(ContainersError::ContainerStatusError {
271 status: health.status,
272 })
273 }
274 }
275 }
276 }
277 None => {
278 return Err(ContainersError::ContainerNotExists {
279 container_name: container.name.clone(),
280 })
281 }
282 }
283 }
284}