harmont_cli/orchestrator/
docker_client.rs1use std::collections::HashMap;
8use std::sync::Arc;
9
10use anyhow::{Context, Result};
11use bollard::Docker;
12use bollard::container::{
13 Config, CreateContainerOptions, RemoveContainerOptions, StartContainerOptions,
14 StopContainerOptions,
15};
16use bollard::exec::{CreateExecOptions, StartExecResults};
17use bollard::image::{
18 CommitContainerOptions, CreateImageOptions, ListImagesOptions, RemoveImageOptions,
19};
20use futures_util::StreamExt;
21use tokio::io::AsyncWrite;
22
23use crate::error::HmError;
24
25#[derive(Debug, Clone)]
26pub struct DockerClient {
27 inner: Arc<Docker>,
28}
29
30impl DockerClient {
31 pub fn connect() -> Result<Self> {
40 let d = Docker::connect_with_local_defaults()
41 .map_err(|e| HmError::Docker(format!("connect: {e}")))?;
42 Ok(Self { inner: Arc::new(d) })
43 }
44
45 pub async fn ping(&self) -> Result<()> {
52 self.inner
53 .ping()
54 .await
55 .map_err(|e| HmError::Docker(format!("ping failed: {e}")))?;
56 Ok(())
57 }
58
59 pub async fn image_exists(&self, tag: &str) -> Result<bool> {
66 let mut filters = HashMap::new();
67 filters.insert("reference".to_string(), vec![tag.to_string()]);
68 let images = self
69 .inner
70 .list_images(Some(ListImagesOptions {
71 filters,
72 ..Default::default()
73 }))
74 .await
75 .map_err(|e| HmError::Docker(format!("list_images: {e}")))?;
76 Ok(!images.is_empty())
77 }
78
79 pub async fn pull_image(&self, tag: &str) -> Result<()> {
88 let mut s = self.inner.create_image(
89 Some(CreateImageOptions {
90 from_image: tag,
91 ..Default::default()
92 }),
93 None,
94 None,
95 );
96 while let Some(item) = s.next().await {
97 item.map_err(|e| HmError::Docker(format!("pull {tag}: {e}")))?;
98 }
99 Ok(())
100 }
101
102 pub async fn start_long_lived(
111 &self,
112 image: &str,
113 env: &[String],
114 workdir: &str,
115 name: &str,
116 ) -> Result<String> {
117 let cfg = Config {
118 image: Some(image.to_string()),
119 cmd: Some(vec!["sh".into(), "-c".into(), "sleep infinity".into()]),
120 env: Some(env.to_vec()),
121 working_dir: Some(workdir.to_string()),
122 ..Default::default()
123 };
124 let create = self
125 .inner
126 .create_container(
127 Some(CreateContainerOptions {
128 name,
129 ..Default::default()
130 }),
131 cfg,
132 )
133 .await
134 .map_err(|e| HmError::Docker(format!("create_container: {e}")))?;
135 self.inner
136 .start_container(&create.id, None::<StartContainerOptions<String>>)
137 .await
138 .map_err(|e| HmError::Docker(format!("start_container: {e}")))?;
139 Ok(create.id)
140 }
141
142 pub async fn exec_streaming(
151 &self,
152 container_id: &str,
153 cmd: &[String],
154 env: &[String],
155 workdir: &str,
156 out: &mut (impl AsyncWrite + Send + Unpin),
157 ) -> Result<i64> {
158 use bollard::container::LogOutput;
159 use tokio::io::AsyncWriteExt;
160
161 let exec = self
162 .inner
163 .create_exec(
164 container_id,
165 CreateExecOptions {
166 cmd: Some(cmd.iter().map(std::string::String::as_str).collect()),
167 env: Some(env.iter().map(std::string::String::as_str).collect()),
168 working_dir: Some(workdir),
169 attach_stdout: Some(true),
170 attach_stderr: Some(true),
171 ..Default::default()
172 },
173 )
174 .await
175 .map_err(|e| HmError::Docker(format!("create_exec: {e}")))?;
176 match self
177 .inner
178 .start_exec(&exec.id, None)
179 .await
180 .map_err(|e| HmError::Docker(format!("start_exec: {e}")))?
181 {
182 StartExecResults::Attached { mut output, .. } => {
183 while let Some(item) = output.next().await {
184 let chunk = item.map_err(|e| HmError::Docker(format!("exec stream: {e}")))?;
185 let (LogOutput::StdOut { message: bytes }
186 | LogOutput::StdErr { message: bytes }
187 | LogOutput::Console { message: bytes }) = chunk
188 else {
189 continue;
191 };
192 out.write_all(&bytes).await.context("write exec output")?;
193 }
194 }
195 StartExecResults::Detached => {}
196 }
197 let inspect = self
198 .inner
199 .inspect_exec(&exec.id)
200 .await
201 .map_err(|e| HmError::Docker(format!("inspect_exec: {e}")))?;
202 Ok(inspect.exit_code.unwrap_or(0))
203 }
204
205 pub async fn exec_streaming_stdin(
216 &self,
217 container_id: &str,
218 cmd: &[String],
219 env: &[String],
220 workdir: &str,
221 stdin_bytes: &[u8],
222 out: &mut (impl AsyncWrite + Send + Unpin),
223 ) -> Result<i64> {
224 use bollard::container::LogOutput;
225 use tokio::io::AsyncWriteExt;
226
227 let exec = self
228 .inner
229 .create_exec(
230 container_id,
231 CreateExecOptions {
232 cmd: Some(cmd.iter().map(std::string::String::as_str).collect()),
233 env: Some(env.iter().map(std::string::String::as_str).collect()),
234 working_dir: Some(workdir),
235 attach_stdin: Some(true),
236 attach_stdout: Some(true),
237 attach_stderr: Some(true),
238 ..Default::default()
239 },
240 )
241 .await
242 .map_err(|e| HmError::Docker(format!("create_exec: {e}")))?;
243 match self
244 .inner
245 .start_exec(&exec.id, None)
246 .await
247 .map_err(|e| HmError::Docker(format!("start_exec: {e}")))?
248 {
249 StartExecResults::Attached {
250 mut output,
251 mut input,
252 } => {
253 input
254 .write_all(stdin_bytes)
255 .await
256 .context("write exec stdin")?;
257 input.shutdown().await.context("close exec stdin")?;
258 drop(input);
260 while let Some(item) = output.next().await {
261 let chunk = item.map_err(|e| HmError::Docker(format!("exec stream: {e}")))?;
262 let (LogOutput::StdOut { message: bytes }
263 | LogOutput::StdErr { message: bytes }
264 | LogOutput::Console { message: bytes }) = chunk
265 else {
266 continue;
268 };
269 out.write_all(&bytes).await.context("write exec output")?;
270 }
271 }
272 StartExecResults::Detached => {}
273 }
274 let inspect = self
275 .inner
276 .inspect_exec(&exec.id)
277 .await
278 .map_err(|e| HmError::Docker(format!("inspect_exec: {e}")))?;
279 Ok(inspect.exit_code.unwrap_or(0))
280 }
281
282 pub async fn commit_container(&self, container_id: &str, tag: &str) -> Result<String> {
302 let parts: Vec<&str> = tag.splitn(2, ':').collect();
303 let (repo, ver) = match parts.as_slice() {
304 [r, v] => (*r, *v),
305 [r] => (*r, "latest"),
306 _ => unreachable!("splitn(2) yields one or two parts for non-empty input"),
307 };
308 let opts = CommitContainerOptions {
309 container: container_id,
310 repo,
311 tag: ver,
312 ..Default::default()
313 };
314 self.inner
315 .commit_container(opts, Config::<String>::default())
316 .await
317 .map_err(|e| HmError::Docker(format!("commit_container: {e}")))?;
318 Ok(tag.to_string())
319 }
320
321 pub async fn remove_image(&self, image: &str) -> Result<()> {
332 self.inner
333 .remove_image(
334 image,
335 Some(RemoveImageOptions {
336 force: true,
337 noprune: false,
338 }),
339 None,
340 )
341 .await
342 .map_err(|e| HmError::Docker(format!("remove_image '{image}': {e}")))?;
343 Ok(())
344 }
345
346 pub async fn stop_remove(&self, container_id: &str) {
347 let _ = self
348 .inner
349 .stop_container(container_id, Some(StopContainerOptions { t: 0 }))
350 .await;
351 let _ = self
352 .inner
353 .remove_container(
354 container_id,
355 Some(RemoveContainerOptions {
356 force: true,
357 v: true,
358 ..Default::default()
359 }),
360 )
361 .await;
362 }
363}
364
365#[cfg(test)]
366#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
367mod smoke {
368 use super::*;
369
370 #[tokio::test]
371 #[ignore = "requires a running Docker daemon; opt in with `cargo test -- --ignored`"]
372 async fn docker_ping() {
373 let c = DockerClient::connect().unwrap();
374 c.ping().await.unwrap();
375 }
376}