1#![doc = include_str!("../README.md")]
2#![warn(
3 missing_debug_implementations,
4 missing_docs,
5 unreachable_pub,
6 rustdoc::all
7)]
8#![deny(unused_must_use, rust_2018_idioms)]
9#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
10
11use std::{collections::HashMap, fmt::Debug};
12
13use bollard::{
14 container::{
15 CreateContainerOptions, ListContainersOptions, LogOutput, RemoveContainerOptions,
16 StartContainerOptions, StopContainerOptions,
17 },
18 exec::{CreateExecOptions, StartExecResults},
19 image::BuildImageOptions,
20 service::{ContainerCreateResponse, ContainerSummary, Volume},
21 Docker,
22};
23use eyre::{bail, Result};
24use futures_util::{StreamExt, TryStreamExt};
25use serde::Serialize;
26
27pub use bollard::container::Config;
28pub use bollard::image::CreateImageOptions;
29pub use bollard::service::HostConfig;
30pub use bollard::volume::CreateVolumeOptions;
31pub use utils::bind_host_port;
32
33mod utils;
35
36#[derive(Debug)]
38pub struct Composer {
39 pub daemon: Docker,
41}
42
43impl Composer {
44 pub fn new() -> Result<Self> {
46 let daemon = Docker::connect_with_local_defaults()?;
47
48 tracing::debug!(target: "composer", "Successfully connected to Docker daemon");
49 Ok(Self { daemon })
50 }
51
52 pub async fn list_containers(&self, status: Option<&str>) -> Result<Vec<ContainerSummary>> {
59 let mut filters = HashMap::new();
60 filters.insert("label", vec!["com.docker.compose.project=op-up"]);
61
62 if let Some(status) = status {
63 filters.insert("status", vec![status]);
64 }
65
66 let list_options = ListContainersOptions {
67 all: true,
68 filters,
69 ..Default::default()
70 };
71
72 self.daemon
73 .list_containers(Some(list_options))
74 .await
75 .map_err(Into::into)
76 }
77
78 pub async fn create_image<T>(&self, opts: CreateImageOptions<'_, T>) -> Result<String>
82 where
83 T: Into<String> + Serialize + Clone + Debug,
84 {
85 let res = self
86 .daemon
87 .create_image(Some(opts), None, None)
88 .map(|res| {
89 res.map(|info| {
90 tracing::trace!(target: "composer", "image progress: {:?}", info);
91 info
92 })
93 })
94 .try_collect::<Vec<_>>()
95 .await?;
96
97 tracing::debug!(target: "composer", "Created docker image: {:?}", res);
98
99 match res.first() {
100 Some(info) => match info.id.as_ref() {
101 Some(id) => Ok(id.clone()),
102 None => bail!("No image ID found in response"),
103 },
104 None => bail!("No image info found in response"),
105 }
106 }
107
108 pub async fn build_image(
110 &self,
111 name: &str,
112 dockerfile: &str,
113 build_context_files: &[(&str, &[u8])],
114 ) -> Result<()> {
115 let build_options = BuildImageOptions {
116 t: name,
117 dockerfile: "Dockerfile",
118 pull: true,
119 ..Default::default()
120 };
121
122 let files = utils::create_dockerfile_build_context(dockerfile, build_context_files)?;
123 let mut image_build_stream =
124 self.daemon
125 .build_image(build_options, None, Some(files.into()));
126
127 while let Some(build_info) = image_build_stream.next().await {
128 let res = build_info?;
129 tracing::debug!(target: "composer", "Build info: {:?}", res);
130 }
131
132 Ok(())
133 }
134
135 pub async fn create_volume<T>(&self, config: CreateVolumeOptions<T>) -> Result<Volume>
137 where
138 T: Into<String> + Serialize + Eq + std::hash::Hash,
139 {
140 self.daemon.create_volume(config).await.map_err(Into::into)
141 }
142
143 pub async fn create_container(
145 &self,
146 name: &str,
147 mut config: Config<String>,
148 overwrite: bool,
149 ) -> Result<ContainerCreateResponse> {
150 let create_options = CreateContainerOptions {
151 name,
152 platform: None,
153 };
154
155 let labels = config.labels.get_or_insert_with(HashMap::new);
156 labels.insert(
157 "com.docker.compose.project".to_string(),
158 "op-up".to_string(),
159 );
160
161 let containers = self.list_containers(None).await?;
165 if let Some(container) = containers.iter().find(|container| {
166 container
167 .names
168 .as_ref()
169 .map(|names| {
170 names
171 .iter()
172 .any(|n| n == name || n == &format!("/{}", name))
173 })
174 .unwrap_or(false)
175 }) {
176 tracing::debug!(target: "composer", "Container {} already exists", name);
177 let id = container
178 .id
179 .clone()
180 .ok_or_else(|| eyre::eyre!("No container ID found"))?;
181
182 if overwrite {
183 self.daemon
184 .remove_container(&id, None::<RemoveContainerOptions>)
185 .await?;
186 tracing::debug!(target: "composer", "Removed existing docker container {}", name);
187 } else {
188 return Ok(ContainerCreateResponse {
189 id,
190 warnings: vec![],
191 });
192 }
193 }
194
195 let res = self
196 .daemon
197 .create_container(Some(create_options), config)
198 .await?;
199
200 tracing::debug!(target: "composer", "Created docker container {} with ID: {}", name, res.id);
201
202 Ok(res)
203 }
204
205 pub async fn start_container(&self, id: &str) -> Result<()> {
207 self.daemon
208 .start_container(id, None::<StartContainerOptions<&str>>)
209 .await?;
210
211 tracing::debug!(target: "composer", "Started docker container with ID: {}", id);
212 Ok(())
213 }
214
215 pub async fn stop_container(&self, id: &str) -> Result<()> {
217 self.daemon
218 .stop_container(id, None::<StopContainerOptions>)
219 .await?;
220
221 tracing::debug!(target: "composer", "Stopped docker container with ID: {}", id);
222 Ok(())
223 }
224
225 pub async fn remove_container(&self, id: &str) -> Result<()> {
227 self.daemon
228 .remove_container(id, None::<RemoveContainerOptions>)
229 .await?;
230
231 tracing::debug!(target: "composer", "Removed docker container with ID: {}", id);
232 Ok(())
233 }
234
235 pub async fn stop_all_containers(&self) -> Result<()> {
237 let running_containers = self.list_containers(Some("running")).await?;
238
239 let ids = running_containers
240 .iter()
241 .filter_map(|container| container.id.as_ref())
242 .map(|id| id.as_str())
243 .collect::<Vec<_>>();
244
245 tracing::info!(target: "composer", "Stopping docker containers: {:?}", ids);
246
247 for id in ids {
248 self.daemon
249 .stop_container(id, None::<StopContainerOptions>)
250 .await?;
251
252 tracing::debug!(target: "composer", "Successfully stopped docker container: {}", id);
253 }
254
255 Ok(())
256 }
257
258 pub async fn purge_all_containers(&self) -> Result<()> {
260 let containers = self.list_containers(None).await?;
261
262 let ids = containers
263 .iter()
264 .filter_map(|container| container.id.as_ref())
265 .map(|id| id.as_str())
266 .collect::<Vec<_>>();
267
268 for id in ids {
269 self.daemon
270 .remove_container(id, None::<RemoveContainerOptions>)
271 .await?;
272
273 tracing::debug!(target: "composer", "Successfully removed docker container: {}", id);
274 }
275
276 Ok(())
277 }
278
279 pub async fn remote_exec(&self, id: &str, cmd: Vec<&str>) -> Result<Vec<LogOutput>> {
281 let exec_options = CreateExecOptions {
282 attach_stdout: Some(true),
283 attach_stderr: Some(true),
284 cmd: Some(cmd),
285 ..Default::default()
286 };
287
288 let exec = self.daemon.create_exec(id, exec_options).await?;
289
290 match self.daemon.start_exec(&exec.id, None).await? {
291 StartExecResults::Attached { output, .. } => Ok(output
292 .filter_map(|res| async {
293 match res {
294 Ok(output) => Some(output),
295 Err(e) => {
296 tracing::error!(target: "composer", "Error executing remote command: {:?}", e);
297 None
298 },
299 }
300 })
301 .collect::<Vec<_>>()
302 .await),
303
304 StartExecResults::Detached => {
305 bail!("Detached exec is not supported")
306 }
307 }
308 }
309}