harmont_cli/orchestrator/
docker_client.rs1use std::collections::HashMap;
8use std::sync::Arc;
9
10use anyhow::{Context, Result};
11use bollard::Docker;
12use bollard::container::{
13 Config, CreateContainerOptions, RemoveContainerOptions, StartContainerOptions,
14 StopContainerOptions,
15};
16use bollard::exec::{CreateExecOptions, StartExecResults};
17use bollard::image::{
18 CommitContainerOptions, CreateImageOptions, ImportImageOptions, ListImagesOptions,
19 RemoveImageOptions, TagImageOptions,
20};
21use bollard::models::HostConfig;
22use futures_util::StreamExt;
23use tokio::io::AsyncWrite;
24
25use crate::error::HmError;
26
27fn build_host_config(binds: &[String], cap_add: &[String]) -> HostConfig {
31 HostConfig {
32 binds: if binds.is_empty() {
33 None
34 } else {
35 Some(binds.to_vec())
36 },
37 cap_add: if cap_add.is_empty() {
38 None
39 } else {
40 Some(cap_add.to_vec())
41 },
42 ..Default::default()
43 }
44}
45
46#[derive(Debug, Clone)]
47pub struct DockerClient {
48 inner: Arc<Docker>,
49}
50
51impl DockerClient {
52 pub fn connect() -> Result<Self> {
61 let d = Docker::connect_with_local_defaults()
62 .map_err(|e| HmError::Docker(format!("connect: {e}")))?;
63 Ok(Self { inner: Arc::new(d) })
64 }
65
66 pub async fn ping(&self) -> Result<()> {
73 self.inner
74 .ping()
75 .await
76 .map_err(|e| HmError::Docker(format!("ping failed: {e}")))?;
77 Ok(())
78 }
79
80 pub async fn image_exists(&self, tag: &str) -> Result<bool> {
87 let mut filters = HashMap::new();
88 filters.insert("reference".to_string(), vec![tag.to_string()]);
89 let images = self
90 .inner
91 .list_images(Some(ListImagesOptions {
92 filters,
93 ..Default::default()
94 }))
95 .await
96 .map_err(|e| HmError::Docker(format!("list_images: {e}")))?;
97 Ok(!images.is_empty())
98 }
99
100 pub async fn list_images_by_reference(&self, reference: &str) -> Result<Vec<String>> {
108 let mut filters = HashMap::new();
109 filters.insert("reference".to_string(), vec![format!("{reference}:*")]);
110 let images = self
111 .inner
112 .list_images(Some(ListImagesOptions {
113 filters,
114 ..Default::default()
115 }))
116 .await
117 .map_err(|e| HmError::Docker(format!("list_images: {e}")))?;
118 Ok(images.into_iter().flat_map(|img| img.repo_tags).collect())
119 }
120
121 pub async fn pull_image(&self, tag: &str) -> Result<()> {
130 let mut s = self.inner.create_image(
131 Some(CreateImageOptions {
132 from_image: tag,
133 ..Default::default()
134 }),
135 None,
136 None,
137 );
138 while let Some(item) = s.next().await {
139 item.map_err(|e| HmError::Docker(format!("pull {tag}: {e}")))?;
140 }
141 Ok(())
142 }
143
144 pub async fn start_long_lived(
153 &self,
154 image: &str,
155 env: &[String],
156 workdir: &str,
157 name: &str,
158 ) -> Result<String> {
159 self.start_long_lived_with_mounts(image, env, workdir, name, &[])
160 .await
161 }
162
163 pub async fn start_long_lived_with_mounts(
174 &self,
175 image: &str,
176 env: &[String],
177 workdir: &str,
178 name: &str,
179 binds: &[String],
180 ) -> Result<String> {
181 let cfg = Config {
182 image: Some(image.to_string()),
183 cmd: Some(vec!["sh".into(), "-c".into(), "sleep infinity".into()]),
184 env: Some(env.to_vec()),
185 working_dir: Some(workdir.to_string()),
186 host_config: Some(build_host_config(binds, &[])),
187 ..Default::default()
188 };
189 let create = self
190 .inner
191 .create_container(
192 Some(CreateContainerOptions {
193 name,
194 ..Default::default()
195 }),
196 cfg,
197 )
198 .await
199 .map_err(|e| HmError::Docker(format!("create_container: {e}")))?;
200 self.inner
201 .start_container(&create.id, None::<StartContainerOptions<String>>)
202 .await
203 .map_err(|e| HmError::Docker(format!("start_container: {e}")))?;
204 Ok(create.id)
205 }
206
207 pub async fn exec_streaming(
216 &self,
217 container_id: &str,
218 cmd: &[String],
219 env: &[String],
220 workdir: &str,
221 out: &mut (impl AsyncWrite + Send + Unpin),
222 ) -> Result<i64> {
223 use bollard::container::LogOutput;
224 use tokio::io::AsyncWriteExt;
225
226 let exec = self
227 .inner
228 .create_exec(
229 container_id,
230 CreateExecOptions {
231 cmd: Some(cmd.iter().map(std::string::String::as_str).collect()),
232 env: Some(env.iter().map(std::string::String::as_str).collect()),
233 working_dir: Some(workdir),
234 attach_stdout: Some(true),
235 attach_stderr: Some(true),
236 ..Default::default()
237 },
238 )
239 .await
240 .map_err(|e| HmError::Docker(format!("create_exec: {e}")))?;
241 match self
242 .inner
243 .start_exec(&exec.id, None)
244 .await
245 .map_err(|e| HmError::Docker(format!("start_exec: {e}")))?
246 {
247 StartExecResults::Attached { mut output, .. } => {
248 while let Some(item) = output.next().await {
249 let chunk = item.map_err(|e| HmError::Docker(format!("exec stream: {e}")))?;
250 let (LogOutput::StdOut { message: bytes }
251 | LogOutput::StdErr { message: bytes }
252 | LogOutput::Console { message: bytes }) = chunk
253 else {
254 continue;
256 };
257 out.write_all(&bytes).await.context("write exec output")?;
258 }
259 }
260 StartExecResults::Detached => {}
261 }
262 let inspect = self
263 .inner
264 .inspect_exec(&exec.id)
265 .await
266 .map_err(|e| HmError::Docker(format!("inspect_exec: {e}")))?;
267 Ok(inspect.exit_code.unwrap_or(0))
268 }
269
270 pub async fn exec_streaming_stdin(
281 &self,
282 container_id: &str,
283 cmd: &[String],
284 env: &[String],
285 workdir: &str,
286 stdin_bytes: &[u8],
287 out: &mut (impl AsyncWrite + Send + Unpin),
288 ) -> Result<i64> {
289 use bollard::container::LogOutput;
290 use tokio::io::AsyncWriteExt;
291
292 let exec = self
293 .inner
294 .create_exec(
295 container_id,
296 CreateExecOptions {
297 cmd: Some(cmd.iter().map(std::string::String::as_str).collect()),
298 env: Some(env.iter().map(std::string::String::as_str).collect()),
299 working_dir: Some(workdir),
300 attach_stdin: Some(true),
301 attach_stdout: Some(true),
302 attach_stderr: Some(true),
303 ..Default::default()
304 },
305 )
306 .await
307 .map_err(|e| HmError::Docker(format!("create_exec: {e}")))?;
308 match self
309 .inner
310 .start_exec(&exec.id, None)
311 .await
312 .map_err(|e| HmError::Docker(format!("start_exec: {e}")))?
313 {
314 StartExecResults::Attached {
315 mut output,
316 mut input,
317 } => {
318 input
319 .write_all(stdin_bytes)
320 .await
321 .context("write exec stdin")?;
322 input.shutdown().await.context("close exec stdin")?;
323 drop(input);
325 while let Some(item) = output.next().await {
326 let chunk = item.map_err(|e| HmError::Docker(format!("exec stream: {e}")))?;
327 let (LogOutput::StdOut { message: bytes }
328 | LogOutput::StdErr { message: bytes }
329 | LogOutput::Console { message: bytes }) = chunk
330 else {
331 continue;
333 };
334 out.write_all(&bytes).await.context("write exec output")?;
335 }
336 }
337 StartExecResults::Detached => {}
338 }
339 let inspect = self
340 .inner
341 .inspect_exec(&exec.id)
342 .await
343 .map_err(|e| HmError::Docker(format!("inspect_exec: {e}")))?;
344 Ok(inspect.exit_code.unwrap_or(0))
345 }
346
347 pub async fn commit_container(&self, container_id: &str, tag: &str) -> Result<String> {
367 let parts: Vec<&str> = tag.splitn(2, ':').collect();
368 let (repo, ver) = match parts.as_slice() {
369 [r, v] => (*r, *v),
370 [r] => (*r, "latest"),
371 _ => unreachable!("splitn(2) yields one or two parts for non-empty input"),
372 };
373 let opts = CommitContainerOptions {
374 container: container_id,
375 repo,
376 tag: ver,
377 ..Default::default()
378 };
379 self.inner
380 .commit_container(opts, Config::<String>::default())
381 .await
382 .map_err(|e| HmError::Docker(format!("commit_container: {e}")))?;
383 Ok(tag.to_string())
384 }
385
386 pub async fn remove_image(&self, image: &str) -> Result<()> {
397 self.inner
398 .remove_image(
399 image,
400 Some(RemoveImageOptions {
401 force: true,
402 noprune: false,
403 }),
404 None,
405 )
406 .await
407 .map_err(|e| HmError::Docker(format!("remove_image '{image}': {e}")))?;
408 Ok(())
409 }
410
411 pub async fn tag_image(&self, source: &str, new_tag: &str) -> Result<()> {
422 let parts: Vec<&str> = new_tag.splitn(2, ':').collect();
423 let (repo, tag) = match parts.as_slice() {
424 [r, v] => (*r, *v),
425 [r] => (*r, "latest"),
426 _ => unreachable!("splitn(2) yields one or two parts for non-empty input"),
427 };
428 self.inner
429 .tag_image(source, Some(TagImageOptions { repo, tag }))
430 .await
431 .map_err(|e| HmError::Docker(format!("tag_image '{source}' -> '{new_tag}': {e}")))?;
432 Ok(())
433 }
434
435 pub async fn export_image(&self, image: &str, dest: &std::path::Path) -> Result<()> {
445 use tokio::io::AsyncWriteExt;
446
447 let mut stream = self.inner.export_image(image);
448 let file = tokio::fs::File::create(dest)
449 .await
450 .with_context(|| format!("create export file '{}'", dest.display()))?;
451 let mut writer = tokio::io::BufWriter::new(file);
452 while let Some(chunk) = stream.next().await {
453 let bytes =
454 chunk.map_err(|e| HmError::Docker(format!("export_image '{image}': {e}")))?;
455 writer
456 .write_all(&bytes)
457 .await
458 .with_context(|| format!("write export data to '{}'", dest.display()))?;
459 }
460 writer
461 .flush()
462 .await
463 .with_context(|| format!("flush export file '{}'", dest.display()))?;
464 Ok(())
465 }
466
467 pub async fn import_image(&self, src: &std::path::Path) -> Result<()> {
477 let body = tokio::fs::read(src)
478 .await
479 .with_context(|| format!("read import file '{}'", src.display()))?;
480 let mut stream =
481 self.inner
482 .import_image(ImportImageOptions { quiet: true }, body.into(), None);
483 while let Some(item) = stream.next().await {
484 item.map_err(|e| HmError::Docker(format!("import_image '{}': {e}", src.display())))?;
485 }
486 Ok(())
487 }
488
489 pub async fn list_images_by_prefix(&self, prefix: &str) -> Result<Vec<String>> {
500 let mut filters = HashMap::new();
501 filters.insert("reference".to_string(), vec![format!("{prefix}*")]);
502 let images = self
503 .inner
504 .list_images(Some(ListImagesOptions {
505 filters,
506 ..Default::default()
507 }))
508 .await
509 .map_err(|e| HmError::Docker(format!("list_images: {e}")))?;
510 let mut tags: Vec<String> = images
511 .iter()
512 .flat_map(|img| &img.repo_tags)
513 .filter(|tag| tag.starts_with(prefix))
514 .cloned()
515 .collect();
516 tags.sort();
517 Ok(tags)
518 }
519
520 pub async fn stop_remove(&self, container_id: &str) {
521 let _ = self
522 .inner
523 .stop_container(container_id, Some(StopContainerOptions { t: 0 }))
524 .await;
525 let _ = self
526 .inner
527 .remove_container(
528 container_id,
529 Some(RemoveContainerOptions {
530 force: true,
531 v: true,
532 ..Default::default()
533 }),
534 )
535 .await;
536 }
537
538 #[doc(hidden)]
546 #[must_use]
547 pub fn inner_for_logs(&self) -> &bollard::Docker {
548 &self.inner
549 }
550
551 pub async fn list_containers_by_label(
557 &self,
558 k: &str,
559 v: &str,
560 ) -> Result<Vec<bollard::secret::ContainerSummary>> {
561 use bollard::container::ListContainersOptions;
562 use std::collections::HashMap;
563 let mut filters: HashMap<String, Vec<String>> = HashMap::new();
564 filters.insert("label".to_string(), vec![format!("{k}={v}")]);
565 let out = self
566 .inner
567 .list_containers(Some(ListContainersOptions {
568 all: true,
569 filters,
570 ..Default::default()
571 }))
572 .await
573 .map_err(|e| HmError::Docker(format!("list_containers: {e}")))?;
574 Ok(out)
575 }
576}
577
578#[cfg(test)]
579#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
580mod smoke {
581 use super::*;
582
583 #[tokio::test]
584 #[ignore = "requires a running Docker daemon; opt in with `cargo test -- --ignored`"]
585 async fn docker_ping() {
586 let c = DockerClient::connect().unwrap();
587 c.ping().await.unwrap();
588 }
589
590 #[tokio::test]
591 #[ignore = "requires a running Docker daemon; opt in with `cargo test -- --ignored`"]
592 async fn list_images_by_reference_returns_empty_for_nonexistent() {
593 let c = DockerClient::connect().unwrap();
594 let tags = c
595 .list_images_by_reference("harmont-test-nonexistent")
596 .await
597 .unwrap();
598 assert!(tags.is_empty());
599 }
600
601 #[test]
602 fn build_host_config_with_binds_and_no_caps() {
603 let hc = super::build_host_config(&["/host/path:/container/path".to_string()], &[]);
604 assert_eq!(
605 hc.binds.as_ref().unwrap(),
606 &["/host/path:/container/path".to_string()]
607 );
608 assert!(hc.cap_add.is_none());
609 }
610
611 #[test]
612 fn build_host_config_empty_binds_is_none() {
613 let hc = super::build_host_config(&[], &[]);
614 assert!(hc.binds.is_none());
615 assert!(hc.cap_add.is_none());
616 }
617}