switchboard_container_utils/container/
mod.rs1use crate::*;
2
3use async_trait::async_trait;
4use bollard::{
5 container::{
6 AttachContainerOptions, AttachContainerResults, DownloadFromContainerOptions,
7 InspectContainerOptions, KillContainerOptions, LogOutput, RemoveContainerOptions,
8 RestartContainerOptions,
9 },
10 exec::{CreateExecOptions, StartExecOptions},
11 Docker,
12};
13use futures_util::StreamExt;
14use std::marker::Send;
15use std::sync::Arc;
16use std::time::Duration;
17use switchboard_common::FunctionResult;
18use tokio::io::AsyncReadExt;
19use tokio::select;
20use tokio::time::interval;
21use tokio_tar::Archive;
22use tokio_util::io::StreamReader;
23use std::future::Future;
24
25mod docker;
26pub use docker::*;
27
28mod qvn;
29pub use qvn::*;
30
31#[derive(Clone, Debug, Default)]
32pub struct DockerContainerOverrides {
33 pub entrypoint: Option<Vec<String>>,
34}
35
36#[async_trait]
41pub trait Container {
42 fn docker(&self) -> &std::sync::Arc<Docker>;
43
44 fn id(&self) -> &String;
45
46 fn image_name(&self) -> &String;
47
48 async fn remove(&self) -> ContainerResult<()> {
49 self.docker()
50 .kill_container::<&str>(self.id(), Some(KillContainerOptions { signal: "SIGKILL" }))
51 .await
52 .map_err(handle_bollard_error)?;
53
54 self.docker()
55 .remove_container(
56 self.id(),
57 Some(RemoveContainerOptions {
58 force: true,
59 ..Default::default()
60 }),
61 )
62 .await
63 .map_err(handle_bollard_error)?;
64
65 Ok(())
66 }
67
68 async fn restart(&self) -> ContainerResult<()> {
69 self.docker()
70 .restart_container(self.id(), None::<RestartContainerOptions>)
71 .await
72 .map_err(handle_bollard_error)?;
73
74 Ok(())
75 }
76
77 async fn kill(&self, remove: bool) -> ContainerResult<()> {
78 self.docker()
79 .kill_container::<&str>(self.id(), Some(KillContainerOptions { signal: "SIGKILL" }))
80 .await
81 .map_err(handle_bollard_error)?;
82
83 if remove {
84 self.docker()
85 .remove_container(
86 self.id(),
87 Some(RemoveContainerOptions {
88 force: true,
89 ..Default::default()
90 }),
91 )
92 .await
93 .map_err(handle_bollard_error)?;
94 }
95
96 Ok(())
97 }
98
99 async fn start_container(&self) -> ContainerResult<()> {
100 self.docker()
101 .start_container::<String>(self.id().clone().as_str(), None)
102 .await
103 .map_err(|e| SbError::ContainerStartError(Arc::new(e)))?;
104
105 info!("Started container {}", self.image_name(), { id: self.id() });
106
107 Ok(())
108 }
109
110 async fn run_and_decode<F, Fut>(
111 &self,
112 timeout_secs: Option<u64>,
113 on_log: F,
114 ) -> ContainerResult<FunctionResult>
115 where
116 F: Fn(String) -> Fut + Send + Sync,
117 Fut: Future<Output = ()> + Send,
118 {
119 self.start_container().await?;
120
121 let logs = self
122 .attach_and_collect_logs(timeout_secs, on_log)
123 .await
124 .unwrap_or_default();
125
126 find_fn_result(&logs)
127 }
128
129 async fn run<F, Fut>(&self, timeout_secs: Option<u64>, on_log: F) -> ContainerResult<String>
130 where
131 F: Fn(String) -> Fut + Send + Sync,
132 Fut: Future<Output = ()> + Send,
133 {
134 self.start_container().await?;
135 self.attach_and_collect_logs(timeout_secs, on_log).await
136 }
137
138 async fn run_command<F, Fut>(
139 &self,
140 cmd: Vec<String>,
141 timeout_secs: Option<u64>,
142 on_log: F,
143 ) -> ContainerResult<String>
144 where
145 F: Fn(String) -> Fut + Send + Sync,
146 Fut: Future<Output = ()> + Send,
147 {
148 let exec = self
149 .docker()
150 .create_exec(
151 self.id(),
152 CreateExecOptions {
153 cmd: Some(cmd),
154 attach_stdout: Some(true),
155 ..Default::default()
156 },
157 )
158 .await
159 .map_err(|e| SbError::ContainerCreateError(Arc::new(e)))?;
160
161 let _ = self
162 .docker()
163 .start_exec(
164 &exec.id,
165 Some(StartExecOptions {
166 detach: true,
167 output_capacity: Some(usize::MAX),
168 }),
169 )
170 .await
171 .map_err(|e| SbError::ContainerStartError(Arc::new(e)))?;
172
173 self.attach_and_collect_logs(timeout_secs, on_log).await
174 }
175
176 async fn attach_and_collect_logs<F, Fut>(
177 &self,
178 timeout_secs: Option<u64>,
179 on_log: F,
180 ) -> ContainerResult<String>
181 where
182 F: Fn(String) -> Fut + Send + Sync,
183 Fut: Future<Output = ()> + Send,
184 {
185 let mut timeout = interval(Duration::from_secs(timeout_secs.unwrap_or(30)));
187 timeout.tick().await;
188
189 let AttachContainerResults {
190 mut output,
191 input: _,
192 } = self
193 .docker()
194 .attach_container(
195 self.id(),
196 Some(AttachContainerOptions {
197 stdin: Some(true),
198 stdout: Some(true),
199 stderr: Some(true),
200 stream: Some(true),
201 logs: Some(true),
202 detach_keys: Some("ctrl-c".to_string()),
203 }),
204 )
205 .await
206 .map_err(|e| SbError::ContainerStartError(Arc::new(e)))?;
207
208 let mut container_logs = String::new();
209
210 let mut line_buffer = String::new();
211
212 loop {
213 select! {
214 _ = timeout.tick() => {
215 debug!("Exec {} stopped", self.image_name(), { id: self.id() });
216 return Err(SbError::ContainerTimeout);
217 },
218 new_log = output.next() => {
219 if new_log.is_none() {
220 debug!("Container {} completed", self.image_name(), { id: self.id() });
221 break;
222 }
223 let mut fd = "";
224 let mut msg = Default::default();
225 match new_log.unwrap() {
226 Ok(LogOutput::StdOut {message}) => (fd, msg) = ("stdout", message),
227 Ok(LogOutput::StdErr {message}) => (fd, msg) = ("stderr", message),
228 _ => {
229 warn!("unexpected", { id: self.id() });
230 },
231 }
232 if fd.is_empty() {
233 warn!("unexpected", { id: self.id() });
234 continue;
235 }
236 for byte in &msg {
237 if *byte == b'\n' {
238 on_log(line_buffer.clone()).await;
239 line_buffer.clear();
240 } else {
241 line_buffer.push(*byte as char);
242 }
243 }
244 let msg_str = String::from_utf8_lossy(&msg).to_string();
245 container_logs += msg_str.as_str();
246 },
247 }
248 }
249 if !line_buffer.is_empty() {
250 on_log(line_buffer.clone()).await;
251 }
252
253 Ok(container_logs.trim().to_string())
254 }
255
256 async fn fetch_file(&self, path: &str) -> ContainerResult<String> {
257 let stream = self
260 .docker()
261 .download_from_container(self.id(), Some(DownloadFromContainerOptions { path }));
262
263 let async_read = StreamReader::new(
265 stream.map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))),
266 );
267
268 let mut archive = Archive::new(async_read);
270
271 let mut file_contents: String = String::new();
273 while let Some(entry) = archive.entries()?.next().await {
274 let mut entry = entry?;
275 entry.read_to_string(&mut file_contents).await?;
276 }
278
279 Ok(file_contents.trim().to_string())
280 }
281
282 async fn fetch_measurement(&self) -> ContainerResult<String> {
284 let stream = self.docker().download_from_container(
285 self.id(),
286 Some(DownloadFromContainerOptions {
287 path: "/measurement.txt",
288 }),
289 );
290
291 let async_read = StreamReader::new(
293 stream.map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))),
294 );
295
296 let mut archive = Archive::new(async_read);
298
299 let mut file_contents: String = String::new();
301 while let Some(entry) = archive.entries()?.next().await {
302 let mut entry = entry?;
303 if entry.path()?.ends_with("measurement.txt") {
304 entry.read_to_string(&mut file_contents).await?;
305 break;
306 }
307 }
308
309 Ok(file_contents.trim().to_string())
310 }
311
312 async fn run_get_measurement(&self) -> ContainerResult<String> {
313 self.run_command(
314 vec!["/bin/bash".to_string(), "/get_measurement.sh".to_string()],
315 None,
316 |_| async move {},
317 )
318 .await
319 .unwrap_or_default();
320
321 let measurement = self.fetch_measurement().await.unwrap_or_default();
322
323 Ok(measurement)
324 }
325
326 async fn get_ip_address(&self) -> ContainerResult<String> {
327 let options = Some(InspectContainerOptions { size: false });
328 let container = self
329 .docker()
330 .inspect_container(self.id(), options)
331 .await
332 .map_err(handle_bollard_error)?;
333 let network_settings = container
334 .network_settings
335 .ok_or("No network settings found for container")?;
336 let networks = network_settings
337 .networks
338 .ok_or("No networks found for container")?;
339 let network = networks
340 .values()
341 .next()
342 .ok_or("No networks found for container")?;
343 let ip_address = network
344 .ip_address
345 .clone()
346 .ok_or("No IP address found for container")?;
347 Ok(ip_address)
348 }
349}