1use std::collections::HashMap;
4use std::path::Path;
5use std::pin::Pin;
6use std::time::{Duration, Instant, SystemTime};
7
8use bollard::Docker;
9use bollard::container::{
10 Config, CreateContainerOptions, ListContainersOptions, LogOutput, LogsOptions,
11 StopContainerOptions,
12};
13use bollard::image::{BuildImageOptions, CreateImageOptions};
14use bollard::models::{HealthConfig, HostConfig, PortBinding as BollardPortBinding};
15use bytes::Bytes;
16use futures::stream::{Stream, StreamExt};
17
18use crate::error::{Result, RuntimeError};
19use crate::runtime::{
20 ContainerId, ContainerRuntime, ContainerStatus, LogChunk, LogChunkStream, LogStream,
21};
22use crate::spec::{
23 ContainerSpec, HealthcheckSpec, ImageSource, PortBinding, VolumeBinding, VolumeSource,
24};
25
26const POLL_INTERVAL: Duration = Duration::from_millis(500);
27
28pub struct DockerRuntime {
33 client: Docker,
34}
35
36impl DockerRuntime {
37 pub fn connect() -> Result<Self> {
39 let client = Docker::connect_with_local_defaults().map_err(RuntimeError::Connect)?;
40 Ok(Self { client })
41 }
42
43 #[must_use]
46 pub fn from_client(client: Docker) -> Self {
47 Self { client }
48 }
49
50 async fn ensure_image(&self, image: &str) -> Result<()> {
51 let (from_image, tag) = split_image_ref(image);
52 let options = CreateImageOptions {
53 from_image,
54 tag,
55 ..Default::default()
56 };
57 let mut stream = self.client.create_image(Some(options), None, None);
58 while let Some(event) = stream.next().await {
59 event.map_err(|e| RuntimeError::ImagePull {
60 image: image.to_owned(),
61 source: e,
62 })?;
63 }
64 Ok(())
65 }
66
67 pub async fn list_managed(&self, project: &str) -> Result<Vec<ManagedContainer>> {
71 let label_filter = format!("{LABEL_PROJECT}={project}");
72 let mut filters: HashMap<String, Vec<String>> = HashMap::new();
73 filters.insert("label".to_owned(), vec![label_filter]);
74 let options = ListContainersOptions {
75 all: true,
76 filters,
77 ..Default::default()
78 };
79 let summaries = self
80 .client
81 .list_containers(Some(options))
82 .await
83 .map_err(|source| RuntimeError::Inspect {
84 id: format!("project={project}"),
85 source,
86 })?;
87
88 let mut out = Vec::with_capacity(summaries.len());
89 for summary in summaries {
90 let Some(id) = summary.id else { continue };
91 let resource = summary
92 .labels
93 .as_ref()
94 .and_then(|labels| labels.get(LABEL_RESOURCE))
95 .cloned()
96 .unwrap_or_else(|| "<unknown>".to_owned());
97 let status = parse_summary_state(summary.state.as_deref());
98 out.push(ManagedContainer {
99 id: ContainerId::new(id),
100 resource,
101 status,
102 });
103 }
104 out.sort_by(|a, b| a.resource.cmp(&b.resource));
105 Ok(out)
106 }
107
108 async fn build_image(
109 &self,
110 context: &str,
111 dockerfile: &str,
112 build_args: &HashMap<String, String>,
113 target: Option<&str>,
114 tag: &str,
115 ) -> Result<()> {
116 let context_owned = context.to_owned();
117 let tar_bytes =
118 tokio::task::spawn_blocking(move || build_tar_archive(Path::new(&context_owned)))
119 .await
120 .map_err(|join_err| {
121 RuntimeError::InvalidSpec(format!("tar build task panicked: {join_err}"))
122 })?
123 .map_err(|io_err| {
124 RuntimeError::InvalidSpec(format!("failed to build tar archive: {io_err}"))
125 })?;
126
127 let options = BuildImageOptions::<String> {
128 dockerfile: dockerfile.to_owned(),
129 t: tag.to_owned(),
130 rm: true,
131 buildargs: build_args.clone(),
132 target: target.unwrap_or("").to_owned(),
133 ..Default::default()
134 };
135
136 let mut stream = self
137 .client
138 .build_image(options, None, Some(Bytes::from(tar_bytes)));
139 while let Some(event) = stream.next().await {
140 event.map_err(RuntimeError::Build)?;
141 }
142 Ok(())
143 }
144}
145
146fn build_tar_archive(context: &Path) -> std::io::Result<Vec<u8>> {
149 use ignore::WalkBuilder;
150
151 let mut buf: Vec<u8> = Vec::new();
152 {
153 let mut builder = tar::Builder::new(&mut buf);
154 builder.follow_symlinks(false);
155
156 let walker = WalkBuilder::new(context)
157 .add_custom_ignore_filename(".dockerignore")
158 .git_ignore(false)
159 .git_exclude(false)
160 .git_global(false)
161 .hidden(false)
162 .build();
163
164 for entry in walker {
165 let entry = entry.map_err(|e| std::io::Error::other(format!("walk error: {e}")))?;
166 let path = entry.path();
167 let relative = match path.strip_prefix(context) {
168 Ok(p) if !p.as_os_str().is_empty() => p,
169 _ => continue,
170 };
171 let Some(file_type) = entry.file_type() else {
172 continue;
173 };
174 if file_type.is_dir() {
175 builder.append_dir(relative, path)?;
176 } else if file_type.is_file() {
177 let mut file = std::fs::File::open(path)?;
178 builder.append_file(relative, &mut file)?;
179 }
180 }
181 builder.finish()?;
182 }
183 Ok(buf)
184}
185
186impl ContainerRuntime for DockerRuntime {
187 async fn start(&self, spec: &ContainerSpec) -> Result<ContainerId> {
188 let image_ref = match &spec.image {
189 ImageSource::Pull(image) => {
190 self.ensure_image(image).await?;
191 image.clone()
192 }
193 ImageSource::Build {
194 context,
195 dockerfile,
196 build_args,
197 target,
198 tag,
199 } => {
200 self.build_image(context, dockerfile, build_args, target.as_deref(), tag)
201 .await?;
202 tag.clone()
203 }
204 };
205
206 let host_config = build_host_config(&spec.ports, &spec.volumes);
207 let exposed_ports = build_exposed_ports(&spec.ports);
208 let env = build_env(&spec.env);
209 let healthcheck = spec.healthcheck.as_ref().map(build_healthcheck);
210 let labels = build_labels(&spec.project, &spec.resource);
211
212 let config = Config {
213 image: Some(image_ref),
214 env: Some(env),
215 cmd: spec.command.clone(),
216 host_config: Some(host_config),
217 exposed_ports: Some(exposed_ports),
218 healthcheck,
219 labels: Some(labels),
220 ..Default::default()
221 };
222
223 let create_options = CreateContainerOptions {
224 name: spec.name.clone(),
225 platform: None,
226 };
227
228 let created = self
229 .client
230 .create_container(Some(create_options), config)
231 .await
232 .map_err(RuntimeError::Start)?;
233
234 self.client
235 .start_container::<String>(&created.id, None)
236 .await
237 .map_err(RuntimeError::Start)?;
238
239 Ok(ContainerId::new(created.id))
240 }
241
242 async fn stop(&self, id: &ContainerId, grace: Duration) -> Result<()> {
243 #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
244 let options = StopContainerOptions {
245 t: grace.as_secs() as i64,
246 };
247 match self.client.stop_container(id.as_str(), Some(options)).await {
248 Ok(())
249 | Err(bollard::errors::Error::DockerResponseServerError {
250 status_code: 304 | 404,
251 ..
252 }) => Ok(()),
253 Err(e) => Err(RuntimeError::Stop {
254 id: id.to_string(),
255 source: e,
256 }),
257 }
258 }
259
260 async fn inspect(&self, id: &ContainerId) -> Result<ContainerStatus> {
261 let info = self
262 .client
263 .inspect_container(id.as_str(), None)
264 .await
265 .map_err(|e| match e {
266 bollard::errors::Error::DockerResponseServerError {
267 status_code: 404, ..
268 } => RuntimeError::NotFound(id.to_string()),
269 other => RuntimeError::Inspect {
270 id: id.to_string(),
271 source: other,
272 },
273 })?;
274
275 let state = info.state.as_ref();
276 let Some(state) = state else {
277 return Ok(ContainerStatus::Starting);
278 };
279
280 if matches!(state.running, Some(true)) {
281 if let Some(health) = &state.health {
282 return Ok(match health.status {
283 Some(bollard::models::HealthStatusEnum::HEALTHY) => ContainerStatus::Healthy,
284 Some(bollard::models::HealthStatusEnum::UNHEALTHY) => {
285 ContainerStatus::Unhealthy
286 }
287 _ => ContainerStatus::Running,
288 });
289 }
290 return Ok(ContainerStatus::Running);
291 }
292
293 if matches!(state.dead, Some(true))
294 || state.status == Some(bollard::models::ContainerStateStatusEnum::EXITED)
295 {
296 #[allow(clippy::cast_possible_truncation)]
297 let exit_code = state.exit_code.map(|c| c as i32);
298 return Ok(ContainerStatus::Stopped { exit_code });
299 }
300
301 Ok(ContainerStatus::Starting)
302 }
303
304 async fn wait_healthy(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
305 let deadline = Instant::now() + timeout;
306 loop {
307 match self.inspect(id).await? {
308 ContainerStatus::Healthy | ContainerStatus::Running => return Ok(()),
309 ContainerStatus::Unhealthy => {
310 if Instant::now() >= deadline {
311 return Err(RuntimeError::Timeout {
312 operation: "wait_healthy",
313 after: timeout,
314 });
315 }
316 }
317 ContainerStatus::Starting => {}
318 ContainerStatus::Stopped { exit_code } => {
319 return Err(RuntimeError::InvalidSpec(format!(
320 "container `{id}` exited with code {exit_code:?} before becoming healthy"
321 )));
322 }
323 }
324 if Instant::now() >= deadline {
325 return Err(RuntimeError::Timeout {
326 operation: "wait_healthy",
327 after: timeout,
328 });
329 }
330 tokio::time::sleep(POLL_INTERVAL).await;
331 }
332 }
333
334 async fn logs(&self, id: &ContainerId, follow: bool) -> Result<LogChunkStream> {
335 let options = LogsOptions::<String> {
336 follow,
337 stdout: true,
338 stderr: true,
339 timestamps: true,
340 ..Default::default()
341 };
342 let stream = self.client.logs(id.as_str(), Some(options));
343 let mapped: Pin<Box<dyn Stream<Item = Result<LogChunk>> + Send>> =
344 Box::pin(stream.map(map_log_item));
345 Ok(mapped)
346 }
347}
348
349fn split_image_ref(image: &str) -> (&str, &str) {
350 image.split_once(':').unwrap_or((image, "latest"))
351}
352
353fn build_env(env: &HashMap<String, String>) -> Vec<String> {
354 env.iter().map(|(k, v)| format!("{k}={v}")).collect()
355}
356
357fn build_labels(project: &str, resource: &str) -> HashMap<String, String> {
358 let mut labels = HashMap::with_capacity(2);
359 labels.insert(LABEL_PROJECT.to_owned(), project.to_owned());
360 labels.insert(LABEL_RESOURCE.to_owned(), resource.to_owned());
361 labels
362}
363
364pub const LABEL_PROJECT: &str = "lightshuttle.project";
367
368pub const LABEL_RESOURCE: &str = "lightshuttle.resource";
371
372#[derive(Debug, Clone)]
374pub struct ManagedContainer {
375 pub id: ContainerId,
377 pub resource: String,
379 pub status: ContainerStatus,
381}
382
383fn parse_summary_state(state: Option<&str>) -> ContainerStatus {
384 match state {
385 Some("running") => ContainerStatus::Running,
386 Some("exited" | "dead") => ContainerStatus::Stopped { exit_code: None },
387 _ => ContainerStatus::Starting,
388 }
389}
390
391#[allow(clippy::zero_sized_map_values)]
392fn build_exposed_ports(ports: &[PortBinding]) -> HashMap<String, HashMap<(), ()>> {
393 ports
394 .iter()
395 .map(|p| (format!("{}/tcp", p.container_port), HashMap::new()))
396 .collect()
397}
398
399fn build_host_config(ports: &[PortBinding], volumes: &[VolumeBinding]) -> HostConfig {
400 let port_bindings = ports
401 .iter()
402 .map(|p| {
403 let bindings = vec![BollardPortBinding {
404 host_ip: p.host_address.clone(),
405 host_port: Some(p.host_port.to_string()),
406 }];
407 (format!("{}/tcp", p.container_port), Some(bindings))
408 })
409 .collect::<HashMap<_, _>>();
410
411 let binds: Vec<String> = volumes
412 .iter()
413 .filter_map(|v| match &v.source {
414 VolumeSource::HostPath(path) => Some(format!("{path}:{}", v.target)),
415 VolumeSource::Named(name) => Some(format!("{name}:{}", v.target)),
416 VolumeSource::Anonymous => None,
417 })
418 .collect();
419
420 HostConfig {
421 port_bindings: Some(port_bindings),
422 binds: if binds.is_empty() { None } else { Some(binds) },
423 ..Default::default()
424 }
425}
426
427fn build_healthcheck(hc: &HealthcheckSpec) -> HealthConfig {
428 #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
429 HealthConfig {
430 test: Some(hc.test.clone()),
431 interval: Some(hc.interval.as_nanos() as i64),
432 timeout: Some(hc.timeout.as_nanos() as i64),
433 retries: Some(i64::from(hc.retries)),
434 start_period: Some(hc.start_period.as_nanos() as i64),
435 ..Default::default()
436 }
437}
438
439fn map_log_item(item: std::result::Result<LogOutput, bollard::errors::Error>) -> Result<LogChunk> {
440 match item {
441 Ok(LogOutput::StdErr { message }) => Ok(LogChunk {
442 stream: LogStream::Stderr,
443 timestamp: SystemTime::now(),
444 bytes: message.to_vec(),
445 }),
446 Ok(
447 LogOutput::StdOut { message }
448 | LogOutput::Console { message }
449 | LogOutput::StdIn { message },
450 ) => Ok(LogChunk {
451 stream: LogStream::Stdout,
452 timestamp: SystemTime::now(),
453 bytes: message.to_vec(),
454 }),
455 Err(e) => Err(RuntimeError::LogStream(e)),
456 }
457}