Skip to main content

atlas_local/
docker.rs

1use bollard::{
2    Docker,
3    container::LogOutput,
4    errors::Error,
5    exec::{CreateExecOptions, StartExecOptions, StartExecResults},
6    query_parameters::{
7        CreateContainerOptions, CreateImageOptionsBuilder, InspectContainerOptions,
8        ListContainersOptions, LogsOptions, RemoveContainerOptions, StartContainerOptions,
9        StopContainerOptions,
10    },
11    secret::{
12        ContainerCreateBody, ContainerCreateResponse, ContainerInspectResponse, ContainerSummary,
13    },
14};
15use futures_util::{Stream, StreamExt, TryStreamExt};
16
17pub trait DockerInspectContainer {
18    fn inspect_container(
19        &self,
20        container_id: &str,
21        options: Option<InspectContainerOptions>,
22    ) -> impl Future<Output = Result<ContainerInspectResponse, Error>> + Send;
23}
24
25impl DockerInspectContainer for Docker {
26    async fn inspect_container(
27        &self,
28        container_id: &str,
29        options: Option<InspectContainerOptions>,
30    ) -> Result<ContainerInspectResponse, Error> {
31        self.inspect_container(container_id, options).await
32    }
33}
34
35pub trait DockerListContainers {
36    fn list_containers(
37        &self,
38        options: Option<ListContainersOptions>,
39    ) -> impl Future<Output = Result<Vec<ContainerSummary>, Error>> + Send;
40}
41
42impl DockerListContainers for Docker {
43    async fn list_containers(
44        &self,
45        options: Option<ListContainersOptions>,
46    ) -> Result<Vec<ContainerSummary>, Error> {
47        self.list_containers(options).await
48    }
49}
50
51pub trait DockerPullImage {
52    fn pull_image(&self, image: &str, tag: &str) -> impl Future<Output = Result<(), Error>> + Send;
53}
54
55impl DockerPullImage for Docker {
56    async fn pull_image(&self, image: &str, tag: &str) -> Result<(), Error> {
57        // Build the options for pulling the Atlas Local Docker image
58        let create_image_options = CreateImageOptionsBuilder::default()
59            .from_image(image)
60            .tag(tag)
61            .build();
62
63        // Start pulling the image, which returns a stream of progress events
64        let mut stream = self.create_image(Some(create_image_options), None, None);
65
66        // Iterate over the stream and check for errors
67        while let Some(result) = stream.next().await {
68            result?;
69        }
70
71        Ok(())
72    }
73}
74
75pub trait DockerStopContainer {
76    fn stop_container(
77        &self,
78        container_id: &str,
79        options: Option<StopContainerOptions>,
80    ) -> impl Future<Output = Result<(), Error>> + Send;
81}
82
83impl DockerStopContainer for Docker {
84    async fn stop_container(
85        &self,
86        container_id: &str,
87        options: Option<StopContainerOptions>,
88    ) -> Result<(), Error> {
89        self.stop_container(container_id, options).await
90    }
91}
92
93pub trait DockerRemoveContainer {
94    fn remove_container(
95        &self,
96        container_id: &str,
97        options: Option<RemoveContainerOptions>,
98    ) -> impl Future<Output = Result<(), Error>> + Send;
99}
100
101impl DockerRemoveContainer for Docker {
102    async fn remove_container(
103        &self,
104        container_id: &str,
105        options: Option<RemoveContainerOptions>,
106    ) -> Result<(), Error> {
107        self.remove_container(container_id, options).await
108    }
109}
110
111pub trait DockerCreateContainer {
112    fn create_container(
113        &self,
114        options: Option<CreateContainerOptions>,
115        config: ContainerCreateBody,
116    ) -> impl Future<Output = Result<ContainerCreateResponse, Error>> + Send;
117}
118
119impl DockerCreateContainer for Docker {
120    async fn create_container(
121        &self,
122        options: Option<CreateContainerOptions>,
123        config: ContainerCreateBody,
124    ) -> Result<ContainerCreateResponse, Error> {
125        self.create_container(options, config).await
126    }
127}
128
129pub trait DockerStartContainer {
130    fn start_container(
131        &self,
132        container_id: &str,
133        options: Option<StartContainerOptions>,
134    ) -> impl Future<Output = Result<(), Error>> + Send;
135}
136
137impl DockerStartContainer for Docker {
138    async fn start_container(
139        &self,
140        container_id: &str,
141        options: Option<StartContainerOptions>,
142    ) -> Result<(), Error> {
143        self.start_container(container_id, options).await
144    }
145}
146
147pub trait DockerPauseContainer {
148    fn pause_container(&self, container_id: &str)
149    -> impl Future<Output = Result<(), Error>> + Send;
150}
151
152impl DockerPauseContainer for Docker {
153    async fn pause_container(&self, container_id: &str) -> Result<(), Error> {
154        self.pause_container(container_id).await
155    }
156}
157
158pub trait DockerUnpauseContainer {
159    fn unpause_container(
160        &self,
161        container_id: &str,
162    ) -> impl Future<Output = Result<(), Error>> + Send;
163}
164
165impl DockerUnpauseContainer for Docker {
166    async fn unpause_container(&self, container_id: &str) -> Result<(), Error> {
167        self.unpause_container(container_id).await
168    }
169}
170
171pub trait RunCommandInContainer {
172    fn run_command_in_container(
173        &self,
174        container_id: &str,
175        command: Vec<String>,
176    ) -> impl Future<Output = Result<CommandOutput, RunCommandInContainerError>> + Send;
177}
178
179pub struct CommandOutput {
180    pub stdout: Vec<String>,
181    pub stderr: Vec<String>,
182}
183
184#[derive(Debug, thiserror::Error)]
185pub enum RunCommandInContainerError {
186    #[error("Failed to create exec: {0}")]
187    CreateExec(Error),
188    #[error("Failed to start exec: {0}")]
189    StartExec(Error),
190    #[error("Failed to get output, output was not attached")]
191    GetOutput,
192    #[error("Failed to get output: {0}")]
193    GetOutputError(Error),
194}
195
196impl RunCommandInContainer for Docker {
197    async fn run_command_in_container(
198        &self,
199        container_id: &str,
200        command: Vec<String>,
201    ) -> Result<CommandOutput, RunCommandInContainerError> {
202        let exec = self
203            .create_exec(
204                container_id,
205                CreateExecOptions {
206                    attach_stdout: Some(true),
207                    attach_stderr: Some(true),
208                    cmd: Some(command),
209                    ..Default::default()
210                },
211            )
212            .await
213            .map_err(RunCommandInContainerError::CreateExec)?;
214
215        let exec = self
216            .start_exec(
217                &exec.id,
218                Some(StartExecOptions {
219                    detach: false,
220                    tty: false,
221                    output_capacity: None,
222                }),
223            )
224            .await
225            .map_err(RunCommandInContainerError::StartExec)?;
226
227        let StartExecResults::Attached { mut output, .. } = exec else {
228            return Err(RunCommandInContainerError::GetOutput);
229        };
230
231        let mut stdout = String::new();
232        let mut stderr = String::new();
233
234        while let Some(result) = output.next().await {
235            let log_ouput = result.map_err(RunCommandInContainerError::GetOutputError)?;
236
237            match log_ouput {
238                LogOutput::StdOut { message } => {
239                    stdout.push_str(&String::from_utf8_lossy(message.as_ref()));
240                }
241                LogOutput::StdErr { message } => {
242                    stderr.push_str(&String::from_utf8_lossy(message.as_ref()));
243                }
244                _ => {}
245            }
246        }
247
248        Ok(CommandOutput {
249            stdout: stdout.lines().map(str::to_string).collect(),
250            stderr: stderr.lines().map(str::to_string).collect(),
251        })
252    }
253}
254
255pub trait DockerLogContainer {
256    fn logs<'a>(
257        &'a self,
258        container_id: &'a str,
259        options: Option<LogsOptions>,
260    ) -> impl Stream<Item = Result<LogOutput, String>> + 'a;
261}
262
263impl DockerLogContainer for Docker {
264    fn logs<'a>(
265        &'a self,
266        container_id: &'a str,
267        options: Option<LogsOptions>,
268    ) -> impl Stream<Item = Result<LogOutput, String>> + 'a {
269        self.logs(container_id, options).map_err(|e| e.to_string())
270    }
271}