1use std::collections::HashMap;
4use std::path::Path;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant, SystemTime};
8
9use bollard::Docker;
10use bollard::container::LogOutput;
11use bollard::models::{
12 ContainerCreateBody, ContainerSummaryStateEnum, EndpointSettings, HealthConfig, HostConfig,
13 NetworkCreateRequest, NetworkingConfig, PortBinding as BollardPortBinding,
14};
15use bollard::query_parameters::{
16 BuildImageOptionsBuilder, BuilderVersion, CreateContainerOptionsBuilder,
17 CreateImageOptionsBuilder, ListContainersOptionsBuilder, LogsOptionsBuilder,
18 RemoveContainerOptionsBuilder, StartContainerOptions, StopContainerOptionsBuilder,
19};
20use bytes::Bytes;
21use futures::stream::{Stream, StreamExt};
22
23use crate::error::{Result, RuntimeError};
24use crate::runtime::{
25 ContainerId, ContainerRuntime, ContainerStatus, LogChunk, LogChunkStream, LogStream,
26};
27use lightshuttle_spec::{
28 ContainerSpec, HealthcheckSpec, ImageSource, PortBinding, VolumeBinding, VolumeSource,
29};
30
31const POLL_INTERVAL: Duration = Duration::from_millis(500);
32
33pub struct DockerRuntime {
38 client: Docker,
39}
40
41impl DockerRuntime {
42 pub fn connect() -> Result<Self> {
44 let client = Docker::connect_with_local_defaults().map_err(RuntimeError::Connect)?;
45 Ok(Self { client })
46 }
47
48 #[must_use]
51 pub fn from_client(client: Docker) -> Self {
52 Self { client }
53 }
54
55 async fn ensure_image(&self, image: &str) -> Result<()> {
56 let (from_image, tag) = split_image_ref(image);
57 let options = CreateImageOptionsBuilder::default()
58 .from_image(from_image)
59 .tag(tag)
60 .build();
61 let mut stream = self.client.create_image(Some(options), None, None);
62 while let Some(event) = stream.next().await {
63 event.map_err(|e| RuntimeError::ImagePull {
64 image: image.to_owned(),
65 source: e,
66 })?;
67 }
68 Ok(())
69 }
70
71 pub async fn list_managed(&self, project: &str) -> Result<Vec<ManagedContainer>> {
75 let label_filter = format!("{LABEL_PROJECT}={project}");
76 let mut filters: HashMap<String, Vec<String>> = HashMap::new();
77 filters.insert("label".to_owned(), vec![label_filter]);
78 let options = ListContainersOptionsBuilder::default()
79 .all(true)
80 .filters(&filters)
81 .build();
82 let summaries = self
83 .client
84 .list_containers(Some(options))
85 .await
86 .map_err(|source| RuntimeError::Inspect {
87 id: format!("project={project}"),
88 source,
89 })?;
90
91 let mut out = Vec::with_capacity(summaries.len());
92 for summary in summaries {
93 let Some(id) = summary.id else { continue };
94 let resource = summary
95 .labels
96 .as_ref()
97 .and_then(|labels| labels.get(LABEL_RESOURCE))
98 .cloned()
99 .unwrap_or_else(|| "<unknown>".to_owned());
100 let status = parse_summary_state(summary.state.as_ref());
101 out.push(ManagedContainer {
102 id: ContainerId::new(id),
103 resource,
104 status,
105 });
106 }
107 out.sort_by(|a, b| a.resource.cmp(&b.resource));
108 Ok(out)
109 }
110
111 async fn build_image(
112 &self,
113 context: &str,
114 dockerfile: &str,
115 build_args: &HashMap<String, String>,
116 target: Option<&str>,
117 tag: &str,
118 ) -> Result<()> {
119 static SESSION_COUNTER: AtomicU64 = AtomicU64::new(0);
122
123 let context_owned = context.to_owned();
124 let tar_bytes =
125 tokio::task::spawn_blocking(move || build_tar_archive(Path::new(&context_owned)))
126 .await
127 .map_err(|join_err| {
128 RuntimeError::InvalidSpec(format!("tar build task panicked: {join_err}"))
129 })?
130 .map_err(|io_err| {
131 RuntimeError::InvalidSpec(format!("failed to build tar archive: {io_err}"))
132 })?;
133
134 let session_id = format!(
135 "lightshuttle-build-{}",
136 SESSION_COUNTER.fetch_add(1, Ordering::Relaxed)
137 );
138
139 let options = BuildImageOptionsBuilder::default()
140 .dockerfile(dockerfile)
141 .t(tag)
142 .rm(true)
143 .buildargs(build_args)
144 .target(target.unwrap_or(""))
145 .version(BuilderVersion::BuilderBuildKit)
146 .session(&session_id)
147 .build();
148
149 let mut stream = self.client.build_image(
150 options,
151 None,
152 Some(bollard::body_full(Bytes::from(tar_bytes))),
153 );
154 while let Some(event) = stream.next().await {
155 let info = event.map_err(RuntimeError::Build)?;
156 if let Some(detail) = info.error_detail {
157 let message = detail
158 .message
159 .unwrap_or_else(|| "unknown build error".to_owned());
160 return Err(RuntimeError::BuildFailed(message));
161 }
162 }
163 Ok(())
164 }
165}
166
167fn build_tar_archive(context: &Path) -> std::io::Result<Vec<u8>> {
170 use ignore::WalkBuilder;
171
172 let mut buf: Vec<u8> = Vec::new();
173 {
174 let mut builder = tar::Builder::new(&mut buf);
175 builder.follow_symlinks(false);
176
177 let walker = WalkBuilder::new(context)
178 .add_custom_ignore_filename(".dockerignore")
179 .git_ignore(false)
180 .git_exclude(false)
181 .git_global(false)
182 .hidden(false)
183 .build();
184
185 for entry in walker {
186 let entry = entry.map_err(|e| std::io::Error::other(format!("walk error: {e}")))?;
187 let path = entry.path();
188 let relative = match path.strip_prefix(context) {
189 Ok(p) if !p.as_os_str().is_empty() => p,
190 _ => continue,
191 };
192 let Some(file_type) = entry.file_type() else {
193 continue;
194 };
195 if file_type.is_dir() {
196 builder.append_dir(relative, path)?;
197 } else if file_type.is_file() {
198 let mut file = std::fs::File::open(path)?;
199 builder.append_file(relative, &mut file)?;
200 }
201 }
202 builder.finish()?;
203 }
204 Ok(buf)
205}
206
207fn network_name(project: &str) -> String {
212 let sanitized: String = project
213 .chars()
214 .map(|c| {
215 if c.is_alphanumeric() || c == '-' {
216 c
217 } else {
218 '-'
219 }
220 })
221 .collect::<String>()
222 .to_lowercase();
223 format!("lightshuttle-{sanitized}")
224}
225
226impl ContainerRuntime for DockerRuntime {
227 async fn ensure_project_network(&self, project: &str) -> Result<()> {
228 let name = network_name(project);
229
230 match self.client.inspect_network(&name, None).await {
231 Ok(_) => return Ok(()),
232 Err(bollard::errors::Error::DockerResponseServerError {
233 status_code: 404, ..
234 }) => {}
235 Err(e) => return Err(RuntimeError::NetworkCreate { name, source: e }),
236 }
237
238 let mut labels = HashMap::new();
239 labels.insert(LABEL_PROJECT.to_owned(), project.to_owned());
240 let config = NetworkCreateRequest {
241 name: name.clone(),
242 driver: Some("bridge".to_owned()),
243 labels: Some(labels),
244 ..Default::default()
245 };
246 match self.client.create_network(config).await {
247 Ok(_)
249 | Err(bollard::errors::Error::DockerResponseServerError {
250 status_code: 409, ..
251 }) => Ok(()),
252 Err(e) => Err(RuntimeError::NetworkCreate { name, source: e }),
253 }
254 }
255
256 async fn teardown_project_network(&self, project: &str) -> Result<()> {
257 let name = network_name(project);
258 match self.client.remove_network(&name).await {
259 Ok(())
260 | Err(bollard::errors::Error::DockerResponseServerError {
261 status_code: 404, ..
262 }) => Ok(()),
263 Err(e) => Err(RuntimeError::NetworkRemove { name, source: e }),
264 }
265 }
266
267 async fn start(&self, spec: &ContainerSpec) -> Result<ContainerId> {
268 let image_ref = match &spec.image {
269 ImageSource::Pull(image) => {
270 self.ensure_image(image).await?;
271 image.clone()
272 }
273 ImageSource::Build {
274 context,
275 dockerfile,
276 build_args,
277 target,
278 tag,
279 } => {
280 self.build_image(context, dockerfile, build_args, target.as_deref(), tag)
281 .await?;
282 tag.clone()
283 }
284 };
285
286 self.ensure_project_network(&spec.project).await?;
287
288 let net = network_name(&spec.project);
289 let mut endpoints = HashMap::new();
290 endpoints.insert(net, EndpointSettings::default());
291 let networking_config = NetworkingConfig {
292 endpoints_config: Some(endpoints),
293 };
294
295 let host_config = build_host_config(&spec.ports, &spec.volumes);
296 let exposed_ports = build_exposed_ports(&spec.ports);
297 let env = build_env(&spec.env);
298 let healthcheck = spec.healthcheck.as_ref().map(build_healthcheck);
299 let labels = build_labels(&spec.project, &spec.resource);
300
301 let config = ContainerCreateBody {
302 image: Some(image_ref),
303 env: Some(env),
304 cmd: spec.command.clone(),
305 working_dir: spec.working_dir.clone(),
306 host_config: Some(host_config),
307 exposed_ports: Some(exposed_ports),
308 healthcheck,
309 labels: Some(labels),
310 networking_config: Some(networking_config),
311 ..Default::default()
312 };
313
314 let create_options = CreateContainerOptionsBuilder::default()
315 .name(&spec.name)
316 .build();
317
318 let created = self
319 .client
320 .create_container(Some(create_options), config)
321 .await
322 .map_err(RuntimeError::Start)?;
323
324 self.client
325 .start_container(&created.id, None::<StartContainerOptions>)
326 .await
327 .map_err(RuntimeError::Start)?;
328
329 Ok(ContainerId::new(created.id))
330 }
331
332 async fn stop(&self, id: &ContainerId, grace: Duration) -> Result<()> {
333 #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
334 let options = StopContainerOptionsBuilder::default()
335 .t(grace.as_secs() as i32)
336 .build();
337 match self.client.stop_container(id.as_str(), Some(options)).await {
338 Ok(())
339 | Err(bollard::errors::Error::DockerResponseServerError {
340 status_code: 304 | 404,
341 ..
342 }) => Ok(()),
343 Err(e) => Err(RuntimeError::Stop {
344 id: id.to_string(),
345 source: e,
346 }),
347 }
348 }
349
350 async fn remove(&self, name: &str) -> Result<()> {
351 let options = RemoveContainerOptionsBuilder::default().force(true).build();
352 match self.client.remove_container(name, Some(options)).await {
353 Ok(())
354 | Err(bollard::errors::Error::DockerResponseServerError {
355 status_code: 404, ..
356 }) => Ok(()),
357 Err(e) => Err(RuntimeError::Remove {
358 name: name.to_owned(),
359 source: e,
360 }),
361 }
362 }
363
364 async fn inspect(&self, id: &ContainerId) -> Result<ContainerStatus> {
365 let info = self
366 .client
367 .inspect_container(id.as_str(), None)
368 .await
369 .map_err(|e| match e {
370 bollard::errors::Error::DockerResponseServerError {
371 status_code: 404, ..
372 } => RuntimeError::NotFound(id.to_string()),
373 other => RuntimeError::Inspect {
374 id: id.to_string(),
375 source: other,
376 },
377 })?;
378
379 let state = info.state.as_ref();
380 let Some(state) = state else {
381 return Ok(ContainerStatus::Starting);
382 };
383
384 if matches!(state.running, Some(true)) {
385 if let Some(health) = &state.health {
386 return Ok(match health.status {
387 Some(bollard::models::HealthStatusEnum::HEALTHY) => ContainerStatus::Healthy,
388 Some(bollard::models::HealthStatusEnum::UNHEALTHY) => {
389 ContainerStatus::Unhealthy
390 }
391 _ => ContainerStatus::Running,
392 });
393 }
394 return Ok(ContainerStatus::Running);
395 }
396
397 if matches!(state.dead, Some(true))
398 || state.status == Some(bollard::models::ContainerStateStatusEnum::EXITED)
399 {
400 #[allow(clippy::cast_possible_truncation)]
401 let exit_code = state.exit_code.map(|c| c as i32);
402 return Ok(ContainerStatus::Stopped { exit_code });
403 }
404
405 Ok(ContainerStatus::Starting)
406 }
407
408 async fn wait_healthy(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
409 let deadline = Instant::now() + timeout;
410 loop {
411 match self.inspect(id).await? {
412 ContainerStatus::Healthy | ContainerStatus::Running => return Ok(()),
413 ContainerStatus::Unhealthy => {
414 if Instant::now() >= deadline {
415 return Err(RuntimeError::Timeout {
416 operation: "wait_healthy",
417 after: timeout,
418 });
419 }
420 }
421 ContainerStatus::Starting => {}
422 ContainerStatus::Stopped { exit_code } => {
423 return Err(RuntimeError::InvalidSpec(format!(
424 "container `{id}` exited with code {exit_code:?} before becoming healthy"
425 )));
426 }
427 }
428 if Instant::now() >= deadline {
429 return Err(RuntimeError::Timeout {
430 operation: "wait_healthy",
431 after: timeout,
432 });
433 }
434 tokio::time::sleep(POLL_INTERVAL).await;
435 }
436 }
437
438 async fn logs(&self, id: &ContainerId, follow: bool) -> Result<LogChunkStream> {
439 let options = LogsOptionsBuilder::default()
440 .follow(follow)
441 .stdout(true)
442 .stderr(true)
443 .timestamps(true)
444 .build();
445 let stream = self.client.logs(id.as_str(), Some(options));
446 let mapped: Pin<Box<dyn Stream<Item = Result<LogChunk>> + Send>> =
447 Box::pin(stream.map(map_log_item));
448 Ok(mapped)
449 }
450}
451
452fn split_image_ref(image: &str) -> (&str, &str) {
453 image.split_once(':').unwrap_or((image, "latest"))
454}
455
456fn build_env(env: &HashMap<String, String>) -> Vec<String> {
457 env.iter().map(|(k, v)| format!("{k}={v}")).collect()
458}
459
460fn build_labels(project: &str, resource: &str) -> HashMap<String, String> {
461 let mut labels = HashMap::with_capacity(2);
462 labels.insert(LABEL_PROJECT.to_owned(), project.to_owned());
463 labels.insert(LABEL_RESOURCE.to_owned(), resource.to_owned());
464 labels
465}
466
467pub const LABEL_PROJECT: &str = "lightshuttle.project";
470
471pub const LABEL_RESOURCE: &str = "lightshuttle.resource";
474
475#[derive(Debug, Clone)]
477pub struct ManagedContainer {
478 pub id: ContainerId,
480 pub resource: String,
482 pub status: ContainerStatus,
484}
485
486fn parse_summary_state(state: Option<&ContainerSummaryStateEnum>) -> ContainerStatus {
487 match state {
488 Some(ContainerSummaryStateEnum::RUNNING) => ContainerStatus::Running,
489 Some(ContainerSummaryStateEnum::EXITED | ContainerSummaryStateEnum::DEAD) => {
490 ContainerStatus::Stopped { exit_code: None }
491 }
492 _ => ContainerStatus::Starting,
493 }
494}
495
496fn build_exposed_ports(ports: &[PortBinding]) -> Vec<String> {
497 ports
498 .iter()
499 .map(|p| format!("{}/tcp", p.container_port))
500 .collect()
501}
502
503const DEFAULT_HOST_BIND_ADDRESS: &str = "127.0.0.1";
510
511fn build_host_config(ports: &[PortBinding], volumes: &[VolumeBinding]) -> HostConfig {
512 let port_bindings = ports
513 .iter()
514 .map(|p| {
515 let host_ip = p
516 .host_address
517 .clone()
518 .unwrap_or_else(|| DEFAULT_HOST_BIND_ADDRESS.to_owned());
519 let bindings = vec![BollardPortBinding {
520 host_ip: Some(host_ip),
521 host_port: Some(p.host_port.to_string()),
522 }];
523 (format!("{}/tcp", p.container_port), Some(bindings))
524 })
525 .collect::<HashMap<_, _>>();
526
527 let binds: Vec<String> = volumes
528 .iter()
529 .filter_map(|v| match &v.source {
530 VolumeSource::HostPath(path) => Some(format!("{path}:{}", v.target)),
531 VolumeSource::Named(name) => Some(format!("{name}:{}", v.target)),
532 VolumeSource::Anonymous => None,
533 })
534 .collect();
535
536 HostConfig {
537 port_bindings: Some(port_bindings),
538 binds: if binds.is_empty() { None } else { Some(binds) },
539 ..Default::default()
540 }
541}
542
543fn build_healthcheck(hc: &HealthcheckSpec) -> HealthConfig {
544 #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
545 HealthConfig {
546 test: Some(hc.test.clone()),
547 interval: Some(hc.interval.as_nanos() as i64),
548 timeout: Some(hc.timeout.as_nanos() as i64),
549 retries: Some(i64::from(hc.retries)),
550 start_period: Some(hc.start_period.as_nanos() as i64),
551 ..Default::default()
552 }
553}
554
555fn map_log_item(item: std::result::Result<LogOutput, bollard::errors::Error>) -> Result<LogChunk> {
556 match item {
557 Ok(LogOutput::StdErr { message }) => Ok(log_chunk(LogStream::Stderr, &message)),
558 Ok(
559 LogOutput::StdOut { message }
560 | LogOutput::Console { message }
561 | LogOutput::StdIn { message },
562 ) => Ok(log_chunk(LogStream::Stdout, &message)),
563 Err(e) => Err(RuntimeError::LogStream(e)),
564 }
565}
566
567fn log_chunk(stream: LogStream, message: &[u8]) -> LogChunk {
570 let (timestamp, bytes) = split_docker_timestamp(message);
571 LogChunk {
572 stream,
573 timestamp,
574 bytes,
575 }
576}
577
578fn split_docker_timestamp(message: &[u8]) -> (SystemTime, Vec<u8>) {
586 if let Some(space) = message.iter().position(|&b| b == b' ')
587 && let Ok(prefix) = std::str::from_utf8(&message[..space])
588 && let Ok(ts) = prefix.parse::<jiff::Timestamp>()
589 && let Some(system_time) = timestamp_to_system_time(ts)
590 {
591 let payload = message.get(space + 1..).unwrap_or(&[]).to_vec();
592 return (system_time, payload);
593 }
594 (SystemTime::now(), message.to_vec())
595}
596
597fn timestamp_to_system_time(ts: jiff::Timestamp) -> Option<SystemTime> {
600 let nanos = ts.as_nanosecond();
601 if nanos < 0 {
602 return None;
603 }
604 let secs = u64::try_from(nanos / 1_000_000_000).ok()?;
605 let subsec = u32::try_from(nanos % 1_000_000_000).ok()?;
606 Some(SystemTime::UNIX_EPOCH + Duration::new(secs, subsec))
607}
608
609#[cfg(test)]
610mod tests {
611 use super::{PortBinding, build_host_config};
612
613 fn host_ip_for(ports: &[PortBinding], key: &str) -> Option<String> {
614 let config = build_host_config(ports, &[]);
615 config
616 .port_bindings
617 .and_then(|map| map.get(key).cloned())
618 .flatten()
619 .and_then(|bindings| bindings.into_iter().next())
620 .and_then(|binding| binding.host_ip)
621 }
622
623 #[test]
624 fn unspecified_address_binds_to_loopback() {
625 let ports = vec![PortBinding {
626 container_port: 5432,
627 host_address: None,
628 host_port: 5432,
629 }];
630 assert_eq!(
631 host_ip_for(&ports, "5432/tcp").as_deref(),
632 Some("127.0.0.1")
633 );
634 }
635
636 #[test]
637 fn explicit_address_is_preserved() {
638 let ports = vec![PortBinding {
639 container_port: 80,
640 host_address: Some("0.0.0.0".to_owned()),
641 host_port: 8080,
642 }];
643 assert_eq!(host_ip_for(&ports, "80/tcp").as_deref(), Some("0.0.0.0"));
644 }
645
646 #[test]
647 fn timestamped_line_parses_emission_time_and_strips_prefix() {
648 use std::time::SystemTime;
649
650 let (ts, payload) =
651 super::split_docker_timestamp(b"2024-01-01T12:34:56.789012345Z hello world");
652
653 let elapsed = ts
654 .duration_since(SystemTime::UNIX_EPOCH)
655 .expect("post-epoch");
656 assert_eq!(elapsed.as_secs(), 1_704_112_496);
657 assert_eq!(elapsed.subsec_micros(), 789_012);
660 assert_eq!(payload, b"hello world");
661 }
662
663 #[test]
664 fn timestamped_line_without_payload_yields_empty_bytes() {
665 let (_ts, payload) = super::split_docker_timestamp(b"2024-01-01T00:00:00Z \n");
667 assert_eq!(payload, b"\n");
668 }
669
670 #[test]
671 fn untimestamped_line_is_forwarded_verbatim() {
672 let input = b"not-a-timestamp hello world";
675 let (_ts, payload) = super::split_docker_timestamp(input);
676 assert_eq!(payload, input);
677 }
678
679 #[test]
680 fn line_without_space_is_forwarded_verbatim() {
681 let input = b"singletoken";
682 let (_ts, payload) = super::split_docker_timestamp(input);
683 assert_eq!(payload, input);
684 }
685}