zlayer_agent/runtime.rs
1//! Abstract container runtime interface
2//!
3//! Defines the Runtime trait that can be implemented for different container runtimes
4//! (containerd, CRI-O, etc.)
5
6use crate::cgroups_stats::ContainerStats;
7use crate::error::{AgentError, Result};
8use futures_util::Stream;
9use std::collections::VecDeque;
10use std::future::Future;
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::io::{AsyncRead, AsyncWrite};
16use tokio::sync::Mutex;
17use tokio::task::JoinHandle;
18use zlayer_observability::logs::{LogEntry, LogSource, LogStream};
19use zlayer_spec::{PullPolicy, RegistryAuth, ServiceSpec};
20
21/// Container state
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum ContainerState {
24 /// Container is being pulled/created
25 Pending,
26 /// Init actions are running
27 Initializing,
28 /// Container is running
29 Running,
30 /// Container is stopping
31 Stopping,
32 /// Container has exited
33 Exited { code: i32 },
34 /// Container failed
35 Failed { reason: String },
36}
37
38impl ContainerState {
39 /// Stable lowercase string representation of the state.
40 ///
41 /// Used when surfacing container state through the API / `ps` output.
42 /// `Running` stringifies to `"running"` (matched case-insensitively by the
43 /// raft e2e harness when counting healthy replicas).
44 #[must_use]
45 pub fn as_str(&self) -> &'static str {
46 match self {
47 Self::Pending => "pending",
48 Self::Initializing => "initializing",
49 Self::Running => "running",
50 Self::Stopping => "stopping",
51 Self::Exited { .. } => "exited",
52 Self::Failed { .. } => "failed",
53 }
54 }
55}
56
57impl std::fmt::Display for ContainerState {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 f.write_str(self.as_str())
60 }
61}
62
63/// Container identifier.
64///
65/// Identifies a container by `(service, replica)` for the legacy single-group
66/// case, and extends with `role` + `node_id` for cluster-aware
67/// multi-group services and cross-node identification.
68///
69/// Defaults: `role = "default"`, `node_id = 0`. Existing constructors
70/// (`ContainerId::new(service, replica)`) produce these defaults. Use
71/// `ContainerId::with_role_and_node(...)` when the new fields matter.
72///
73/// Display:
74/// - With defaults: `{service}-rep-{replica}` (backward compat).
75/// - Otherwise: `{service}-{role}-{replica}-on-{node_id}`.
76#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
77pub struct ContainerId {
78 pub service: String,
79 pub replica: u32,
80 /// Role within `replica_groups`. `"default"` for services without groups.
81 #[serde(default = "default_container_role")]
82 pub role: String,
83 /// Cluster node that owns this container. `0` in single-node deployments
84 /// or before the cluster is initialized.
85 #[serde(default)]
86 pub node_id: u64,
87}
88
89fn default_container_role() -> String {
90 "default".to_string()
91}
92
93impl ContainerId {
94 /// Build a legacy `{service, replica}` `ContainerId` with default `role`
95 /// and `node_id`. Used by all existing callsites — behavior is unchanged.
96 #[must_use]
97 pub fn new(service: impl Into<String>, replica: u32) -> Self {
98 Self {
99 service: service.into(),
100 replica,
101 role: default_container_role(),
102 node_id: 0,
103 }
104 }
105
106 /// Build a cluster-aware `ContainerId` with explicit `role` and `node_id`.
107 /// Used by `ServiceManager` when a service has `replica_groups` or when
108 /// the daemon participates in a multi-node cluster.
109 #[must_use]
110 pub fn with_role_and_node(
111 service: impl Into<String>,
112 replica: u32,
113 role: impl Into<String>,
114 node_id: u64,
115 ) -> Self {
116 Self {
117 service: service.into(),
118 replica,
119 role: role.into(),
120 node_id,
121 }
122 }
123
124 /// True when both `role` and `node_id` are at their defaults — i.e.
125 /// this is a legacy-shape `ContainerId`.
126 #[must_use]
127 pub fn is_legacy_shape(&self) -> bool {
128 self.role == "default" && self.node_id == 0
129 }
130
131 /// Parse a `ContainerId` back from its [`Display`](std::fmt::Display) form.
132 ///
133 /// This is the exact inverse of `Display`:
134 /// - `"{service}-rep-{replica}"` (legacy shape) → `ContainerId::new`.
135 /// - `"{service}-{role}-{replica}-on-{node_id}"` (cluster shape) →
136 /// `ContainerId::with_role_and_node`.
137 ///
138 /// The service name may itself contain `-`, so parsing anchors on the
139 /// rightmost structural markers (`-on-` then the trailing `-rep-`/`-{role}-`
140 /// segment) rather than splitting left-to-right. Returns `None` for any
141 /// string that does not match either shape (e.g. a hex id or a bare name).
142 #[must_use]
143 pub fn parse_display(s: &str) -> Option<Self> {
144 // Cluster shape: `{service}-{role}-{replica}-on-{node_id}`.
145 if let Some((head, node_str)) = s.rsplit_once("-on-") {
146 let node_id: u64 = node_str.parse().ok()?;
147 // `head` = `{service}-{role}-{replica}`. The replica is the last
148 // `-`-segment; the role is the segment before it; everything left
149 // of that is the (possibly hyphenated) service.
150 let (service_role, replica_str) = head.rsplit_once('-')?;
151 let replica: u32 = replica_str.parse().ok()?;
152 let (service, role) = service_role.rsplit_once('-')?;
153 if service.is_empty() || role.is_empty() {
154 return None;
155 }
156 return Some(Self::with_role_and_node(service, replica, role, node_id));
157 }
158 // Legacy shape: `{service}-rep-{replica}`.
159 if let Some((service, replica_str)) = s.rsplit_once("-rep-") {
160 let replica: u32 = replica_str.parse().ok()?;
161 if service.is_empty() {
162 return None;
163 }
164 return Some(Self::new(service, replica));
165 }
166 None
167 }
168}
169
170impl std::fmt::Display for ContainerId {
171 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172 if self.is_legacy_shape() {
173 write!(f, "{}-rep-{}", self.service, self.replica)
174 } else {
175 write!(
176 f,
177 "{}-{}-{}-on-{}",
178 self.service, self.role, self.replica, self.node_id
179 )
180 }
181 }
182}
183
184/// Container handle
185pub struct Container {
186 pub id: ContainerId,
187 /// Image reference this container was created from (canonical form, e.g.
188 /// `docker.io/library/nginx:1.29-alpine`). Surfaced through the API/`ps`.
189 pub image: String,
190 pub state: ContainerState,
191 pub pid: Option<u32>,
192 pub task: Option<JoinHandle<std::io::Result<()>>>,
193 /// Overlay network IP address assigned to this container
194 pub overlay_ip: Option<IpAddr>,
195 /// Health monitor task handle for this container
196 pub health_monitor: Option<JoinHandle<()>>,
197 /// Runtime-assigned port override (used by macOS sandbox where all
198 /// containers share the host network and need unique ports).
199 /// When `Some(port)`, the proxy should use this port instead of the
200 /// spec-declared endpoint port for this specific container's backend address.
201 pub port_override: Option<u16>,
202}
203
204/// Summary information about a cached image on the host runtime.
205#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
206pub struct ImageInfo {
207 /// Canonical image reference (e.g. `zachhandley/zlayer-manager:latest`).
208 pub reference: String,
209 /// Content-addressed digest if known (`sha256:...`). `None` when the
210 /// backend only tracks images by tag.
211 pub digest: Option<String>,
212 /// Total on-disk / in-cache size in bytes, when available.
213 pub size_bytes: Option<u64>,
214}
215
216/// Result of a prune operation.
217#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
218pub struct PruneResult {
219 /// Image references that were removed.
220 pub deleted: Vec<String>,
221 /// Bytes reclaimed from the cache. `0` when the backend cannot report.
222 pub space_reclaimed: u64,
223}
224
225/// Reason a container stopped running, as reported by [`Runtime::wait_outcome`].
226///
227/// Serialized as `snake_case` strings on the wire (`exited`, `signal`,
228/// `oom_killed`, `runtime_error`) so the API DTO can emit the reason as-is
229/// without a second translation layer.
230#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
231#[serde(rename_all = "snake_case")]
232pub enum WaitReason {
233 /// Container exited normally.
234 Exited,
235 /// Container was killed by a signal (e.g. `SIGKILL`, `SIGTERM`).
236 Signal,
237 /// Container was killed by the OOM killer.
238 OomKilled,
239 /// Runtime-side failure (pre-start error, runtime crash, etc.).
240 RuntimeError,
241}
242
243/// Wait condition mirroring Docker's `POST /containers/{id}/wait?condition=`.
244///
245/// Maps 1:1 to the wire form (`not-running`, `next-exit`, `removed`) via
246/// kebab-case serde so the daemon and the Docker compat shim can deserialize
247/// the query parameter in a single step. The default condition is
248/// [`WaitCondition::NotRunning`], matching Docker's behaviour when the
249/// `condition` query param is omitted.
250#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
251#[serde(rename_all = "kebab-case")]
252pub enum WaitCondition {
253 /// Wait until the container is not running. This is the default when
254 /// the caller doesn't specify a condition. Returns immediately if the
255 /// container has already exited; otherwise blocks until it does.
256 #[default]
257 NotRunning,
258 /// Wait for the next container exit, even if the container is already
259 /// stopped at the time of the call. Restarts the wait loop on each
260 /// observed exit.
261 NextExit,
262 /// Wait until the container is removed. Useful in conjunction with
263 /// `--rm` / `AutoRemove` lifecycle wiring.
264 Removed,
265}
266
267impl WaitCondition {
268 /// Return the wire string this variant serializes to (`"not-running"`,
269 /// `"next-exit"`, `"removed"`), matching Docker's `condition=` query
270 /// parameter spelling.
271 #[must_use]
272 pub const fn as_wire_str(self) -> &'static str {
273 match self {
274 Self::NotRunning => "not-running",
275 Self::NextExit => "next-exit",
276 Self::Removed => "removed",
277 }
278 }
279
280 /// Parse a Docker-style condition string (`"not-running"`, `"next-exit"`,
281 /// `"removed"`). Returns `None` for unknown values so callers can
282 /// distinguish "default" (omitted) from "rejected".
283 #[must_use]
284 pub fn from_wire_str(s: &str) -> Option<Self> {
285 match s {
286 "not-running" | "" => Some(Self::NotRunning),
287 "next-exit" => Some(Self::NextExit),
288 "removed" => Some(Self::Removed),
289 _ => None,
290 }
291 }
292}
293
294/// Richer wait result returned by [`Runtime::wait_outcome`].
295///
296/// Backwards-compatible with [`Runtime::wait_container`] (which returns just
297/// `exit_code`). The API handler uses this to populate the extended
298/// `ContainerWaitResponse` fields (`reason`, `signal`, `finished_at`) while
299/// existing callers that only need the exit code can keep using
300/// `wait_container`.
301#[derive(Debug, Clone)]
302pub struct WaitOutcome {
303 /// Process exit code (0 = success). When the container was killed by
304 /// signal `N`, this is typically `128 + N`.
305 pub exit_code: i32,
306 /// Classification of the exit.
307 pub reason: WaitReason,
308 /// Signal name when `reason == WaitReason::Signal`, e.g. `"SIGKILL"`.
309 /// Derived from `exit_code - 128` on a best-effort basis.
310 pub signal: Option<String>,
311 /// Time the container exited, if the runtime reports it.
312 pub finished_at: Option<chrono::DateTime<chrono::Utc>>,
313}
314
315impl WaitOutcome {
316 /// Build a plain `Exited` outcome with no signal/timestamp metadata — the
317 /// default that matches the pre-§3.12 behaviour.
318 #[must_use]
319 pub fn exited(exit_code: i32) -> Self {
320 Self {
321 exit_code,
322 reason: WaitReason::Exited,
323 signal: None,
324 finished_at: None,
325 }
326 }
327}
328
329/// Map a signal-style exit code (`128 + N`) to a canonical signal name.
330///
331/// Recognises common POSIX signals and falls back to `signal_<n>` for
332/// unknown numbers so the caller always gets *something* readable.
333#[must_use]
334pub fn signal_name_from_exit_code(exit_code: i32) -> Option<String> {
335 if exit_code <= 128 {
336 return None;
337 }
338 let n = exit_code - 128;
339 let name = match n {
340 1 => "SIGHUP",
341 2 => "SIGINT",
342 3 => "SIGQUIT",
343 4 => "SIGILL",
344 6 => "SIGABRT",
345 7 => "SIGBUS",
346 8 => "SIGFPE",
347 9 => "SIGKILL",
348 10 => "SIGUSR1",
349 11 => "SIGSEGV",
350 12 => "SIGUSR2",
351 13 => "SIGPIPE",
352 14 => "SIGALRM",
353 15 => "SIGTERM",
354 17 => "SIGSTOP",
355 18 => "SIGCONT",
356 _ => return Some(format!("signal_{n}")),
357 };
358 Some(name.to_string())
359}
360
361/// One streaming event emitted by [`Runtime::exec_stream`].
362///
363/// Runtimes push these events as the exec'd command produces output. The final
364/// event for any successful stream is always an [`ExecEvent::Exit`] carrying
365/// the process's exit code.
366#[derive(Debug, Clone, PartialEq, Eq)]
367pub enum ExecEvent {
368 /// A chunk of stdout from the running command. Emitted line-by-line by
369 /// Docker; other runtimes may emit the full buffered output in one event.
370 Stdout(String),
371 /// A chunk of stderr from the running command.
372 Stderr(String),
373 /// The command has exited with this exit code. Always the final event.
374 Exit(i32),
375}
376
377/// Boxed async stream of [`ExecEvent`]s returned by [`Runtime::exec_stream`].
378pub type ExecEventStream = Pin<Box<dyn Stream<Item = ExecEvent> + Send>>;
379
380/// Options accepted by [`Runtime::exec_pty`].
381///
382/// Mirrors the union of fields exposed by Docker's `POST /containers/{id}/exec`
383/// (`ExecConfig`) so the daemon can pass them through with minimal translation.
384/// Unlike [`Runtime::exec`] / [`Runtime::exec_stream`] (which capture stdout
385/// and stderr separately and return only after the process exits), `exec_pty`
386/// is the interactive entry point: it allocates a PTY when `tty` is set,
387/// streams I/O bidirectionally over a single duplex byte stream, and returns
388/// an [`ExecHandle`] that the caller drives concurrently with the running
389/// process.
390#[derive(Debug, Clone, Default, PartialEq, Eq)]
391#[allow(clippy::struct_excessive_bools)] // mirrors Docker's `ExecConfig` 1:1
392pub struct ExecOptions {
393 /// The argv vector for the exec'd process (`command[0]` is the binary).
394 pub command: Vec<String>,
395 /// Extra environment variables, in `KEY=VALUE` form. Merged into the
396 /// container's existing env on the runtime side.
397 pub env: Vec<String>,
398 /// Optional working directory inside the container. `None` keeps the
399 /// container's default `WORKDIR`.
400 pub working_dir: Option<String>,
401 /// Optional `user[:group]` override. `None` keeps the container's
402 /// configured user.
403 pub user: Option<String>,
404 /// Run the exec with privileged capabilities (Docker `Privileged`).
405 pub privileged: bool,
406 /// Allocate a TTY for the exec'd process (Docker `Tty`). When `true`, the
407 /// runtime should set up a pseudo-terminal pair and the duplex stream on
408 /// the returned [`ExecHandle`] carries multiplexed PTY traffic; when
409 /// `false`, the stream carries raw stdout/stderr without PTY framing.
410 pub tty: bool,
411 /// Attach stdin so the caller can write to the process (Docker
412 /// `AttachStdin`). When `false`, the writable half of the duplex stream
413 /// is effectively a no-op.
414 pub attach_stdin: bool,
415 /// Attach stdout so the caller receives the process's stdout on the
416 /// readable half of the duplex stream (Docker `AttachStdout`).
417 pub attach_stdout: bool,
418 /// Attach stderr so the caller receives the process's stderr on the
419 /// readable half of the duplex stream (Docker `AttachStderr`).
420 pub attach_stderr: bool,
421}
422
423/// Marker supertrait combining [`AsyncRead`] + [`AsyncWrite`] so they can be
424/// used together as a single trait object. Rust forbids stacking two
425/// non-auto traits directly in `dyn`, so [`ExecPtyStream`] is built on top
426/// of this helper instead.
427///
428/// A blanket impl below covers every type that already satisfies the four
429/// component bounds, so callers never need to implement `ExecDuplex`
430/// manually — they just hand the runtime any concrete duplex stream that's
431/// `AsyncRead + AsyncWrite + Send + Unpin`.
432pub trait ExecDuplex: AsyncRead + AsyncWrite + Send + Unpin {}
433
434impl<T> ExecDuplex for T where T: AsyncRead + AsyncWrite + Send + Unpin + ?Sized {}
435
436/// Duplex byte stream used by [`ExecHandle`] to shuttle stdin/stdout (and,
437/// when [`ExecOptions::tty`] is set, multiplexed PTY traffic) between the
438/// caller and the exec'd process.
439///
440/// `Unpin` is required (via [`ExecDuplex`]) so callers can poll the trait
441/// object directly via the usual `tokio::io::AsyncReadExt` /
442/// `AsyncWriteExt` extension methods without having to pin the box
443/// themselves.
444pub type ExecPtyStream = Box<dyn ExecDuplex + 'static>;
445
446/// Future returned by [`ExecHandle`] that resolves with the exec'd process's
447/// exit code once the runtime observes it has terminated.
448pub type ExecExitFuture = Pin<Box<dyn Future<Output = Result<i32>> + Send>>;
449
450/// Runtime-side handle returned by [`Runtime::exec_pty`].
451///
452/// Bundles everything a long-lived interactive exec session needs:
453///
454/// 1. A duplex [`ExecPtyStream`] for shuttling stdin/stdout (or full PTY
455/// traffic when `tty` is set) between the caller and the running process.
456/// 2. A `tokio::sync::mpsc::Sender<(rows, cols)>` so the caller can resize
457/// the allocated PTY in response to terminal-size changes (mirrors
458/// Docker's `POST /exec/{id}/resize` endpoint). Runtimes that don't allocate
459/// a PTY should still accept the channel and treat resize messages as
460/// no-ops; the channel is dropped on the runtime side once the process
461/// exits.
462/// 3. A boxed [`ExecExitFuture`] that resolves with the exit code once the
463/// runtime detects the process has terminated.
464///
465/// `ExecHandle` deliberately does not implement `Debug` / `Clone` because
466/// every field holds a trait object (or, in the exit future's case, an opaque
467/// boxed future). Consumers move the fields out by destructuring and then
468/// drive the I/O stream, the resize channel, and the exit future
469/// independently.
470pub struct ExecHandle {
471 /// Bidirectional byte channel between the caller and the exec'd process.
472 pub stream: ExecPtyStream,
473 /// Channel for sending `(rows, cols)` resize requests for the PTY
474 /// allocated to the exec session. Bounded so a stuck runtime can't make
475 /// the caller buffer unbounded resize events; senders should drop the
476 /// most recent excess size on backpressure.
477 pub resize: tokio::sync::mpsc::Sender<(u16, u16)>,
478 /// Future that resolves with the process's exit code once the runtime
479 /// observes the exec has terminated. Consumers typically `await` this on
480 /// a dedicated task while pumping the duplex stream on another.
481 pub exit: ExecExitFuture,
482}
483
484/// Which standard stream a [`LogChunk`] originated from.
485///
486/// Mirrors the three POSIX file descriptors. `Stdin` is included for
487/// completeness — Docker's multiplexed log header carries a `stdin` channel
488/// for attached interactive containers — but most container runtimes only
489/// emit `Stdout` / `Stderr` chunks.
490#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
491#[serde(rename_all = "snake_case")]
492pub enum LogChannel {
493 /// Standard input (rarely emitted; included for completeness with
494 /// Docker's stdcopy framing).
495 Stdin,
496 /// Standard output.
497 Stdout,
498 /// Standard error.
499 Stderr,
500}
501
502/// One chunk of container log output emitted by [`Runtime::logs_stream`].
503///
504/// Streams emit one `LogChunk` per line (or per Docker stdcopy frame) as
505/// data is produced by the container. Backends that expose timestamps
506/// populate `timestamp`; ones that don't leave it `None`.
507#[derive(Debug, Clone)]
508pub struct LogChunk {
509 /// Which standard stream produced this chunk.
510 pub stream: LogChannel,
511 /// Raw bytes of the chunk. Not necessarily UTF-8 — container output is
512 /// arbitrary binary data and consumers must handle invalid UTF-8.
513 pub bytes: bytes::Bytes,
514 /// When the runtime reported this chunk, when known.
515 pub timestamp: Option<chrono::DateTime<chrono::Utc>>,
516}
517
518/// Options accepted by [`Runtime::logs_stream`].
519///
520/// Mirrors the `GET /containers/{id}/logs` query parameters in the Docker
521/// Engine API so backends can pass them through with minimal translation.
522#[derive(Debug, Clone, Default)]
523#[allow(clippy::struct_excessive_bools)] // mirrors Docker's logs query params 1:1
524pub struct LogsStreamOptions {
525 /// Continue streaming after the current end-of-log marker. When `false`,
526 /// the stream terminates once the runtime has emitted all currently
527 /// buffered logs.
528 pub follow: bool,
529 /// Tail the last N lines before starting to stream. `None` means "all
530 /// available logs from the start".
531 pub tail: Option<u64>,
532 /// Earliest timestamp (Unix seconds) to include. `None` means "no
533 /// lower bound".
534 pub since: Option<i64>,
535 /// Latest timestamp (Unix seconds) to include. `None` means "no upper
536 /// bound".
537 pub until: Option<i64>,
538 /// When `true`, the runtime should populate [`LogChunk::timestamp`] for
539 /// every chunk. Backends that always carry timestamps may ignore this
540 /// flag.
541 pub timestamps: bool,
542 /// Include stdout in the stream.
543 pub stdout: bool,
544 /// Include stderr in the stream.
545 pub stderr: bool,
546}
547
548/// One periodic resource-usage sample emitted by [`Runtime::stats_stream`].
549///
550/// Mirrors the union of fields exposed by Docker's `/containers/{id}/stats`
551/// endpoint and Linux cgroup stat files so downstream consumers (autoscaler,
552/// `docker stats`-compat HTTP shim) can read a single shape regardless of
553/// backend. Counters that the runtime cannot report are left at zero —
554/// missing data is signalled separately via the surrounding stream
555/// metadata.
556#[derive(Debug, Clone)]
557pub struct StatsSample {
558 /// Cumulative container CPU time consumed, in nanoseconds.
559 pub cpu_total_ns: u64,
560 /// Cumulative system CPU time observed at the same moment, in
561 /// nanoseconds. Used to compute relative CPU percentage between
562 /// successive samples (Docker's classic `cpu_delta / system_delta`
563 /// formula).
564 pub cpu_system_ns: u64,
565 /// Number of CPUs currently online for this container. Used as the
566 /// final scaling factor in the CPU-percentage calculation.
567 pub online_cpus: u32,
568 /// Resident memory currently in use, in bytes (cgroup
569 /// `memory.usage_in_bytes` minus inactive page cache for v1, or
570 /// `memory.current` for v2).
571 pub mem_used_bytes: u64,
572 /// Memory limit configured on the container, in bytes. `0` when no
573 /// limit is set (cgroup reports its sentinel value).
574 pub mem_limit_bytes: u64,
575 /// Cumulative bytes received across all attached network interfaces.
576 pub net_rx_bytes: u64,
577 /// Cumulative bytes transmitted across all attached network interfaces.
578 pub net_tx_bytes: u64,
579 /// Cumulative bytes read from block devices.
580 pub blkio_read_bytes: u64,
581 /// Cumulative bytes written to block devices.
582 pub blkio_write_bytes: u64,
583 /// Number of process IDs currently running inside the container's pid
584 /// namespace.
585 pub pids_current: u64,
586 /// Configured pids limit, if any. `None` means unlimited.
587 pub pids_limit: Option<u64>,
588 /// Wallclock time the sample was taken.
589 pub timestamp: chrono::DateTime<chrono::Utc>,
590}
591
592/// One progress event emitted by [`Runtime::pull_image_stream`].
593///
594/// Backends emit a series of `Status` events as layers are downloaded /
595/// extracted, followed by exactly one `Done` event when the pull completes
596/// successfully. Errors mid-pull are propagated as the stream's `Err`
597/// variant and terminate the stream.
598#[derive(Debug, Clone)]
599pub enum PullProgress {
600 /// Progress update for an in-flight layer or stage.
601 Status {
602 /// Layer ID or other backend-specific identifier, when available.
603 id: Option<String>,
604 /// Human-readable status text, e.g. `"Pulling fs layer"`,
605 /// `"Downloading"`, `"Extracting"`. Always present; may be empty
606 /// when the backend has nothing to report this tick.
607 status: String,
608 /// Pre-formatted progress bar (Docker emits a string like
609 /// `"[========> ] 12.3MB/45.6MB"`). `None` when the
610 /// backend reports raw `current`/`total` only.
611 progress: Option<String>,
612 /// Bytes transferred so far for this layer, when reported.
613 current: Option<u64>,
614 /// Expected total bytes for this layer, when reported.
615 total: Option<u64>,
616 },
617 /// Pull completed successfully.
618 Done {
619 /// Resolved canonical image reference (typically the same as the
620 /// requested reference, but may include a digest the backend
621 /// resolved).
622 reference: String,
623 /// Content-addressed digest, when the backend reports one.
624 digest: Option<String>,
625 },
626}
627
628/// Boxed async stream of `Result<LogChunk, AgentError>` items returned by
629/// [`Runtime::logs_stream`].
630///
631/// `'static` lifetime so handlers can hold the stream past the trait method's
632/// borrow of `self`.
633pub type LogsStream = Pin<Box<dyn Stream<Item = Result<LogChunk>> + Send + 'static>>;
634
635/// Boxed async stream of `Result<StatsSample, AgentError>` items returned by
636/// [`Runtime::stats_stream`].
637pub type StatsStream = Pin<Box<dyn Stream<Item = Result<StatsSample>> + Send + 'static>>;
638
639/// Boxed async stream of `Result<PullProgress, AgentError>` items returned by
640/// [`Runtime::pull_image_stream`].
641pub type PullProgressStream = Pin<Box<dyn Stream<Item = Result<PullProgress>> + Send + 'static>>;
642
643/// Boxed async stream of TAR-archive byte chunks returned by
644/// [`Runtime::archive_get`].
645///
646/// Each yielded `Bytes` is a contiguous slice of the TAR archive that the
647/// runtime is producing for the requested container path. The stream ends
648/// once the archive is fully written. Mid-stream errors map to
649/// [`AgentError`] variants.
650pub type ArchiveStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes>> + Send + 'static>>;
651
652/// Stat metadata for a single path inside a container, returned by
653/// [`Runtime::archive_head`].
654///
655/// Mirrors Docker's `X-Docker-Container-Path-Stat` header payload (a
656/// base64-encoded JSON object with `name`, `size`, `mode`, `mtime`, and
657/// `linkTarget` fields). The API layer serializes this back into the same
658/// header for the `HEAD /containers/{id}/archive` response.
659#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
660pub struct PathStat {
661 /// Base name of the path (e.g. `"foo.txt"` for `/etc/foo.txt`).
662 pub name: String,
663 /// File size in bytes. For directories this is the size reported by the
664 /// runtime (typically the directory entry size, not the recursive sum).
665 pub size: i64,
666 /// Unix file mode bits (`S_IFMT | S_IRWXU | ...`). Encoded as a `u32` so
667 /// it round-trips through the Docker JSON header losslessly.
668 pub mode: u32,
669 /// Last-modification time as an RFC 3339 string. `None` when the runtime
670 /// cannot report it.
671 pub mtime: Option<String>,
672 /// Target of a symbolic link, when the path is a symlink. Empty string
673 /// for non-symlink paths (matching Docker's wire shape).
674 pub link_target: String,
675}
676
677/// Options accepted by [`Runtime::archive_put`].
678///
679/// Mirrors the query parameters Docker accepts on
680/// `PUT /containers/{id}/archive`:
681///
682/// * `noOverwriteDirNonDir=1` — refuse to replace a non-directory with a
683/// directory or vice versa. Default `false`.
684/// * `copyUIDGID=1` — preserve UID/GID of files in the archive instead of
685/// chown'ing them to the container's user. Default `false`.
686#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
687pub struct ArchivePutOptions {
688 /// When `true`, the runtime must reject puts that would replace a
689 /// non-directory with a directory (or vice versa) at the destination.
690 pub no_overwrite_dir_non_dir: bool,
691 /// When `true`, preserve UID/GID of files in the archive verbatim.
692 pub copy_uid_gid: bool,
693}
694
695/// Per-network attachment reported by [`Runtime::inspect_detailed`].
696///
697/// Mirrors the subset of bollard's `EndpointSettings` that the API needs to
698/// populate `ContainerInfo.networks` for standalone containers. Kept in
699/// `zlayer-agent` (rather than `zlayer-spec`) because it's a runtime-level
700/// inspect result, not a deployment specification.
701#[derive(Debug, Clone, Default, PartialEq, Eq)]
702pub struct NetworkAttachmentDetail {
703 /// Network name as reported by the runtime (Docker's key in
704 /// `NetworkSettings.Networks`, e.g. `"bridge"` or a user-defined network
705 /// name).
706 pub network: String,
707 /// DNS aliases the container answers to on this network. Empty when the
708 /// runtime doesn't surface aliases.
709 pub aliases: Vec<String>,
710 /// Assigned IPv4 address on this network, if any. Empty strings are
711 /// normalised to `None`.
712 pub ipv4: Option<String>,
713}
714
715/// Per-container health detail reported by [`Runtime::inspect_detailed`].
716///
717/// Sourced directly from bollard's `ContainerState.health` (Docker's native
718/// healthcheck tracking). Our internal `HealthMonitor` in
719/// `crates/zlayer-agent/src/health.rs` drives service-level health events; for
720/// standalone containers the API reports the runtime-native status instead so
721/// that images with a baked-in `HEALTHCHECK` show up correctly.
722#[derive(Debug, Clone, Default, PartialEq, Eq)]
723pub struct HealthDetail {
724 /// One of `"none"`, `"starting"`, `"healthy"`, `"unhealthy"` (Docker's
725 /// `HealthStatusEnum`). Empty string is normalised to `"none"` upstream.
726 pub status: String,
727 /// Consecutive failing probe count, if the runtime reports it.
728 pub failing_streak: Option<u32>,
729 /// Output from the most recent failing probe, when available.
730 pub last_output: Option<String>,
731}
732
733/// Rich inspect details for a single container, returned by
734/// [`Runtime::inspect_detailed`].
735///
736/// Carries the fields `ContainerInfo` needs on top of the bare
737/// [`ContainerState`] reported by [`Runtime::container_state`]:
738/// published ports, attached networks, first IPv4, health, and `exit_code`.
739///
740/// Default is an all-empty record — that's what the default trait method
741/// returns for runtimes that don't (yet) implement rich inspect, and the API
742/// layer treats all fields as purely additive, so a default record still
743/// produces a backwards-compatible `ContainerInfo`.
744#[derive(Debug, Clone, Default, PartialEq, Eq)]
745pub struct ContainerInspectDetails {
746 /// Published port mappings (container → host), translated back from the
747 /// runtime's internal port-binding map.
748 pub ports: Vec<zlayer_spec::PortMapping>,
749 /// Networks the container is attached to, plus the aliases + IPv4 for each.
750 pub networks: Vec<NetworkAttachmentDetail>,
751 /// First non-empty IPv4 address found across the container's networks,
752 /// useful as a "primary" IP for simple clients that don't want to iterate
753 /// `networks`. `None` when the container isn't on any network with an IP.
754 pub ipv4: Option<String>,
755 /// Health status when the container has a Docker-native `HEALTHCHECK` or
756 /// the runtime otherwise reports a health state.
757 pub health: Option<HealthDetail>,
758 /// Most recent exit code, when the runtime reports one. `None` for
759 /// containers that are still running and have never exited.
760 pub exit_code: Option<i32>,
761}
762
763/// Lightweight summary of a single container reported by
764/// [`Runtime::list_containers`].
765///
766/// Reconciliation only needs to match runtime containers against
767/// `ZLayer`'s own metadata, which lives in the `com.zlayer.container_id`
768/// label (see `zlayer-api::handlers::container_id_map::ZLAYER_CONTAINER_ID_LABEL`).
769/// Carrying that label value plus the runtime-native id is sufficient for
770/// the standalone-container reconcile pass; richer fields can be added
771/// when concrete callers need them.
772#[derive(Debug, Clone, Default, PartialEq, Eq)]
773pub struct RuntimeContainerSummary {
774 /// Backend-native container handle (Docker's 64-char hex, the youki
775 /// state-dir name, etc.). Opaque to the reconciler — only used for
776 /// logging and to disambiguate listings.
777 pub runtime_id: String,
778 /// Value of the `com.zlayer.container_id` label, if the runtime
779 /// reports one for this container. Containers without this label are
780 /// foreign (not ZLayer-managed) and should be ignored by reconcile.
781 pub zlayer_container_id_label: Option<String>,
782}
783
784/// One row of process information returned by [`Runtime::top_container`].
785///
786/// Mirrors Docker's `GET /containers/{id}/top` response shape: a `Titles`
787/// vector that names each column, plus a `Processes` matrix where each row is
788/// the per-process column values. The runtime decides which `ps` fields to
789/// emit; the API/Docker shim forwards them verbatim.
790#[derive(Debug, Clone, Default, PartialEq, Eq)]
791pub struct ContainerTopOutput {
792 /// Column titles (e.g. `["UID", "PID", "PPID", "C", "STIME", "TTY", "TIME", "CMD"]`).
793 pub titles: Vec<String>,
794 /// One row per process; each row has the same length as `titles`.
795 pub processes: Vec<Vec<String>>,
796}
797
798/// Filesystem change kind reported by [`Runtime::changes_container`].
799///
800/// Matches Docker's numeric encoding: `0 = Modified`, `1 = Added`, `2 = Deleted`.
801#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
802#[serde(rename_all = "snake_case")]
803pub enum FilesystemChangeKind {
804 /// File or directory was modified in the container's writable layer.
805 Modified,
806 /// File or directory was added.
807 Added,
808 /// File or directory was deleted.
809 Deleted,
810}
811
812impl FilesystemChangeKind {
813 /// Numeric wire value used by Docker's `/containers/{id}/changes`.
814 #[must_use]
815 pub const fn as_docker_kind(self) -> u8 {
816 match self {
817 Self::Modified => 0,
818 Self::Added => 1,
819 Self::Deleted => 2,
820 }
821 }
822}
823
824/// One filesystem change reported by [`Runtime::changes_container`].
825#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
826pub struct FilesystemChangeEntry {
827 /// Path inside the container that changed (absolute, e.g. `/etc/hosts`).
828 pub path: String,
829 /// Kind of change.
830 pub kind: FilesystemChangeKind,
831}
832
833/// One published port mapping entry returned by [`Runtime::port_mappings_container`].
834///
835/// Mirrors one entry of Docker's `/containers/{id}/port` map. A single
836/// container port may bind multiple host endpoints (e.g. IPv4 + IPv6); each
837/// such binding yields one [`PortMappingEntry`].
838#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
839pub struct PortMappingEntry {
840 /// Container port number that's published.
841 pub container_port: u16,
842 /// Protocol (`"tcp"`, `"udp"`, `"sctp"`).
843 pub protocol: String,
844 /// Host IP address that the container's port is mapped to.
845 pub host_ip: Option<String>,
846 /// Host port number that the container's port is mapped to.
847 pub host_port: Option<u16>,
848}
849
850/// Result of a [`Runtime::prune_containers`] call.
851#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
852pub struct ContainerPruneResult {
853 /// Container IDs that were removed (runtime-native or hex form).
854 pub deleted: Vec<String>,
855 /// Bytes reclaimed from the runtime's container storage. `0` when the
856 /// backend cannot report.
857 pub space_reclaimed: u64,
858}
859
860/// Runtime-level container restart policy attached to a
861/// [`ContainerResourceUpdate`].
862///
863/// Mirrors Docker's `HostConfig.RestartPolicy`. `name` is one of `""`,
864/// `"no"`, `"always"`, `"unless-stopped"`, or `"on-failure"`. Backends that
865/// can persist a restart policy (Docker via bollard, Youki via the
866/// supervisor's stored spec) honour it; backends that cannot
867/// (WASM, Mock) leave it unmodified.
868#[derive(Debug, Clone, Default, PartialEq, Eq)]
869pub struct ContainerRestartPolicyUpdate {
870 /// Restart policy name.
871 pub name: Option<String>,
872 /// Maximum retry count (only honoured when `name == "on-failure"`).
873 pub maximum_retry_count: Option<i64>,
874}
875
876/// Resource-update payload passed to [`Runtime::update_container_resources`].
877///
878/// Mirrors the resource subset of Docker's `POST /containers/{id}/update`
879/// body. Every field is `Option<...>`; backends apply only the fields that
880/// are `Some`. A request with all fields `None` is a no-op (consistent with
881/// Docker's behaviour).
882///
883/// Field semantics match Docker's wire shape: see [`ContainerUpdateRequest`]
884/// in `zlayer-types::api::containers` for the JSON encoding.
885#[derive(Debug, Clone, Default, PartialEq, Eq)]
886pub struct ContainerResourceUpdate {
887 /// CPU shares (cgroup `cpu.weight` / `cpu.shares`).
888 pub cpu_shares: Option<i64>,
889 /// Memory limit in bytes.
890 pub memory: Option<i64>,
891 /// CPU CFS period in microseconds.
892 pub cpu_period: Option<i64>,
893 /// CPU CFS quota in microseconds.
894 pub cpu_quota: Option<i64>,
895 /// CPU real-time period in microseconds.
896 pub cpu_realtime_period: Option<i64>,
897 /// CPU real-time runtime in microseconds.
898 pub cpu_realtime_runtime: Option<i64>,
899 /// CPUs allowed for execution (e.g. `"0-3"`).
900 pub cpuset_cpus: Option<String>,
901 /// Memory nodes (NUMA) allowed for execution.
902 pub cpuset_mems: Option<String>,
903 /// Soft memory limit in bytes.
904 pub memory_reservation: Option<i64>,
905 /// Total memory limit (memory + swap) in bytes. `-1` removes swap.
906 pub memory_swap: Option<i64>,
907 /// Kernel memory limit in bytes (deprecated upstream).
908 pub kernel_memory: Option<i64>,
909 /// Block IO weight (10-1000).
910 pub blkio_weight: Option<u16>,
911 /// PIDs limit. `0` or `-1` for unlimited.
912 pub pids_limit: Option<i64>,
913 /// Replacement restart policy. `None` leaves the policy unchanged.
914 pub restart_policy: Option<ContainerRestartPolicyUpdate>,
915}
916
917impl ContainerResourceUpdate {
918 /// Returns `true` when this update would not change anything — every
919 /// field is `None`. Backends short-circuit no-op updates rather than
920 /// touching the cgroup hierarchy.
921 #[must_use]
922 pub fn is_empty(&self) -> bool {
923 self.cpu_shares.is_none()
924 && self.memory.is_none()
925 && self.cpu_period.is_none()
926 && self.cpu_quota.is_none()
927 && self.cpu_realtime_period.is_none()
928 && self.cpu_realtime_runtime.is_none()
929 && self.cpuset_cpus.is_none()
930 && self.cpuset_mems.is_none()
931 && self.memory_reservation.is_none()
932 && self.memory_swap.is_none()
933 && self.kernel_memory.is_none()
934 && self.blkio_weight.is_none()
935 && self.pids_limit.is_none()
936 && self.restart_policy.is_none()
937 }
938}
939
940/// Result of a [`Runtime::update_container_resources`] call. Mirrors
941/// Docker's `{"Warnings": [...]}` response shape: backends append a string
942/// per "we accepted this but did not apply it" or "field deprecated"
943/// warning.
944#[derive(Debug, Clone, Default, PartialEq, Eq)]
945pub struct ContainerUpdateOutcome {
946 /// Human-readable warnings emitted while applying the update.
947 pub warnings: Vec<String>,
948}
949
950/// Detailed image inspect record returned by [`Runtime::inspect_image_native`].
951///
952/// Mirrors the union of fields exposed by Docker's `GET /images/{name}/json`
953/// (bollard's `ImageInspect`) so the API / Docker compat shim can surface a
954/// Docker-shaped JSON response without re-translating later. Optional fields
955/// remain `None` when the backend cannot provide them.
956#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
957pub struct ImageInspectInfo {
958 /// Content-addressed image id (`sha256:...`), when known.
959 pub id: Option<String>,
960 /// All tags (`repo:tag`) currently pointing at this image.
961 pub repo_tags: Vec<String>,
962 /// Manifest digests this image is known under (`repo@sha256:...`).
963 pub repo_digests: Vec<String>,
964 /// Parent image id (`sha256:...`), when the image was built locally.
965 pub parent: Option<String>,
966 /// Human-readable comment recorded at commit/import time.
967 pub comment: Option<String>,
968 /// Creation timestamp in RFC 3339 form.
969 pub created: Option<String>,
970 /// Container id this image was committed from, when applicable.
971 pub container: Option<String>,
972 /// Daemon version that built / imported this image.
973 pub docker_version: Option<String>,
974 /// Author recorded on the image (e.g. `MAINTAINER` instruction).
975 pub author: Option<String>,
976 /// Hardware architecture the image targets (`amd64`, `arm64`, ...).
977 pub architecture: Option<String>,
978 /// Operating system the image targets (`linux`, `windows`, ...).
979 pub os: Option<String>,
980 /// Total on-disk size in bytes, when known.
981 pub size: Option<u64>,
982 /// Layer order: list of `sha256:...` digests, root-most first.
983 pub layers: Vec<String>,
984 /// Container env (`KEY=VALUE`).
985 pub env: Vec<String>,
986 /// Default command vector.
987 pub cmd: Vec<String>,
988 /// Default entrypoint vector.
989 pub entrypoint: Vec<String>,
990 /// Working directory inside the image.
991 pub working_dir: Option<String>,
992 /// User the image runs as by default.
993 pub user: Option<String>,
994 /// Image labels.
995 pub labels: std::collections::BTreeMap<String, String>,
996}
997
998/// One row of an image's history, returned by [`Runtime::image_history`].
999///
1000/// Mirrors Docker's `GET /images/{name}/history` response.
1001#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1002pub struct ImageHistoryEntry {
1003 /// Layer / image id (`sha256:...`). May be `<missing>` for layers that
1004 /// were dropped during a squash.
1005 pub id: String,
1006 /// Unix-seconds timestamp when this layer was created.
1007 pub created: i64,
1008 /// Dockerfile-style instruction that produced this layer.
1009 pub created_by: String,
1010 /// Tags that point at this specific layer.
1011 pub tags: Vec<String>,
1012 /// Layer size in bytes.
1013 pub size: u64,
1014 /// Optional comment recorded with the layer.
1015 pub comment: String,
1016}
1017
1018/// One result returned by [`Runtime::search_images`].
1019///
1020/// Mirrors Docker's `GET /images/search` response items.
1021#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1022pub struct ImageSearchResult {
1023 /// Image name (e.g. `library/nginx`).
1024 pub name: String,
1025 /// Free-text description of the image.
1026 pub description: String,
1027 /// Number of stars on the source registry, when reported.
1028 pub star_count: u64,
1029 /// Whether the image is officially curated.
1030 pub official: bool,
1031 /// Whether the image was produced by an automated build (deprecated by
1032 /// Docker but still surfaced for compatibility).
1033 pub automated: bool,
1034}
1035
1036/// Result of a [`Runtime::commit_container`] call.
1037#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1038pub struct CommitOutcome {
1039 /// Content-addressed image id of the freshly created image.
1040 pub id: String,
1041}
1042
1043/// Options accepted by [`Runtime::commit_container`].
1044///
1045/// Mirrors Docker's `POST /commit` query parameters. All fields are optional
1046/// — when `repo`/`tag` are both empty the runtime creates an untagged image.
1047#[derive(Debug, Clone, Default, PartialEq, Eq)]
1048pub struct CommitOptions {
1049 /// Repository name to apply to the committed image (e.g. `myapp`).
1050 pub repo: Option<String>,
1051 /// Tag to apply (defaults to `latest` when `repo` is set and tag is empty).
1052 pub tag: Option<String>,
1053 /// Free-form comment to record on the committed image.
1054 pub comment: Option<String>,
1055 /// Author to record on the committed image.
1056 pub author: Option<String>,
1057 /// Whether to pause the container before committing (defaults to `true`).
1058 pub pause: bool,
1059 /// Dockerfile-style instructions to apply during commit.
1060 pub changes: Option<String>,
1061}
1062
1063/// Boxed async stream of TAR-archive byte chunks returned by image save /
1064/// container export endpoints. Each yielded `Bytes` is a contiguous slice
1065/// of an uncompressed TAR archive.
1066pub type ImageExportStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes>> + Send + 'static>>;
1067
1068/// One progress event emitted by [`Runtime::load_images`].
1069///
1070/// `Status` events carry per-line progress reported by the daemon while
1071/// the tar is being unpacked; `Done` is emitted exactly once when load
1072/// completes successfully.
1073#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1074#[serde(tag = "kind", rename_all = "snake_case")]
1075pub enum LoadProgress {
1076 /// Mid-load progress entry.
1077 Status {
1078 /// Layer or image id when reported.
1079 #[serde(default, skip_serializing_if = "Option::is_none")]
1080 id: Option<String>,
1081 /// Human-readable status text.
1082 status: String,
1083 },
1084 /// Load completed. `references` lists the image references that were
1085 /// loaded into the cache.
1086 Done {
1087 /// Loaded image references (`repo:tag` or `sha256:...`).
1088 references: Vec<String>,
1089 },
1090}
1091
1092/// Boxed async stream of [`LoadProgress`] events returned by
1093/// [`Runtime::load_images`].
1094pub type LoadProgressStream = Pin<Box<dyn Stream<Item = Result<LoadProgress>> + Send + 'static>>;
1095
1096/// How a runtime joins a container to the cross-node `WireGuard` overlay.
1097///
1098/// The overlay can be attached three ways depending on whether the container is
1099/// a host process, a Windows compute system, or a full VM:
1100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1101pub enum OverlayAttachKind {
1102 /// Linux host process: overlayd enters `/proc/<pid>/ns/net` and plumbs a
1103 /// veth into the container's network namespace (the default).
1104 HostNetns,
1105 /// Windows: the HCN endpoint + namespace were created at container-create
1106 /// time; the agent only registers the assigned IP.
1107 HostIp,
1108 /// A VM guest with no host-visible netns/PID (macOS VZ-Linux): overlayd
1109 /// allocates the overlay identity and the agent pushes it into the guest
1110 /// over vsock, where a kernel `WireGuard` device is brought up. See
1111 /// [`Runtime::push_overlay_config`].
1112 InGuestVsock,
1113}
1114
1115/// Abstract container runtime trait
1116///
1117/// This trait abstracts over different container runtimes (containerd, CRI-O, etc.)
1118#[async_trait::async_trait]
1119pub trait Runtime: Send + Sync {
1120 /// Pull an image to local storage
1121 async fn pull_image(&self, image: &str) -> Result<()>;
1122
1123 /// Pull an image to local storage with a specific policy.
1124 ///
1125 /// When `auth` is `Some`, the runtime uses those inline credentials for
1126 /// the pull (§3.10 of `ZLAYER_SDK_FIXES.md`). When `auth` is `None`, the
1127 /// runtime falls back to its existing credential-store lookup keyed by
1128 /// registry hostname (or anonymous access when no match exists).
1129 ///
1130 /// Non-Docker runtimes may accept but ignore the `auth` argument — their
1131 /// OCI puller (`zlayer-registry`) already resolves credentials from the
1132 /// store by hostname, and inline auth is primarily a Docker-backend
1133 /// concern. Ignoring it is safe: callers that need inline auth should use
1134 /// the Docker runtime.
1135 ///
1136 /// `source` is the per-image [`zlayer_spec::SourcePolicy`] selecting which
1137 /// tiers (local store, S3, remote registry) the pull may consult and in
1138 /// what order. Runtimes that build the `zlayer-registry` `ImagePuller`
1139 /// chain (youki, macOS VZ-Linux) honor it; runtimes that delegate to an
1140 /// external daemon (Docker/WSL/HCS) accept but ignore it.
1141 async fn pull_image_with_policy(
1142 &self,
1143 image: &str,
1144 policy: PullPolicy,
1145 auth: Option<&RegistryAuth>,
1146 source: zlayer_spec::SourcePolicy,
1147 ) -> Result<()>;
1148
1149 /// Create a container
1150 async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()>;
1151
1152 /// Start a container
1153 async fn start_container(&self, id: &ContainerId) -> Result<()>;
1154
1155 /// Stop a container
1156 async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()>;
1157
1158 /// Remove a container
1159 async fn remove_container(&self, id: &ContainerId) -> Result<()>;
1160
1161 /// Get container state
1162 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState>;
1163
1164 /// Get container logs as structured entries
1165 async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>>;
1166
1167 /// Execute command in container
1168 async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)>;
1169
1170 /// Execute a command in a container with Docker `exec` options (`--user`,
1171 /// `-w`/`--workdir`, `-e`/`--env`) applied.
1172 ///
1173 /// The default implementation ignores the user/cwd/env overrides and
1174 /// delegates to [`Runtime::exec`], so runtimes that don't (yet) plumb them
1175 /// keep working unchanged. Runtimes that can honour them — notably the macOS
1176 /// VZ-Linux runtime, which drives the in-guest agent over vsock — override
1177 /// this to drop to the requested uid/gid, chdir, and inject env before exec.
1178 ///
1179 /// `opts.command` is the argv (`command[0]` is the binary); `opts.user`,
1180 /// `opts.working_dir`, and `opts.env` carry the Docker overrides. The
1181 /// `tty`/`attach_*`/`privileged` fields of [`ExecOptions`] are not consulted
1182 /// by this buffered entry point.
1183 async fn exec_with_opts(
1184 &self,
1185 id: &ContainerId,
1186 opts: &ExecOptions,
1187 ) -> Result<(i32, String, String)> {
1188 self.exec(id, &opts.command).await
1189 }
1190
1191 /// Execute a command in a container and stream stdout / stderr / exit
1192 /// events as they are produced.
1193 ///
1194 /// The default implementation calls the buffered [`Runtime::exec`] and
1195 /// emits everything as a single `Stdout` event, a single `Stderr` event,
1196 /// and a final `Exit` event. Runtimes that support true streaming (e.g.
1197 /// Docker via bollard) override this to produce line-by-line events as
1198 /// the command runs.
1199 ///
1200 /// The returned stream always terminates with exactly one
1201 /// [`ExecEvent::Exit`] as the final item on success. Errors that occur
1202 /// before the stream is returned (e.g. container not found, failure to
1203 /// create the exec) are surfaced via the outer `Result`; errors that
1204 /// occur mid-stream are logged by the runtime and the stream closes.
1205 async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
1206 let (exit, stdout, stderr) = self.exec(id, cmd).await?;
1207 let mut events: Vec<ExecEvent> = Vec::with_capacity(3);
1208 if !stdout.is_empty() {
1209 events.push(ExecEvent::Stdout(stdout));
1210 }
1211 if !stderr.is_empty() {
1212 events.push(ExecEvent::Stderr(stderr));
1213 }
1214 events.push(ExecEvent::Exit(exit));
1215 Ok(Box::pin(futures_util::stream::iter(events)))
1216 }
1217
1218 /// Start an interactive exec session against a container, returning an
1219 /// [`ExecHandle`] the caller drives concurrently with the running process.
1220 ///
1221 /// Unlike [`Runtime::exec`] (which buffers stdout/stderr and returns only
1222 /// after the process exits) and [`Runtime::exec_stream`] (which streams
1223 /// line-by-line events one-way), `exec_pty` is the long-lived bidirectional
1224 /// entry point: when [`ExecOptions::tty`] is set the runtime allocates a
1225 /// pseudo-terminal pair and the returned [`ExecHandle::stream`] shuttles
1226 /// raw PTY bytes; when `tty` is false the stream still carries
1227 /// stdin/stdout/stderr but without PTY framing. The handle's
1228 /// [`ExecHandle::resize`] channel mirrors Docker's
1229 /// `POST /exec/{id}/resize` and the [`ExecHandle::exit`] future resolves
1230 /// with the process exit code once the runtime detects termination.
1231 ///
1232 /// The default implementation returns [`AgentError::Unsupported`].
1233 /// Backends that can host interactive execs (Docker via bollard's
1234 /// `start_exec` with hijacked stream, Youki via libcontainer's exec API,
1235 /// HCS via the Windows console) override this. Runtimes that have no
1236 /// notion of an interactive exec (WASM in-process, mocks that don't need
1237 /// PTY traffic) should leave the default in place — callers then surface
1238 /// a clear error rather than degrade silently to a buffered exec.
1239 async fn exec_pty(&self, _id: &ContainerId, _opts: ExecOptions) -> Result<ExecHandle> {
1240 Err(AgentError::Unsupported(
1241 "exec_pty is not supported by this runtime".into(),
1242 ))
1243 }
1244
1245 /// Get container resource statistics from cgroups
1246 ///
1247 /// Returns CPU and memory statistics for the specified container.
1248 /// Used for metrics collection and autoscaling decisions.
1249 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats>;
1250
1251 /// Wait for a container to exit and return its exit code
1252 ///
1253 /// This method blocks until the container exits or an error occurs.
1254 /// Used primarily for job execution to implement run-to-completion semantics.
1255 async fn wait_container(&self, id: &ContainerId) -> Result<i32>;
1256
1257 /// Wait for a container to exit and return a [`WaitOutcome`] with richer
1258 /// classification (exit code + reason + signal + `finished_at` timestamp).
1259 ///
1260 /// The default implementation delegates to [`Runtime::wait_container`] and
1261 /// synthesizes a [`WaitReason::Exited`] result with no signal/timestamp.
1262 /// Runtimes that can distinguish OOM kills, signal deaths, or report a
1263 /// finished-at time (e.g. the Docker runtime, which has
1264 /// `ContainerInspectResponse.state.oom_killed` / `.finished_at`) should
1265 /// override this.
1266 ///
1267 /// This is the legacy entry point that always uses
1268 /// [`WaitCondition::NotRunning`]. Callers that need to honour Docker's
1269 /// `condition=` query parameter should use
1270 /// [`Runtime::wait_outcome_with_condition`] instead.
1271 async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
1272 let exit_code = self.wait_container(id).await?;
1273 Ok(WaitOutcome::exited(exit_code))
1274 }
1275
1276 /// Wait for a container to reach a [`WaitCondition`] and return a
1277 /// [`WaitOutcome`].
1278 ///
1279 /// Mirrors Docker's `POST /containers/{id}/wait?condition=<...>` semantics:
1280 ///
1281 /// * [`WaitCondition::NotRunning`] (the default) — block until the
1282 /// container is no longer running. Returns the exit-code outcome.
1283 /// * [`WaitCondition::NextExit`] — wait for the next observed exit, even
1284 /// if the container is already stopped at call time. The default
1285 /// implementation cannot distinguish "already stopped" from "next exit",
1286 /// so it falls back to the same wait as `NotRunning`. Backends that can
1287 /// subscribe to runtime events (Docker via bollard's wait stream)
1288 /// override this to honour the semantic.
1289 /// * [`WaitCondition::Removed`] — block until the container has been
1290 /// removed. The default implementation again falls back to a normal
1291 /// wait; the Docker runtime overrides it via bollard's `condition`
1292 /// parameter.
1293 ///
1294 /// Default implementation delegates to [`Runtime::wait_outcome`] for all
1295 /// conditions, ignoring the condition argument. This keeps existing
1296 /// runtimes (Youki, WASM, mocks) working without code changes.
1297 async fn wait_outcome_with_condition(
1298 &self,
1299 id: &ContainerId,
1300 _condition: WaitCondition,
1301 ) -> Result<WaitOutcome> {
1302 self.wait_outcome(id).await
1303 }
1304
1305 /// Rename a container. Mirrors Docker's
1306 /// `POST /containers/{id}/rename?name=<new>` endpoint.
1307 ///
1308 /// `new_name` is the requested human-readable name (without any leading
1309 /// `/`). Backends are expected to validate the name against their own
1310 /// constraints (e.g. uniqueness, allowed characters) and return an
1311 /// appropriate [`AgentError`] on rejection.
1312 ///
1313 /// The default implementation returns [`AgentError::Unsupported`].
1314 /// Runtimes that can rename a live container override this:
1315 ///
1316 /// * Docker — calls bollard's `rename_container` with
1317 /// `RenameContainerOptions { name }`.
1318 /// * Youki — currently returns `Unsupported` because the libcontainer
1319 /// state-dir is keyed off the immutable `ContainerId` and renaming the
1320 /// on-disk layout safely while a container is alive would require
1321 /// coordination with the supervisor that owns the bundle path.
1322 /// * Other backends (WASM, HCS, mocks) inherit the `Unsupported` default.
1323 async fn rename_container(&self, _id: &ContainerId, _new_name: &str) -> Result<()> {
1324 Err(AgentError::Unsupported(
1325 "rename_container is not supported by this runtime".into(),
1326 ))
1327 }
1328
1329 /// Get container logs (stdout/stderr combined)
1330 ///
1331 /// Returns logs as structured entries.
1332 /// Used to capture job output after completion.
1333 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>>;
1334
1335 /// Get the PID of a container's main process
1336 ///
1337 /// Returns:
1338 /// - `Ok(Some(pid))` for runtimes with real processes (Youki, Docker)
1339 /// - `Ok(None)` for runtimes without separate PIDs (WASM in-process)
1340 /// - `Err` if the container doesn't exist or there's an error
1341 ///
1342 /// Used for overlay network attachment and process management.
1343 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>>;
1344
1345 /// How this runtime joins a container to the cross-node overlay network.
1346 ///
1347 /// Defaults to [`OverlayAttachKind::HostNetns`] (the Linux veth-by-PID path).
1348 /// VM runtimes with no host netns (macOS VZ-Linux) override this to
1349 /// [`OverlayAttachKind::InGuestVsock`], which makes the service layer push a
1350 /// host-allocated overlay config into the guest via [`push_overlay_config`]
1351 /// instead of attaching a veth by PID.
1352 ///
1353 /// [`push_overlay_config`]: Runtime::push_overlay_config
1354 fn overlay_attach_kind(&self) -> OverlayAttachKind {
1355 OverlayAttachKind::HostNetns
1356 }
1357
1358 /// Push a host-allocated overlay (`WireGuard`) config into a guest that manages
1359 /// its own overlay interface ([`OverlayAttachKind::InGuestVsock`]).
1360 ///
1361 /// The service layer obtains `config` from overlayd (which allocated the
1362 /// address + keypair and registered the public key in the mesh) and calls
1363 /// this so the runtime can deliver it to the guest (over vsock) and bring up
1364 /// the in-guest interface. Runtimes that attach by netns/PID never call this
1365 /// and use the default, which errors.
1366 async fn push_overlay_config(
1367 &self,
1368 _id: &ContainerId,
1369 _config: &zlayer_types::overlayd::GuestOverlayConfig,
1370 ) -> Result<()> {
1371 Err(AgentError::Unsupported(
1372 "push_overlay_config is not supported by this runtime".to_string(),
1373 ))
1374 }
1375
1376 /// Get the IP address of a container
1377 ///
1378 /// Returns:
1379 /// - `Ok(Some(ip))` if the container has a known IP address
1380 /// - `Ok(None)` if the container exists but has no IP assigned yet
1381 /// - `Err` if the container doesn't exist or there's an error
1382 ///
1383 /// Used for proxy backend registration when overlay networking is unavailable.
1384 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>>;
1385
1386 /// Get a runtime-assigned port override for a container.
1387 ///
1388 /// Returns:
1389 /// - `Ok(Some(port))` if the runtime assigned a dynamic port to this container
1390 /// - `Ok(None)` if the container should use the spec-declared endpoint port
1391 ///
1392 /// This exists for runtimes where all containers share the host network stack
1393 /// (e.g., macOS sandbox). Without network namespaces, multiple replicas of
1394 /// the same service would conflict on the same port. The runtime assigns
1395 /// each replica a unique port and passes it via the `PORT` environment variable.
1396 /// The proxy then routes to `container_ip:override_port` instead of
1397 /// `container_ip:spec_port`.
1398 ///
1399 /// Runtimes with per-container networking (overlay, VMs, Docker) return `None`.
1400 async fn get_container_port_override(&self, _id: &ContainerId) -> Result<Option<u16>> {
1401 Ok(None)
1402 }
1403
1404 /// Get the HCN namespace GUID of a Windows container.
1405 ///
1406 /// Windows-only. Linux/macOS runtimes have no HCN namespace concept and
1407 /// return `Ok(None)`. The `HcsRuntime` overrides this to return the
1408 /// namespace GUID attached during `create_container`; `OverlayManager`
1409 /// then uses the GUID to register the container's assigned overlay IP
1410 /// against the right HCN compartment (analogous to how Linux uses PID
1411 /// to enter the netns via `/proc/{pid}/ns/net`).
1412 #[cfg(target_os = "windows")]
1413 async fn get_container_namespace_id(
1414 &self,
1415 _id: &ContainerId,
1416 ) -> Result<Option<windows::core::GUID>> {
1417 Ok(None)
1418 }
1419
1420 /// Sync all named volumes associated with this container to S3.
1421 ///
1422 /// Called after a container is stopped but before it is removed, giving
1423 /// the runtime a chance to flush persistent volume data to remote storage.
1424 ///
1425 /// The default implementation is a no-op. Runtimes that support S3-backed
1426 /// volume sync (e.g., Youki with the `s3` feature) override this.
1427 async fn sync_container_volumes(&self, _id: &ContainerId) -> Result<()> {
1428 Ok(())
1429 }
1430
1431 /// Stream container logs as raw byte chunks tagged with their channel.
1432 ///
1433 /// Mirrors the `GET /containers/{id}/logs` endpoint of the Docker Engine
1434 /// API: callers can request `follow`, `tail`, `since`/`until` time
1435 /// windows, per-channel filtering (`stdout` / `stderr`), and inline
1436 /// timestamps via [`LogsStreamOptions`]. Backends that demultiplex
1437 /// Docker's stdcopy framing emit one [`LogChunk`] per frame; line-based
1438 /// runtimes emit one chunk per line.
1439 ///
1440 /// The default implementation returns [`AgentError::Unsupported`].
1441 /// Concrete runtimes override this with backend-specific streaming
1442 /// (bollard's `logs` for Docker, log-file tailing for Youki/HCS, etc.).
1443 /// The stream is `'static` so HTTP handlers can drive it independently
1444 /// of the trait-method borrow.
1445 async fn logs_stream(&self, _id: &ContainerId, _opts: LogsStreamOptions) -> Result<LogsStream> {
1446 Err(AgentError::Unsupported(
1447 "logs_stream is not supported by this runtime".into(),
1448 ))
1449 }
1450
1451 /// Stream periodic resource-usage samples for a container.
1452 ///
1453 /// Mirrors the streaming form of `GET /containers/{id}/stats` in the
1454 /// Docker Engine API: each yielded [`StatsSample`] is one full snapshot
1455 /// of CPU / memory / network / block-IO / pids counters at the moment
1456 /// it was taken. Sampling cadence is backend-defined (Docker emits one
1457 /// sample per second by default).
1458 ///
1459 /// The default implementation returns [`AgentError::Unsupported`].
1460 /// Backends that can produce this data implement it directly: Docker
1461 /// via bollard's `stats`, Youki by polling cgroup stat files,
1462 /// `MockRuntime` by emitting a deterministic single sample.
1463 async fn stats_stream(&self, _id: &ContainerId) -> Result<StatsStream> {
1464 Err(AgentError::Unsupported(
1465 "stats_stream is not supported by this runtime".into(),
1466 ))
1467 }
1468
1469 /// Pull an image, streaming progress events as layers are downloaded.
1470 ///
1471 /// Mirrors the streaming form of `POST /images/create` in the Docker
1472 /// Engine API. Backends emit a series of [`PullProgress::Status`]
1473 /// events for in-flight layers, followed by exactly one
1474 /// [`PullProgress::Done`] event on success. Errors that occur mid-pull
1475 /// surface as `Err` items on the stream and terminate it.
1476 ///
1477 /// `auth` carries inline credentials for this pull. When `None`, the
1478 /// runtime falls back to its credential-store lookup keyed by registry
1479 /// hostname (matching the semantics of [`Runtime::pull_image_with_policy`]).
1480 ///
1481 /// Backends override this with their native streaming pull
1482 /// (bollard's `create_image` for Docker, `zlayer-registry` for Youki).
1483 ///
1484 /// The default implementation performs a BLOCKING pull via
1485 /// [`Runtime::pull_image_with_policy`] and then synthesizes a minimal
1486 /// Docker-style progress stream (a `Status` line + a terminal `Done`). This
1487 /// keeps the streaming `POST /images/create` (e.g. `docker pull` through
1488 /// the `zlayer-docker` compat socket) working on runtimes that lack a native
1489 /// streaming pull — notably the macOS sandbox. The streaming form means
1490 /// "make this image available now", so it pulls if not already present.
1491 async fn pull_image_stream(
1492 &self,
1493 image: &str,
1494 auth: Option<&RegistryAuth>,
1495 ) -> Result<PullProgressStream> {
1496 self.pull_image_with_policy(
1497 image,
1498 PullPolicy::IfNotPresent,
1499 auth,
1500 zlayer_spec::SourcePolicy::default(),
1501 )
1502 .await?;
1503 let reference = image.to_string();
1504 let events: Vec<Result<PullProgress>> = vec![
1505 Ok(PullProgress::Status {
1506 id: None,
1507 status: format!("Pulling from {reference}"),
1508 progress: None,
1509 current: None,
1510 total: None,
1511 }),
1512 Ok(PullProgress::Done {
1513 reference,
1514 digest: None,
1515 }),
1516 ];
1517 Ok(Box::pin(futures_util::stream::iter(events)))
1518 }
1519
1520 /// List all images managed by this runtime's image storage.
1521 ///
1522 /// The default implementation returns `AgentError::Unsupported` — individual
1523 /// runtimes override this with backend-specific logic (bollard for Docker,
1524 /// zlayer-registry cache walk for Youki, etc.).
1525 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
1526 Err(AgentError::Unsupported(
1527 "list_images is not supported by this runtime".into(),
1528 ))
1529 }
1530
1531 /// Remove an image by reference from local storage.
1532 ///
1533 /// When `force` is true, also removes the image even when other containers
1534 /// reference it. The default implementation returns `AgentError::Unsupported`.
1535 async fn remove_image(&self, _image: &str, _force: bool) -> Result<()> {
1536 Err(AgentError::Unsupported(
1537 "remove_image is not supported by this runtime".into(),
1538 ))
1539 }
1540
1541 /// Prune dangling / unused images from local storage.
1542 ///
1543 /// Returns a [`PruneResult`] describing what was removed. The default
1544 /// implementation returns `AgentError::Unsupported`.
1545 async fn prune_images(&self) -> Result<PruneResult> {
1546 Err(AgentError::Unsupported(
1547 "prune_images is not supported by this runtime".into(),
1548 ))
1549 }
1550
1551 /// Send a signal to a running container.
1552 ///
1553 /// When `signal` is `None`, the runtime sends `SIGKILL` (matching Docker's
1554 /// `docker kill` default). Backends validate the signal name and reject
1555 /// anything outside the standard POSIX set (`SIGKILL`, `SIGTERM`, `SIGINT`,
1556 /// `SIGHUP`, `SIGUSR1`, `SIGUSR2`).
1557 ///
1558 /// Used by `POST /api/v1/containers/{id}/kill` and Docker-compat
1559 /// `docker kill`. The default implementation returns
1560 /// [`AgentError::Unsupported`].
1561 async fn kill_container(&self, _id: &ContainerId, _signal: Option<&str>) -> Result<()> {
1562 Err(AgentError::Unsupported(
1563 "kill_container is not supported by this runtime".into(),
1564 ))
1565 }
1566
1567 /// Write a chunk of stdin to a running container's main process.
1568 ///
1569 /// Powers the host→guest direction of interactive (`-it`) sessions: the
1570 /// daemon's `POST /api/v1/containers/{id}/stdin` endpoint forwards raw
1571 /// terminal bytes here, which the runtime relays to the workload (for the
1572 /// macOS VZ-Linux backend, as `Msg::Stdin` frames to the in-guest agent).
1573 ///
1574 /// The default implementation returns [`AgentError::Unsupported`] so
1575 /// non-interactive backends keep compiling.
1576 async fn write_stdin(&self, _id: &ContainerId, _data: &[u8]) -> Result<()> {
1577 Err(AgentError::Unsupported(
1578 "write_stdin is not supported by this runtime".into(),
1579 ))
1580 }
1581
1582 /// Signal end-of-input (close stdin) for a running container.
1583 ///
1584 /// Powers Ctrl-D / detach for interactive sessions: the daemon's
1585 /// `DELETE /api/v1/containers/{id}/stdin` endpoint calls this, which causes
1586 /// the runtime to stop forwarding stdin and emit a final close marker (for
1587 /// the macOS VZ-Linux backend, `Msg::StdinEof` to the in-guest agent).
1588 ///
1589 /// The default implementation returns [`AgentError::Unsupported`].
1590 async fn close_stdin(&self, _id: &ContainerId) -> Result<()> {
1591 Err(AgentError::Unsupported(
1592 "close_stdin is not supported by this runtime".into(),
1593 ))
1594 }
1595
1596 /// Create a new tag pointing at an existing image.
1597 ///
1598 /// `source` is the reference to an already-cached image. `target` is the
1599 /// new reference to create — it must be a full reference (repository + tag).
1600 ///
1601 /// Used by `POST /api/v1/images/tag` and Docker-compat `docker tag`. The
1602 /// default implementation returns [`AgentError::Unsupported`].
1603 async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
1604 Err(AgentError::Unsupported(
1605 "tag_image is not supported by this runtime".into(),
1606 ))
1607 }
1608
1609 /// Inspect an image and return a Docker-shaped detail record.
1610 ///
1611 /// Mirrors Docker's `GET /images/{name}/json`. Backends translate their
1612 /// native inspect output into [`ImageInspectInfo`]; the API/Docker
1613 /// compat shim emits the JSON body. The default implementation returns
1614 /// [`AgentError::Unsupported`] so non-Docker backends keep compiling.
1615 async fn inspect_image_native(&self, _image: &str) -> Result<ImageInspectInfo> {
1616 Err(AgentError::Unsupported(
1617 "inspect_image_native is not supported by this runtime".into(),
1618 ))
1619 }
1620
1621 /// Return the parent-layer history for an image.
1622 ///
1623 /// Mirrors Docker's `GET /images/{name}/history`. The default
1624 /// implementation returns [`AgentError::Unsupported`].
1625 async fn image_history(&self, _image: &str) -> Result<Vec<ImageHistoryEntry>> {
1626 Err(AgentError::Unsupported(
1627 "image_history is not supported by this runtime".into(),
1628 ))
1629 }
1630
1631 /// Search a registry for images matching `term`.
1632 ///
1633 /// Mirrors Docker's `GET /images/search`. `limit` caps the number of
1634 /// returned items; `0` means "let the registry decide". The default
1635 /// implementation returns [`AgentError::Unsupported`].
1636 async fn search_images(&self, _term: &str, _limit: u32) -> Result<Vec<ImageSearchResult>> {
1637 Err(AgentError::Unsupported(
1638 "search_images is not supported by this runtime".into(),
1639 ))
1640 }
1641
1642 /// Stream a tar archive containing one or more images.
1643 ///
1644 /// Mirrors Docker's `GET /images/get?names=...`. Multi-image archives
1645 /// dedupe shared layers. The default implementation returns
1646 /// [`AgentError::Unsupported`].
1647 async fn save_images(&self, _names: &[String]) -> Result<ImageExportStream> {
1648 Err(AgentError::Unsupported(
1649 "save_images is not supported by this runtime".into(),
1650 ))
1651 }
1652
1653 /// Load images from a tar archive.
1654 ///
1655 /// Mirrors Docker's `POST /images/load`. `tar_bytes` is the
1656 /// uncompressed (or gzip-compressed) tar produced by [`Self::save_images`].
1657 /// `quiet` suppresses progress events when set. The default
1658 /// implementation returns [`AgentError::Unsupported`].
1659 async fn load_images(
1660 &self,
1661 _tar_bytes: bytes::Bytes,
1662 _quiet: bool,
1663 ) -> Result<LoadProgressStream> {
1664 Err(AgentError::Unsupported(
1665 "load_images is not supported by this runtime".into(),
1666 ))
1667 }
1668
1669 /// Import a single image from a tar root filesystem.
1670 ///
1671 /// Mirrors the `fromSrc=`-mode of Docker's `POST /images/create`.
1672 /// `tar_bytes` is a tar of the root filesystem; `repo`/`tag` are the
1673 /// reference to apply to the resulting image. The default
1674 /// implementation returns [`AgentError::Unsupported`].
1675 async fn import_image(
1676 &self,
1677 _tar_bytes: bytes::Bytes,
1678 _repo: Option<&str>,
1679 _tag: Option<&str>,
1680 ) -> Result<String> {
1681 Err(AgentError::Unsupported(
1682 "import_image is not supported by this runtime".into(),
1683 ))
1684 }
1685
1686 /// Stream a tar archive of the container's filesystem.
1687 ///
1688 /// Mirrors Docker's `GET /containers/{id}/export`. The default
1689 /// implementation returns [`AgentError::Unsupported`].
1690 async fn export_container_fs(&self, _id: &ContainerId) -> Result<ImageExportStream> {
1691 Err(AgentError::Unsupported(
1692 "export_container_fs is not supported by this runtime".into(),
1693 ))
1694 }
1695
1696 /// Commit a container's filesystem state to a new image.
1697 ///
1698 /// Mirrors Docker's `POST /commit?container=...`. `opts` carries the
1699 /// optional repo/tag/comment/author/pause/changes parameters. The
1700 /// default implementation returns [`AgentError::Unsupported`].
1701 async fn commit_container(
1702 &self,
1703 _id: &ContainerId,
1704 _opts: &CommitOptions,
1705 ) -> Result<CommitOutcome> {
1706 Err(AgentError::Unsupported(
1707 "commit_container is not supported by this runtime".into(),
1708 ))
1709 }
1710
1711 /// Return rich inspect details for a container: published ports, attached
1712 /// networks, first IPv4, health, and most-recent exit code.
1713 ///
1714 /// Runtimes implement this by translating the backend's native inspect
1715 /// response (bollard's `ContainerInspectResponse` for Docker) into the
1716 /// runtime-level [`ContainerInspectDetails`] struct. The API layer merges
1717 /// these fields into `ContainerInfo` on `GET /api/v1/containers` and
1718 /// `GET /api/v1/containers/{id}` (§3.15 of `ZLAYER_SDK_FIXES.md`).
1719 ///
1720 /// The default implementation returns [`ContainerInspectDetails::default`]
1721 /// — an all-empty record, which the API layer treats as "this runtime
1722 /// doesn't support rich inspect; skip all the extra fields". This keeps
1723 /// non-Docker runtimes (Youki, WASM, Mock) backwards compatible; they can
1724 /// override this later if they gain equivalent inspect capability.
1725 async fn inspect_detailed(&self, _id: &ContainerId) -> Result<ContainerInspectDetails> {
1726 Ok(ContainerInspectDetails::default())
1727 }
1728
1729 /// Pause all processes in the container by freezing its cgroup.
1730 ///
1731 /// Mirrors Docker's `POST /containers/{id}/pause`. After pause, the
1732 /// container's processes are suspended in the kernel via the cgroup
1733 /// freezer; calls to [`Runtime::container_state`] still report
1734 /// `Running` but no instructions execute until [`Runtime::unpause_container`].
1735 ///
1736 /// The default implementation returns [`AgentError::Unsupported`].
1737 /// Backends override this with their native pause API (bollard's
1738 /// `pause_container` for Docker, libcontainer's `Container::pause` for
1739 /// Youki).
1740 async fn pause_container(&self, _id: &ContainerId) -> Result<()> {
1741 Err(AgentError::Unsupported(
1742 "pause_container is not supported by this runtime".into(),
1743 ))
1744 }
1745
1746 /// Resume a previously-paused container by thawing its cgroup freezer.
1747 ///
1748 /// Mirrors Docker's `POST /containers/{id}/unpause`. The default
1749 /// implementation returns [`AgentError::Unsupported`].
1750 async fn unpause_container(&self, _id: &ContainerId) -> Result<()> {
1751 Err(AgentError::Unsupported(
1752 "unpause_container is not supported by this runtime".into(),
1753 ))
1754 }
1755
1756 /// Update a running container's resource limits and restart policy.
1757 ///
1758 /// Mirrors Docker's `POST /containers/{id}/update`. The fields on
1759 /// [`ContainerResourceUpdate`] are individually optional: backends
1760 /// apply only the fields that are `Some` and leave the rest of the
1761 /// container's runtime configuration untouched. A fully-empty update
1762 /// short-circuits to a no-op (no cgroup writes, no warnings).
1763 ///
1764 /// Returns a [`ContainerUpdateOutcome`] whose `warnings` vector
1765 /// surfaces non-fatal issues (e.g. "kernel memory limit is
1766 /// deprecated", or "real-time scheduling not supported on this
1767 /// kernel"). Empty `warnings` ⇒ every requested field was applied.
1768 ///
1769 /// The default implementation returns [`AgentError::Unsupported`].
1770 /// Backends override this:
1771 ///
1772 /// * Docker — calls bollard's `update_container` with a
1773 /// `ContainerUpdateBody` populated from the input.
1774 /// * Youki — writes the corresponding cgroup v2 files
1775 /// (`cpu.weight`, `memory.max`, `pids.max`, `io.weight`,
1776 /// `cpuset.cpus`, `cpuset.mems`) under
1777 /// `<container_root>/cgroup` and persists the new restart policy
1778 /// in the on-disk supervisor state.
1779 /// * Other backends (WASM, mocks) inherit the `Unsupported` default.
1780 async fn update_container_resources(
1781 &self,
1782 _id: &ContainerId,
1783 _update: &ContainerResourceUpdate,
1784 ) -> Result<ContainerUpdateOutcome> {
1785 Err(AgentError::Unsupported(
1786 "update_container_resources is not supported by this runtime".into(),
1787 ))
1788 }
1789
1790 /// List the processes running inside a container (`docker top`).
1791 ///
1792 /// `ps_args` is forwarded to the runtime as the `ps(1)` argument list when
1793 /// supported; an empty slice means "use the runtime's default columns".
1794 /// Mirrors Docker's `GET /containers/{id}/top?ps_args=<...>`.
1795 ///
1796 /// The default implementation returns [`AgentError::Unsupported`].
1797 async fn top_container(
1798 &self,
1799 _id: &ContainerId,
1800 _ps_args: &[String],
1801 ) -> Result<ContainerTopOutput> {
1802 Err(AgentError::Unsupported(
1803 "top_container is not supported by this runtime".into(),
1804 ))
1805 }
1806
1807 /// Report changes to a container's filesystem since it was created.
1808 ///
1809 /// Mirrors Docker's `GET /containers/{id}/changes`. Returns one
1810 /// [`FilesystemChangeEntry`] per added / modified / deleted path in the
1811 /// container's writable layer. Runtimes that don't compute layer diffs
1812 /// (e.g. youki, which uses raw bundle rootfs without a layered FS) return
1813 /// [`AgentError::Unsupported`].
1814 async fn changes_container(&self, _id: &ContainerId) -> Result<Vec<FilesystemChangeEntry>> {
1815 Err(AgentError::Unsupported(
1816 "changes_container is not supported by this runtime".into(),
1817 ))
1818 }
1819
1820 /// Report the published port mappings for a container.
1821 ///
1822 /// Mirrors Docker's `GET /containers/{id}/port`. Returns one
1823 /// [`PortMappingEntry`] per (container-port, protocol, host-binding)
1824 /// triple. Containers with no published ports return an empty vector.
1825 ///
1826 /// The default implementation returns [`AgentError::Unsupported`].
1827 async fn port_mappings_container(&self, _id: &ContainerId) -> Result<Vec<PortMappingEntry>> {
1828 Err(AgentError::Unsupported(
1829 "port_mappings_container is not supported by this runtime".into(),
1830 ))
1831 }
1832
1833 /// Prune stopped containers from the runtime.
1834 ///
1835 /// Mirrors Docker's `POST /containers/prune`. Returns the IDs of
1836 /// containers that were removed plus the bytes reclaimed. The default
1837 /// implementation returns [`AgentError::Unsupported`].
1838 async fn prune_containers(&self) -> Result<ContainerPruneResult> {
1839 Err(AgentError::Unsupported(
1840 "prune_containers is not supported by this runtime".into(),
1841 ))
1842 }
1843
1844 /// Enumerate all containers known to this runtime, including stopped /
1845 /// exited ones (Docker's `list_containers(all=true)` semantics).
1846 ///
1847 /// Used by `zlayer-api::handlers::standalone_reconcile` on daemon boot
1848 /// to match persisted standalone-container records against the
1849 /// runtime's actual inventory: entries the runtime no longer reports
1850 /// are pruned, surviving entries are re-registered in the
1851 /// `ContainerIdMap`, and runtime containers carrying a
1852 /// `com.zlayer.container_id` label that has no storage match are
1853 /// counted as orphans (logged but otherwise left alone).
1854 ///
1855 /// The default implementation returns an empty list, which makes
1856 /// reconcile degrade to a label-blind pass: storage entries can still
1857 /// be probed individually via [`Runtime::container_state`], but orphan
1858 /// detection is disabled until the backend overrides this method
1859 /// (Docker via `bollard::list_containers`, youki via state-dir walk).
1860 async fn list_containers(&self) -> Result<Vec<RuntimeContainerSummary>> {
1861 Ok(Vec::new())
1862 }
1863
1864 /// Stream a TAR archive of the file or directory at `path` inside the
1865 /// container.
1866 ///
1867 /// Mirrors Docker's `GET /containers/{id}/archive?path=<...>`. The
1868 /// returned [`ArchiveStream`] yields raw `application/x-tar` bytes.
1869 /// Backends decide whether to materialize the archive in memory or
1870 /// stream it on the fly:
1871 ///
1872 /// * Docker — bollard's `download_from_container` produces a chunked
1873 /// stream of TAR bytes; we forward it verbatim.
1874 /// * Youki — a rootfs walk under `<bundle>/rootfs<path>` produces a
1875 /// TAR archive in a worker task and streams it through an mpsc.
1876 /// * Other backends (WASM, mocks) inherit the `Unsupported` default.
1877 ///
1878 /// The default implementation returns [`AgentError::Unsupported`].
1879 async fn archive_get(&self, _id: &ContainerId, _path: &str) -> Result<ArchiveStream> {
1880 Err(AgentError::Unsupported(
1881 "archive_get is not supported by this runtime".into(),
1882 ))
1883 }
1884
1885 /// Extract a TAR archive into the container at `path`.
1886 ///
1887 /// Mirrors Docker's `PUT /containers/{id}/archive?path=<...>`. The
1888 /// runtime must extract `tar_bytes` (an uncompressed TAR archive) into
1889 /// `path` inside the container, honouring [`ArchivePutOptions`].
1890 ///
1891 /// `path` must already exist inside the container and must be a
1892 /// directory; if it does not exist the runtime returns
1893 /// [`AgentError::NotFound`]. Mismatched directory/non-directory
1894 /// replacements with `no_overwrite_dir_non_dir=true` return
1895 /// [`AgentError::InvalidSpec`].
1896 ///
1897 /// The default implementation returns [`AgentError::Unsupported`].
1898 async fn archive_put(
1899 &self,
1900 _id: &ContainerId,
1901 _path: &str,
1902 _tar_bytes: bytes::Bytes,
1903 _opts: ArchivePutOptions,
1904 ) -> Result<()> {
1905 Err(AgentError::Unsupported(
1906 "archive_put is not supported by this runtime".into(),
1907 ))
1908 }
1909
1910 /// Return stat metadata for the file or directory at `path` inside the
1911 /// container.
1912 ///
1913 /// Mirrors Docker's `HEAD /containers/{id}/archive?path=<...>`, which
1914 /// answers with the metadata that `GET /archive` *would* expose without
1915 /// materializing the TAR. Used by `docker cp` and the API layer to
1916 /// short-circuit on missing paths.
1917 ///
1918 /// The default implementation returns [`AgentError::Unsupported`].
1919 async fn archive_head(&self, _id: &ContainerId, _path: &str) -> Result<PathStat> {
1920 Err(AgentError::Unsupported(
1921 "archive_head is not supported by this runtime".into(),
1922 ))
1923 }
1924}
1925
1926/// Validate a signal name for [`Runtime::kill_container`].
1927///
1928/// Accepts both the `SIG`-prefixed form (`"SIGKILL"`) and the bare form
1929/// (`"KILL"`). Returns the canonical uppercase `SIG`-prefixed name on success.
1930///
1931/// # Errors
1932///
1933/// Returns [`AgentError::InvalidSpec`] when `signal` is not one of the
1934/// supported signals: `SIGKILL`, `SIGTERM`, `SIGINT`, `SIGHUP`, `SIGUSR1`,
1935/// `SIGUSR2`.
1936pub fn validate_signal(signal: &str) -> Result<String> {
1937 let trimmed = signal.trim();
1938 if trimmed.is_empty() {
1939 return Err(AgentError::InvalidSpec(
1940 "signal must not be empty".to_string(),
1941 ));
1942 }
1943 let upper = trimmed.to_ascii_uppercase();
1944 let canonical = if upper.starts_with("SIG") {
1945 upper
1946 } else {
1947 format!("SIG{upper}")
1948 };
1949 match canonical.as_str() {
1950 "SIGKILL" | "SIGTERM" | "SIGINT" | "SIGHUP" | "SIGUSR1" | "SIGUSR2" => Ok(canonical),
1951 other => Err(AgentError::InvalidSpec(format!(
1952 "unsupported signal '{other}'; allowed: SIGKILL, SIGTERM, SIGINT, SIGHUP, SIGUSR1, SIGUSR2"
1953 ))),
1954 }
1955}
1956
1957/// Auth context injected into every container so it can talk back to the host
1958/// API without needing external credentials.
1959#[derive(Debug, Clone)]
1960pub struct ContainerAuthContext {
1961 /// Base URL of the `ZLayer` API, e.g. `"http://127.0.0.1:3669"`.
1962 pub api_url: String,
1963 /// JWT signing secret — used to mint per-container tokens at start time.
1964 pub jwt_secret: String,
1965 /// Absolute path of the Unix socket on the host (bind-mounted into Linux
1966 /// containers; added to `writable_dirs` for macOS sandbox).
1967 pub socket_path: String,
1968}
1969
1970/// In-memory mock runtime for testing and development.
1971///
1972/// In addition to tracking container lifecycle in memory, the mock exposes
1973/// per-container queues for streaming method outputs so unit tests can
1974/// pre-script the events that [`Runtime::logs_stream`],
1975/// [`Runtime::stats_stream`], and [`Runtime::pull_image_stream`] should
1976/// yield. See [`MockRuntime::enqueue_log_chunk`],
1977/// [`MockRuntime::enqueue_stats_sample`], and
1978/// [`MockRuntime::enqueue_pull_progress`].
1979pub struct MockRuntime {
1980 containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
1981 /// Pre-scripted log chunks per container. Each call to
1982 /// [`Runtime::logs_stream`] drains this queue in order; once empty, the
1983 /// stream either terminates (when `follow=false`) or hangs forever
1984 /// (when `follow=true`) so tests can exercise both branches.
1985 pub logs_to_yield:
1986 Arc<Mutex<std::collections::HashMap<ContainerId, VecDeque<Result<LogChunk>>>>>,
1987 /// Pre-scripted stats samples per container. Drained in order by
1988 /// [`Runtime::stats_stream`].
1989 pub stats_to_yield:
1990 Arc<Mutex<std::collections::HashMap<ContainerId, VecDeque<Result<StatsSample>>>>>,
1991 /// Pre-scripted pull progress events keyed by image reference. Drained
1992 /// in order by [`Runtime::pull_image_stream`].
1993 pub pull_progress_to_yield:
1994 Arc<Mutex<std::collections::HashMap<String, VecDeque<Result<PullProgress>>>>>,
1995}
1996
1997impl MockRuntime {
1998 #[must_use]
1999 pub fn new() -> Self {
2000 Self {
2001 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
2002 logs_to_yield: Arc::new(Mutex::new(std::collections::HashMap::new())),
2003 stats_to_yield: Arc::new(Mutex::new(std::collections::HashMap::new())),
2004 pull_progress_to_yield: Arc::new(Mutex::new(std::collections::HashMap::new())),
2005 }
2006 }
2007
2008 /// Push a single [`LogChunk`] onto the queue for `id`. Subsequent calls to
2009 /// [`Runtime::logs_stream`] will yield enqueued chunks in FIFO order.
2010 pub async fn enqueue_log_chunk(&self, id: &ContainerId, chunk: LogChunk) {
2011 self.logs_to_yield
2012 .lock()
2013 .await
2014 .entry(id.clone())
2015 .or_default()
2016 .push_back(Ok(chunk));
2017 }
2018
2019 /// Push a pre-built error onto the log queue for `id`. The next
2020 /// [`Runtime::logs_stream`] call drains this as the next yielded item.
2021 pub async fn enqueue_log_error(&self, id: &ContainerId, err: AgentError) {
2022 self.logs_to_yield
2023 .lock()
2024 .await
2025 .entry(id.clone())
2026 .or_default()
2027 .push_back(Err(err));
2028 }
2029
2030 /// Push a single [`StatsSample`] onto the queue for `id`. Subsequent calls
2031 /// to [`Runtime::stats_stream`] will yield enqueued samples in FIFO order.
2032 pub async fn enqueue_stats_sample(&self, id: &ContainerId, sample: StatsSample) {
2033 self.stats_to_yield
2034 .lock()
2035 .await
2036 .entry(id.clone())
2037 .or_default()
2038 .push_back(Ok(sample));
2039 }
2040
2041 /// Push a single [`PullProgress`] event onto the queue for `image`.
2042 /// Subsequent calls to [`Runtime::pull_image_stream`] for the same image
2043 /// reference will yield enqueued events in FIFO order.
2044 pub async fn enqueue_pull_progress(&self, image: &str, progress: PullProgress) {
2045 self.pull_progress_to_yield
2046 .lock()
2047 .await
2048 .entry(image.to_string())
2049 .or_default()
2050 .push_back(Ok(progress));
2051 }
2052}
2053
2054impl Default for MockRuntime {
2055 fn default() -> Self {
2056 Self::new()
2057 }
2058}
2059
2060#[async_trait::async_trait]
2061impl Runtime for MockRuntime {
2062 async fn pull_image(&self, _image: &str) -> Result<()> {
2063 self.pull_image_with_policy(
2064 _image,
2065 PullPolicy::IfNotPresent,
2066 None,
2067 zlayer_spec::SourcePolicy::default(),
2068 )
2069 .await
2070 }
2071
2072 async fn pull_image_with_policy(
2073 &self,
2074 _image: &str,
2075 _policy: PullPolicy,
2076 _auth: Option<&RegistryAuth>,
2077 _source: zlayer_spec::SourcePolicy,
2078 ) -> Result<()> {
2079 // Mock: always succeeds
2080 tokio::time::sleep(Duration::from_millis(100)).await;
2081 Ok(())
2082 }
2083
2084 async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
2085 let mut containers = self.containers.write().await;
2086 containers.insert(
2087 id.clone(),
2088 Container {
2089 id: id.clone(),
2090 image: spec.image.name.to_string(),
2091 state: ContainerState::Pending,
2092 pid: None,
2093 task: None,
2094 overlay_ip: None,
2095 health_monitor: None,
2096 port_override: None,
2097 },
2098 );
2099 Ok(())
2100 }
2101
2102 async fn start_container(&self, id: &ContainerId) -> Result<()> {
2103 let mut containers = self.containers.write().await;
2104 if let Some(container) = containers.get_mut(id) {
2105 container.state = ContainerState::Running;
2106 container.pid = Some(std::process::id()); // Mock PID
2107 }
2108 Ok(())
2109 }
2110
2111 async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
2112 let mut containers = self.containers.write().await;
2113 if let Some(container) = containers.get_mut(id) {
2114 container.state = ContainerState::Exited { code: 0 };
2115 }
2116 Ok(())
2117 }
2118
2119 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
2120 let mut containers = self.containers.write().await;
2121 containers.remove(id);
2122 Ok(())
2123 }
2124
2125 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
2126 let containers = self.containers.read().await;
2127 containers
2128 .get(id)
2129 .map(|c| c.state.clone())
2130 .ok_or_else(|| AgentError::NotFound {
2131 container: id.to_string(),
2132 reason: "container not found".to_string(),
2133 })
2134 }
2135
2136 async fn container_logs(&self, _id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
2137 let entries = vec![
2138 LogEntry {
2139 timestamp: chrono::Utc::now(),
2140 stream: LogStream::Stdout,
2141 message: "mock log line 1".to_string(),
2142 source: LogSource::Container("mock".to_string()),
2143 service: None,
2144 deployment: None,
2145 },
2146 LogEntry {
2147 timestamp: chrono::Utc::now(),
2148 stream: LogStream::Stderr,
2149 message: "mock error line".to_string(),
2150 source: LogSource::Container("mock".to_string()),
2151 service: None,
2152 deployment: None,
2153 },
2154 ];
2155 let skip = entries.len().saturating_sub(tail);
2156 Ok(entries.into_iter().skip(skip).collect())
2157 }
2158
2159 async fn exec(&self, _id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
2160 Ok((0, cmd.join(" "), String::new()))
2161 }
2162
2163 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
2164 // Mock: return dummy stats
2165 let containers = self.containers.read().await;
2166 if containers.contains_key(id) {
2167 Ok(ContainerStats {
2168 cpu_usage_usec: 1_000_000, // 1 second
2169 memory_bytes: 50 * 1024 * 1024, // 50 MB
2170 memory_limit: 256 * 1024 * 1024, // 256 MB
2171 timestamp: std::time::Instant::now(),
2172 })
2173 } else {
2174 Err(AgentError::NotFound {
2175 container: id.to_string(),
2176 reason: "container not found".to_string(),
2177 })
2178 }
2179 }
2180
2181 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
2182 // Mock: simulate waiting for container to exit
2183 let containers = self.containers.read().await;
2184 if let Some(container) = containers.get(id) {
2185 match &container.state {
2186 ContainerState::Exited { code } => Ok(*code),
2187 ContainerState::Failed { .. } => Ok(1),
2188 _ => {
2189 // Simulate a brief wait and then return success
2190 drop(containers);
2191 tokio::time::sleep(Duration::from_millis(50)).await;
2192 Ok(0)
2193 }
2194 }
2195 } else {
2196 Err(AgentError::NotFound {
2197 container: id.to_string(),
2198 reason: "container not found".to_string(),
2199 })
2200 }
2201 }
2202
2203 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
2204 // Mock: return dummy structured log entries
2205 let containers = self.containers.read().await;
2206 if containers.contains_key(id) {
2207 let container_name = id.to_string();
2208 Ok(vec![
2209 LogEntry {
2210 timestamp: chrono::Utc::now(),
2211 stream: LogStream::Stdout,
2212 message: format!("[{container_name}] Container started"),
2213 source: LogSource::Container(container_name.clone()),
2214 service: None,
2215 deployment: None,
2216 },
2217 LogEntry {
2218 timestamp: chrono::Utc::now(),
2219 stream: LogStream::Stdout,
2220 message: format!("[{container_name}] Executing command..."),
2221 source: LogSource::Container(container_name.clone()),
2222 service: None,
2223 deployment: None,
2224 },
2225 LogEntry {
2226 timestamp: chrono::Utc::now(),
2227 stream: LogStream::Stdout,
2228 message: format!("[{container_name}] Command completed successfully"),
2229 source: LogSource::Container(container_name),
2230 service: None,
2231 deployment: None,
2232 },
2233 ])
2234 } else {
2235 Err(AgentError::NotFound {
2236 container: id.to_string(),
2237 reason: "container not found".to_string(),
2238 })
2239 }
2240 }
2241
2242 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
2243 let containers = self.containers.read().await;
2244 if let Some(container) = containers.get(id) {
2245 Ok(container.pid)
2246 } else {
2247 Err(AgentError::NotFound {
2248 container: id.to_string(),
2249 reason: "container not found".to_string(),
2250 })
2251 }
2252 }
2253
2254 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
2255 let containers = self.containers.read().await;
2256 if containers.contains_key(id) {
2257 // Mock: deterministic IP based on replica number (172.17.0.{replica+2})
2258 #[allow(clippy::cast_possible_truncation)]
2259 let last_octet = (id.replica + 2) as u8;
2260 Ok(Some(IpAddr::V4(std::net::Ipv4Addr::new(
2261 172, 17, 0, last_octet,
2262 ))))
2263 } else {
2264 Err(AgentError::NotFound {
2265 container: id.to_string(),
2266 reason: "container not found".to_string(),
2267 })
2268 }
2269 }
2270
2271 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
2272 Ok(Vec::new())
2273 }
2274
2275 async fn remove_image(&self, _image: &str, _force: bool) -> Result<()> {
2276 Ok(())
2277 }
2278
2279 async fn prune_images(&self) -> Result<PruneResult> {
2280 Ok(PruneResult::default())
2281 }
2282
2283 async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
2284 // Validate signal even in the mock so callers exercise the same error
2285 // path. Default to SIGKILL when omitted.
2286 let _canonical = validate_signal(signal.unwrap_or("SIGKILL"))?;
2287 let mut containers = self.containers.write().await;
2288 let container = containers.get_mut(id).ok_or_else(|| AgentError::NotFound {
2289 container: id.to_string(),
2290 reason: "container not found".to_string(),
2291 })?;
2292 container.state = ContainerState::Exited { code: 137 };
2293 Ok(())
2294 }
2295
2296 async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
2297 // The in-memory mock doesn't store images; treat tag as a no-op success.
2298 Ok(())
2299 }
2300
2301 async fn logs_stream(&self, id: &ContainerId, opts: LogsStreamOptions) -> Result<LogsStream> {
2302 use futures_util::StreamExt;
2303
2304 // Drain the per-container queue once at call time; the resulting
2305 // iterator is owned by the stream so the lock isn't held while the
2306 // consumer reads.
2307 let queued: Vec<Result<LogChunk>> = {
2308 let mut guard = self.logs_to_yield.lock().await;
2309 guard.remove(id).map(Vec::from).unwrap_or_default()
2310 };
2311 let head = futures_util::stream::iter(queued);
2312 if opts.follow {
2313 // After the queued items are drained, hang forever so callers can
2314 // exercise the "still-following, no more data" branch and cancel
2315 // by dropping the stream.
2316 let tail = futures_util::stream::pending::<Result<LogChunk>>();
2317 Ok(Box::pin(head.chain(tail)))
2318 } else {
2319 Ok(Box::pin(head))
2320 }
2321 }
2322
2323 async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
2324 let queued: Vec<Result<StatsSample>> = {
2325 let mut guard = self.stats_to_yield.lock().await;
2326 guard.remove(id).map(Vec::from).unwrap_or_default()
2327 };
2328 // `stats_stream` has no `follow` flag on the trait. The mock yields
2329 // exactly what was pre-loaded and then closes — tests that want a
2330 // forever-pending stream can simulate it by holding the receiver and
2331 // never enqueueing more. Closing on drain keeps tests bounded so a
2332 // forgotten consumer never deadlocks the test runner.
2333 Ok(Box::pin(futures_util::stream::iter(queued)))
2334 }
2335
2336 async fn pull_image_stream(
2337 &self,
2338 image: &str,
2339 _auth: Option<&RegistryAuth>,
2340 ) -> Result<PullProgressStream> {
2341 let queued: Vec<Result<PullProgress>> = {
2342 let mut guard = self.pull_progress_to_yield.lock().await;
2343 guard.remove(image).map(Vec::from).unwrap_or_default()
2344 };
2345 // Pulls are inherently bounded: the real backends emit a final `Done`
2346 // event and close the stream. The mock follows the same shape — it
2347 // just yields whatever the test pre-loaded and ends.
2348 Ok(Box::pin(futures_util::stream::iter(queued)))
2349 }
2350}
2351
2352#[cfg(test)]
2353mod tests {
2354 use super::*;
2355
2356 #[tokio::test]
2357 async fn test_mock_runtime() {
2358 let runtime = MockRuntime::new();
2359 let id = ContainerId::new("test".to_string(), 1);
2360
2361 runtime.pull_image("test:latest").await.unwrap();
2362 runtime.create_container(&id, &mock_spec()).await.unwrap();
2363 runtime.start_container(&id).await.unwrap();
2364
2365 let state = runtime.container_state(&id).await.unwrap();
2366 assert_eq!(state, ContainerState::Running);
2367 }
2368
2369 #[test]
2370 fn parse_display_round_trips_cluster_shape() {
2371 // The exact id the deployment-run path surfaces: service=alpine,
2372 // role=default, replica=1, node_id=1 → `alpine-default-1-on-1`.
2373 let cid = ContainerId::with_role_and_node("alpine", 1, "default", 1);
2374 let s = cid.to_string();
2375 assert_eq!(s, "alpine-default-1-on-1");
2376 assert_eq!(ContainerId::parse_display(&s), Some(cid));
2377 }
2378
2379 #[test]
2380 fn parse_display_round_trips_legacy_shape() {
2381 let cid = ContainerId::new("alpine", 3);
2382 let s = cid.to_string();
2383 assert_eq!(s, "alpine-rep-3");
2384 assert_eq!(ContainerId::parse_display(&s), Some(cid));
2385 }
2386
2387 #[test]
2388 fn parse_display_handles_hyphenated_service() {
2389 // A service name containing `-` must round-trip: the parser anchors on
2390 // the rightmost structural markers, not a left-to-right split.
2391 let cid = ContainerId::with_role_and_node("my-web-app", 2, "read", 7);
2392 let s = cid.to_string();
2393 assert_eq!(s, "my-web-app-read-2-on-7");
2394 assert_eq!(ContainerId::parse_display(&s), Some(cid));
2395 }
2396
2397 #[test]
2398 fn parse_display_rejects_non_ids() {
2399 // A 64-char hex id, a bare name, and obvious garbage must not parse.
2400 assert_eq!(ContainerId::parse_display("alpine"), None);
2401 assert_eq!(
2402 ContainerId::parse_display(
2403 "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2404 ),
2405 None
2406 );
2407 assert_eq!(ContainerId::parse_display("alpine-default-x-on-1"), None);
2408 assert_eq!(ContainerId::parse_display("alpine-default-1-on-y"), None);
2409 assert_eq!(ContainerId::parse_display(""), None);
2410 }
2411
2412 #[test]
2413 fn validate_signal_accepts_known_signals() {
2414 // SIG-prefixed form
2415 assert_eq!(validate_signal("SIGKILL").unwrap(), "SIGKILL");
2416 assert_eq!(validate_signal("SIGTERM").unwrap(), "SIGTERM");
2417 assert_eq!(validate_signal("SIGINT").unwrap(), "SIGINT");
2418 assert_eq!(validate_signal("SIGHUP").unwrap(), "SIGHUP");
2419 assert_eq!(validate_signal("SIGUSR1").unwrap(), "SIGUSR1");
2420 assert_eq!(validate_signal("SIGUSR2").unwrap(), "SIGUSR2");
2421
2422 // Bare form (no "SIG" prefix) should be canonicalised.
2423 assert_eq!(validate_signal("KILL").unwrap(), "SIGKILL");
2424 assert_eq!(validate_signal("term").unwrap(), "SIGTERM");
2425 // Whitespace around the name is tolerated.
2426 assert_eq!(validate_signal(" INT ").unwrap(), "SIGINT");
2427 }
2428
2429 #[test]
2430 fn validate_signal_rejects_unknown_or_empty() {
2431 assert!(matches!(
2432 validate_signal(""),
2433 Err(AgentError::InvalidSpec(_))
2434 ));
2435 assert!(matches!(
2436 validate_signal(" "),
2437 Err(AgentError::InvalidSpec(_))
2438 ));
2439 assert!(matches!(
2440 validate_signal("SIGSEGV"),
2441 Err(AgentError::InvalidSpec(_))
2442 ));
2443 assert!(matches!(
2444 validate_signal("NOPE"),
2445 Err(AgentError::InvalidSpec(_))
2446 ));
2447 // Signals outside the POSIX allowlist are rejected even if real.
2448 assert!(matches!(
2449 validate_signal("SIGPIPE"),
2450 Err(AgentError::InvalidSpec(_))
2451 ));
2452 }
2453
2454 #[tokio::test]
2455 async fn mock_kill_container_defaults_to_sigkill() {
2456 let runtime = MockRuntime::new();
2457 let id = ContainerId::new("kill-me".to_string(), 0);
2458 runtime.create_container(&id, &mock_spec()).await.unwrap();
2459 runtime.start_container(&id).await.unwrap();
2460
2461 // `None` -> defaults to SIGKILL; returns Ok and marks the container
2462 // as exited.
2463 runtime.kill_container(&id, None).await.unwrap();
2464 let state = runtime.container_state(&id).await.unwrap();
2465 assert!(
2466 matches!(state, ContainerState::Exited { code: 137 }),
2467 "expected Exited(137), got {state:?}"
2468 );
2469 }
2470
2471 #[test]
2472 fn wait_reason_serializes_as_snake_case() {
2473 assert_eq!(
2474 serde_json::to_string(&WaitReason::Exited).unwrap(),
2475 "\"exited\""
2476 );
2477 assert_eq!(
2478 serde_json::to_string(&WaitReason::Signal).unwrap(),
2479 "\"signal\""
2480 );
2481 assert_eq!(
2482 serde_json::to_string(&WaitReason::OomKilled).unwrap(),
2483 "\"oom_killed\""
2484 );
2485 assert_eq!(
2486 serde_json::to_string(&WaitReason::RuntimeError).unwrap(),
2487 "\"runtime_error\""
2488 );
2489 }
2490
2491 #[test]
2492 fn wait_reason_deserialize_roundtrip() {
2493 for variant in [
2494 WaitReason::Exited,
2495 WaitReason::Signal,
2496 WaitReason::OomKilled,
2497 WaitReason::RuntimeError,
2498 ] {
2499 let s = serde_json::to_string(&variant).unwrap();
2500 let back: WaitReason = serde_json::from_str(&s).unwrap();
2501 assert_eq!(variant, back, "roundtrip failed for {variant:?}");
2502 }
2503 }
2504
2505 #[test]
2506 fn signal_name_from_exit_code_known_signals() {
2507 assert_eq!(signal_name_from_exit_code(137).as_deref(), Some("SIGKILL"));
2508 assert_eq!(signal_name_from_exit_code(143).as_deref(), Some("SIGTERM"));
2509 assert_eq!(signal_name_from_exit_code(130).as_deref(), Some("SIGINT"));
2510 assert_eq!(signal_name_from_exit_code(129).as_deref(), Some("SIGHUP"));
2511 assert_eq!(signal_name_from_exit_code(139).as_deref(), Some("SIGSEGV"));
2512 }
2513
2514 #[test]
2515 fn signal_name_from_exit_code_handles_unknown_and_normal() {
2516 // Normal exits (<= 128) return None.
2517 assert_eq!(signal_name_from_exit_code(0), None);
2518 assert_eq!(signal_name_from_exit_code(1), None);
2519 assert_eq!(signal_name_from_exit_code(128), None);
2520
2521 // Unknown signals produce a stable string form.
2522 assert_eq!(
2523 signal_name_from_exit_code(128 + 99).as_deref(),
2524 Some("signal_99")
2525 );
2526 }
2527
2528 #[tokio::test]
2529 async fn default_wait_outcome_delegates_to_wait_container() {
2530 let runtime = MockRuntime::new();
2531 let id = ContainerId::new("wait-test".to_string(), 0);
2532 runtime.create_container(&id, &mock_spec()).await.unwrap();
2533 runtime.start_container(&id).await.unwrap();
2534
2535 let outcome = runtime.wait_outcome(&id).await.unwrap();
2536 // MockRuntime::wait_container returns 0 for running containers.
2537 assert_eq!(outcome.exit_code, 0);
2538 assert_eq!(outcome.reason, WaitReason::Exited);
2539 assert!(outcome.signal.is_none());
2540 assert!(outcome.finished_at.is_none());
2541 }
2542
2543 #[tokio::test]
2544 async fn mock_kill_container_rejects_bogus_signal() {
2545 let runtime = MockRuntime::new();
2546 let id = ContainerId::new("kill-me".to_string(), 0);
2547 runtime.create_container(&id, &mock_spec()).await.unwrap();
2548 runtime.start_container(&id).await.unwrap();
2549
2550 let err = runtime
2551 .kill_container(&id, Some("SIGFOO"))
2552 .await
2553 .unwrap_err();
2554 assert!(
2555 matches!(err, AgentError::InvalidSpec(_)),
2556 "expected InvalidSpec, got {err:?}"
2557 );
2558 }
2559
2560 // The default trait impls of `logs_stream` and `stats_stream` still return
2561 // `AgentError::Unsupported`; `pull_image_stream` now performs a blocking
2562 // pull and synthesizes a Status+Done progress stream. `MockRuntime`
2563 // overrides all three so tests can pre-script stream output (see
2564 // `mock_logs_stream_yields_queued_items_in_order` and friends below).
2565 // A trivial `BareRuntime` exercises the default trait impls without
2566 // dragging in MockRuntime's overrides.
2567
2568 /// Minimal `Runtime` implementation used to exercise the default trait
2569 /// impls of `logs_stream` / `stats_stream` / `pull_image_stream`. Most
2570 /// methods panic, but `pull_image_with_policy` returns `Ok(())` so the
2571 /// default `pull_image_stream` (which delegates to it) can be exercised.
2572 struct BareRuntime;
2573
2574 #[async_trait::async_trait]
2575 impl Runtime for BareRuntime {
2576 async fn pull_image(&self, _image: &str) -> Result<()> {
2577 unimplemented!()
2578 }
2579 async fn pull_image_with_policy(
2580 &self,
2581 _image: &str,
2582 _policy: PullPolicy,
2583 _auth: Option<&RegistryAuth>,
2584 _source: zlayer_spec::SourcePolicy,
2585 ) -> Result<()> {
2586 // The default `pull_image_stream` delegates here; return Ok so the
2587 // streaming default can be exercised without a real registry.
2588 Ok(())
2589 }
2590 async fn create_container(&self, _id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
2591 unimplemented!()
2592 }
2593 async fn start_container(&self, _id: &ContainerId) -> Result<()> {
2594 unimplemented!()
2595 }
2596 async fn stop_container(&self, _id: &ContainerId, _timeout: Duration) -> Result<()> {
2597 unimplemented!()
2598 }
2599 async fn remove_container(&self, _id: &ContainerId) -> Result<()> {
2600 unimplemented!()
2601 }
2602 async fn container_state(&self, _id: &ContainerId) -> Result<ContainerState> {
2603 unimplemented!()
2604 }
2605 async fn container_logs(&self, _id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
2606 unimplemented!()
2607 }
2608 async fn exec(&self, _id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
2609 unimplemented!()
2610 }
2611 async fn get_container_stats(&self, _id: &ContainerId) -> Result<ContainerStats> {
2612 unimplemented!()
2613 }
2614 async fn wait_container(&self, _id: &ContainerId) -> Result<i32> {
2615 unimplemented!()
2616 }
2617 async fn get_logs(&self, _id: &ContainerId) -> Result<Vec<LogEntry>> {
2618 unimplemented!()
2619 }
2620 async fn get_container_pid(&self, _id: &ContainerId) -> Result<Option<u32>> {
2621 unimplemented!()
2622 }
2623 async fn get_container_ip(&self, _id: &ContainerId) -> Result<Option<IpAddr>> {
2624 unimplemented!()
2625 }
2626 }
2627
2628 #[tokio::test]
2629 async fn default_logs_stream_is_unsupported() {
2630 let runtime = BareRuntime;
2631 let id = ContainerId::new("stream-test".to_string(), 0);
2632 // The success-side `LogsStream` is not `Debug`, so we can't call
2633 // `unwrap_err`; pattern-match on the Result directly instead.
2634 match runtime.logs_stream(&id, LogsStreamOptions::default()).await {
2635 Err(AgentError::Unsupported(_)) => {}
2636 Err(other) => panic!("expected Unsupported, got {other:?}"),
2637 Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2638 }
2639 }
2640
2641 #[tokio::test]
2642 async fn default_stats_stream_is_unsupported() {
2643 let runtime = BareRuntime;
2644 let id = ContainerId::new("stream-test".to_string(), 0);
2645 match runtime.stats_stream(&id).await {
2646 Err(AgentError::Unsupported(_)) => {}
2647 Err(other) => panic!("expected Unsupported, got {other:?}"),
2648 Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2649 }
2650 }
2651
2652 #[tokio::test]
2653 async fn default_pull_image_stream_synthesizes_progress() {
2654 use futures_util::StreamExt as _;
2655
2656 // The default `pull_image_stream` now performs a blocking pull (via
2657 // `pull_image_with_policy`, which `BareRuntime` answers with `Ok`) and
2658 // synthesizes a Status + Done progress stream.
2659 let runtime = BareRuntime;
2660 let stream = runtime
2661 .pull_image_stream("alpine:latest", None)
2662 .await
2663 .expect("default pull_image_stream should succeed when the pull succeeds");
2664 let events: Vec<_> = stream.collect().await;
2665 assert_eq!(events.len(), 2, "expected a Status then a Done event");
2666 assert!(
2667 matches!(events[0], Ok(PullProgress::Status { .. })),
2668 "first event should be a Status line, got {:?}",
2669 events[0]
2670 );
2671 assert!(
2672 matches!(events[1], Ok(PullProgress::Done { .. })),
2673 "second event should be the terminal Done, got {:?}",
2674 events[1]
2675 );
2676 }
2677
2678 #[tokio::test]
2679 async fn default_archive_get_is_unsupported() {
2680 let runtime = BareRuntime;
2681 let id = ContainerId::new("archive-test".to_string(), 0);
2682 match runtime.archive_get(&id, "/etc/hosts").await {
2683 Err(AgentError::Unsupported(_)) => {}
2684 Err(other) => panic!("expected Unsupported, got {other:?}"),
2685 Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2686 }
2687 }
2688
2689 #[tokio::test]
2690 async fn default_archive_put_is_unsupported() {
2691 let runtime = BareRuntime;
2692 let id = ContainerId::new("archive-test".to_string(), 0);
2693 let err = runtime
2694 .archive_put(
2695 &id,
2696 "/tmp",
2697 bytes::Bytes::from_static(&[]),
2698 ArchivePutOptions::default(),
2699 )
2700 .await
2701 .unwrap_err();
2702 assert!(matches!(err, AgentError::Unsupported(_)));
2703 }
2704
2705 #[tokio::test]
2706 async fn default_archive_head_is_unsupported() {
2707 let runtime = BareRuntime;
2708 let id = ContainerId::new("archive-test".to_string(), 0);
2709 let err = runtime.archive_head(&id, "/etc/hosts").await.unwrap_err();
2710 assert!(matches!(err, AgentError::Unsupported(_)));
2711 }
2712
2713 #[tokio::test]
2714 async fn default_exec_pty_is_unsupported() {
2715 let runtime = BareRuntime;
2716 let id = ContainerId::new("exec-pty".to_string(), 0);
2717 // The success-side `ExecHandle` is not `Debug`, so we can't call
2718 // `unwrap_err`; pattern-match on the Result directly instead.
2719 match runtime.exec_pty(&id, ExecOptions::default()).await {
2720 Err(AgentError::Unsupported(_)) => {}
2721 Err(other) => panic!("expected Unsupported, got {other:?}"),
2722 Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2723 }
2724 }
2725
2726 #[tokio::test]
2727 async fn default_inspect_image_native_is_unsupported() {
2728 let runtime = BareRuntime;
2729 let err = runtime.inspect_image_native("alpine").await.unwrap_err();
2730 assert!(matches!(err, AgentError::Unsupported(_)));
2731 }
2732
2733 #[tokio::test]
2734 async fn default_image_history_is_unsupported() {
2735 let runtime = BareRuntime;
2736 let err = runtime.image_history("alpine").await.unwrap_err();
2737 assert!(matches!(err, AgentError::Unsupported(_)));
2738 }
2739
2740 #[tokio::test]
2741 async fn default_search_images_is_unsupported() {
2742 let runtime = BareRuntime;
2743 let err = runtime.search_images("nginx", 10).await.unwrap_err();
2744 assert!(matches!(err, AgentError::Unsupported(_)));
2745 }
2746
2747 #[tokio::test]
2748 async fn default_save_images_is_unsupported() {
2749 let runtime = BareRuntime;
2750 // The success-side stream isn't `Debug`, so pattern-match the Result.
2751 match runtime.save_images(&["alpine".to_string()]).await {
2752 Err(AgentError::Unsupported(_)) => {}
2753 Err(other) => panic!("expected Unsupported, got {other:?}"),
2754 Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2755 }
2756 }
2757
2758 #[tokio::test]
2759 async fn default_load_images_is_unsupported() {
2760 let runtime = BareRuntime;
2761 match runtime
2762 .load_images(bytes::Bytes::from_static(&[]), false)
2763 .await
2764 {
2765 Err(AgentError::Unsupported(_)) => {}
2766 Err(other) => panic!("expected Unsupported, got {other:?}"),
2767 Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2768 }
2769 }
2770
2771 #[tokio::test]
2772 async fn default_import_image_is_unsupported() {
2773 let runtime = BareRuntime;
2774 let err = runtime
2775 .import_image(bytes::Bytes::from_static(&[]), None, None)
2776 .await
2777 .unwrap_err();
2778 assert!(matches!(err, AgentError::Unsupported(_)));
2779 }
2780
2781 #[tokio::test]
2782 async fn default_export_container_fs_is_unsupported() {
2783 let runtime = BareRuntime;
2784 let id = ContainerId::new("export".to_string(), 0);
2785 match runtime.export_container_fs(&id).await {
2786 Err(AgentError::Unsupported(_)) => {}
2787 Err(other) => panic!("expected Unsupported, got {other:?}"),
2788 Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2789 }
2790 }
2791
2792 #[tokio::test]
2793 async fn default_commit_container_is_unsupported() {
2794 let runtime = BareRuntime;
2795 let id = ContainerId::new("commit".to_string(), 0);
2796 let err = runtime
2797 .commit_container(&id, &CommitOptions::default())
2798 .await
2799 .unwrap_err();
2800 assert!(matches!(err, AgentError::Unsupported(_)));
2801 }
2802
2803 #[test]
2804 fn load_progress_serializes_with_kind_discriminator() {
2805 let status = LoadProgress::Status {
2806 id: Some("abc".to_string()),
2807 status: "Loading layer".to_string(),
2808 };
2809 let json = serde_json::to_value(&status).unwrap();
2810 assert_eq!(json["kind"], "status");
2811 assert_eq!(json["status"], "Loading layer");
2812
2813 let done = LoadProgress::Done {
2814 references: vec!["alpine:latest".to_string()],
2815 };
2816 let json = serde_json::to_value(&done).unwrap();
2817 assert_eq!(json["kind"], "done");
2818 assert_eq!(json["references"], serde_json::json!(["alpine:latest"]));
2819 }
2820
2821 #[test]
2822 fn commit_options_default_is_no_op_pause_false() {
2823 let opts = CommitOptions::default();
2824 assert!(opts.repo.is_none());
2825 assert!(opts.tag.is_none());
2826 assert!(opts.comment.is_none());
2827 assert!(opts.author.is_none());
2828 assert!(!opts.pause);
2829 assert!(opts.changes.is_none());
2830 }
2831
2832 #[test]
2833 fn image_inspect_info_default_round_trips_via_serde() {
2834 let info = ImageInspectInfo::default();
2835 let json = serde_json::to_string(&info).unwrap();
2836 let back: ImageInspectInfo = serde_json::from_str(&json).unwrap();
2837 assert_eq!(info, back);
2838 }
2839
2840 #[tokio::test]
2841 async fn mock_logs_stream_yields_queued_items_in_order() {
2842 use futures_util::StreamExt;
2843
2844 let runtime = MockRuntime::new();
2845 let id = ContainerId::new("logs-order".to_string(), 0);
2846
2847 let make_chunk = |s: &str, ch: LogChannel| LogChunk {
2848 stream: ch,
2849 bytes: bytes::Bytes::copy_from_slice(s.as_bytes()),
2850 timestamp: None,
2851 };
2852
2853 runtime
2854 .enqueue_log_chunk(&id, make_chunk("first", LogChannel::Stdout))
2855 .await;
2856 runtime
2857 .enqueue_log_chunk(&id, make_chunk("second", LogChannel::Stderr))
2858 .await;
2859 runtime
2860 .enqueue_log_chunk(&id, make_chunk("third", LogChannel::Stdout))
2861 .await;
2862
2863 // `follow=false` so the stream ends once the queue is drained.
2864 let opts = LogsStreamOptions {
2865 follow: false,
2866 ..LogsStreamOptions::default()
2867 };
2868 let mut stream = runtime.logs_stream(&id, opts).await.unwrap();
2869
2870 let mut got = Vec::new();
2871 while let Some(item) = stream.next().await {
2872 let chunk = item.unwrap();
2873 got.push((
2874 chunk.stream,
2875 String::from_utf8(chunk.bytes.to_vec()).unwrap(),
2876 ));
2877 }
2878 assert_eq!(
2879 got,
2880 vec![
2881 (LogChannel::Stdout, "first".to_string()),
2882 (LogChannel::Stderr, "second".to_string()),
2883 (LogChannel::Stdout, "third".to_string()),
2884 ]
2885 );
2886 }
2887
2888 #[tokio::test]
2889 async fn mock_logs_stream_empty_queue_ends_immediately_when_not_follow() {
2890 use futures_util::StreamExt;
2891
2892 let runtime = MockRuntime::new();
2893 let id = ContainerId::new("logs-empty".to_string(), 0);
2894
2895 let opts = LogsStreamOptions {
2896 follow: false,
2897 ..LogsStreamOptions::default()
2898 };
2899 let mut stream = runtime.logs_stream(&id, opts).await.unwrap();
2900
2901 // Empty queue + follow=false => stream is closed on first poll.
2902 // Wrap in a short timeout so a regression that hangs would surface
2903 // as a test failure rather than the test runner stalling.
2904 let next = tokio::time::timeout(Duration::from_millis(500), stream.next())
2905 .await
2906 .expect("stream did not terminate; expected immediate close on empty queue");
2907 assert!(
2908 next.is_none(),
2909 "expected stream to be exhausted, got Some(_)"
2910 );
2911 }
2912
2913 #[tokio::test]
2914 async fn mock_stats_stream_yields_queued_samples_in_order() {
2915 use futures_util::StreamExt;
2916
2917 let runtime = MockRuntime::new();
2918 let id = ContainerId::new("stats-order".to_string(), 0);
2919
2920 let now = chrono::Utc::now();
2921 let mk = |cpu: u64| StatsSample {
2922 cpu_total_ns: cpu,
2923 cpu_system_ns: 0,
2924 online_cpus: 1,
2925 mem_used_bytes: 0,
2926 mem_limit_bytes: 0,
2927 net_rx_bytes: 0,
2928 net_tx_bytes: 0,
2929 blkio_read_bytes: 0,
2930 blkio_write_bytes: 0,
2931 pids_current: 0,
2932 pids_limit: None,
2933 timestamp: now,
2934 };
2935
2936 runtime.enqueue_stats_sample(&id, mk(100)).await;
2937 runtime.enqueue_stats_sample(&id, mk(200)).await;
2938 runtime.enqueue_stats_sample(&id, mk(300)).await;
2939
2940 let mut stream = runtime.stats_stream(&id).await.unwrap();
2941
2942 let mut cpus = Vec::new();
2943 while let Some(item) = stream.next().await {
2944 cpus.push(item.unwrap().cpu_total_ns);
2945 }
2946 assert_eq!(cpus, vec![100, 200, 300]);
2947 }
2948
2949 #[tokio::test]
2950 async fn mock_pull_image_stream_yields_queued_progress_in_order() {
2951 use futures_util::StreamExt;
2952
2953 let runtime = MockRuntime::new();
2954 let image = "alpine:latest";
2955
2956 runtime
2957 .enqueue_pull_progress(
2958 image,
2959 PullProgress::Status {
2960 id: Some("layer-1".to_string()),
2961 status: "Pulling fs layer".to_string(),
2962 progress: None,
2963 current: None,
2964 total: None,
2965 },
2966 )
2967 .await;
2968 runtime
2969 .enqueue_pull_progress(
2970 image,
2971 PullProgress::Status {
2972 id: Some("layer-1".to_string()),
2973 status: "Downloading".to_string(),
2974 progress: Some("[==> ] 1MB/4MB".to_string()),
2975 current: Some(1024 * 1024),
2976 total: Some(4 * 1024 * 1024),
2977 },
2978 )
2979 .await;
2980 runtime
2981 .enqueue_pull_progress(
2982 image,
2983 PullProgress::Done {
2984 reference: image.to_string(),
2985 digest: Some("sha256:deadbeef".to_string()),
2986 },
2987 )
2988 .await;
2989
2990 let mut stream = runtime.pull_image_stream(image, None).await.unwrap();
2991 let mut events = Vec::new();
2992 while let Some(item) = stream.next().await {
2993 events.push(item.unwrap());
2994 }
2995
2996 assert_eq!(events.len(), 3);
2997 match &events[0] {
2998 PullProgress::Status { status, .. } => assert_eq!(status, "Pulling fs layer"),
2999 done @ PullProgress::Done { .. } => panic!("expected Status, got {done:?}"),
3000 }
3001 match &events[1] {
3002 PullProgress::Status {
3003 status,
3004 current,
3005 total,
3006 ..
3007 } => {
3008 assert_eq!(status, "Downloading");
3009 assert_eq!(*current, Some(1024 * 1024));
3010 assert_eq!(*total, Some(4 * 1024 * 1024));
3011 }
3012 done @ PullProgress::Done { .. } => panic!("expected Status, got {done:?}"),
3013 }
3014 match &events[2] {
3015 PullProgress::Done { reference, digest } => {
3016 assert_eq!(reference, image);
3017 assert_eq!(digest.as_deref(), Some("sha256:deadbeef"));
3018 }
3019 status @ PullProgress::Status { .. } => panic!("expected Done, got {status:?}"),
3020 }
3021 }
3022
3023 #[test]
3024 fn log_channel_serializes_as_snake_case() {
3025 assert_eq!(
3026 serde_json::to_string(&LogChannel::Stdin).unwrap(),
3027 "\"stdin\""
3028 );
3029 assert_eq!(
3030 serde_json::to_string(&LogChannel::Stdout).unwrap(),
3031 "\"stdout\""
3032 );
3033 assert_eq!(
3034 serde_json::to_string(&LogChannel::Stderr).unwrap(),
3035 "\"stderr\""
3036 );
3037 }
3038
3039 fn mock_spec() -> ServiceSpec {
3040 use zlayer_spec::*;
3041 serde_yaml::from_str::<DeploymentSpec>(
3042 r"
3043version: v1
3044deployment: test
3045services:
3046 test:
3047 rtype: service
3048 image:
3049 name: test:latest
3050 endpoints:
3051 - name: http
3052 protocol: http
3053 port: 8080
3054",
3055 )
3056 .unwrap()
3057 .services
3058 .remove("test")
3059 .unwrap()
3060 }
3061}