switchboard_container_utils/container/
mod.rs

1use crate::*;
2
3use async_trait::async_trait;
4use bollard::{
5    container::{
6        AttachContainerOptions, AttachContainerResults, DownloadFromContainerOptions,
7        InspectContainerOptions, KillContainerOptions, LogOutput, RemoveContainerOptions,
8        RestartContainerOptions,
9    },
10    exec::{CreateExecOptions, StartExecOptions},
11    Docker,
12};
13use futures_util::StreamExt;
14use std::marker::Send;
15use std::sync::Arc;
16use std::time::Duration;
17use switchboard_common::FunctionResult;
18use tokio::io::AsyncReadExt;
19use tokio::select;
20use tokio::time::interval;
21use tokio_tar::Archive;
22use tokio_util::io::StreamReader;
23use std::future::Future;
24
25mod docker;
26pub use docker::*;
27
28mod qvn;
29pub use qvn::*;
30
31#[derive(Clone, Debug, Default)]
32pub struct DockerContainerOverrides {
33    pub entrypoint: Option<Vec<String>>,
34}
35
36/////////////////////////////////////////////////////////////////////////////////////////////
37// TRAIT DEFINITION
38/////////////////////////////////////////////////////////////////////////////////////////////
39
40#[async_trait]
41pub trait Container {
42    fn docker(&self) -> &std::sync::Arc<Docker>;
43
44    fn id(&self) -> &String;
45
46    fn image_name(&self) -> &String;
47
48    async fn remove(&self) -> ContainerResult<()> {
49        self.docker()
50            .kill_container::<&str>(self.id(), Some(KillContainerOptions { signal: "SIGKILL" }))
51            .await
52            .map_err(handle_bollard_error)?;
53
54        self.docker()
55            .remove_container(
56                self.id(),
57                Some(RemoveContainerOptions {
58                    force: true,
59                    ..Default::default()
60                }),
61            )
62            .await
63            .map_err(handle_bollard_error)?;
64
65        Ok(())
66    }
67
68    async fn restart(&self) -> ContainerResult<()> {
69        self.docker()
70            .restart_container(self.id(), None::<RestartContainerOptions>)
71            .await
72            .map_err(handle_bollard_error)?;
73
74        Ok(())
75    }
76
77    async fn kill(&self, remove: bool) -> ContainerResult<()> {
78        self.docker()
79            .kill_container::<&str>(self.id(), Some(KillContainerOptions { signal: "SIGKILL" }))
80            .await
81            .map_err(handle_bollard_error)?;
82
83        if remove {
84            self.docker()
85                .remove_container(
86                    self.id(),
87                    Some(RemoveContainerOptions {
88                        force: true,
89                        ..Default::default()
90                    }),
91                )
92                .await
93                .map_err(handle_bollard_error)?;
94        }
95
96        Ok(())
97    }
98
99    async fn start_container(&self) -> ContainerResult<()> {
100        self.docker()
101            .start_container::<String>(self.id().clone().as_str(), None)
102            .await
103            .map_err(|e| SbError::ContainerStartError(Arc::new(e)))?;
104
105        info!("Started container {}", self.image_name(), { id: self.id() });
106
107        Ok(())
108    }
109
110    async fn run_and_decode<F, Fut>(
111        &self,
112        timeout_secs: Option<u64>,
113        on_log: F,
114    ) -> ContainerResult<FunctionResult>
115    where
116        F: Fn(String) -> Fut + Send + Sync,
117        Fut: Future<Output = ()> + Send,
118    {
119        self.start_container().await?;
120
121        let logs = self
122            .attach_and_collect_logs(timeout_secs, on_log)
123            .await
124            .unwrap_or_default();
125
126        find_fn_result(&logs)
127    }
128
129    async fn run<F, Fut>(&self, timeout_secs: Option<u64>, on_log: F) -> ContainerResult<String>
130    where
131        F: Fn(String) -> Fut + Send + Sync,
132        Fut: Future<Output = ()> + Send,
133    {
134        self.start_container().await?;
135        self.attach_and_collect_logs(timeout_secs, on_log).await
136    }
137
138    async fn run_command<F, Fut>(
139        &self,
140        cmd: Vec<String>,
141        timeout_secs: Option<u64>,
142        on_log: F,
143    ) -> ContainerResult<String>
144    where
145        F: Fn(String) -> Fut + Send + Sync,
146        Fut: Future<Output = ()> + Send,
147    {
148        let exec = self
149            .docker()
150            .create_exec(
151                self.id(),
152                CreateExecOptions {
153                    cmd: Some(cmd),
154                    attach_stdout: Some(true),
155                    ..Default::default()
156                },
157            )
158            .await
159            .map_err(|e| SbError::ContainerCreateError(Arc::new(e)))?;
160
161        let _ = self
162            .docker()
163            .start_exec(
164                &exec.id,
165                Some(StartExecOptions {
166                    detach: true,
167                    output_capacity: Some(usize::MAX),
168                }),
169            )
170            .await
171            .map_err(|e| SbError::ContainerStartError(Arc::new(e)))?;
172
173        self.attach_and_collect_logs(timeout_secs, on_log).await
174    }
175
176    async fn attach_and_collect_logs<F, Fut>(
177        &self,
178        timeout_secs: Option<u64>,
179        on_log: F,
180    ) -> ContainerResult<String>
181    where
182        F: Fn(String) -> Fut + Send + Sync,
183        Fut: Future<Output = ()> + Send,
184    {
185        // let start = unix_timestamp();
186        let mut timeout = interval(Duration::from_secs(timeout_secs.unwrap_or(30)));
187        timeout.tick().await;
188
189        let AttachContainerResults {
190            mut output,
191            input: _,
192        } = self
193            .docker()
194            .attach_container(
195                self.id(),
196                Some(AttachContainerOptions {
197                    stdin: Some(true),
198                    stdout: Some(true),
199                    stderr: Some(true),
200                    stream: Some(true),
201                    logs: Some(true),
202                    detach_keys: Some("ctrl-c".to_string()),
203                }),
204            )
205            .await
206            .map_err(|e| SbError::ContainerStartError(Arc::new(e)))?;
207
208        let mut container_logs = String::new();
209
210        let mut line_buffer = String::new();
211
212        loop {
213            select! {
214                _ = timeout.tick() => {
215                    debug!("Exec {} stopped", self.image_name(), { id: self.id() });
216                    return Err(SbError::ContainerTimeout);
217                },
218                new_log = output.next() => {
219                    if new_log.is_none() {
220                        debug!("Container {} completed", self.image_name(),  { id: self.id() });
221                        break;
222                    }
223                    let mut fd = "";
224                    let mut msg = Default::default();
225                    match new_log.unwrap() {
226                        Ok(LogOutput::StdOut {message}) => (fd, msg) = ("stdout", message),
227                        Ok(LogOutput::StdErr {message}) => (fd, msg) = ("stderr", message),
228                        _ => {
229                            warn!("unexpected", { id: self.id() });
230                        },
231                    }
232                    if fd.is_empty() {
233                        warn!("unexpected",  { id: self.id() });
234                        continue;
235                    }
236                    for byte in &msg {
237                        if *byte == b'\n' {
238                            on_log(line_buffer.clone()).await;
239                            line_buffer.clear();
240                        } else {
241                            line_buffer.push(*byte as char);
242                        }
243                    }
244                    let msg_str = String::from_utf8_lossy(&msg).to_string();
245                    container_logs += msg_str.as_str();
246                },
247            }
248        }
249        if !line_buffer.is_empty() {
250            on_log(line_buffer.clone()).await;
251        }
252
253        Ok(container_logs.trim().to_string())
254    }
255
256    async fn fetch_file(&self, path: &str) -> ContainerResult<String> {
257        // TODO: parse path and verify it is a file and not a directory
258
259        let stream = self
260            .docker()
261            .download_from_container(self.id(), Some(DownloadFromContainerOptions { path }));
262
263        // Convert the stream into an AsyncRead
264        let async_read = StreamReader::new(
265            stream.map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))),
266        );
267
268        // Unarchive the tar
269        let mut archive = Archive::new(async_read);
270
271        // Look for the file in the stream of entries
272        let mut file_contents: String = String::new();
273        while let Some(entry) = archive.entries()?.next().await {
274            let mut entry = entry?;
275            entry.read_to_string(&mut file_contents).await?;
276            // break;
277        }
278
279        Ok(file_contents.trim().to_string())
280    }
281
282    // Gets the file at /measurement.txt. DOES NOT GET THE ACTUAL MEASUREMENT
283    async fn fetch_measurement(&self) -> ContainerResult<String> {
284        let stream = self.docker().download_from_container(
285            self.id(),
286            Some(DownloadFromContainerOptions {
287                path: "/measurement.txt",
288            }),
289        );
290
291        // Convert the stream into an AsyncRead
292        let async_read = StreamReader::new(
293            stream.map(|res| res.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))),
294        );
295
296        // Unarchive the tar
297        let mut archive = Archive::new(async_read);
298
299        // Look for the 'measurement.txt' file in the stream of entries
300        let mut file_contents: String = String::new();
301        while let Some(entry) = archive.entries()?.next().await {
302            let mut entry = entry?;
303            if entry.path()?.ends_with("measurement.txt") {
304                entry.read_to_string(&mut file_contents).await?;
305                break;
306            }
307        }
308
309        Ok(file_contents.trim().to_string())
310    }
311
312    async fn run_get_measurement(&self) -> ContainerResult<String> {
313        self.run_command(
314            vec!["/bin/bash".to_string(), "/get_measurement.sh".to_string()],
315            None,
316            |_| async move {},
317        )
318        .await
319        .unwrap_or_default();
320
321        let measurement = self.fetch_measurement().await.unwrap_or_default();
322
323        Ok(measurement)
324    }
325
326    async fn get_ip_address(&self) -> ContainerResult<String> {
327        let options = Some(InspectContainerOptions { size: false });
328        let container = self
329            .docker()
330            .inspect_container(self.id(), options)
331            .await
332            .map_err(handle_bollard_error)?;
333        let network_settings = container
334            .network_settings
335            .ok_or("No network settings found for container")?;
336        let networks = network_settings
337            .networks
338            .ok_or("No networks found for container")?;
339        let network = networks
340            .values()
341            .next()
342            .ok_or("No networks found for container")?;
343        let ip_address = network
344            .ip_address
345            .clone()
346            .ok_or("No IP address found for container")?;
347        Ok(ip_address)
348    }
349}