switchboard_container_utils/manager/
mod.rs

1use crate::*;
2
3use async_trait::async_trait;
4use bollard::service::{ContainerSummary, ImageInspect};
5use futures_util::StreamExt;
6use hyper::body::Body;
7use std::sync::Arc;
8use tokio::fs::File;
9use tokio_util::codec::{BytesCodec, FramedRead};
10
11mod capacity;
12pub use capacity::*;
13
14mod docker;
15pub use docker::*;
16
17mod backoff;
18pub use backoff::*;
19
20// mod qvn_manager;
21// pub use qvn_manager::*;
22
23pub use bollard::{
24    auth::DockerCredentials,
25    container::{
26        Config, CreateContainerOptions, DownloadFromContainerOptions, KillContainerOptions,
27        ListContainersOptions, LogOutput, LogsOptions, PruneContainersOptions,
28        RemoveContainerOptions, StartContainerOptions,
29    },
30    image::{CreateImageOptions, ImportImageOptions, ListImagesOptions, PruneImagesOptions},
31    service::{
32        AuthConfig, CreateImageInfo, HostConfig, Mount, RestartPolicy, RestartPolicyNameEnum,
33    },
34    system::Version,
35    Docker,
36};
37
38/////////////////////////////////////////////////////////////////////////////////////////////
39// TRAIT DEFINITION
40/////////////////////////////////////////////////////////////////////////////////////////////
41
42/// This ContainerManager trait provides utilities for managing a set of docker containers.
43/// It includes functions for creating, starting, stopping, and removing containers,
44/// as well as querying their status and retrieving their logs.
45///
46/// # Examples
47///
48/// ```
49/// use switchboard_container_utils::manager::{ContainerManager, DockerManager};
50/// use bollard::Docker;
51///
52/// let manager = Arc::new(DockerManager::new(
53///     Arc::new(Docker::connect_with_unix_defaults().unwrap()),
54///     Some(Config {
55///         ..get_default_docker_config()
56///     }),
57/// ));
58/// ```
59#[async_trait]
60pub trait ContainerManager {
61    fn docker(&self) -> &std::sync::Arc<Docker>;
62
63    fn docker_credentials(&self) -> &DockerCredentials;
64
65    fn docker_default_config(&self) -> &Config<String>;
66
67    /// Asynchronously gets the version of Docker being used by the manager.
68    ///
69    /// # Errors
70    ///
71    /// Returns a `ContainerError` if there was an issue fetching the Docker version.
72    ///
73    /// # Returns
74    ///
75    /// Returns a `Version` struct containing information about the Docker version.
76    async fn get_version(&self) -> ContainerResult<Version> {
77        let result = self
78            .docker()
79            .version()
80            .await
81            .map_err(handle_bollard_error)?;
82        Ok(result)
83    }
84
85    fn get_container_config(
86        &self,
87        image_name: &str,
88        env: Option<Vec<String>>,
89        overrides: DockerContainerOverrides,
90    ) -> Config<String> {
91        let default_config = self.docker_default_config().clone();
92        let default_host_config = default_config.host_config.unwrap_or_default();
93        let restart_policy = default_host_config
94            .clone()
95            .restart_policy
96            .unwrap_or(RestartPolicy {
97                name: Some(RestartPolicyNameEnum::NO),
98                maximum_retry_count: None,
99            });
100
101        Config {
102            image: Some(image_name.to_string()),
103            entrypoint: overrides.entrypoint,
104            env,
105            host_config: Some(HostConfig {
106                restart_policy: Some(restart_policy),
107                ..default_host_config
108            }),
109            ..default_config
110        }
111    }
112
113    async fn get_image_size(&self, image: &str, tag: Option<&str>) -> ContainerResult<u64> {
114        let image_name = format!("{}:{}", image, tag.unwrap_or("latest"));
115        let image_info = self
116            .docker()
117            .inspect_image(&image_name)
118            .await
119            .map_err(handle_bollard_error)?;
120
121        if let Some(size) = image_info.size {
122            let size_result = size.try_into();
123            if let Ok(size) = size_result {
124                return Ok(size);
125            }
126        }
127
128        Err(SbError::ContainerErrorMessage(format!(
129            "Failed to get size for image {}",
130            image_name
131        )))
132    }
133
134    async fn load_image_from_archive(&self, filepath: &str, quiet: bool) -> ContainerResult<()> {
135        let archive = File::open(filepath)
136            .await
137            .map_err(|e| ContainerError::Message(format!("could not open archive, {}", e)))
138            .unwrap();
139        let stream = FramedRead::new(archive, BytesCodec::new());
140        let body = Body::wrap_stream(stream);
141
142        let mut import_image_stream =
143            self.docker()
144                .import_image(ImportImageOptions { quiet }, body, None);
145
146        while let Some(msg) = import_image_stream.next().await {
147            println!("[import]: {msg:?}");
148        }
149
150        Ok(())
151    }
152
153    async fn fetch_image(&self, image_name: &str) -> ContainerResult<bool> {
154        let (_image_name_without_version, mut image_version) = match image_name.split_once(':') {
155            Some((left, right)) => (left, right),
156            None => (image_name, ""),
157        };
158
159        if image_version.is_empty() {
160            image_version = "latest";
161        }
162
163        let mut was_downloaded = false;
164
165        let mut create_img_stream = self.docker().create_image(
166            Some(CreateImageOptions {
167                from_image: image_name.to_string(),
168                platform: "linux/amd64".to_string(),
169                tag: image_version.to_string(),
170                ..Default::default()
171            }),
172            None,
173            Some(self.docker_credentials().clone()),
174        );
175        while let Some(Ok(progress)) = create_img_stream.next().await {
176            was_downloaded = true;
177            trace!(
178                "{:?} {:?} {:?} {:?}",
179                image_name,
180                progress.id,
181                progress.status,
182                progress.progress,
183                { id: image_name }
184            );
185        }
186
187        Ok(was_downloaded)
188    }
189
190    async fn inspect_image(&self, image_name: &str) -> ContainerResult<ImageInspect> {
191        self.fetch_image(image_name).await?;
192
193        self.docker()
194            .inspect_image(image_name)
195            .await
196            .map_err(handle_bollard_error)
197    }
198
199    async fn create_docker_container(
200        &self,
201        key: &str,
202        image_name: &str,
203        env: Option<Vec<String>>,
204        overrides: Option<DockerContainerOverrides>,
205    ) -> ContainerResult<DockerContainer> {
206        let env = env.unwrap_or_default();
207
208        let config =
209            self.get_container_config(image_name, Some(env.clone()), overrides.unwrap_or_default());
210
211        let mut create_img_stream = self.docker().create_image(
212            Some(CreateImageOptions {
213                from_image: image_name.to_string(),
214                platform: "linux/amd64".to_string(),
215                ..Default::default()
216            }),
217            None,
218            Some(self.docker_credentials().clone()),
219        );
220        while let Some(Ok(progress)) = create_img_stream.next().await {
221            trace!(
222                "{:?} {:?} {:?} {:?}",
223                image_name,
224                progress.id,
225                progress.status,
226                progress.progress,
227                { id: key }
228            );
229        }
230
231        match self
232            .docker()
233            .create_container::<String, _>(
234                Some(CreateContainerOptions {
235                    ..Default::default()
236                }),
237                config.clone(),
238            )
239            .await
240        {
241            Ok(result) => {
242                info!("Created container for image {} ({})", image_name, result.id, { id: key });
243                // println!("[DOCKER ENV] {:#?}", env.clone());
244
245                Ok(DockerContainer {
246                    id: result.id,
247                    image_name: image_name.to_string(),
248                    env: env.clone(),
249                    docker: self.docker().clone(),
250                    config: config.clone(),
251                })
252            }
253            Err(error) => {
254                info!("Failed to create container for image {}, {}", image_name, error, { id: key });
255
256                Err(SbError::ContainerCreateError(Arc::new(error)))
257            }
258        }
259    }
260
261    async fn prune_container(&self, id: String) -> ContainerResult<String> {
262        self.docker()
263            .kill_container::<&str>(&id, Some(KillContainerOptions { signal: "SIGKILL" }))
264            .await
265            .map_err(handle_bollard_error)?;
266
267        self.docker()
268            .remove_container(
269                &id,
270                Some(RemoveContainerOptions {
271                    force: true,
272                    ..Default::default()
273                }),
274            )
275            .await
276            .map_err(handle_bollard_error)?;
277
278        Ok(id)
279    }
280
281    async fn prune_containers(&self, until: &str) {
282        let mut filters = std::collections::HashMap::new();
283        filters.insert("until", vec![until]);
284
285        let result = self
286            .docker()
287            .prune_containers(Some(PruneContainersOptions { filters }))
288            .await;
289
290        if let Ok(result) = result {
291            for container in result.containers_deleted.unwrap_or_default() {
292                info!("deleted container {}", container, { id: "prune" });
293            }
294
295            if let Some(space_reclaimed) = result.space_reclaimed {
296                if space_reclaimed > 0 {
297                    info!("[CONTAINERS] space_reclaimed: {:?}", format_bytes(space_reclaimed as f64), { id: "prune" });
298                }
299            }
300        }
301    }
302
303    async fn prune_images(&self, until: &str) {
304        let mut filters = std::collections::HashMap::new();
305        filters.insert("until", vec![until]);
306        filters.insert(
307            "label!",
308            vec![
309                "name=switchboardlabs/sgx-function",
310                "name=switchboardlabs/qvn",
311                "name=gramine",
312            ],
313        );
314
315        let result = self
316            .docker()
317            .prune_images(Some(PruneImagesOptions { filters }))
318            .await;
319
320        if let Ok(result) = result {
321            if let Some(space_reclaimed) = result.space_reclaimed {
322                if space_reclaimed > 0 {
323                    info!("[IMAGES] space_reclaimed: {:?}", format_bytes(space_reclaimed as f64), { id: "prune" });
324                }
325            }
326        }
327    }
328
329    /// Asynchronously retrieves a vector of active container summaries.
330    ///
331    /// # Errors
332    ///
333    /// Returns a `ContainerError` if there was an error retrieving the active containers.
334    ///
335    /// # Examples
336    ///
337    /// ```rust
338    /// # use switchboard_container_utils::manager::Manager;
339    /// # use std::error::Error;
340    /// #
341    /// # #[tokio::main]
342    /// # async fn main() -> Result<(), Box<dyn Error>> {
343    /// #     let manager = Manager::new()?;
344    /// #
345    /// #     let active_containers = manager.get_active_containers().await?;
346    /// #
347    /// #     Ok(())
348    /// # }
349    /// ```
350    async fn get_active_containers(&self) -> ContainerResult<Vec<ContainerSummary>> {
351        let mut filters = std::collections::HashMap::new();
352        filters.insert("status", vec!["running", "restarting"]);
353
354        let options = Some(ListContainersOptions {
355            all: true,
356            filters,
357            ..Default::default()
358        });
359
360        self.docker()
361            .list_containers(options)
362            .await
363            .map_err(handle_bollard_error)
364    }
365}