1use std::collections::HashMap;
4use std::path::Path;
5use std::pin::Pin;
6use std::time::{Duration, Instant, SystemTime};
7
8use bollard::Docker;
9use bollard::container::LogOutput;
10use bollard::models::{
11 ContainerCreateBody, ContainerSummaryStateEnum, HealthConfig, HostConfig,
12 PortBinding as BollardPortBinding,
13};
14use bollard::query_parameters::{
15 BuildImageOptionsBuilder, BuilderVersion, CreateContainerOptionsBuilder,
16 CreateImageOptionsBuilder, ListContainersOptionsBuilder, LogsOptionsBuilder,
17 RemoveContainerOptionsBuilder, StartContainerOptions, StopContainerOptionsBuilder,
18};
19use bytes::Bytes;
20use futures::stream::{Stream, StreamExt};
21
22use crate::error::{Result, RuntimeError};
23use crate::runtime::{
24 ContainerId, ContainerRuntime, ContainerStatus, LogChunk, LogChunkStream, LogStream,
25};
26use lightshuttle_spec::{
27 ContainerSpec, HealthcheckSpec, ImageSource, PortBinding, VolumeBinding, VolumeSource,
28};
29
30const POLL_INTERVAL: Duration = Duration::from_millis(500);
31
32pub struct DockerRuntime {
37 client: Docker,
38}
39
40impl DockerRuntime {
41 pub fn connect() -> Result<Self> {
43 let client = Docker::connect_with_local_defaults().map_err(RuntimeError::Connect)?;
44 Ok(Self { client })
45 }
46
47 #[must_use]
50 pub fn from_client(client: Docker) -> Self {
51 Self { client }
52 }
53
54 async fn ensure_image(&self, image: &str) -> Result<()> {
55 let (from_image, tag) = split_image_ref(image);
56 let options = CreateImageOptionsBuilder::default()
57 .from_image(from_image)
58 .tag(tag)
59 .build();
60 let mut stream = self.client.create_image(Some(options), None, None);
61 while let Some(event) = stream.next().await {
62 event.map_err(|e| RuntimeError::ImagePull {
63 image: image.to_owned(),
64 source: e,
65 })?;
66 }
67 Ok(())
68 }
69
70 pub async fn list_managed(&self, project: &str) -> Result<Vec<ManagedContainer>> {
74 let label_filter = format!("{LABEL_PROJECT}={project}");
75 let mut filters: HashMap<String, Vec<String>> = HashMap::new();
76 filters.insert("label".to_owned(), vec![label_filter]);
77 let options = ListContainersOptionsBuilder::default()
78 .all(true)
79 .filters(&filters)
80 .build();
81 let summaries = self
82 .client
83 .list_containers(Some(options))
84 .await
85 .map_err(|source| RuntimeError::Inspect {
86 id: format!("project={project}"),
87 source,
88 })?;
89
90 let mut out = Vec::with_capacity(summaries.len());
91 for summary in summaries {
92 let Some(id) = summary.id else { continue };
93 let resource = summary
94 .labels
95 .as_ref()
96 .and_then(|labels| labels.get(LABEL_RESOURCE))
97 .cloned()
98 .unwrap_or_else(|| "<unknown>".to_owned());
99 let status = parse_summary_state(summary.state.as_ref());
100 out.push(ManagedContainer {
101 id: ContainerId::new(id),
102 resource,
103 status,
104 });
105 }
106 out.sort_by(|a, b| a.resource.cmp(&b.resource));
107 Ok(out)
108 }
109
110 async fn build_image(
111 &self,
112 context: &str,
113 dockerfile: &str,
114 build_args: &HashMap<String, String>,
115 target: Option<&str>,
116 tag: &str,
117 ) -> Result<()> {
118 let context_owned = context.to_owned();
119 let tar_bytes =
120 tokio::task::spawn_blocking(move || build_tar_archive(Path::new(&context_owned)))
121 .await
122 .map_err(|join_err| {
123 RuntimeError::InvalidSpec(format!("tar build task panicked: {join_err}"))
124 })?
125 .map_err(|io_err| {
126 RuntimeError::InvalidSpec(format!("failed to build tar archive: {io_err}"))
127 })?;
128
129 let options = BuildImageOptionsBuilder::default()
130 .dockerfile(dockerfile)
131 .t(tag)
132 .rm(true)
133 .buildargs(build_args)
134 .target(target.unwrap_or(""))
135 .version(BuilderVersion::BuilderBuildKit)
136 .build();
137
138 let mut stream = self.client.build_image(
139 options,
140 None,
141 Some(bollard::body_full(Bytes::from(tar_bytes))),
142 );
143 while let Some(event) = stream.next().await {
144 event.map_err(RuntimeError::Build)?;
145 }
146 Ok(())
147 }
148}
149
150fn build_tar_archive(context: &Path) -> std::io::Result<Vec<u8>> {
153 use ignore::WalkBuilder;
154
155 let mut buf: Vec<u8> = Vec::new();
156 {
157 let mut builder = tar::Builder::new(&mut buf);
158 builder.follow_symlinks(false);
159
160 let walker = WalkBuilder::new(context)
161 .add_custom_ignore_filename(".dockerignore")
162 .git_ignore(false)
163 .git_exclude(false)
164 .git_global(false)
165 .hidden(false)
166 .build();
167
168 for entry in walker {
169 let entry = entry.map_err(|e| std::io::Error::other(format!("walk error: {e}")))?;
170 let path = entry.path();
171 let relative = match path.strip_prefix(context) {
172 Ok(p) if !p.as_os_str().is_empty() => p,
173 _ => continue,
174 };
175 let Some(file_type) = entry.file_type() else {
176 continue;
177 };
178 if file_type.is_dir() {
179 builder.append_dir(relative, path)?;
180 } else if file_type.is_file() {
181 let mut file = std::fs::File::open(path)?;
182 builder.append_file(relative, &mut file)?;
183 }
184 }
185 builder.finish()?;
186 }
187 Ok(buf)
188}
189
190impl ContainerRuntime for DockerRuntime {
191 async fn start(&self, spec: &ContainerSpec) -> Result<ContainerId> {
192 let image_ref = match &spec.image {
193 ImageSource::Pull(image) => {
194 self.ensure_image(image).await?;
195 image.clone()
196 }
197 ImageSource::Build {
198 context,
199 dockerfile,
200 build_args,
201 target,
202 tag,
203 } => {
204 self.build_image(context, dockerfile, build_args, target.as_deref(), tag)
205 .await?;
206 tag.clone()
207 }
208 };
209
210 let host_config = build_host_config(&spec.ports, &spec.volumes);
211 let exposed_ports = build_exposed_ports(&spec.ports);
212 let env = build_env(&spec.env);
213 let healthcheck = spec.healthcheck.as_ref().map(build_healthcheck);
214 let labels = build_labels(&spec.project, &spec.resource);
215
216 let config = ContainerCreateBody {
217 image: Some(image_ref),
218 env: Some(env),
219 cmd: spec.command.clone(),
220 working_dir: spec.working_dir.clone(),
221 host_config: Some(host_config),
222 exposed_ports: Some(exposed_ports),
223 healthcheck,
224 labels: Some(labels),
225 ..Default::default()
226 };
227
228 let create_options = CreateContainerOptionsBuilder::default()
229 .name(&spec.name)
230 .build();
231
232 let created = self
233 .client
234 .create_container(Some(create_options), config)
235 .await
236 .map_err(RuntimeError::Start)?;
237
238 self.client
239 .start_container(&created.id, None::<StartContainerOptions>)
240 .await
241 .map_err(RuntimeError::Start)?;
242
243 Ok(ContainerId::new(created.id))
244 }
245
246 async fn stop(&self, id: &ContainerId, grace: Duration) -> Result<()> {
247 #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
248 let options = StopContainerOptionsBuilder::default()
249 .t(grace.as_secs() as i32)
250 .build();
251 match self.client.stop_container(id.as_str(), Some(options)).await {
252 Ok(())
253 | Err(bollard::errors::Error::DockerResponseServerError {
254 status_code: 304 | 404,
255 ..
256 }) => Ok(()),
257 Err(e) => Err(RuntimeError::Stop {
258 id: id.to_string(),
259 source: e,
260 }),
261 }
262 }
263
264 async fn remove(&self, name: &str) -> Result<()> {
265 let options = RemoveContainerOptionsBuilder::default().force(true).build();
266 match self.client.remove_container(name, Some(options)).await {
267 Ok(())
268 | Err(bollard::errors::Error::DockerResponseServerError {
269 status_code: 404, ..
270 }) => Ok(()),
271 Err(e) => Err(RuntimeError::Remove {
272 name: name.to_owned(),
273 source: e,
274 }),
275 }
276 }
277
278 async fn inspect(&self, id: &ContainerId) -> Result<ContainerStatus> {
279 let info = self
280 .client
281 .inspect_container(id.as_str(), None)
282 .await
283 .map_err(|e| match e {
284 bollard::errors::Error::DockerResponseServerError {
285 status_code: 404, ..
286 } => RuntimeError::NotFound(id.to_string()),
287 other => RuntimeError::Inspect {
288 id: id.to_string(),
289 source: other,
290 },
291 })?;
292
293 let state = info.state.as_ref();
294 let Some(state) = state else {
295 return Ok(ContainerStatus::Starting);
296 };
297
298 if matches!(state.running, Some(true)) {
299 if let Some(health) = &state.health {
300 return Ok(match health.status {
301 Some(bollard::models::HealthStatusEnum::HEALTHY) => ContainerStatus::Healthy,
302 Some(bollard::models::HealthStatusEnum::UNHEALTHY) => {
303 ContainerStatus::Unhealthy
304 }
305 _ => ContainerStatus::Running,
306 });
307 }
308 return Ok(ContainerStatus::Running);
309 }
310
311 if matches!(state.dead, Some(true))
312 || state.status == Some(bollard::models::ContainerStateStatusEnum::EXITED)
313 {
314 #[allow(clippy::cast_possible_truncation)]
315 let exit_code = state.exit_code.map(|c| c as i32);
316 return Ok(ContainerStatus::Stopped { exit_code });
317 }
318
319 Ok(ContainerStatus::Starting)
320 }
321
322 async fn wait_healthy(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
323 let deadline = Instant::now() + timeout;
324 loop {
325 match self.inspect(id).await? {
326 ContainerStatus::Healthy | ContainerStatus::Running => return Ok(()),
327 ContainerStatus::Unhealthy => {
328 if Instant::now() >= deadline {
329 return Err(RuntimeError::Timeout {
330 operation: "wait_healthy",
331 after: timeout,
332 });
333 }
334 }
335 ContainerStatus::Starting => {}
336 ContainerStatus::Stopped { exit_code } => {
337 return Err(RuntimeError::InvalidSpec(format!(
338 "container `{id}` exited with code {exit_code:?} before becoming healthy"
339 )));
340 }
341 }
342 if Instant::now() >= deadline {
343 return Err(RuntimeError::Timeout {
344 operation: "wait_healthy",
345 after: timeout,
346 });
347 }
348 tokio::time::sleep(POLL_INTERVAL).await;
349 }
350 }
351
352 async fn logs(&self, id: &ContainerId, follow: bool) -> Result<LogChunkStream> {
353 let options = LogsOptionsBuilder::default()
354 .follow(follow)
355 .stdout(true)
356 .stderr(true)
357 .timestamps(true)
358 .build();
359 let stream = self.client.logs(id.as_str(), Some(options));
360 let mapped: Pin<Box<dyn Stream<Item = Result<LogChunk>> + Send>> =
361 Box::pin(stream.map(map_log_item));
362 Ok(mapped)
363 }
364}
365
366fn split_image_ref(image: &str) -> (&str, &str) {
367 image.split_once(':').unwrap_or((image, "latest"))
368}
369
370fn build_env(env: &HashMap<String, String>) -> Vec<String> {
371 env.iter().map(|(k, v)| format!("{k}={v}")).collect()
372}
373
374fn build_labels(project: &str, resource: &str) -> HashMap<String, String> {
375 let mut labels = HashMap::with_capacity(2);
376 labels.insert(LABEL_PROJECT.to_owned(), project.to_owned());
377 labels.insert(LABEL_RESOURCE.to_owned(), resource.to_owned());
378 labels
379}
380
381pub const LABEL_PROJECT: &str = "lightshuttle.project";
384
385pub const LABEL_RESOURCE: &str = "lightshuttle.resource";
388
389#[derive(Debug, Clone)]
391pub struct ManagedContainer {
392 pub id: ContainerId,
394 pub resource: String,
396 pub status: ContainerStatus,
398}
399
400fn parse_summary_state(state: Option<&ContainerSummaryStateEnum>) -> ContainerStatus {
401 match state {
402 Some(ContainerSummaryStateEnum::RUNNING) => ContainerStatus::Running,
403 Some(ContainerSummaryStateEnum::EXITED | ContainerSummaryStateEnum::DEAD) => {
404 ContainerStatus::Stopped { exit_code: None }
405 }
406 _ => ContainerStatus::Starting,
407 }
408}
409
410fn build_exposed_ports(ports: &[PortBinding]) -> Vec<String> {
411 ports
412 .iter()
413 .map(|p| format!("{}/tcp", p.container_port))
414 .collect()
415}
416
417const DEFAULT_HOST_BIND_ADDRESS: &str = "127.0.0.1";
424
425fn build_host_config(ports: &[PortBinding], volumes: &[VolumeBinding]) -> HostConfig {
426 let port_bindings = ports
427 .iter()
428 .map(|p| {
429 let host_ip = p
430 .host_address
431 .clone()
432 .unwrap_or_else(|| DEFAULT_HOST_BIND_ADDRESS.to_owned());
433 let bindings = vec![BollardPortBinding {
434 host_ip: Some(host_ip),
435 host_port: Some(p.host_port.to_string()),
436 }];
437 (format!("{}/tcp", p.container_port), Some(bindings))
438 })
439 .collect::<HashMap<_, _>>();
440
441 let binds: Vec<String> = volumes
442 .iter()
443 .filter_map(|v| match &v.source {
444 VolumeSource::HostPath(path) => Some(format!("{path}:{}", v.target)),
445 VolumeSource::Named(name) => Some(format!("{name}:{}", v.target)),
446 VolumeSource::Anonymous => None,
447 })
448 .collect();
449
450 HostConfig {
451 port_bindings: Some(port_bindings),
452 binds: if binds.is_empty() { None } else { Some(binds) },
453 ..Default::default()
454 }
455}
456
457fn build_healthcheck(hc: &HealthcheckSpec) -> HealthConfig {
458 #[allow(clippy::cast_possible_wrap, clippy::cast_possible_truncation)]
459 HealthConfig {
460 test: Some(hc.test.clone()),
461 interval: Some(hc.interval.as_nanos() as i64),
462 timeout: Some(hc.timeout.as_nanos() as i64),
463 retries: Some(i64::from(hc.retries)),
464 start_period: Some(hc.start_period.as_nanos() as i64),
465 ..Default::default()
466 }
467}
468
469fn map_log_item(item: std::result::Result<LogOutput, bollard::errors::Error>) -> Result<LogChunk> {
470 match item {
471 Ok(LogOutput::StdErr { message }) => Ok(log_chunk(LogStream::Stderr, &message)),
472 Ok(
473 LogOutput::StdOut { message }
474 | LogOutput::Console { message }
475 | LogOutput::StdIn { message },
476 ) => Ok(log_chunk(LogStream::Stdout, &message)),
477 Err(e) => Err(RuntimeError::LogStream(e)),
478 }
479}
480
481fn log_chunk(stream: LogStream, message: &[u8]) -> LogChunk {
484 let (timestamp, bytes) = split_docker_timestamp(message);
485 LogChunk {
486 stream,
487 timestamp,
488 bytes,
489 }
490}
491
492fn split_docker_timestamp(message: &[u8]) -> (SystemTime, Vec<u8>) {
500 if let Some(space) = message.iter().position(|&b| b == b' ')
501 && let Ok(prefix) = std::str::from_utf8(&message[..space])
502 && let Ok(ts) = prefix.parse::<jiff::Timestamp>()
503 && let Some(system_time) = timestamp_to_system_time(ts)
504 {
505 let payload = message.get(space + 1..).unwrap_or(&[]).to_vec();
506 return (system_time, payload);
507 }
508 (SystemTime::now(), message.to_vec())
509}
510
511fn timestamp_to_system_time(ts: jiff::Timestamp) -> Option<SystemTime> {
514 let nanos = ts.as_nanosecond();
515 if nanos < 0 {
516 return None;
517 }
518 let secs = u64::try_from(nanos / 1_000_000_000).ok()?;
519 let subsec = u32::try_from(nanos % 1_000_000_000).ok()?;
520 Some(SystemTime::UNIX_EPOCH + Duration::new(secs, subsec))
521}
522
523#[cfg(test)]
524mod tests {
525 use super::{PortBinding, build_host_config};
526
527 fn host_ip_for(ports: &[PortBinding], key: &str) -> Option<String> {
528 let config = build_host_config(ports, &[]);
529 config
530 .port_bindings
531 .and_then(|map| map.get(key).cloned())
532 .flatten()
533 .and_then(|bindings| bindings.into_iter().next())
534 .and_then(|binding| binding.host_ip)
535 }
536
537 #[test]
538 fn unspecified_address_binds_to_loopback() {
539 let ports = vec![PortBinding {
540 container_port: 5432,
541 host_address: None,
542 host_port: 5432,
543 }];
544 assert_eq!(
545 host_ip_for(&ports, "5432/tcp").as_deref(),
546 Some("127.0.0.1")
547 );
548 }
549
550 #[test]
551 fn explicit_address_is_preserved() {
552 let ports = vec![PortBinding {
553 container_port: 80,
554 host_address: Some("0.0.0.0".to_owned()),
555 host_port: 8080,
556 }];
557 assert_eq!(host_ip_for(&ports, "80/tcp").as_deref(), Some("0.0.0.0"));
558 }
559
560 #[test]
561 fn timestamped_line_parses_emission_time_and_strips_prefix() {
562 use std::time::SystemTime;
563
564 let (ts, payload) =
565 super::split_docker_timestamp(b"2024-01-01T12:34:56.789012345Z hello world");
566
567 let elapsed = ts
568 .duration_since(SystemTime::UNIX_EPOCH)
569 .expect("post-epoch");
570 assert_eq!(elapsed.as_secs(), 1_704_112_496);
571 assert_eq!(elapsed.subsec_micros(), 789_012);
574 assert_eq!(payload, b"hello world");
575 }
576
577 #[test]
578 fn timestamped_line_without_payload_yields_empty_bytes() {
579 let (_ts, payload) = super::split_docker_timestamp(b"2024-01-01T00:00:00Z \n");
581 assert_eq!(payload, b"\n");
582 }
583
584 #[test]
585 fn untimestamped_line_is_forwarded_verbatim() {
586 let input = b"not-a-timestamp hello world";
589 let (_ts, payload) = super::split_docker_timestamp(input);
590 assert_eq!(payload, input);
591 }
592
593 #[test]
594 fn line_without_space_is_forwarded_verbatim() {
595 let input = b"singletoken";
596 let (_ts, payload) = super::split_docker_timestamp(input);
597 assert_eq!(payload, input);
598 }
599}