Skip to main content

lightshuttle_runtime/
docker.rs

1//! Docker container runtime backed by the `bollard` crate.
2
3use std::collections::HashMap;
4use std::path::Path;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant, SystemTime};
8
9use bollard::Docker;
10use bollard::container::LogOutput;
11use bollard::models::{
12    ContainerCreateBody, ContainerSummaryStateEnum, EndpointSettings, HealthConfig, HostConfig,
13    NetworkCreateRequest, NetworkingConfig, PortBinding as BollardPortBinding,
14};
15use bollard::query_parameters::{
16    BuildImageOptionsBuilder, BuilderVersion, CreateContainerOptionsBuilder,
17    CreateImageOptionsBuilder, ListContainersOptionsBuilder, LogsOptionsBuilder,
18    RemoveContainerOptionsBuilder, StartContainerOptions, StopContainerOptionsBuilder,
19};
20use bytes::Bytes;
21use futures::stream::{Stream, StreamExt};
22
23use crate::error::{Result, RuntimeError};
24use crate::runtime::{
25    ContainerId, ContainerRuntime, ContainerStatus, LogChunk, LogChunkStream, LogStream,
26};
27use lightshuttle_spec::{
28    ContainerSpec, HealthcheckSpec, ImageSource, PortBinding, VolumeBinding, VolumeSource,
29};
30
31const POLL_INTERVAL: Duration = Duration::from_millis(500);
32
33/// Docker container runtime backed by the `bollard` crate.
34///
35/// Connects to the local Docker daemon using the platform default
36/// transport (Unix socket on Linux and macOS, named pipe on Windows).
37pub struct DockerRuntime {
38    client: Docker,
39}
40
41impl DockerRuntime {
42    /// Connect to the local Docker daemon.
43    pub fn connect() -> Result<Self> {
44        let client = Docker::connect_with_local_defaults().map_err(RuntimeError::Connect)?;
45        Ok(Self { client })
46    }
47
48    /// Wrap an existing `bollard::Docker` client. Useful for tests that
49    /// supply a pre-configured client (custom transport, mock, etc.).
50    #[must_use]
51    pub fn from_client(client: Docker) -> Self {
52        Self { client }
53    }
54
55    async fn ensure_image(&self, image: &str) -> Result<()> {
56        let (from_image, tag) = split_image_ref(image);
57        let options = CreateImageOptionsBuilder::default()
58            .from_image(from_image)
59            .tag(tag)
60            .build();
61        let mut stream = self.client.create_image(Some(options), None, None);
62        while let Some(event) = stream.next().await {
63            event.map_err(|e| RuntimeError::ImagePull {
64                image: image.to_owned(),
65                source: e,
66            })?;
67        }
68        Ok(())
69    }
70
71    /// List every container labelled with `lightshuttle.project=<project>`,
72    /// including stopped ones. Used by the CLI to implement `ps` and
73    /// `down` without relying on in-memory state.
74    pub async fn list_managed(&self, project: &str) -> Result<Vec<ManagedContainer>> {
75        let label_filter = format!("{LABEL_PROJECT}={project}");
76        let mut filters: HashMap<String, Vec<String>> = HashMap::new();
77        filters.insert("label".to_owned(), vec![label_filter]);
78        let options = ListContainersOptionsBuilder::default()
79            .all(true)
80            .filters(&filters)
81            .build();
82        let summaries = self
83            .client
84            .list_containers(Some(options))
85            .await
86            .map_err(|source| RuntimeError::Inspect {
87                id: format!("project={project}"),
88                source,
89            })?;
90
91        let mut out = Vec::with_capacity(summaries.len());
92        for summary in summaries {
93            let Some(id) = summary.id else { continue };
94            let resource = summary
95                .labels
96                .as_ref()
97                .and_then(|labels| labels.get(LABEL_RESOURCE))
98                .cloned()
99                .unwrap_or_else(|| "<unknown>".to_owned());
100            let status = parse_summary_state(summary.state.as_ref());
101            out.push(ManagedContainer {
102                id: ContainerId::new(id),
103                resource,
104                status,
105            });
106        }
107        out.sort_by(|a, b| a.resource.cmp(&b.resource));
108        Ok(out)
109    }
110
111    async fn build_image(
112        &self,
113        context: &str,
114        dockerfile: &str,
115        build_args: &HashMap<String, String>,
116        target: Option<&str>,
117        tag: &str,
118    ) -> Result<()> {
119        // A process-unique id keeps concurrent BuildKit builds from
120        // colliding on the same gRPC session.
121        static SESSION_COUNTER: AtomicU64 = AtomicU64::new(0);
122
123        let context_owned = context.to_owned();
124        let tar_bytes =
125            tokio::task::spawn_blocking(move || build_tar_archive(Path::new(&context_owned)))
126                .await
127                .map_err(|join_err| {
128                    RuntimeError::InvalidSpec(format!("tar build task panicked: {join_err}"))
129                })?
130                .map_err(|io_err| {
131                    RuntimeError::InvalidSpec(format!("failed to build tar archive: {io_err}"))
132                })?;
133
134        let session_id = format!(
135            "lightshuttle-build-{}",
136            SESSION_COUNTER.fetch_add(1, Ordering::Relaxed)
137        );
138
139        let options = BuildImageOptionsBuilder::default()
140            .dockerfile(dockerfile)
141            .t(tag)
142            .rm(true)
143            .buildargs(build_args)
144            .target(target.unwrap_or(""))
145            .version(BuilderVersion::BuilderBuildKit)
146            .session(&session_id)
147            .build();
148
149        let mut stream = self.client.build_image(
150            options,
151            None,
152            Some(bollard::body_full(Bytes::from(tar_bytes))),
153        );
154        while let Some(event) = stream.next().await {
155            let info = event.map_err(RuntimeError::Build)?;
156            if let Some(detail) = info.error_detail {
157                let message = detail
158                    .message
159                    .unwrap_or_else(|| "unknown build error".to_owned());
160                return Err(RuntimeError::BuildFailed(message));
161            }
162        }
163        Ok(())
164    }
165}
166
167/// Build a tar archive from `context`, respecting `.dockerignore`
168/// patterns found within. Returns the raw tar bytes (uncompressed).
169fn build_tar_archive(context: &Path) -> std::io::Result<Vec<u8>> {
170    use ignore::WalkBuilder;
171
172    let mut buf: Vec<u8> = Vec::new();
173    {
174        let mut builder = tar::Builder::new(&mut buf);
175        builder.follow_symlinks(false);
176
177        let walker = WalkBuilder::new(context)
178            .add_custom_ignore_filename(".dockerignore")
179            .git_ignore(false)
180            .git_exclude(false)
181            .git_global(false)
182            .hidden(false)
183            .build();
184
185        for entry in walker {
186            let entry = entry.map_err(|e| std::io::Error::other(format!("walk error: {e}")))?;
187            let path = entry.path();
188            let relative = match path.strip_prefix(context) {
189                Ok(p) if !p.as_os_str().is_empty() => p,
190                _ => continue,
191            };
192            let Some(file_type) = entry.file_type() else {
193                continue;
194            };
195            if file_type.is_dir() {
196                builder.append_dir(relative, path)?;
197            } else if file_type.is_file() {
198                let mut file = std::fs::File::open(path)?;
199                builder.append_file(relative, &mut file)?;
200            }
201        }
202        builder.finish()?;
203    }
204    Ok(buf)
205}
206
207/// Build the Docker network name for a project.
208///
209/// Non-alphanumeric characters are replaced with `-` and the result is
210/// lower-cased so the name is valid across all Docker network name rules.
211fn network_name(project: &str) -> String {
212    let sanitized: String = project
213        .chars()
214        .map(|c| {
215            if c.is_alphanumeric() || c == '-' {
216                c
217            } else {
218                '-'
219            }
220        })
221        .collect::<String>()
222        .to_lowercase();
223    format!("lightshuttle-{sanitized}")
224}
225
226impl ContainerRuntime for DockerRuntime {
227    async fn ensure_project_network(&self, project: &str) -> Result<()> {
228        let name = network_name(project);
229
230        match self.client.inspect_network(&name, None).await {
231            Ok(_) => return Ok(()),
232            Err(bollard::errors::Error::DockerResponseServerError {
233                status_code: 404, ..
234            }) => {}
235            Err(e) => return Err(RuntimeError::NetworkCreate { name, source: e }),
236        }
237
238        let mut labels = HashMap::new();
239        labels.insert(LABEL_PROJECT.to_owned(), project.to_owned());
240        let config = NetworkCreateRequest {
241            name: name.clone(),
242            driver: Some("bridge".to_owned()),
243            labels: Some(labels),
244            ..Default::default()
245        };
246        match self.client.create_network(config).await {
247            // 409 = another concurrent start already created the network.
248            Ok(_)
249            | Err(bollard::errors::Error::DockerResponseServerError {
250                status_code: 409, ..
251            }) => Ok(()),
252            Err(e) => Err(RuntimeError::NetworkCreate { name, source: e }),
253        }
254    }
255
256    async fn teardown_project_network(&self, project: &str) -> Result<()> {
257        let name = network_name(project);
258        match self.client.remove_network(&name).await {
259            Ok(())
260            | Err(bollard::errors::Error::DockerResponseServerError {
261                status_code: 404, ..
262            }) => Ok(()),
263            Err(e) => Err(RuntimeError::NetworkRemove { name, source: e }),
264        }
265    }
266
267    async fn start(&self, spec: &ContainerSpec) -> Result<ContainerId> {
268        let image_ref = match &spec.image {
269            ImageSource::Pull(image) => {
270                self.ensure_image(image).await?;
271                image.clone()
272            }
273            ImageSource::Build {
274                context,
275                dockerfile,
276                build_args,
277                target,
278                tag,
279            } => {
280                self.build_image(context, dockerfile, build_args, target.as_deref(), tag)
281                    .await?;
282                tag.clone()
283            }
284        };
285
286        self.ensure_project_network(&spec.project).await?;
287
288        let net = network_name(&spec.project);
289        let mut endpoints = HashMap::new();
290        endpoints.insert(net, EndpointSettings::default());
291        let networking_config = NetworkingConfig {
292            endpoints_config: Some(endpoints),
293        };
294
295        let host_config = build_host_config(&spec.ports, &spec.volumes);
296        let exposed_ports = build_exposed_ports(&spec.ports);
297        let env = build_env(&spec.env);
298        let healthcheck = spec.healthcheck.as_ref().map(build_healthcheck);
299        let labels = build_labels(&spec.project, &spec.resource);
300
301        let config = ContainerCreateBody {
302            image: Some(image_ref),
303            env: Some(env),
304            cmd: spec.command.clone(),
305            working_dir: spec.working_dir.clone(),
306            host_config: Some(host_config),
307            exposed_ports: Some(exposed_ports),
308            healthcheck,
309            labels: Some(labels),
310            networking_config: Some(networking_config),
311            ..Default::default()
312        };
313
314        let create_options = CreateContainerOptionsBuilder::default()
315            .name(&spec.name)
316            .build();
317
318        let created = self
319            .client
320            .create_container(Some(create_options), config)
321            .await
322            .map_err(RuntimeError::Start)?;
323
324        self.client
325            .start_container(&created.id, None::<StartContainerOptions>)
326            .await
327            .map_err(RuntimeError::Start)?;
328
329        Ok(ContainerId::new(created.id))
330    }
331
332    async fn stop(&self, id: &ContainerId, grace: Duration) -> Result<()> {
333        #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
334        let options = StopContainerOptionsBuilder::default()
335            .t(grace.as_secs() as i32)
336            .build();
337        match self.client.stop_container(id.as_str(), Some(options)).await {
338            Ok(())
339            | Err(bollard::errors::Error::DockerResponseServerError {
340                status_code: 304 | 404,
341                ..
342            }) => Ok(()),
343            Err(e) => Err(RuntimeError::Stop {
344                id: id.to_string(),
345                source: e,
346            }),
347        }
348    }
349
350    async fn remove(&self, name: &str) -> Result<()> {
351        let options = RemoveContainerOptionsBuilder::default().force(true).build();
352        match self.client.remove_container(name, Some(options)).await {
353            Ok(())
354            | Err(bollard::errors::Error::DockerResponseServerError {
355                status_code: 404, ..
356            }) => Ok(()),
357            Err(e) => Err(RuntimeError::Remove {
358                name: name.to_owned(),
359                source: e,
360            }),
361        }
362    }
363
364    async fn inspect(&self, id: &ContainerId) -> Result<ContainerStatus> {
365        let info = self
366            .client
367            .inspect_container(id.as_str(), None)
368            .await
369            .map_err(|e| match e {
370                bollard::errors::Error::DockerResponseServerError {
371                    status_code: 404, ..
372                } => RuntimeError::NotFound(id.to_string()),
373                other => RuntimeError::Inspect {
374                    id: id.to_string(),
375                    source: other,
376                },
377            })?;
378
379        let state = info.state.as_ref();
380        let Some(state) = state else {
381            return Ok(ContainerStatus::Starting);
382        };
383
384        if matches!(state.running, Some(true)) {
385            if let Some(health) = &state.health {
386                return Ok(match health.status {
387                    Some(bollard::models::HealthStatusEnum::HEALTHY) => ContainerStatus::Healthy,
388                    Some(bollard::models::HealthStatusEnum::UNHEALTHY) => {
389                        ContainerStatus::Unhealthy
390                    }
391                    _ => ContainerStatus::Running,
392                });
393            }
394            return Ok(ContainerStatus::Running);
395        }
396
397        if matches!(state.dead, Some(true))
398            || state.status == Some(bollard::models::ContainerStateStatusEnum::EXITED)
399        {
400            #[allow(clippy::cast_possible_truncation)]
401            let exit_code = state.exit_code.map(|c| c as i32);
402            return Ok(ContainerStatus::Stopped { exit_code });
403        }
404
405        Ok(ContainerStatus::Starting)
406    }
407
408    async fn wait_healthy(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
409        let deadline = Instant::now() + timeout;
410        loop {
411            match self.inspect(id).await? {
412                ContainerStatus::Healthy | ContainerStatus::Running => return Ok(()),
413                ContainerStatus::Unhealthy => {
414                    if Instant::now() >= deadline {
415                        return Err(RuntimeError::Timeout {
416                            operation: "wait_healthy",
417                            after: timeout,
418                        });
419                    }
420                }
421                ContainerStatus::Starting => {}
422                ContainerStatus::Stopped { exit_code } => {
423                    return Err(RuntimeError::InvalidSpec(format!(
424                        "container `{id}` exited with code {exit_code:?} before becoming healthy"
425                    )));
426                }
427            }
428            if Instant::now() >= deadline {
429                return Err(RuntimeError::Timeout {
430                    operation: "wait_healthy",
431                    after: timeout,
432                });
433            }
434            tokio::time::sleep(POLL_INTERVAL).await;
435        }
436    }
437
438    async fn logs(&self, id: &ContainerId, follow: bool) -> Result<LogChunkStream> {
439        let options = LogsOptionsBuilder::default()
440            .follow(follow)
441            .stdout(true)
442            .stderr(true)
443            .timestamps(true)
444            .build();
445        let stream = self.client.logs(id.as_str(), Some(options));
446        let mapped: Pin<Box<dyn Stream<Item = Result<LogChunk>> + Send>> =
447            Box::pin(stream.map(map_log_item));
448        Ok(mapped)
449    }
450}
451
452fn split_image_ref(image: &str) -> (&str, &str) {
453    image.split_once(':').unwrap_or((image, "latest"))
454}
455
456fn build_env(env: &HashMap<String, String>) -> Vec<String> {
457    env.iter().map(|(k, v)| format!("{k}={v}")).collect()
458}
459
460fn build_labels(project: &str, resource: &str) -> HashMap<String, String> {
461    let mut labels = HashMap::with_capacity(2);
462    labels.insert(LABEL_PROJECT.to_owned(), project.to_owned());
463    labels.insert(LABEL_RESOURCE.to_owned(), resource.to_owned());
464    labels
465}
466
467/// Docker label key set on every container managed by LightShuttle to
468/// carry the manifest project name.
469pub const LABEL_PROJECT: &str = "lightshuttle.project";
470
471/// Docker label key set on every container to carry the manifest
472/// resource name.
473pub const LABEL_RESOURCE: &str = "lightshuttle.resource";
474
475/// One entry returned by [`DockerRuntime::list_managed`].
476#[derive(Debug, Clone)]
477pub struct ManagedContainer {
478    /// Container identifier.
479    pub id: ContainerId,
480    /// Resource name as declared in the manifest.
481    pub resource: String,
482    /// Current lifecycle status.
483    pub status: ContainerStatus,
484}
485
486fn parse_summary_state(state: Option<&ContainerSummaryStateEnum>) -> ContainerStatus {
487    match state {
488        Some(ContainerSummaryStateEnum::RUNNING) => ContainerStatus::Running,
489        Some(ContainerSummaryStateEnum::EXITED | ContainerSummaryStateEnum::DEAD) => {
490            ContainerStatus::Stopped { exit_code: None }
491        }
492        _ => ContainerStatus::Starting,
493    }
494}
495
496fn build_exposed_ports(ports: &[PortBinding]) -> Vec<String> {
497    ports
498        .iter()
499        .map(|p| format!("{}/tcp", p.container_port))
500        .collect()
501}
502
503/// Default host bind address for published ports.
504///
505/// Loopback by default so a dev machine never exposes managed services
506/// (PostgreSQL, Redis, application ports) to the wider network. A
507/// manifest that needs a broader bind must request it explicitly via
508/// the `address:host:container` port mapping form.
509const DEFAULT_HOST_BIND_ADDRESS: &str = "127.0.0.1";
510
511fn build_host_config(ports: &[PortBinding], volumes: &[VolumeBinding]) -> HostConfig {
512    let port_bindings = ports
513        .iter()
514        .map(|p| {
515            let host_ip = p
516                .host_address
517                .clone()
518                .unwrap_or_else(|| DEFAULT_HOST_BIND_ADDRESS.to_owned());
519            let bindings = vec![BollardPortBinding {
520                host_ip: Some(host_ip),
521                host_port: Some(p.host_port.to_string()),
522            }];
523            (format!("{}/tcp", p.container_port), Some(bindings))
524        })
525        .collect::<HashMap<_, _>>();
526
527    let binds: Vec<String> = volumes
528        .iter()
529        .filter_map(|v| match &v.source {
530            VolumeSource::HostPath(path) => Some(format!("{path}:{}", v.target)),
531            VolumeSource::Named(name) => Some(format!("{name}:{}", v.target)),
532            VolumeSource::Anonymous => None,
533        })
534        .collect();
535
536    HostConfig {
537        port_bindings: Some(port_bindings),
538        binds: if binds.is_empty() { None } else { Some(binds) },
539        ..Default::default()
540    }
541}
542
543fn build_healthcheck(hc: &HealthcheckSpec) -> HealthConfig {
544    #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
545    HealthConfig {
546        test: Some(hc.test.clone()),
547        interval: Some(hc.interval.as_nanos() as i64),
548        timeout: Some(hc.timeout.as_nanos() as i64),
549        retries: Some(i64::from(hc.retries)),
550        start_period: Some(hc.start_period.as_nanos() as i64),
551        ..Default::default()
552    }
553}
554
555fn map_log_item(item: std::result::Result<LogOutput, bollard::errors::Error>) -> Result<LogChunk> {
556    match item {
557        Ok(LogOutput::StdErr { message }) => Ok(log_chunk(LogStream::Stderr, &message)),
558        Ok(
559            LogOutput::StdOut { message }
560            | LogOutput::Console { message }
561            | LogOutput::StdIn { message },
562        ) => Ok(log_chunk(LogStream::Stdout, &message)),
563        Err(e) => Err(RuntimeError::LogStream(e)),
564    }
565}
566
567/// Build a [`LogChunk`], extracting the Docker emission timestamp from
568/// the line prefix when present.
569fn log_chunk(stream: LogStream, message: &[u8]) -> LogChunk {
570    let (timestamp, bytes) = split_docker_timestamp(message);
571    LogChunk {
572        stream,
573        timestamp,
574        bytes,
575    }
576}
577
578/// Split a Docker log line into its emission timestamp and payload.
579///
580/// With `timestamps: true`, Docker prepends each line with an RFC3339
581/// nanosecond timestamp and a single space. When that prefix parses,
582/// the real emission time is returned and the prefix is stripped from
583/// the forwarded bytes. Otherwise the read time is used and the line is
584/// forwarded verbatim.
585fn split_docker_timestamp(message: &[u8]) -> (SystemTime, Vec<u8>) {
586    if let Some(space) = message.iter().position(|&b| b == b' ')
587        && let Ok(prefix) = std::str::from_utf8(&message[..space])
588        && let Ok(ts) = prefix.parse::<jiff::Timestamp>()
589        && let Some(system_time) = timestamp_to_system_time(ts)
590    {
591        let payload = message.get(space + 1..).unwrap_or(&[]).to_vec();
592        return (system_time, payload);
593    }
594    (SystemTime::now(), message.to_vec())
595}
596
597/// Convert a `jiff` timestamp to a `SystemTime`, returning `None` for
598/// pre-epoch instants (never produced by container logs).
599fn timestamp_to_system_time(ts: jiff::Timestamp) -> Option<SystemTime> {
600    let nanos = ts.as_nanosecond();
601    if nanos < 0 {
602        return None;
603    }
604    let secs = u64::try_from(nanos / 1_000_000_000).ok()?;
605    let subsec = u32::try_from(nanos % 1_000_000_000).ok()?;
606    Some(SystemTime::UNIX_EPOCH + Duration::new(secs, subsec))
607}
608
609#[cfg(test)]
610mod tests {
611    use super::{PortBinding, build_host_config};
612
613    fn host_ip_for(ports: &[PortBinding], key: &str) -> Option<String> {
614        let config = build_host_config(ports, &[]);
615        config
616            .port_bindings
617            .and_then(|map| map.get(key).cloned())
618            .flatten()
619            .and_then(|bindings| bindings.into_iter().next())
620            .and_then(|binding| binding.host_ip)
621    }
622
623    #[test]
624    fn unspecified_address_binds_to_loopback() {
625        let ports = vec![PortBinding {
626            container_port: 5432,
627            host_address: None,
628            host_port: 5432,
629        }];
630        assert_eq!(
631            host_ip_for(&ports, "5432/tcp").as_deref(),
632            Some("127.0.0.1")
633        );
634    }
635
636    #[test]
637    fn explicit_address_is_preserved() {
638        let ports = vec![PortBinding {
639            container_port: 80,
640            host_address: Some("0.0.0.0".to_owned()),
641            host_port: 8080,
642        }];
643        assert_eq!(host_ip_for(&ports, "80/tcp").as_deref(), Some("0.0.0.0"));
644    }
645
646    #[test]
647    fn timestamped_line_parses_emission_time_and_strips_prefix() {
648        use std::time::SystemTime;
649
650        let (ts, payload) =
651            super::split_docker_timestamp(b"2024-01-01T12:34:56.789012345Z hello world");
652
653        let elapsed = ts
654            .duration_since(SystemTime::UNIX_EPOCH)
655            .expect("post-epoch");
656        assert_eq!(elapsed.as_secs(), 1_704_112_496);
657        // SystemTime resolution is platform dependent (100ns on Windows),
658        // so compare the sub-second part at microsecond granularity.
659        assert_eq!(elapsed.subsec_micros(), 789_012);
660        assert_eq!(payload, b"hello world");
661    }
662
663    #[test]
664    fn timestamped_line_without_payload_yields_empty_bytes() {
665        // Docker still emits the trailing space then the (empty) line.
666        let (_ts, payload) = super::split_docker_timestamp(b"2024-01-01T00:00:00Z \n");
667        assert_eq!(payload, b"\n");
668    }
669
670    #[test]
671    fn untimestamped_line_is_forwarded_verbatim() {
672        // A leading token that is not an RFC3339 timestamp falls back to
673        // the read time and forwards every byte, including the token.
674        let input = b"not-a-timestamp hello world";
675        let (_ts, payload) = super::split_docker_timestamp(input);
676        assert_eq!(payload, input);
677    }
678
679    #[test]
680    fn line_without_space_is_forwarded_verbatim() {
681        let input = b"singletoken";
682        let (_ts, payload) = super::split_docker_timestamp(input);
683        assert_eq!(payload, input);
684    }
685}