Skip to main content

lightshuttle_runtime/
docker.rs

1//! Docker container runtime backed by the `bollard` crate.
2
3use std::collections::HashMap;
4use std::path::Path;
5use std::pin::Pin;
6use std::time::{Duration, Instant, SystemTime};
7
8use bollard::Docker;
9use bollard::container::{
10    Config, CreateContainerOptions, ListContainersOptions, LogOutput, LogsOptions,
11    StopContainerOptions,
12};
13use bollard::image::{BuildImageOptions, CreateImageOptions};
14use bollard::models::{HealthConfig, HostConfig, PortBinding as BollardPortBinding};
15use bytes::Bytes;
16use futures::stream::{Stream, StreamExt};
17
18use crate::error::{Result, RuntimeError};
19use crate::runtime::{
20    ContainerId, ContainerRuntime, ContainerStatus, LogChunk, LogChunkStream, LogStream,
21};
22use crate::spec::{
23    ContainerSpec, HealthcheckSpec, ImageSource, PortBinding, VolumeBinding, VolumeSource,
24};
25
26const POLL_INTERVAL: Duration = Duration::from_millis(500);
27
28/// Docker container runtime backed by the `bollard` crate.
29///
30/// Connects to the local Docker daemon using the platform default
31/// transport (Unix socket on Linux and macOS, named pipe on Windows).
32pub struct DockerRuntime {
33    client: Docker,
34}
35
36impl DockerRuntime {
37    /// Connect to the local Docker daemon.
38    pub fn connect() -> Result<Self> {
39        let client = Docker::connect_with_local_defaults().map_err(RuntimeError::Connect)?;
40        Ok(Self { client })
41    }
42
43    /// Wrap an existing `bollard::Docker` client. Useful for tests that
44    /// supply a pre-configured client (custom transport, mock, etc.).
45    #[must_use]
46    pub fn from_client(client: Docker) -> Self {
47        Self { client }
48    }
49
50    async fn ensure_image(&self, image: &str) -> Result<()> {
51        let (from_image, tag) = split_image_ref(image);
52        let options = CreateImageOptions {
53            from_image,
54            tag,
55            ..Default::default()
56        };
57        let mut stream = self.client.create_image(Some(options), None, None);
58        while let Some(event) = stream.next().await {
59            event.map_err(|e| RuntimeError::ImagePull {
60                image: image.to_owned(),
61                source: e,
62            })?;
63        }
64        Ok(())
65    }
66
67    /// List every container labelled with `lightshuttle.project=<project>`,
68    /// including stopped ones. Used by the CLI to implement `ps` and
69    /// `down` without relying on in-memory state.
70    pub async fn list_managed(&self, project: &str) -> Result<Vec<ManagedContainer>> {
71        let label_filter = format!("{LABEL_PROJECT}={project}");
72        let mut filters: HashMap<String, Vec<String>> = HashMap::new();
73        filters.insert("label".to_owned(), vec![label_filter]);
74        let options = ListContainersOptions {
75            all: true,
76            filters,
77            ..Default::default()
78        };
79        let summaries = self
80            .client
81            .list_containers(Some(options))
82            .await
83            .map_err(|source| RuntimeError::Inspect {
84                id: format!("project={project}"),
85                source,
86            })?;
87
88        let mut out = Vec::with_capacity(summaries.len());
89        for summary in summaries {
90            let Some(id) = summary.id else { continue };
91            let resource = summary
92                .labels
93                .as_ref()
94                .and_then(|labels| labels.get(LABEL_RESOURCE))
95                .cloned()
96                .unwrap_or_else(|| "<unknown>".to_owned());
97            let status = parse_summary_state(summary.state.as_deref());
98            out.push(ManagedContainer {
99                id: ContainerId::new(id),
100                resource,
101                status,
102            });
103        }
104        out.sort_by(|a, b| a.resource.cmp(&b.resource));
105        Ok(out)
106    }
107
108    async fn build_image(
109        &self,
110        context: &str,
111        dockerfile: &str,
112        build_args: &HashMap<String, String>,
113        target: Option<&str>,
114        tag: &str,
115    ) -> Result<()> {
116        let context_owned = context.to_owned();
117        let tar_bytes =
118            tokio::task::spawn_blocking(move || build_tar_archive(Path::new(&context_owned)))
119                .await
120                .map_err(|join_err| {
121                    RuntimeError::InvalidSpec(format!("tar build task panicked: {join_err}"))
122                })?
123                .map_err(|io_err| {
124                    RuntimeError::InvalidSpec(format!("failed to build tar archive: {io_err}"))
125                })?;
126
127        let options = BuildImageOptions::<String> {
128            dockerfile: dockerfile.to_owned(),
129            t: tag.to_owned(),
130            rm: true,
131            buildargs: build_args.clone(),
132            target: target.unwrap_or("").to_owned(),
133            ..Default::default()
134        };
135
136        let mut stream = self
137            .client
138            .build_image(options, None, Some(Bytes::from(tar_bytes)));
139        while let Some(event) = stream.next().await {
140            event.map_err(RuntimeError::Build)?;
141        }
142        Ok(())
143    }
144}
145
146/// Build a tar archive from `context`, respecting `.dockerignore`
147/// patterns found within. Returns the raw tar bytes (uncompressed).
148fn build_tar_archive(context: &Path) -> std::io::Result<Vec<u8>> {
149    use ignore::WalkBuilder;
150
151    let mut buf: Vec<u8> = Vec::new();
152    {
153        let mut builder = tar::Builder::new(&mut buf);
154        builder.follow_symlinks(false);
155
156        let walker = WalkBuilder::new(context)
157            .add_custom_ignore_filename(".dockerignore")
158            .git_ignore(false)
159            .git_exclude(false)
160            .git_global(false)
161            .hidden(false)
162            .build();
163
164        for entry in walker {
165            let entry = entry.map_err(|e| std::io::Error::other(format!("walk error: {e}")))?;
166            let path = entry.path();
167            let relative = match path.strip_prefix(context) {
168                Ok(p) if !p.as_os_str().is_empty() => p,
169                _ => continue,
170            };
171            let Some(file_type) = entry.file_type() else {
172                continue;
173            };
174            if file_type.is_dir() {
175                builder.append_dir(relative, path)?;
176            } else if file_type.is_file() {
177                let mut file = std::fs::File::open(path)?;
178                builder.append_file(relative, &mut file)?;
179            }
180        }
181        builder.finish()?;
182    }
183    Ok(buf)
184}
185
186impl ContainerRuntime for DockerRuntime {
187    async fn start(&self, spec: &ContainerSpec) -> Result<ContainerId> {
188        let image_ref = match &spec.image {
189            ImageSource::Pull(image) => {
190                self.ensure_image(image).await?;
191                image.clone()
192            }
193            ImageSource::Build {
194                context,
195                dockerfile,
196                build_args,
197                target,
198                tag,
199            } => {
200                self.build_image(context, dockerfile, build_args, target.as_deref(), tag)
201                    .await?;
202                tag.clone()
203            }
204        };
205
206        let host_config = build_host_config(&spec.ports, &spec.volumes);
207        let exposed_ports = build_exposed_ports(&spec.ports);
208        let env = build_env(&spec.env);
209        let healthcheck = spec.healthcheck.as_ref().map(build_healthcheck);
210        let labels = build_labels(&spec.project, &spec.resource);
211
212        let config = Config {
213            image: Some(image_ref),
214            env: Some(env),
215            cmd: spec.command.clone(),
216            host_config: Some(host_config),
217            exposed_ports: Some(exposed_ports),
218            healthcheck,
219            labels: Some(labels),
220            ..Default::default()
221        };
222
223        let create_options = CreateContainerOptions {
224            name: spec.name.clone(),
225            platform: None,
226        };
227
228        let created = self
229            .client
230            .create_container(Some(create_options), config)
231            .await
232            .map_err(RuntimeError::Start)?;
233
234        self.client
235            .start_container::<String>(&created.id, None)
236            .await
237            .map_err(RuntimeError::Start)?;
238
239        Ok(ContainerId::new(created.id))
240    }
241
242    async fn stop(&self, id: &ContainerId, grace: Duration) -> Result<()> {
243        #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
244        let options = StopContainerOptions {
245            t: grace.as_secs() as i64,
246        };
247        match self.client.stop_container(id.as_str(), Some(options)).await {
248            Ok(())
249            | Err(bollard::errors::Error::DockerResponseServerError {
250                status_code: 304 | 404,
251                ..
252            }) => Ok(()),
253            Err(e) => Err(RuntimeError::Stop {
254                id: id.to_string(),
255                source: e,
256            }),
257        }
258    }
259
260    async fn inspect(&self, id: &ContainerId) -> Result<ContainerStatus> {
261        let info = self
262            .client
263            .inspect_container(id.as_str(), None)
264            .await
265            .map_err(|e| match e {
266                bollard::errors::Error::DockerResponseServerError {
267                    status_code: 404, ..
268                } => RuntimeError::NotFound(id.to_string()),
269                other => RuntimeError::Inspect {
270                    id: id.to_string(),
271                    source: other,
272                },
273            })?;
274
275        let state = info.state.as_ref();
276        let Some(state) = state else {
277            return Ok(ContainerStatus::Starting);
278        };
279
280        if matches!(state.running, Some(true)) {
281            if let Some(health) = &state.health {
282                return Ok(match health.status {
283                    Some(bollard::models::HealthStatusEnum::HEALTHY) => ContainerStatus::Healthy,
284                    Some(bollard::models::HealthStatusEnum::UNHEALTHY) => {
285                        ContainerStatus::Unhealthy
286                    }
287                    _ => ContainerStatus::Running,
288                });
289            }
290            return Ok(ContainerStatus::Running);
291        }
292
293        if matches!(state.dead, Some(true))
294            || state.status == Some(bollard::models::ContainerStateStatusEnum::EXITED)
295        {
296            #[allow(clippy::cast_possible_truncation)]
297            let exit_code = state.exit_code.map(|c| c as i32);
298            return Ok(ContainerStatus::Stopped { exit_code });
299        }
300
301        Ok(ContainerStatus::Starting)
302    }
303
304    async fn wait_healthy(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
305        let deadline = Instant::now() + timeout;
306        loop {
307            match self.inspect(id).await? {
308                ContainerStatus::Healthy | ContainerStatus::Running => return Ok(()),
309                ContainerStatus::Unhealthy => {
310                    if Instant::now() >= deadline {
311                        return Err(RuntimeError::Timeout {
312                            operation: "wait_healthy",
313                            after: timeout,
314                        });
315                    }
316                }
317                ContainerStatus::Starting => {}
318                ContainerStatus::Stopped { exit_code } => {
319                    return Err(RuntimeError::InvalidSpec(format!(
320                        "container `{id}` exited with code {exit_code:?} before becoming healthy"
321                    )));
322                }
323            }
324            if Instant::now() >= deadline {
325                return Err(RuntimeError::Timeout {
326                    operation: "wait_healthy",
327                    after: timeout,
328                });
329            }
330            tokio::time::sleep(POLL_INTERVAL).await;
331        }
332    }
333
334    async fn logs(&self, id: &ContainerId, follow: bool) -> Result<LogChunkStream> {
335        let options = LogsOptions::<String> {
336            follow,
337            stdout: true,
338            stderr: true,
339            timestamps: true,
340            ..Default::default()
341        };
342        let stream = self.client.logs(id.as_str(), Some(options));
343        let mapped: Pin<Box<dyn Stream<Item = Result<LogChunk>> + Send>> =
344            Box::pin(stream.map(map_log_item));
345        Ok(mapped)
346    }
347}
348
349fn split_image_ref(image: &str) -> (&str, &str) {
350    image.split_once(':').unwrap_or((image, "latest"))
351}
352
353fn build_env(env: &HashMap<String, String>) -> Vec<String> {
354    env.iter().map(|(k, v)| format!("{k}={v}")).collect()
355}
356
357fn build_labels(project: &str, resource: &str) -> HashMap<String, String> {
358    let mut labels = HashMap::with_capacity(2);
359    labels.insert(LABEL_PROJECT.to_owned(), project.to_owned());
360    labels.insert(LABEL_RESOURCE.to_owned(), resource.to_owned());
361    labels
362}
363
364/// Docker label key set on every container managed by LightShuttle to
365/// carry the manifest project name.
366pub const LABEL_PROJECT: &str = "lightshuttle.project";
367
368/// Docker label key set on every container to carry the manifest
369/// resource name.
370pub const LABEL_RESOURCE: &str = "lightshuttle.resource";
371
372/// One entry returned by [`DockerRuntime::list_managed`].
373#[derive(Debug, Clone)]
374pub struct ManagedContainer {
375    /// Container identifier.
376    pub id: ContainerId,
377    /// Resource name as declared in the manifest.
378    pub resource: String,
379    /// Current lifecycle status.
380    pub status: ContainerStatus,
381}
382
383fn parse_summary_state(state: Option<&str>) -> ContainerStatus {
384    match state {
385        Some("running") => ContainerStatus::Running,
386        Some("exited" | "dead") => ContainerStatus::Stopped { exit_code: None },
387        _ => ContainerStatus::Starting,
388    }
389}
390
391#[allow(clippy::zero_sized_map_values)]
392fn build_exposed_ports(ports: &[PortBinding]) -> HashMap<String, HashMap<(), ()>> {
393    ports
394        .iter()
395        .map(|p| (format!("{}/tcp", p.container_port), HashMap::new()))
396        .collect()
397}
398
399fn build_host_config(ports: &[PortBinding], volumes: &[VolumeBinding]) -> HostConfig {
400    let port_bindings = ports
401        .iter()
402        .map(|p| {
403            let bindings = vec![BollardPortBinding {
404                host_ip: p.host_address.clone(),
405                host_port: Some(p.host_port.to_string()),
406            }];
407            (format!("{}/tcp", p.container_port), Some(bindings))
408        })
409        .collect::<HashMap<_, _>>();
410
411    let binds: Vec<String> = volumes
412        .iter()
413        .filter_map(|v| match &v.source {
414            VolumeSource::HostPath(path) => Some(format!("{path}:{}", v.target)),
415            VolumeSource::Named(name) => Some(format!("{name}:{}", v.target)),
416            VolumeSource::Anonymous => None,
417        })
418        .collect();
419
420    HostConfig {
421        port_bindings: Some(port_bindings),
422        binds: if binds.is_empty() { None } else { Some(binds) },
423        ..Default::default()
424    }
425}
426
427fn build_healthcheck(hc: &HealthcheckSpec) -> HealthConfig {
428    #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
429    HealthConfig {
430        test: Some(hc.test.clone()),
431        interval: Some(hc.interval.as_nanos() as i64),
432        timeout: Some(hc.timeout.as_nanos() as i64),
433        retries: Some(i64::from(hc.retries)),
434        start_period: Some(hc.start_period.as_nanos() as i64),
435        ..Default::default()
436    }
437}
438
439fn map_log_item(item: std::result::Result<LogOutput, bollard::errors::Error>) -> Result<LogChunk> {
440    match item {
441        Ok(LogOutput::StdErr { message }) => Ok(LogChunk {
442            stream: LogStream::Stderr,
443            timestamp: SystemTime::now(),
444            bytes: message.to_vec(),
445        }),
446        Ok(
447            LogOutput::StdOut { message }
448            | LogOutput::Console { message }
449            | LogOutput::StdIn { message },
450        ) => Ok(LogChunk {
451            stream: LogStream::Stdout,
452            timestamp: SystemTime::now(),
453            bytes: message.to_vec(),
454        }),
455        Err(e) => Err(RuntimeError::LogStream(e)),
456    }
457}