op_composer/
lib.rs

1#![doc = include_str!("../README.md")]
2#![warn(
3    missing_debug_implementations,
4    missing_docs,
5    unreachable_pub,
6    rustdoc::all
7)]
8#![deny(unused_must_use, rust_2018_idioms)]
9#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
10
11use std::{collections::HashMap, fmt::Debug};
12
13use bollard::{
14    container::{
15        CreateContainerOptions, ListContainersOptions, LogOutput, RemoveContainerOptions,
16        StartContainerOptions, StopContainerOptions,
17    },
18    exec::{CreateExecOptions, StartExecResults},
19    image::BuildImageOptions,
20    service::{ContainerCreateResponse, ContainerSummary, Volume},
21    Docker,
22};
23use eyre::{bail, Result};
24use futures_util::{StreamExt, TryStreamExt};
25use serde::Serialize;
26
27pub use bollard::container::Config;
28pub use bollard::image::CreateImageOptions;
29pub use bollard::service::HostConfig;
30pub use bollard::volume::CreateVolumeOptions;
31pub use utils::bind_host_port;
32
33/// Utilities for Docker operations
34mod utils;
35
36/// The Composer is responsible for managing the OP-UP docker containers.
37#[derive(Debug)]
38pub struct Composer {
39    /// The Docker daemon client.
40    pub daemon: Docker,
41}
42
43impl Composer {
44    /// Create a new instance of the Composer.
45    pub fn new() -> Result<Self> {
46        let daemon = Docker::connect_with_local_defaults()?;
47
48        tracing::debug!(target: "composer", "Successfully connected to Docker daemon");
49        Ok(Self { daemon })
50    }
51
52    /// List all the OP-UP docker containers existing on the host.
53    ///
54    /// The containers are filtered by the label `com.docker.compose.project=op-up`.
55    ///
56    /// This method allows optional filtering by container status:
57    /// `created`|`restarting`|`running`|`removing`|`paused`|`exited`|`dead`
58    pub async fn list_containers(&self, status: Option<&str>) -> Result<Vec<ContainerSummary>> {
59        let mut filters = HashMap::new();
60        filters.insert("label", vec!["com.docker.compose.project=op-up"]);
61
62        if let Some(status) = status {
63            filters.insert("status", vec![status]);
64        }
65
66        let list_options = ListContainersOptions {
67            all: true,
68            filters,
69            ..Default::default()
70        };
71
72        self.daemon
73            .list_containers(Some(list_options))
74            .await
75            .map_err(Into::into)
76    }
77
78    /// Create a Docker image from the specified options.
79    ///
80    /// Returns the ID of the created image.
81    pub async fn create_image<T>(&self, opts: CreateImageOptions<'_, T>) -> Result<String>
82    where
83        T: Into<String> + Serialize + Clone + Debug,
84    {
85        let res = self
86            .daemon
87            .create_image(Some(opts), None, None)
88            .map(|res| {
89                res.map(|info| {
90                    tracing::trace!(target: "composer", "image progress: {:?}", info);
91                    info
92                })
93            })
94            .try_collect::<Vec<_>>()
95            .await?;
96
97        tracing::debug!(target: "composer", "Created docker image: {:?}", res);
98
99        match res.first() {
100            Some(info) => match info.id.as_ref() {
101                Some(id) => Ok(id.clone()),
102                None => bail!("No image ID found in response"),
103            },
104            None => bail!("No image info found in response"),
105        }
106    }
107
108    /// Build a Docker image from the specified Dockerfile and build context files.
109    pub async fn build_image(
110        &self,
111        name: &str,
112        dockerfile: &str,
113        build_context_files: &[(&str, &[u8])],
114    ) -> Result<()> {
115        let build_options = BuildImageOptions {
116            t: name,
117            dockerfile: "Dockerfile",
118            pull: true,
119            ..Default::default()
120        };
121
122        let files = utils::create_dockerfile_build_context(dockerfile, build_context_files)?;
123        let mut image_build_stream =
124            self.daemon
125                .build_image(build_options, None, Some(files.into()));
126
127        while let Some(build_info) = image_build_stream.next().await {
128            let res = build_info?;
129            tracing::debug!(target: "composer", "Build info: {:?}", res);
130        }
131
132        Ok(())
133    }
134
135    /// Creates a Docker volume with the specified options.
136    pub async fn create_volume<T>(&self, config: CreateVolumeOptions<T>) -> Result<Volume>
137    where
138        T: Into<String> + Serialize + Eq + std::hash::Hash,
139    {
140        self.daemon.create_volume(config).await.map_err(Into::into)
141    }
142
143    /// Create a Docker container for the specified OP Stack component
144    pub async fn create_container(
145        &self,
146        name: &str,
147        mut config: Config<String>,
148        overwrite: bool,
149    ) -> Result<ContainerCreateResponse> {
150        let create_options = CreateContainerOptions {
151            name,
152            platform: None,
153        };
154
155        let labels = config.labels.get_or_insert_with(HashMap::new);
156        labels.insert(
157            "com.docker.compose.project".to_string(),
158            "op-up".to_string(),
159        );
160
161        // Check if a container already exists with the specified name. If it does:
162        // - If overwrite is true, remove the existing container and create a new one.
163        // - If overwrite is false, return the existing container ID.
164        let containers = self.list_containers(None).await?;
165        if let Some(container) = containers.iter().find(|container| {
166            container
167                .names
168                .as_ref()
169                .map(|names| {
170                    names
171                        .iter()
172                        .any(|n| n == name || n == &format!("/{}", name))
173                })
174                .unwrap_or(false)
175        }) {
176            tracing::debug!(target: "composer", "Container {} already exists", name);
177            let id = container
178                .id
179                .clone()
180                .ok_or_else(|| eyre::eyre!("No container ID found"))?;
181
182            if overwrite {
183                self.daemon
184                    .remove_container(&id, None::<RemoveContainerOptions>)
185                    .await?;
186                tracing::debug!(target: "composer", "Removed existing docker container {}", name);
187            } else {
188                return Ok(ContainerCreateResponse {
189                    id,
190                    warnings: vec![],
191                });
192            }
193        }
194
195        let res = self
196            .daemon
197            .create_container(Some(create_options), config)
198            .await?;
199
200        tracing::debug!(target: "composer", "Created docker container {} with ID: {}", name, res.id);
201
202        Ok(res)
203    }
204
205    /// Start the specified OP Stack component container by ID.
206    pub async fn start_container(&self, id: &str) -> Result<()> {
207        self.daemon
208            .start_container(id, None::<StartContainerOptions<&str>>)
209            .await?;
210
211        tracing::debug!(target: "composer", "Started docker container with ID: {}", id);
212        Ok(())
213    }
214
215    /// Stop the specified OP Stack component container by ID.
216    pub async fn stop_container(&self, id: &str) -> Result<()> {
217        self.daemon
218            .stop_container(id, None::<StopContainerOptions>)
219            .await?;
220
221        tracing::debug!(target: "composer", "Stopped docker container with ID: {}", id);
222        Ok(())
223    }
224
225    /// Remove the specified OP Stack component container by ID.
226    pub async fn remove_container(&self, id: &str) -> Result<()> {
227        self.daemon
228            .remove_container(id, None::<RemoveContainerOptions>)
229            .await?;
230
231        tracing::debug!(target: "composer", "Removed docker container with ID: {}", id);
232        Ok(())
233    }
234
235    /// Stop all OP-UP docker containers at once.
236    pub async fn stop_all_containers(&self) -> Result<()> {
237        let running_containers = self.list_containers(Some("running")).await?;
238
239        let ids = running_containers
240            .iter()
241            .filter_map(|container| container.id.as_ref())
242            .map(|id| id.as_str())
243            .collect::<Vec<_>>();
244
245        tracing::info!(target: "composer", "Stopping docker containers: {:?}", ids);
246
247        for id in ids {
248            self.daemon
249                .stop_container(id, None::<StopContainerOptions>)
250                .await?;
251
252            tracing::debug!(target: "composer", "Successfully stopped docker container: {}", id);
253        }
254
255        Ok(())
256    }
257
258    /// Remove all OP-UP docker containers at once
259    pub async fn purge_all_containers(&self) -> Result<()> {
260        let containers = self.list_containers(None).await?;
261
262        let ids = containers
263            .iter()
264            .filter_map(|container| container.id.as_ref())
265            .map(|id| id.as_str())
266            .collect::<Vec<_>>();
267
268        for id in ids {
269            self.daemon
270                .remove_container(id, None::<RemoveContainerOptions>)
271                .await?;
272
273            tracing::debug!(target: "composer", "Successfully removed docker container: {}", id);
274        }
275
276        Ok(())
277    }
278
279    /// Execute a command on a running container by its ID and return the output.
280    pub async fn remote_exec(&self, id: &str, cmd: Vec<&str>) -> Result<Vec<LogOutput>> {
281        let exec_options = CreateExecOptions {
282            attach_stdout: Some(true),
283            attach_stderr: Some(true),
284            cmd: Some(cmd),
285            ..Default::default()
286        };
287
288        let exec = self.daemon.create_exec(id, exec_options).await?;
289
290        match self.daemon.start_exec(&exec.id, None).await? {
291            StartExecResults::Attached { output, .. } => Ok(output
292                .filter_map(|res| async {
293                    match res {
294                        Ok(output) => Some(output),
295                        Err(e) => {
296                            tracing::error!(target: "composer", "Error executing remote command: {:?}", e);
297                            None
298                        },
299                    }
300                })
301                .collect::<Vec<_>>()
302                .await),
303
304            StartExecResults::Detached => {
305                bail!("Detached exec is not supported")
306            }
307        }
308    }
309}