tsk/context/
docker_client.rs

1use async_trait::async_trait;
2use bollard::Docker;
3use bollard::container::{Config, CreateContainerOptions, LogsOptions, RemoveContainerOptions};
4use bollard::image::{BuildImageOptions, ListImagesOptions};
5use bollard::network::{CreateNetworkOptions, ListNetworksOptions};
6use futures_util::stream::{Stream, StreamExt};
7use std::collections::HashMap;
8
9#[async_trait]
10pub trait DockerClient: Send + Sync {
11    #[cfg(test)]
12    fn as_any(&self) -> &dyn std::any::Any;
13    async fn create_container(
14        &self,
15        options: Option<CreateContainerOptions<String>>,
16        config: Config<String>,
17    ) -> Result<String, String>;
18
19    async fn start_container(&self, id: &str) -> Result<(), String>;
20
21    async fn wait_container(&self, id: &str) -> Result<i64, String>;
22
23    #[allow(dead_code)]
24    async fn logs(&self, id: &str, options: Option<LogsOptions<String>>) -> Result<String, String>;
25
26    #[allow(dead_code)]
27    async fn logs_stream(
28        &self,
29        id: &str,
30        options: Option<LogsOptions<String>>,
31    ) -> Result<Box<dyn futures_util::Stream<Item = Result<String, String>> + Send + Unpin>, String>;
32
33    async fn remove_container(
34        &self,
35        id: &str,
36        options: Option<RemoveContainerOptions>,
37    ) -> Result<(), String>;
38
39    async fn create_network(&self, name: &str) -> Result<String, String>;
40
41    async fn network_exists(&self, name: &str) -> Result<bool, String>;
42
43    /// Build a Docker image from a tar archive containing a Dockerfile and associated files with streaming output
44    ///
45    /// # Arguments
46    /// * `options` - Build options including image tag, build args, and cache settings
47    /// * `tar_archive` - Tar archive containing Dockerfile and any additional files
48    ///
49    /// # Returns
50    /// A stream of build output messages, or an error if the build fails to start
51    async fn build_image(
52        &self,
53        options: BuildImageOptions<String>,
54        tar_archive: Vec<u8>,
55    ) -> Result<Box<dyn futures_util::Stream<Item = Result<String, String>> + Send + Unpin>, String>;
56
57    /// Check if a Docker image exists locally
58    ///
59    /// # Arguments
60    /// * `tag` - The image tag to check (e.g., "tsk/rust/claude/web-api")
61    ///
62    /// # Returns
63    /// True if the image exists, false otherwise
64    async fn image_exists(&self, tag: &str) -> Result<bool, String>;
65}
66
67#[derive(Clone)]
68pub struct DefaultDockerClient {
69    docker: Docker,
70}
71
72impl DefaultDockerClient {
73    pub fn new() -> Self {
74        match Docker::connect_with_local_defaults() {
75            Ok(docker) => Self { docker },
76            Err(e) => panic!(
77                "Failed to connect to Docker: {e}\n\n\
78                Please ensure Docker is installed and running:\n\
79                  - On macOS: Open Docker Desktop application\n\
80                  - On Linux: Run 'sudo systemctl start docker' or 'sudo service docker start'\n\
81                  - Check Docker status with: 'docker ps'\n\n\
82                If Docker is running, check permissions:\n\
83                  - On Linux: Ensure your user is in the docker group: 'sudo usermod -aG docker $USER'\n\
84                    - Then log out and back in for group changes to take effect"
85            ),
86        }
87    }
88}
89
90impl Default for DefaultDockerClient {
91    fn default() -> Self {
92        Self::new()
93    }
94}
95
96#[async_trait]
97impl DockerClient for DefaultDockerClient {
98    #[cfg(test)]
99    fn as_any(&self) -> &dyn std::any::Any {
100        self
101    }
102    async fn create_container(
103        &self,
104        options: Option<CreateContainerOptions<String>>,
105        config: Config<String>,
106    ) -> Result<String, String> {
107        let response = self
108            .docker
109            .create_container(options, config)
110            .await
111            .map_err(|e| format!("Failed to create container: {e}"))?;
112        Ok(response.id)
113    }
114
115    async fn start_container(&self, id: &str) -> Result<(), String> {
116        self.docker
117            .start_container::<String>(id, None)
118            .await
119            .map_err(|e| format!("Failed to start container: {e}"))
120    }
121
122    async fn wait_container(&self, id: &str) -> Result<i64, String> {
123        use futures_util::stream::StreamExt;
124
125        let mut stream = self
126            .docker
127            .wait_container(id, None::<bollard::container::WaitContainerOptions<String>>);
128        if let Some(result) = stream.next().await {
129            match result {
130                Ok(wait_response) => Ok(wait_response.status_code),
131                Err(e) => Err(format!("Failed to wait for container: {e}")),
132            }
133        } else {
134            Err("Container wait stream ended unexpectedly".to_string())
135        }
136    }
137
138    async fn logs(&self, id: &str, options: Option<LogsOptions<String>>) -> Result<String, String> {
139        let mut stream = self.docker.logs(id, options);
140        let mut output = String::new();
141
142        while let Some(result) = stream.next().await {
143            match result {
144                Ok(log) => output.push_str(&log.to_string()),
145                Err(e) => return Err(format!("Failed to get logs: {e}")),
146            }
147        }
148
149        Ok(output)
150    }
151
152    async fn logs_stream(
153        &self,
154        id: &str,
155        options: Option<LogsOptions<String>>,
156    ) -> Result<Box<dyn Stream<Item = Result<String, String>> + Send + Unpin>, String> {
157        let stream = self.docker.logs(id, options);
158        let mapped_stream = stream.map(|result| match result {
159            Ok(log) => Ok(log.to_string()),
160            Err(e) => Err(format!("Failed to get logs: {e}")),
161        });
162        Ok(Box::new(Box::pin(mapped_stream)))
163    }
164
165    async fn remove_container(
166        &self,
167        id: &str,
168        options: Option<RemoveContainerOptions>,
169    ) -> Result<(), String> {
170        self.docker
171            .remove_container(id, options)
172            .await
173            .map_err(|e| format!("Failed to remove container: {e}"))
174    }
175
176    async fn create_network(&self, name: &str) -> Result<String, String> {
177        let options = CreateNetworkOptions {
178            name: name.to_string(),
179            ..Default::default()
180        };
181
182        let response = self
183            .docker
184            .create_network(options)
185            .await
186            .map_err(|e| format!("Failed to create network: {e}"))?;
187
188        Ok(response.id.unwrap_or_else(|| name.to_string()))
189    }
190
191    async fn network_exists(&self, name: &str) -> Result<bool, String> {
192        let mut filters = HashMap::new();
193        filters.insert("name", vec![name]);
194
195        let options = ListNetworksOptions { filters };
196
197        let networks = self
198            .docker
199            .list_networks(Some(options))
200            .await
201            .map_err(|e| format!("Failed to list networks: {e}"))?;
202
203        Ok(!networks.is_empty())
204    }
205
206    async fn build_image(
207        &self,
208        options: BuildImageOptions<String>,
209        tar_archive: Vec<u8>,
210    ) -> Result<Box<dyn futures_util::Stream<Item = Result<String, String>> + Send + Unpin>, String>
211    {
212        use futures_util::StreamExt;
213        use tokio::sync::mpsc;
214
215        let (tx, rx) = mpsc::unbounded_channel();
216        let docker = self.docker.clone();
217
218        // Spawn a task to handle the streaming
219        tokio::spawn(async move {
220            let mut stream = docker.build_image(options, None, Some(tar_archive.into()));
221
222            while let Some(build_info) = stream.next().await {
223                match build_info {
224                    Ok(info) => {
225                        if let Some(error) = info.error {
226                            let _ = tx.send(Err(format!("Docker build error: {error}")));
227                            break;
228                        } else if let Some(stream_msg) = info.stream {
229                            if !stream_msg.is_empty() && tx.send(Ok(stream_msg)).is_err() {
230                                break; // Receiver dropped
231                            }
232                        }
233                    }
234                    Err(e) => {
235                        let _ = tx.send(Err(format!("Failed to build image: {e}")));
236                        break;
237                    }
238                }
239            }
240        });
241
242        // Convert receiver to stream
243        let receiver_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
244        Ok(Box::new(Box::pin(receiver_stream)))
245    }
246
247    async fn image_exists(&self, tag: &str) -> Result<bool, String> {
248        let mut filters = HashMap::new();
249        filters.insert("reference", vec![tag]);
250
251        let options = ListImagesOptions {
252            filters,
253            ..Default::default()
254        };
255
256        let images = self
257            .docker
258            .list_images(Some(options))
259            .await
260            .map_err(|e| format!("Failed to list images: {e}"))?;
261
262        Ok(!images.is_empty())
263    }
264}