switchboard_container_utils/manager/
mod.rs1use crate::*;
2
3use async_trait::async_trait;
4use bollard::service::{ContainerSummary, ImageInspect};
5use futures_util::StreamExt;
6use hyper::body::Body;
7use std::sync::Arc;
8use tokio::fs::File;
9use tokio_util::codec::{BytesCodec, FramedRead};
10
11mod capacity;
12pub use capacity::*;
13
14mod docker;
15pub use docker::*;
16
17mod backoff;
18pub use backoff::*;
19
20pub use bollard::{
24 auth::DockerCredentials,
25 container::{
26 Config, CreateContainerOptions, DownloadFromContainerOptions, KillContainerOptions,
27 ListContainersOptions, LogOutput, LogsOptions, PruneContainersOptions,
28 RemoveContainerOptions, StartContainerOptions,
29 },
30 image::{CreateImageOptions, ImportImageOptions, ListImagesOptions, PruneImagesOptions},
31 service::{
32 AuthConfig, CreateImageInfo, HostConfig, Mount, RestartPolicy, RestartPolicyNameEnum,
33 },
34 system::Version,
35 Docker,
36};
37
38#[async_trait]
60pub trait ContainerManager {
61 fn docker(&self) -> &std::sync::Arc<Docker>;
62
63 fn docker_credentials(&self) -> &DockerCredentials;
64
65 fn docker_default_config(&self) -> &Config<String>;
66
67 async fn get_version(&self) -> ContainerResult<Version> {
77 let result = self
78 .docker()
79 .version()
80 .await
81 .map_err(handle_bollard_error)?;
82 Ok(result)
83 }
84
85 fn get_container_config(
86 &self,
87 image_name: &str,
88 env: Option<Vec<String>>,
89 overrides: DockerContainerOverrides,
90 ) -> Config<String> {
91 let default_config = self.docker_default_config().clone();
92 let default_host_config = default_config.host_config.unwrap_or_default();
93 let restart_policy = default_host_config
94 .clone()
95 .restart_policy
96 .unwrap_or(RestartPolicy {
97 name: Some(RestartPolicyNameEnum::NO),
98 maximum_retry_count: None,
99 });
100
101 Config {
102 image: Some(image_name.to_string()),
103 entrypoint: overrides.entrypoint,
104 env,
105 host_config: Some(HostConfig {
106 restart_policy: Some(restart_policy),
107 ..default_host_config
108 }),
109 ..default_config
110 }
111 }
112
113 async fn get_image_size(&self, image: &str, tag: Option<&str>) -> ContainerResult<u64> {
114 let image_name = format!("{}:{}", image, tag.unwrap_or("latest"));
115 let image_info = self
116 .docker()
117 .inspect_image(&image_name)
118 .await
119 .map_err(handle_bollard_error)?;
120
121 if let Some(size) = image_info.size {
122 let size_result = size.try_into();
123 if let Ok(size) = size_result {
124 return Ok(size);
125 }
126 }
127
128 Err(SbError::ContainerErrorMessage(format!(
129 "Failed to get size for image {}",
130 image_name
131 )))
132 }
133
134 async fn load_image_from_archive(&self, filepath: &str, quiet: bool) -> ContainerResult<()> {
135 let archive = File::open(filepath)
136 .await
137 .map_err(|e| ContainerError::Message(format!("could not open archive, {}", e)))
138 .unwrap();
139 let stream = FramedRead::new(archive, BytesCodec::new());
140 let body = Body::wrap_stream(stream);
141
142 let mut import_image_stream =
143 self.docker()
144 .import_image(ImportImageOptions { quiet }, body, None);
145
146 while let Some(msg) = import_image_stream.next().await {
147 println!("[import]: {msg:?}");
148 }
149
150 Ok(())
151 }
152
153 async fn fetch_image(&self, image_name: &str) -> ContainerResult<bool> {
154 let (_image_name_without_version, mut image_version) = match image_name.split_once(':') {
155 Some((left, right)) => (left, right),
156 None => (image_name, ""),
157 };
158
159 if image_version.is_empty() {
160 image_version = "latest";
161 }
162
163 let mut was_downloaded = false;
164
165 let mut create_img_stream = self.docker().create_image(
166 Some(CreateImageOptions {
167 from_image: image_name.to_string(),
168 platform: "linux/amd64".to_string(),
169 tag: image_version.to_string(),
170 ..Default::default()
171 }),
172 None,
173 Some(self.docker_credentials().clone()),
174 );
175 while let Some(Ok(progress)) = create_img_stream.next().await {
176 was_downloaded = true;
177 trace!(
178 "{:?} {:?} {:?} {:?}",
179 image_name,
180 progress.id,
181 progress.status,
182 progress.progress,
183 { id: image_name }
184 );
185 }
186
187 Ok(was_downloaded)
188 }
189
190 async fn inspect_image(&self, image_name: &str) -> ContainerResult<ImageInspect> {
191 self.fetch_image(image_name).await?;
192
193 self.docker()
194 .inspect_image(image_name)
195 .await
196 .map_err(handle_bollard_error)
197 }
198
199 async fn create_docker_container(
200 &self,
201 key: &str,
202 image_name: &str,
203 env: Option<Vec<String>>,
204 overrides: Option<DockerContainerOverrides>,
205 ) -> ContainerResult<DockerContainer> {
206 let env = env.unwrap_or_default();
207
208 let config =
209 self.get_container_config(image_name, Some(env.clone()), overrides.unwrap_or_default());
210
211 let mut create_img_stream = self.docker().create_image(
212 Some(CreateImageOptions {
213 from_image: image_name.to_string(),
214 platform: "linux/amd64".to_string(),
215 ..Default::default()
216 }),
217 None,
218 Some(self.docker_credentials().clone()),
219 );
220 while let Some(Ok(progress)) = create_img_stream.next().await {
221 trace!(
222 "{:?} {:?} {:?} {:?}",
223 image_name,
224 progress.id,
225 progress.status,
226 progress.progress,
227 { id: key }
228 );
229 }
230
231 match self
232 .docker()
233 .create_container::<String, _>(
234 Some(CreateContainerOptions {
235 ..Default::default()
236 }),
237 config.clone(),
238 )
239 .await
240 {
241 Ok(result) => {
242 info!("Created container for image {} ({})", image_name, result.id, { id: key });
243 Ok(DockerContainer {
246 id: result.id,
247 image_name: image_name.to_string(),
248 env: env.clone(),
249 docker: self.docker().clone(),
250 config: config.clone(),
251 })
252 }
253 Err(error) => {
254 info!("Failed to create container for image {}, {}", image_name, error, { id: key });
255
256 Err(SbError::ContainerCreateError(Arc::new(error)))
257 }
258 }
259 }
260
261 async fn prune_container(&self, id: String) -> ContainerResult<String> {
262 self.docker()
263 .kill_container::<&str>(&id, Some(KillContainerOptions { signal: "SIGKILL" }))
264 .await
265 .map_err(handle_bollard_error)?;
266
267 self.docker()
268 .remove_container(
269 &id,
270 Some(RemoveContainerOptions {
271 force: true,
272 ..Default::default()
273 }),
274 )
275 .await
276 .map_err(handle_bollard_error)?;
277
278 Ok(id)
279 }
280
281 async fn prune_containers(&self, until: &str) {
282 let mut filters = std::collections::HashMap::new();
283 filters.insert("until", vec![until]);
284
285 let result = self
286 .docker()
287 .prune_containers(Some(PruneContainersOptions { filters }))
288 .await;
289
290 if let Ok(result) = result {
291 for container in result.containers_deleted.unwrap_or_default() {
292 info!("deleted container {}", container, { id: "prune" });
293 }
294
295 if let Some(space_reclaimed) = result.space_reclaimed {
296 if space_reclaimed > 0 {
297 info!("[CONTAINERS] space_reclaimed: {:?}", format_bytes(space_reclaimed as f64), { id: "prune" });
298 }
299 }
300 }
301 }
302
303 async fn prune_images(&self, until: &str) {
304 let mut filters = std::collections::HashMap::new();
305 filters.insert("until", vec![until]);
306 filters.insert(
307 "label!",
308 vec![
309 "name=switchboardlabs/sgx-function",
310 "name=switchboardlabs/qvn",
311 "name=gramine",
312 ],
313 );
314
315 let result = self
316 .docker()
317 .prune_images(Some(PruneImagesOptions { filters }))
318 .await;
319
320 if let Ok(result) = result {
321 if let Some(space_reclaimed) = result.space_reclaimed {
322 if space_reclaimed > 0 {
323 info!("[IMAGES] space_reclaimed: {:?}", format_bytes(space_reclaimed as f64), { id: "prune" });
324 }
325 }
326 }
327 }
328
329 async fn get_active_containers(&self) -> ContainerResult<Vec<ContainerSummary>> {
351 let mut filters = std::collections::HashMap::new();
352 filters.insert("status", vec!["running", "restarting"]);
353
354 let options = Some(ListContainersOptions {
355 all: true,
356 filters,
357 ..Default::default()
358 });
359
360 self.docker()
361 .list_containers(options)
362 .await
363 .map_err(handle_bollard_error)
364 }
365}