1use async_trait::async_trait;
2use bollard::Docker;
3use bollard::container::{Config, CreateContainerOptions, LogsOptions, RemoveContainerOptions};
4use bollard::image::{BuildImageOptions, ListImagesOptions};
5use bollard::network::{CreateNetworkOptions, ListNetworksOptions};
6use futures_util::stream::{Stream, StreamExt};
7use std::collections::HashMap;
8
9#[async_trait]
10pub trait DockerClient: Send + Sync {
11 #[cfg(test)]
12 fn as_any(&self) -> &dyn std::any::Any;
13 async fn create_container(
14 &self,
15 options: Option<CreateContainerOptions<String>>,
16 config: Config<String>,
17 ) -> Result<String, String>;
18
19 async fn start_container(&self, id: &str) -> Result<(), String>;
20
21 async fn wait_container(&self, id: &str) -> Result<i64, String>;
22
23 #[allow(dead_code)]
24 async fn logs(&self, id: &str, options: Option<LogsOptions<String>>) -> Result<String, String>;
25
26 #[allow(dead_code)]
27 async fn logs_stream(
28 &self,
29 id: &str,
30 options: Option<LogsOptions<String>>,
31 ) -> Result<Box<dyn futures_util::Stream<Item = Result<String, String>> + Send + Unpin>, String>;
32
33 async fn remove_container(
34 &self,
35 id: &str,
36 options: Option<RemoveContainerOptions>,
37 ) -> Result<(), String>;
38
39 async fn create_network(&self, name: &str) -> Result<String, String>;
40
41 async fn network_exists(&self, name: &str) -> Result<bool, String>;
42
43 async fn build_image(
52 &self,
53 options: BuildImageOptions<String>,
54 tar_archive: Vec<u8>,
55 ) -> Result<Box<dyn futures_util::Stream<Item = Result<String, String>> + Send + Unpin>, String>;
56
57 async fn image_exists(&self, tag: &str) -> Result<bool, String>;
65}
66
67#[derive(Clone)]
68pub struct DefaultDockerClient {
69 docker: Docker,
70}
71
72impl DefaultDockerClient {
73 pub fn new() -> Self {
74 match Docker::connect_with_local_defaults() {
75 Ok(docker) => Self { docker },
76 Err(e) => panic!(
77 "Failed to connect to Docker: {e}\n\n\
78 Please ensure Docker is installed and running:\n\
79 - On macOS: Open Docker Desktop application\n\
80 - On Linux: Run 'sudo systemctl start docker' or 'sudo service docker start'\n\
81 - Check Docker status with: 'docker ps'\n\n\
82 If Docker is running, check permissions:\n\
83 - On Linux: Ensure your user is in the docker group: 'sudo usermod -aG docker $USER'\n\
84 - Then log out and back in for group changes to take effect"
85 ),
86 }
87 }
88}
89
90impl Default for DefaultDockerClient {
91 fn default() -> Self {
92 Self::new()
93 }
94}
95
96#[async_trait]
97impl DockerClient for DefaultDockerClient {
98 #[cfg(test)]
99 fn as_any(&self) -> &dyn std::any::Any {
100 self
101 }
102 async fn create_container(
103 &self,
104 options: Option<CreateContainerOptions<String>>,
105 config: Config<String>,
106 ) -> Result<String, String> {
107 let response = self
108 .docker
109 .create_container(options, config)
110 .await
111 .map_err(|e| format!("Failed to create container: {e}"))?;
112 Ok(response.id)
113 }
114
115 async fn start_container(&self, id: &str) -> Result<(), String> {
116 self.docker
117 .start_container::<String>(id, None)
118 .await
119 .map_err(|e| format!("Failed to start container: {e}"))
120 }
121
122 async fn wait_container(&self, id: &str) -> Result<i64, String> {
123 use futures_util::stream::StreamExt;
124
125 let mut stream = self
126 .docker
127 .wait_container(id, None::<bollard::container::WaitContainerOptions<String>>);
128 if let Some(result) = stream.next().await {
129 match result {
130 Ok(wait_response) => Ok(wait_response.status_code),
131 Err(e) => Err(format!("Failed to wait for container: {e}")),
132 }
133 } else {
134 Err("Container wait stream ended unexpectedly".to_string())
135 }
136 }
137
138 async fn logs(&self, id: &str, options: Option<LogsOptions<String>>) -> Result<String, String> {
139 let mut stream = self.docker.logs(id, options);
140 let mut output = String::new();
141
142 while let Some(result) = stream.next().await {
143 match result {
144 Ok(log) => output.push_str(&log.to_string()),
145 Err(e) => return Err(format!("Failed to get logs: {e}")),
146 }
147 }
148
149 Ok(output)
150 }
151
152 async fn logs_stream(
153 &self,
154 id: &str,
155 options: Option<LogsOptions<String>>,
156 ) -> Result<Box<dyn Stream<Item = Result<String, String>> + Send + Unpin>, String> {
157 let stream = self.docker.logs(id, options);
158 let mapped_stream = stream.map(|result| match result {
159 Ok(log) => Ok(log.to_string()),
160 Err(e) => Err(format!("Failed to get logs: {e}")),
161 });
162 Ok(Box::new(Box::pin(mapped_stream)))
163 }
164
165 async fn remove_container(
166 &self,
167 id: &str,
168 options: Option<RemoveContainerOptions>,
169 ) -> Result<(), String> {
170 self.docker
171 .remove_container(id, options)
172 .await
173 .map_err(|e| format!("Failed to remove container: {e}"))
174 }
175
176 async fn create_network(&self, name: &str) -> Result<String, String> {
177 let options = CreateNetworkOptions {
178 name: name.to_string(),
179 ..Default::default()
180 };
181
182 let response = self
183 .docker
184 .create_network(options)
185 .await
186 .map_err(|e| format!("Failed to create network: {e}"))?;
187
188 Ok(response.id.unwrap_or_else(|| name.to_string()))
189 }
190
191 async fn network_exists(&self, name: &str) -> Result<bool, String> {
192 let mut filters = HashMap::new();
193 filters.insert("name", vec![name]);
194
195 let options = ListNetworksOptions { filters };
196
197 let networks = self
198 .docker
199 .list_networks(Some(options))
200 .await
201 .map_err(|e| format!("Failed to list networks: {e}"))?;
202
203 Ok(!networks.is_empty())
204 }
205
206 async fn build_image(
207 &self,
208 options: BuildImageOptions<String>,
209 tar_archive: Vec<u8>,
210 ) -> Result<Box<dyn futures_util::Stream<Item = Result<String, String>> + Send + Unpin>, String>
211 {
212 use futures_util::StreamExt;
213 use tokio::sync::mpsc;
214
215 let (tx, rx) = mpsc::unbounded_channel();
216 let docker = self.docker.clone();
217
218 tokio::spawn(async move {
220 let mut stream = docker.build_image(options, None, Some(tar_archive.into()));
221
222 while let Some(build_info) = stream.next().await {
223 match build_info {
224 Ok(info) => {
225 if let Some(error) = info.error {
226 let _ = tx.send(Err(format!("Docker build error: {error}")));
227 break;
228 } else if let Some(stream_msg) = info.stream {
229 if !stream_msg.is_empty() && tx.send(Ok(stream_msg)).is_err() {
230 break; }
232 }
233 }
234 Err(e) => {
235 let _ = tx.send(Err(format!("Failed to build image: {e}")));
236 break;
237 }
238 }
239 }
240 });
241
242 let receiver_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
244 Ok(Box::new(Box::pin(receiver_stream)))
245 }
246
247 async fn image_exists(&self, tag: &str) -> Result<bool, String> {
248 let mut filters = HashMap::new();
249 filters.insert("reference", vec![tag]);
250
251 let options = ListImagesOptions {
252 filters,
253 ..Default::default()
254 };
255
256 let images = self
257 .docker
258 .list_images(Some(options))
259 .await
260 .map_err(|e| format!("Failed to list images: {e}"))?;
261
262 Ok(!images.is_empty())
263 }
264}