Skip to main content

lightshuttle_runtime/
docker.rs

1//! Docker container runtime backed by the `bollard` crate.
2
3use std::collections::HashMap;
4use std::path::Path;
5use std::pin::Pin;
6use std::time::{Duration, Instant, SystemTime};
7
8use bollard::Docker;
9use bollard::container::LogOutput;
10use bollard::models::{
11    ContainerCreateBody, ContainerSummaryStateEnum, HealthConfig, HostConfig,
12    PortBinding as BollardPortBinding,
13};
14use bollard::query_parameters::{
15    BuildImageOptionsBuilder, BuilderVersion, CreateContainerOptionsBuilder,
16    CreateImageOptionsBuilder, ListContainersOptionsBuilder, LogsOptionsBuilder,
17    RemoveContainerOptionsBuilder, StartContainerOptions, StopContainerOptionsBuilder,
18};
19use bytes::Bytes;
20use futures::stream::{Stream, StreamExt};
21
22use crate::error::{Result, RuntimeError};
23use crate::runtime::{
24    ContainerId, ContainerRuntime, ContainerStatus, LogChunk, LogChunkStream, LogStream,
25};
26use lightshuttle_spec::{
27    ContainerSpec, HealthcheckSpec, ImageSource, PortBinding, VolumeBinding, VolumeSource,
28};
29
30const POLL_INTERVAL: Duration = Duration::from_millis(500);
31
32/// Docker container runtime backed by the `bollard` crate.
33///
34/// Connects to the local Docker daemon using the platform default
35/// transport (Unix socket on Linux and macOS, named pipe on Windows).
36pub struct DockerRuntime {
37    client: Docker,
38}
39
40impl DockerRuntime {
41    /// Connect to the local Docker daemon.
42    pub fn connect() -> Result<Self> {
43        let client = Docker::connect_with_local_defaults().map_err(RuntimeError::Connect)?;
44        Ok(Self { client })
45    }
46
47    /// Wrap an existing `bollard::Docker` client. Useful for tests that
48    /// supply a pre-configured client (custom transport, mock, etc.).
49    #[must_use]
50    pub fn from_client(client: Docker) -> Self {
51        Self { client }
52    }
53
54    async fn ensure_image(&self, image: &str) -> Result<()> {
55        let (from_image, tag) = split_image_ref(image);
56        let options = CreateImageOptionsBuilder::default()
57            .from_image(from_image)
58            .tag(tag)
59            .build();
60        let mut stream = self.client.create_image(Some(options), None, None);
61        while let Some(event) = stream.next().await {
62            event.map_err(|e| RuntimeError::ImagePull {
63                image: image.to_owned(),
64                source: e,
65            })?;
66        }
67        Ok(())
68    }
69
70    /// List every container labelled with `lightshuttle.project=<project>`,
71    /// including stopped ones. Used by the CLI to implement `ps` and
72    /// `down` without relying on in-memory state.
73    pub async fn list_managed(&self, project: &str) -> Result<Vec<ManagedContainer>> {
74        let label_filter = format!("{LABEL_PROJECT}={project}");
75        let mut filters: HashMap<String, Vec<String>> = HashMap::new();
76        filters.insert("label".to_owned(), vec![label_filter]);
77        let options = ListContainersOptionsBuilder::default()
78            .all(true)
79            .filters(&filters)
80            .build();
81        let summaries = self
82            .client
83            .list_containers(Some(options))
84            .await
85            .map_err(|source| RuntimeError::Inspect {
86                id: format!("project={project}"),
87                source,
88            })?;
89
90        let mut out = Vec::with_capacity(summaries.len());
91        for summary in summaries {
92            let Some(id) = summary.id else { continue };
93            let resource = summary
94                .labels
95                .as_ref()
96                .and_then(|labels| labels.get(LABEL_RESOURCE))
97                .cloned()
98                .unwrap_or_else(|| "<unknown>".to_owned());
99            let status = parse_summary_state(summary.state.as_ref());
100            out.push(ManagedContainer {
101                id: ContainerId::new(id),
102                resource,
103                status,
104            });
105        }
106        out.sort_by(|a, b| a.resource.cmp(&b.resource));
107        Ok(out)
108    }
109
110    async fn build_image(
111        &self,
112        context: &str,
113        dockerfile: &str,
114        build_args: &HashMap<String, String>,
115        target: Option<&str>,
116        tag: &str,
117    ) -> Result<()> {
118        let context_owned = context.to_owned();
119        let tar_bytes =
120            tokio::task::spawn_blocking(move || build_tar_archive(Path::new(&context_owned)))
121                .await
122                .map_err(|join_err| {
123                    RuntimeError::InvalidSpec(format!("tar build task panicked: {join_err}"))
124                })?
125                .map_err(|io_err| {
126                    RuntimeError::InvalidSpec(format!("failed to build tar archive: {io_err}"))
127                })?;
128
129        let options = BuildImageOptionsBuilder::default()
130            .dockerfile(dockerfile)
131            .t(tag)
132            .rm(true)
133            .buildargs(build_args)
134            .target(target.unwrap_or(""))
135            .version(BuilderVersion::BuilderBuildKit)
136            .build();
137
138        let mut stream = self.client.build_image(
139            options,
140            None,
141            Some(bollard::body_full(Bytes::from(tar_bytes))),
142        );
143        while let Some(event) = stream.next().await {
144            event.map_err(RuntimeError::Build)?;
145        }
146        Ok(())
147    }
148}
149
150/// Build a tar archive from `context`, respecting `.dockerignore`
151/// patterns found within. Returns the raw tar bytes (uncompressed).
152fn build_tar_archive(context: &Path) -> std::io::Result<Vec<u8>> {
153    use ignore::WalkBuilder;
154
155    let mut buf: Vec<u8> = Vec::new();
156    {
157        let mut builder = tar::Builder::new(&mut buf);
158        builder.follow_symlinks(false);
159
160        let walker = WalkBuilder::new(context)
161            .add_custom_ignore_filename(".dockerignore")
162            .git_ignore(false)
163            .git_exclude(false)
164            .git_global(false)
165            .hidden(false)
166            .build();
167
168        for entry in walker {
169            let entry = entry.map_err(|e| std::io::Error::other(format!("walk error: {e}")))?;
170            let path = entry.path();
171            let relative = match path.strip_prefix(context) {
172                Ok(p) if !p.as_os_str().is_empty() => p,
173                _ => continue,
174            };
175            let Some(file_type) = entry.file_type() else {
176                continue;
177            };
178            if file_type.is_dir() {
179                builder.append_dir(relative, path)?;
180            } else if file_type.is_file() {
181                let mut file = std::fs::File::open(path)?;
182                builder.append_file(relative, &mut file)?;
183            }
184        }
185        builder.finish()?;
186    }
187    Ok(buf)
188}
189
190impl ContainerRuntime for DockerRuntime {
191    async fn start(&self, spec: &ContainerSpec) -> Result<ContainerId> {
192        let image_ref = match &spec.image {
193            ImageSource::Pull(image) => {
194                self.ensure_image(image).await?;
195                image.clone()
196            }
197            ImageSource::Build {
198                context,
199                dockerfile,
200                build_args,
201                target,
202                tag,
203            } => {
204                self.build_image(context, dockerfile, build_args, target.as_deref(), tag)
205                    .await?;
206                tag.clone()
207            }
208        };
209
210        let host_config = build_host_config(&spec.ports, &spec.volumes);
211        let exposed_ports = build_exposed_ports(&spec.ports);
212        let env = build_env(&spec.env);
213        let healthcheck = spec.healthcheck.as_ref().map(build_healthcheck);
214        let labels = build_labels(&spec.project, &spec.resource);
215
216        let config = ContainerCreateBody {
217            image: Some(image_ref),
218            env: Some(env),
219            cmd: spec.command.clone(),
220            working_dir: spec.working_dir.clone(),
221            host_config: Some(host_config),
222            exposed_ports: Some(exposed_ports),
223            healthcheck,
224            labels: Some(labels),
225            ..Default::default()
226        };
227
228        let create_options = CreateContainerOptionsBuilder::default()
229            .name(&spec.name)
230            .build();
231
232        let created = self
233            .client
234            .create_container(Some(create_options), config)
235            .await
236            .map_err(RuntimeError::Start)?;
237
238        self.client
239            .start_container(&created.id, None::<StartContainerOptions>)
240            .await
241            .map_err(RuntimeError::Start)?;
242
243        Ok(ContainerId::new(created.id))
244    }
245
246    async fn stop(&self, id: &ContainerId, grace: Duration) -> Result<()> {
247        #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
248        let options = StopContainerOptionsBuilder::default()
249            .t(grace.as_secs() as i32)
250            .build();
251        match self.client.stop_container(id.as_str(), Some(options)).await {
252            Ok(())
253            | Err(bollard::errors::Error::DockerResponseServerError {
254                status_code: 304 | 404,
255                ..
256            }) => Ok(()),
257            Err(e) => Err(RuntimeError::Stop {
258                id: id.to_string(),
259                source: e,
260            }),
261        }
262    }
263
264    async fn remove(&self, name: &str) -> Result<()> {
265        let options = RemoveContainerOptionsBuilder::default().force(true).build();
266        match self.client.remove_container(name, Some(options)).await {
267            Ok(())
268            | Err(bollard::errors::Error::DockerResponseServerError {
269                status_code: 404, ..
270            }) => Ok(()),
271            Err(e) => Err(RuntimeError::Remove {
272                name: name.to_owned(),
273                source: e,
274            }),
275        }
276    }
277
278    async fn inspect(&self, id: &ContainerId) -> Result<ContainerStatus> {
279        let info = self
280            .client
281            .inspect_container(id.as_str(), None)
282            .await
283            .map_err(|e| match e {
284                bollard::errors::Error::DockerResponseServerError {
285                    status_code: 404, ..
286                } => RuntimeError::NotFound(id.to_string()),
287                other => RuntimeError::Inspect {
288                    id: id.to_string(),
289                    source: other,
290                },
291            })?;
292
293        let state = info.state.as_ref();
294        let Some(state) = state else {
295            return Ok(ContainerStatus::Starting);
296        };
297
298        if matches!(state.running, Some(true)) {
299            if let Some(health) = &state.health {
300                return Ok(match health.status {
301                    Some(bollard::models::HealthStatusEnum::HEALTHY) => ContainerStatus::Healthy,
302                    Some(bollard::models::HealthStatusEnum::UNHEALTHY) => {
303                        ContainerStatus::Unhealthy
304                    }
305                    _ => ContainerStatus::Running,
306                });
307            }
308            return Ok(ContainerStatus::Running);
309        }
310
311        if matches!(state.dead, Some(true))
312            || state.status == Some(bollard::models::ContainerStateStatusEnum::EXITED)
313        {
314            #[allow(clippy::cast_possible_truncation)]
315            let exit_code = state.exit_code.map(|c| c as i32);
316            return Ok(ContainerStatus::Stopped { exit_code });
317        }
318
319        Ok(ContainerStatus::Starting)
320    }
321
322    async fn wait_healthy(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
323        let deadline = Instant::now() + timeout;
324        loop {
325            match self.inspect(id).await? {
326                ContainerStatus::Healthy | ContainerStatus::Running => return Ok(()),
327                ContainerStatus::Unhealthy => {
328                    if Instant::now() >= deadline {
329                        return Err(RuntimeError::Timeout {
330                            operation: "wait_healthy",
331                            after: timeout,
332                        });
333                    }
334                }
335                ContainerStatus::Starting => {}
336                ContainerStatus::Stopped { exit_code } => {
337                    return Err(RuntimeError::InvalidSpec(format!(
338                        "container `{id}` exited with code {exit_code:?} before becoming healthy"
339                    )));
340                }
341            }
342            if Instant::now() >= deadline {
343                return Err(RuntimeError::Timeout {
344                    operation: "wait_healthy",
345                    after: timeout,
346                });
347            }
348            tokio::time::sleep(POLL_INTERVAL).await;
349        }
350    }
351
352    async fn logs(&self, id: &ContainerId, follow: bool) -> Result<LogChunkStream> {
353        let options = LogsOptionsBuilder::default()
354            .follow(follow)
355            .stdout(true)
356            .stderr(true)
357            .timestamps(true)
358            .build();
359        let stream = self.client.logs(id.as_str(), Some(options));
360        let mapped: Pin<Box<dyn Stream<Item = Result<LogChunk>> + Send>> =
361            Box::pin(stream.map(map_log_item));
362        Ok(mapped)
363    }
364}
365
366fn split_image_ref(image: &str) -> (&str, &str) {
367    image.split_once(':').unwrap_or((image, "latest"))
368}
369
370fn build_env(env: &HashMap<String, String>) -> Vec<String> {
371    env.iter().map(|(k, v)| format!("{k}={v}")).collect()
372}
373
374fn build_labels(project: &str, resource: &str) -> HashMap<String, String> {
375    let mut labels = HashMap::with_capacity(2);
376    labels.insert(LABEL_PROJECT.to_owned(), project.to_owned());
377    labels.insert(LABEL_RESOURCE.to_owned(), resource.to_owned());
378    labels
379}
380
381/// Docker label key set on every container managed by LightShuttle to
382/// carry the manifest project name.
383pub const LABEL_PROJECT: &str = "lightshuttle.project";
384
385/// Docker label key set on every container to carry the manifest
386/// resource name.
387pub const LABEL_RESOURCE: &str = "lightshuttle.resource";
388
389/// One entry returned by [`DockerRuntime::list_managed`].
390#[derive(Debug, Clone)]
391pub struct ManagedContainer {
392    /// Container identifier.
393    pub id: ContainerId,
394    /// Resource name as declared in the manifest.
395    pub resource: String,
396    /// Current lifecycle status.
397    pub status: ContainerStatus,
398}
399
400fn parse_summary_state(state: Option<&ContainerSummaryStateEnum>) -> ContainerStatus {
401    match state {
402        Some(ContainerSummaryStateEnum::RUNNING) => ContainerStatus::Running,
403        Some(ContainerSummaryStateEnum::EXITED | ContainerSummaryStateEnum::DEAD) => {
404            ContainerStatus::Stopped { exit_code: None }
405        }
406        _ => ContainerStatus::Starting,
407    }
408}
409
410fn build_exposed_ports(ports: &[PortBinding]) -> Vec<String> {
411    ports
412        .iter()
413        .map(|p| format!("{}/tcp", p.container_port))
414        .collect()
415}
416
417/// Default host bind address for published ports.
418///
419/// Loopback by default so a dev machine never exposes managed services
420/// (PostgreSQL, Redis, application ports) to the wider network. A
421/// manifest that needs a broader bind must request it explicitly via
422/// the `address:host:container` port mapping form.
423const DEFAULT_HOST_BIND_ADDRESS: &str = "127.0.0.1";
424
425fn build_host_config(ports: &[PortBinding], volumes: &[VolumeBinding]) -> HostConfig {
426    let port_bindings = ports
427        .iter()
428        .map(|p| {
429            let host_ip = p
430                .host_address
431                .clone()
432                .unwrap_or_else(|| DEFAULT_HOST_BIND_ADDRESS.to_owned());
433            let bindings = vec![BollardPortBinding {
434                host_ip: Some(host_ip),
435                host_port: Some(p.host_port.to_string()),
436            }];
437            (format!("{}/tcp", p.container_port), Some(bindings))
438        })
439        .collect::<HashMap<_, _>>();
440
441    let binds: Vec<String> = volumes
442        .iter()
443        .filter_map(|v| match &v.source {
444            VolumeSource::HostPath(path) => Some(format!("{path}:{}", v.target)),
445            VolumeSource::Named(name) => Some(format!("{name}:{}", v.target)),
446            VolumeSource::Anonymous => None,
447        })
448        .collect();
449
450    HostConfig {
451        port_bindings: Some(port_bindings),
452        binds: if binds.is_empty() { None } else { Some(binds) },
453        ..Default::default()
454    }
455}
456
457fn build_healthcheck(hc: &HealthcheckSpec) -> HealthConfig {
458    #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
459    HealthConfig {
460        test: Some(hc.test.clone()),
461        interval: Some(hc.interval.as_nanos() as i64),
462        timeout: Some(hc.timeout.as_nanos() as i64),
463        retries: Some(i64::from(hc.retries)),
464        start_period: Some(hc.start_period.as_nanos() as i64),
465        ..Default::default()
466    }
467}
468
469fn map_log_item(item: std::result::Result<LogOutput, bollard::errors::Error>) -> Result<LogChunk> {
470    match item {
471        Ok(LogOutput::StdErr { message }) => Ok(log_chunk(LogStream::Stderr, &message)),
472        Ok(
473            LogOutput::StdOut { message }
474            | LogOutput::Console { message }
475            | LogOutput::StdIn { message },
476        ) => Ok(log_chunk(LogStream::Stdout, &message)),
477        Err(e) => Err(RuntimeError::LogStream(e)),
478    }
479}
480
481/// Build a [`LogChunk`], extracting the Docker emission timestamp from
482/// the line prefix when present.
483fn log_chunk(stream: LogStream, message: &[u8]) -> LogChunk {
484    let (timestamp, bytes) = split_docker_timestamp(message);
485    LogChunk {
486        stream,
487        timestamp,
488        bytes,
489    }
490}
491
492/// Split a Docker log line into its emission timestamp and payload.
493///
494/// With `timestamps: true`, Docker prepends each line with an RFC3339
495/// nanosecond timestamp and a single space. When that prefix parses,
496/// the real emission time is returned and the prefix is stripped from
497/// the forwarded bytes. Otherwise the read time is used and the line is
498/// forwarded verbatim.
499fn split_docker_timestamp(message: &[u8]) -> (SystemTime, Vec<u8>) {
500    if let Some(space) = message.iter().position(|&b| b == b' ')
501        && let Ok(prefix) = std::str::from_utf8(&message[..space])
502        && let Ok(ts) = prefix.parse::<jiff::Timestamp>()
503        && let Some(system_time) = timestamp_to_system_time(ts)
504    {
505        let payload = message.get(space + 1..).unwrap_or(&[]).to_vec();
506        return (system_time, payload);
507    }
508    (SystemTime::now(), message.to_vec())
509}
510
511/// Convert a `jiff` timestamp to a `SystemTime`, returning `None` for
512/// pre-epoch instants (never produced by container logs).
513fn timestamp_to_system_time(ts: jiff::Timestamp) -> Option<SystemTime> {
514    let nanos = ts.as_nanosecond();
515    if nanos < 0 {
516        return None;
517    }
518    let secs = u64::try_from(nanos / 1_000_000_000).ok()?;
519    let subsec = u32::try_from(nanos % 1_000_000_000).ok()?;
520    Some(SystemTime::UNIX_EPOCH + Duration::new(secs, subsec))
521}
522
523#[cfg(test)]
524mod tests {
525    use super::{PortBinding, build_host_config};
526
527    fn host_ip_for(ports: &[PortBinding], key: &str) -> Option<String> {
528        let config = build_host_config(ports, &[]);
529        config
530            .port_bindings
531            .and_then(|map| map.get(key).cloned())
532            .flatten()
533            .and_then(|bindings| bindings.into_iter().next())
534            .and_then(|binding| binding.host_ip)
535    }
536
537    #[test]
538    fn unspecified_address_binds_to_loopback() {
539        let ports = vec![PortBinding {
540            container_port: 5432,
541            host_address: None,
542            host_port: 5432,
543        }];
544        assert_eq!(
545            host_ip_for(&ports, "5432/tcp").as_deref(),
546            Some("127.0.0.1")
547        );
548    }
549
550    #[test]
551    fn explicit_address_is_preserved() {
552        let ports = vec![PortBinding {
553            container_port: 80,
554            host_address: Some("0.0.0.0".to_owned()),
555            host_port: 8080,
556        }];
557        assert_eq!(host_ip_for(&ports, "80/tcp").as_deref(), Some("0.0.0.0"));
558    }
559
560    #[test]
561    fn timestamped_line_parses_emission_time_and_strips_prefix() {
562        use std::time::SystemTime;
563
564        let (ts, payload) =
565            super::split_docker_timestamp(b"2024-01-01T12:34:56.789012345Z hello world");
566
567        let elapsed = ts
568            .duration_since(SystemTime::UNIX_EPOCH)
569            .expect("post-epoch");
570        assert_eq!(elapsed.as_secs(), 1_704_112_496);
571        // SystemTime resolution is platform dependent (100ns on Windows),
572        // so compare the sub-second part at microsecond granularity.
573        assert_eq!(elapsed.subsec_micros(), 789_012);
574        assert_eq!(payload, b"hello world");
575    }
576
577    #[test]
578    fn timestamped_line_without_payload_yields_empty_bytes() {
579        // Docker still emits the trailing space then the (empty) line.
580        let (_ts, payload) = super::split_docker_timestamp(b"2024-01-01T00:00:00Z \n");
581        assert_eq!(payload, b"\n");
582    }
583
584    #[test]
585    fn untimestamped_line_is_forwarded_verbatim() {
586        // A leading token that is not an RFC3339 timestamp falls back to
587        // the read time and forwards every byte, including the token.
588        let input = b"not-a-timestamp hello world";
589        let (_ts, payload) = super::split_docker_timestamp(input);
590        assert_eq!(payload, input);
591    }
592
593    #[test]
594    fn line_without_space_is_forwarded_verbatim() {
595        let input = b"singletoken";
596        let (_ts, payload) = super::split_docker_timestamp(input);
597        assert_eq!(payload, input);
598    }
599}