Skip to main content

zlayer_agent/
runtime.rs

1//! Abstract container runtime interface
2//!
3//! Defines the Runtime trait that can be implemented for different container runtimes
4//! (containerd, CRI-O, etc.)
5
6use crate::cgroups_stats::ContainerStats;
7use crate::error::{AgentError, Result};
8use futures_util::Stream;
9use std::collections::VecDeque;
10use std::future::Future;
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::io::{AsyncRead, AsyncWrite};
16use tokio::sync::Mutex;
17use tokio::task::JoinHandle;
18use zlayer_observability::logs::{LogEntry, LogSource, LogStream};
19use zlayer_spec::{PullPolicy, RegistryAuth, ServiceSpec};
20
21/// Container state
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum ContainerState {
24    /// Container is being pulled/created
25    Pending,
26    /// Init actions are running
27    Initializing,
28    /// Container is running
29    Running,
30    /// Container is stopping
31    Stopping,
32    /// Container has exited
33    Exited { code: i32 },
34    /// Container failed
35    Failed { reason: String },
36}
37
38impl ContainerState {
39    /// Stable lowercase string representation of the state.
40    ///
41    /// Used when surfacing container state through the API / `ps` output.
42    /// `Running` stringifies to `"running"` (matched case-insensitively by the
43    /// raft e2e harness when counting healthy replicas).
44    #[must_use]
45    pub fn as_str(&self) -> &'static str {
46        match self {
47            Self::Pending => "pending",
48            Self::Initializing => "initializing",
49            Self::Running => "running",
50            Self::Stopping => "stopping",
51            Self::Exited { .. } => "exited",
52            Self::Failed { .. } => "failed",
53        }
54    }
55}
56
57impl std::fmt::Display for ContainerState {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        f.write_str(self.as_str())
60    }
61}
62
63/// Container identifier.
64///
65/// Identifies a container by `(service, replica)` for the legacy single-group
66/// case, and extends with `role` + `node_id` for cluster-aware
67/// multi-group services and cross-node identification.
68///
69/// Defaults: `role = "default"`, `node_id = 0`. Existing constructors
70/// (`ContainerId::new(service, replica)`) produce these defaults. Use
71/// `ContainerId::with_role_and_node(...)` when the new fields matter.
72///
73/// Display:
74/// - With defaults: `{service}-rep-{replica}` (backward compat).
75/// - Otherwise: `{service}-{role}-{replica}-on-{node_id}`.
76#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
77pub struct ContainerId {
78    pub service: String,
79    pub replica: u32,
80    /// Role within `replica_groups`. `"default"` for services without groups.
81    #[serde(default = "default_container_role")]
82    pub role: String,
83    /// Cluster node that owns this container. `0` in single-node deployments
84    /// or before the cluster is initialized.
85    #[serde(default)]
86    pub node_id: u64,
87}
88
89fn default_container_role() -> String {
90    "default".to_string()
91}
92
93impl ContainerId {
94    /// Build a legacy `{service, replica}` `ContainerId` with default `role`
95    /// and `node_id`. Used by all existing callsites — behavior is unchanged.
96    #[must_use]
97    pub fn new(service: impl Into<String>, replica: u32) -> Self {
98        Self {
99            service: service.into(),
100            replica,
101            role: default_container_role(),
102            node_id: 0,
103        }
104    }
105
106    /// Build a cluster-aware `ContainerId` with explicit `role` and `node_id`.
107    /// Used by `ServiceManager` when a service has `replica_groups` or when
108    /// the daemon participates in a multi-node cluster.
109    #[must_use]
110    pub fn with_role_and_node(
111        service: impl Into<String>,
112        replica: u32,
113        role: impl Into<String>,
114        node_id: u64,
115    ) -> Self {
116        Self {
117            service: service.into(),
118            replica,
119            role: role.into(),
120            node_id,
121        }
122    }
123
124    /// True when both `role` and `node_id` are at their defaults — i.e.
125    /// this is a legacy-shape `ContainerId`.
126    #[must_use]
127    pub fn is_legacy_shape(&self) -> bool {
128        self.role == "default" && self.node_id == 0
129    }
130
131    /// Parse a `ContainerId` back from its [`Display`](std::fmt::Display) form.
132    ///
133    /// This is the exact inverse of `Display`:
134    /// - `"{service}-rep-{replica}"` (legacy shape) → `ContainerId::new`.
135    /// - `"{service}-{role}-{replica}-on-{node_id}"` (cluster shape) →
136    ///   `ContainerId::with_role_and_node`.
137    ///
138    /// The service name may itself contain `-`, so parsing anchors on the
139    /// rightmost structural markers (`-on-` then the trailing `-rep-`/`-{role}-`
140    /// segment) rather than splitting left-to-right. Returns `None` for any
141    /// string that does not match either shape (e.g. a hex id or a bare name).
142    #[must_use]
143    pub fn parse_display(s: &str) -> Option<Self> {
144        // Cluster shape: `{service}-{role}-{replica}-on-{node_id}`.
145        if let Some((head, node_str)) = s.rsplit_once("-on-") {
146            let node_id: u64 = node_str.parse().ok()?;
147            // `head` = `{service}-{role}-{replica}`. The replica is the last
148            // `-`-segment; the role is the segment before it; everything left
149            // of that is the (possibly hyphenated) service.
150            let (service_role, replica_str) = head.rsplit_once('-')?;
151            let replica: u32 = replica_str.parse().ok()?;
152            let (service, role) = service_role.rsplit_once('-')?;
153            if service.is_empty() || role.is_empty() {
154                return None;
155            }
156            return Some(Self::with_role_and_node(service, replica, role, node_id));
157        }
158        // Legacy shape: `{service}-rep-{replica}`.
159        if let Some((service, replica_str)) = s.rsplit_once("-rep-") {
160            let replica: u32 = replica_str.parse().ok()?;
161            if service.is_empty() {
162                return None;
163            }
164            return Some(Self::new(service, replica));
165        }
166        None
167    }
168}
169
170impl std::fmt::Display for ContainerId {
171    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
172        if self.is_legacy_shape() {
173            write!(f, "{}-rep-{}", self.service, self.replica)
174        } else {
175            write!(
176                f,
177                "{}-{}-{}-on-{}",
178                self.service, self.role, self.replica, self.node_id
179            )
180        }
181    }
182}
183
184/// Container handle
185pub struct Container {
186    pub id: ContainerId,
187    /// Image reference this container was created from (canonical form, e.g.
188    /// `docker.io/library/nginx:1.29-alpine`). Surfaced through the API/`ps`.
189    pub image: String,
190    pub state: ContainerState,
191    pub pid: Option<u32>,
192    pub task: Option<JoinHandle<std::io::Result<()>>>,
193    /// Overlay network IP address assigned to this container
194    pub overlay_ip: Option<IpAddr>,
195    /// Health monitor task handle for this container
196    pub health_monitor: Option<JoinHandle<()>>,
197    /// Runtime-assigned port override (used by macOS sandbox where all
198    /// containers share the host network and need unique ports).
199    /// When `Some(port)`, the proxy should use this port instead of the
200    /// spec-declared endpoint port for this specific container's backend address.
201    pub port_override: Option<u16>,
202}
203
204/// Summary information about a cached image on the host runtime.
205#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
206pub struct ImageInfo {
207    /// Canonical image reference (e.g. `zachhandley/zlayer-manager:latest`).
208    pub reference: String,
209    /// Content-addressed digest if known (`sha256:...`). `None` when the
210    /// backend only tracks images by tag.
211    pub digest: Option<String>,
212    /// Total on-disk / in-cache size in bytes, when available.
213    pub size_bytes: Option<u64>,
214}
215
216/// Result of a prune operation.
217#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
218pub struct PruneResult {
219    /// Image references that were removed.
220    pub deleted: Vec<String>,
221    /// Bytes reclaimed from the cache. `0` when the backend cannot report.
222    pub space_reclaimed: u64,
223}
224
225/// Reason a container stopped running, as reported by [`Runtime::wait_outcome`].
226///
227/// Serialized as `snake_case` strings on the wire (`exited`, `signal`,
228/// `oom_killed`, `runtime_error`) so the API DTO can emit the reason as-is
229/// without a second translation layer.
230#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
231#[serde(rename_all = "snake_case")]
232pub enum WaitReason {
233    /// Container exited normally.
234    Exited,
235    /// Container was killed by a signal (e.g. `SIGKILL`, `SIGTERM`).
236    Signal,
237    /// Container was killed by the OOM killer.
238    OomKilled,
239    /// Runtime-side failure (pre-start error, runtime crash, etc.).
240    RuntimeError,
241}
242
243/// Wait condition mirroring Docker's `POST /containers/{id}/wait?condition=`.
244///
245/// Maps 1:1 to the wire form (`not-running`, `next-exit`, `removed`) via
246/// kebab-case serde so the daemon and the Docker compat shim can deserialize
247/// the query parameter in a single step. The default condition is
248/// [`WaitCondition::NotRunning`], matching Docker's behaviour when the
249/// `condition` query param is omitted.
250#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
251#[serde(rename_all = "kebab-case")]
252pub enum WaitCondition {
253    /// Wait until the container is not running. This is the default when
254    /// the caller doesn't specify a condition. Returns immediately if the
255    /// container has already exited; otherwise blocks until it does.
256    #[default]
257    NotRunning,
258    /// Wait for the next container exit, even if the container is already
259    /// stopped at the time of the call. Restarts the wait loop on each
260    /// observed exit.
261    NextExit,
262    /// Wait until the container is removed. Useful in conjunction with
263    /// `--rm` / `AutoRemove` lifecycle wiring.
264    Removed,
265}
266
267impl WaitCondition {
268    /// Return the wire string this variant serializes to (`"not-running"`,
269    /// `"next-exit"`, `"removed"`), matching Docker's `condition=` query
270    /// parameter spelling.
271    #[must_use]
272    pub const fn as_wire_str(self) -> &'static str {
273        match self {
274            Self::NotRunning => "not-running",
275            Self::NextExit => "next-exit",
276            Self::Removed => "removed",
277        }
278    }
279
280    /// Parse a Docker-style condition string (`"not-running"`, `"next-exit"`,
281    /// `"removed"`). Returns `None` for unknown values so callers can
282    /// distinguish "default" (omitted) from "rejected".
283    #[must_use]
284    pub fn from_wire_str(s: &str) -> Option<Self> {
285        match s {
286            "not-running" | "" => Some(Self::NotRunning),
287            "next-exit" => Some(Self::NextExit),
288            "removed" => Some(Self::Removed),
289            _ => None,
290        }
291    }
292}
293
294/// Richer wait result returned by [`Runtime::wait_outcome`].
295///
296/// Backwards-compatible with [`Runtime::wait_container`] (which returns just
297/// `exit_code`). The API handler uses this to populate the extended
298/// `ContainerWaitResponse` fields (`reason`, `signal`, `finished_at`) while
299/// existing callers that only need the exit code can keep using
300/// `wait_container`.
301#[derive(Debug, Clone)]
302pub struct WaitOutcome {
303    /// Process exit code (0 = success). When the container was killed by
304    /// signal `N`, this is typically `128 + N`.
305    pub exit_code: i32,
306    /// Classification of the exit.
307    pub reason: WaitReason,
308    /// Signal name when `reason == WaitReason::Signal`, e.g. `"SIGKILL"`.
309    /// Derived from `exit_code - 128` on a best-effort basis.
310    pub signal: Option<String>,
311    /// Time the container exited, if the runtime reports it.
312    pub finished_at: Option<chrono::DateTime<chrono::Utc>>,
313}
314
315impl WaitOutcome {
316    /// Build a plain `Exited` outcome with no signal/timestamp metadata — the
317    /// default that matches the pre-§3.12 behaviour.
318    #[must_use]
319    pub fn exited(exit_code: i32) -> Self {
320        Self {
321            exit_code,
322            reason: WaitReason::Exited,
323            signal: None,
324            finished_at: None,
325        }
326    }
327}
328
329/// Map a signal-style exit code (`128 + N`) to a canonical signal name.
330///
331/// Recognises common POSIX signals and falls back to `signal_<n>` for
332/// unknown numbers so the caller always gets *something* readable.
333#[must_use]
334pub fn signal_name_from_exit_code(exit_code: i32) -> Option<String> {
335    if exit_code <= 128 {
336        return None;
337    }
338    let n = exit_code - 128;
339    let name = match n {
340        1 => "SIGHUP",
341        2 => "SIGINT",
342        3 => "SIGQUIT",
343        4 => "SIGILL",
344        6 => "SIGABRT",
345        7 => "SIGBUS",
346        8 => "SIGFPE",
347        9 => "SIGKILL",
348        10 => "SIGUSR1",
349        11 => "SIGSEGV",
350        12 => "SIGUSR2",
351        13 => "SIGPIPE",
352        14 => "SIGALRM",
353        15 => "SIGTERM",
354        17 => "SIGSTOP",
355        18 => "SIGCONT",
356        _ => return Some(format!("signal_{n}")),
357    };
358    Some(name.to_string())
359}
360
361/// One streaming event emitted by [`Runtime::exec_stream`].
362///
363/// Runtimes push these events as the exec'd command produces output. The final
364/// event for any successful stream is always an [`ExecEvent::Exit`] carrying
365/// the process's exit code.
366#[derive(Debug, Clone, PartialEq, Eq)]
367pub enum ExecEvent {
368    /// A chunk of stdout from the running command. Emitted line-by-line by
369    /// Docker; other runtimes may emit the full buffered output in one event.
370    Stdout(String),
371    /// A chunk of stderr from the running command.
372    Stderr(String),
373    /// The command has exited with this exit code. Always the final event.
374    Exit(i32),
375}
376
377/// Boxed async stream of [`ExecEvent`]s returned by [`Runtime::exec_stream`].
378pub type ExecEventStream = Pin<Box<dyn Stream<Item = ExecEvent> + Send>>;
379
380/// Options accepted by [`Runtime::exec_pty`].
381///
382/// Mirrors the union of fields exposed by Docker's `POST /containers/{id}/exec`
383/// (`ExecConfig`) so the daemon can pass them through with minimal translation.
384/// Unlike [`Runtime::exec`] / [`Runtime::exec_stream`] (which capture stdout
385/// and stderr separately and return only after the process exits), `exec_pty`
386/// is the interactive entry point: it allocates a PTY when `tty` is set,
387/// streams I/O bidirectionally over a single duplex byte stream, and returns
388/// an [`ExecHandle`] that the caller drives concurrently with the running
389/// process.
390#[derive(Debug, Clone, Default, PartialEq, Eq)]
391#[allow(clippy::struct_excessive_bools)] // mirrors Docker's `ExecConfig` 1:1
392pub struct ExecOptions {
393    /// The argv vector for the exec'd process (`command[0]` is the binary).
394    pub command: Vec<String>,
395    /// Extra environment variables, in `KEY=VALUE` form. Merged into the
396    /// container's existing env on the runtime side.
397    pub env: Vec<String>,
398    /// Optional working directory inside the container. `None` keeps the
399    /// container's default `WORKDIR`.
400    pub working_dir: Option<String>,
401    /// Optional `user[:group]` override. `None` keeps the container's
402    /// configured user.
403    pub user: Option<String>,
404    /// Run the exec with privileged capabilities (Docker `Privileged`).
405    pub privileged: bool,
406    /// Allocate a TTY for the exec'd process (Docker `Tty`). When `true`, the
407    /// runtime should set up a pseudo-terminal pair and the duplex stream on
408    /// the returned [`ExecHandle`] carries multiplexed PTY traffic; when
409    /// `false`, the stream carries raw stdout/stderr without PTY framing.
410    pub tty: bool,
411    /// Attach stdin so the caller can write to the process (Docker
412    /// `AttachStdin`). When `false`, the writable half of the duplex stream
413    /// is effectively a no-op.
414    pub attach_stdin: bool,
415    /// Attach stdout so the caller receives the process's stdout on the
416    /// readable half of the duplex stream (Docker `AttachStdout`).
417    pub attach_stdout: bool,
418    /// Attach stderr so the caller receives the process's stderr on the
419    /// readable half of the duplex stream (Docker `AttachStderr`).
420    pub attach_stderr: bool,
421}
422
423/// Marker supertrait combining [`AsyncRead`] + [`AsyncWrite`] so they can be
424/// used together as a single trait object. Rust forbids stacking two
425/// non-auto traits directly in `dyn`, so [`ExecPtyStream`] is built on top
426/// of this helper instead.
427///
428/// A blanket impl below covers every type that already satisfies the four
429/// component bounds, so callers never need to implement `ExecDuplex`
430/// manually — they just hand the runtime any concrete duplex stream that's
431/// `AsyncRead + AsyncWrite + Send + Unpin`.
432pub trait ExecDuplex: AsyncRead + AsyncWrite + Send + Unpin {}
433
434impl<T> ExecDuplex for T where T: AsyncRead + AsyncWrite + Send + Unpin + ?Sized {}
435
436/// Duplex byte stream used by [`ExecHandle`] to shuttle stdin/stdout (and,
437/// when [`ExecOptions::tty`] is set, multiplexed PTY traffic) between the
438/// caller and the exec'd process.
439///
440/// `Unpin` is required (via [`ExecDuplex`]) so callers can poll the trait
441/// object directly via the usual `tokio::io::AsyncReadExt` /
442/// `AsyncWriteExt` extension methods without having to pin the box
443/// themselves.
444pub type ExecPtyStream = Box<dyn ExecDuplex + 'static>;
445
446/// Future returned by [`ExecHandle`] that resolves with the exec'd process's
447/// exit code once the runtime observes it has terminated.
448pub type ExecExitFuture = Pin<Box<dyn Future<Output = Result<i32>> + Send>>;
449
450/// Runtime-side handle returned by [`Runtime::exec_pty`].
451///
452/// Bundles everything a long-lived interactive exec session needs:
453///
454/// 1. A duplex [`ExecPtyStream`] for shuttling stdin/stdout (or full PTY
455///    traffic when `tty` is set) between the caller and the running process.
456/// 2. A `tokio::sync::mpsc::Sender<(rows, cols)>` so the caller can resize
457///    the allocated PTY in response to terminal-size changes (mirrors
458///    Docker's `POST /exec/{id}/resize` endpoint). Runtimes that don't allocate
459///    a PTY should still accept the channel and treat resize messages as
460///    no-ops; the channel is dropped on the runtime side once the process
461///    exits.
462/// 3. A boxed [`ExecExitFuture`] that resolves with the exit code once the
463///    runtime detects the process has terminated.
464///
465/// `ExecHandle` deliberately does not implement `Debug` / `Clone` because
466/// every field holds a trait object (or, in the exit future's case, an opaque
467/// boxed future). Consumers move the fields out by destructuring and then
468/// drive the I/O stream, the resize channel, and the exit future
469/// independently.
470pub struct ExecHandle {
471    /// Bidirectional byte channel between the caller and the exec'd process.
472    pub stream: ExecPtyStream,
473    /// Channel for sending `(rows, cols)` resize requests for the PTY
474    /// allocated to the exec session. Bounded so a stuck runtime can't make
475    /// the caller buffer unbounded resize events; senders should drop the
476    /// most recent excess size on backpressure.
477    pub resize: tokio::sync::mpsc::Sender<(u16, u16)>,
478    /// Future that resolves with the process's exit code once the runtime
479    /// observes the exec has terminated. Consumers typically `await` this on
480    /// a dedicated task while pumping the duplex stream on another.
481    pub exit: ExecExitFuture,
482}
483
484/// Which standard stream a [`LogChunk`] originated from.
485///
486/// Mirrors the three POSIX file descriptors. `Stdin` is included for
487/// completeness — Docker's multiplexed log header carries a `stdin` channel
488/// for attached interactive containers — but most container runtimes only
489/// emit `Stdout` / `Stderr` chunks.
490#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
491#[serde(rename_all = "snake_case")]
492pub enum LogChannel {
493    /// Standard input (rarely emitted; included for completeness with
494    /// Docker's stdcopy framing).
495    Stdin,
496    /// Standard output.
497    Stdout,
498    /// Standard error.
499    Stderr,
500}
501
502/// One chunk of container log output emitted by [`Runtime::logs_stream`].
503///
504/// Streams emit one `LogChunk` per line (or per Docker stdcopy frame) as
505/// data is produced by the container. Backends that expose timestamps
506/// populate `timestamp`; ones that don't leave it `None`.
507#[derive(Debug, Clone)]
508pub struct LogChunk {
509    /// Which standard stream produced this chunk.
510    pub stream: LogChannel,
511    /// Raw bytes of the chunk. Not necessarily UTF-8 — container output is
512    /// arbitrary binary data and consumers must handle invalid UTF-8.
513    pub bytes: bytes::Bytes,
514    /// When the runtime reported this chunk, when known.
515    pub timestamp: Option<chrono::DateTime<chrono::Utc>>,
516}
517
518/// Options accepted by [`Runtime::logs_stream`].
519///
520/// Mirrors the `GET /containers/{id}/logs` query parameters in the Docker
521/// Engine API so backends can pass them through with minimal translation.
522#[derive(Debug, Clone, Default)]
523#[allow(clippy::struct_excessive_bools)] // mirrors Docker's logs query params 1:1
524pub struct LogsStreamOptions {
525    /// Continue streaming after the current end-of-log marker. When `false`,
526    /// the stream terminates once the runtime has emitted all currently
527    /// buffered logs.
528    pub follow: bool,
529    /// Tail the last N lines before starting to stream. `None` means "all
530    /// available logs from the start".
531    pub tail: Option<u64>,
532    /// Earliest timestamp (Unix seconds) to include. `None` means "no
533    /// lower bound".
534    pub since: Option<i64>,
535    /// Latest timestamp (Unix seconds) to include. `None` means "no upper
536    /// bound".
537    pub until: Option<i64>,
538    /// When `true`, the runtime should populate [`LogChunk::timestamp`] for
539    /// every chunk. Backends that always carry timestamps may ignore this
540    /// flag.
541    pub timestamps: bool,
542    /// Include stdout in the stream.
543    pub stdout: bool,
544    /// Include stderr in the stream.
545    pub stderr: bool,
546}
547
548/// One periodic resource-usage sample emitted by [`Runtime::stats_stream`].
549///
550/// Mirrors the union of fields exposed by Docker's `/containers/{id}/stats`
551/// endpoint and Linux cgroup stat files so downstream consumers (autoscaler,
552/// `docker stats`-compat HTTP shim) can read a single shape regardless of
553/// backend. Counters that the runtime cannot report are left at zero —
554/// missing data is signalled separately via the surrounding stream
555/// metadata.
556#[derive(Debug, Clone)]
557pub struct StatsSample {
558    /// Cumulative container CPU time consumed, in nanoseconds.
559    pub cpu_total_ns: u64,
560    /// Cumulative system CPU time observed at the same moment, in
561    /// nanoseconds. Used to compute relative CPU percentage between
562    /// successive samples (Docker's classic `cpu_delta / system_delta`
563    /// formula).
564    pub cpu_system_ns: u64,
565    /// Number of CPUs currently online for this container. Used as the
566    /// final scaling factor in the CPU-percentage calculation.
567    pub online_cpus: u32,
568    /// Resident memory currently in use, in bytes (cgroup
569    /// `memory.usage_in_bytes` minus inactive page cache for v1, or
570    /// `memory.current` for v2).
571    pub mem_used_bytes: u64,
572    /// Memory limit configured on the container, in bytes. `0` when no
573    /// limit is set (cgroup reports its sentinel value).
574    pub mem_limit_bytes: u64,
575    /// Cumulative bytes received across all attached network interfaces.
576    pub net_rx_bytes: u64,
577    /// Cumulative bytes transmitted across all attached network interfaces.
578    pub net_tx_bytes: u64,
579    /// Cumulative bytes read from block devices.
580    pub blkio_read_bytes: u64,
581    /// Cumulative bytes written to block devices.
582    pub blkio_write_bytes: u64,
583    /// Number of process IDs currently running inside the container's pid
584    /// namespace.
585    pub pids_current: u64,
586    /// Configured pids limit, if any. `None` means unlimited.
587    pub pids_limit: Option<u64>,
588    /// Wallclock time the sample was taken.
589    pub timestamp: chrono::DateTime<chrono::Utc>,
590}
591
592/// One progress event emitted by [`Runtime::pull_image_stream`].
593///
594/// Backends emit a series of `Status` events as layers are downloaded /
595/// extracted, followed by exactly one `Done` event when the pull completes
596/// successfully. Errors mid-pull are propagated as the stream's `Err`
597/// variant and terminate the stream.
598#[derive(Debug, Clone)]
599pub enum PullProgress {
600    /// Progress update for an in-flight layer or stage.
601    Status {
602        /// Layer ID or other backend-specific identifier, when available.
603        id: Option<String>,
604        /// Human-readable status text, e.g. `"Pulling fs layer"`,
605        /// `"Downloading"`, `"Extracting"`. Always present; may be empty
606        /// when the backend has nothing to report this tick.
607        status: String,
608        /// Pre-formatted progress bar (Docker emits a string like
609        /// `"[========>          ] 12.3MB/45.6MB"`). `None` when the
610        /// backend reports raw `current`/`total` only.
611        progress: Option<String>,
612        /// Bytes transferred so far for this layer, when reported.
613        current: Option<u64>,
614        /// Expected total bytes for this layer, when reported.
615        total: Option<u64>,
616    },
617    /// Pull completed successfully.
618    Done {
619        /// Resolved canonical image reference (typically the same as the
620        /// requested reference, but may include a digest the backend
621        /// resolved).
622        reference: String,
623        /// Content-addressed digest, when the backend reports one.
624        digest: Option<String>,
625    },
626}
627
628/// Boxed async stream of `Result<LogChunk, AgentError>` items returned by
629/// [`Runtime::logs_stream`].
630///
631/// `'static` lifetime so handlers can hold the stream past the trait method's
632/// borrow of `self`.
633pub type LogsStream = Pin<Box<dyn Stream<Item = Result<LogChunk>> + Send + 'static>>;
634
635/// Boxed async stream of `Result<StatsSample, AgentError>` items returned by
636/// [`Runtime::stats_stream`].
637pub type StatsStream = Pin<Box<dyn Stream<Item = Result<StatsSample>> + Send + 'static>>;
638
639/// Boxed async stream of `Result<PullProgress, AgentError>` items returned by
640/// [`Runtime::pull_image_stream`].
641pub type PullProgressStream = Pin<Box<dyn Stream<Item = Result<PullProgress>> + Send + 'static>>;
642
643/// Boxed async stream of TAR-archive byte chunks returned by
644/// [`Runtime::archive_get`].
645///
646/// Each yielded `Bytes` is a contiguous slice of the TAR archive that the
647/// runtime is producing for the requested container path. The stream ends
648/// once the archive is fully written. Mid-stream errors map to
649/// [`AgentError`] variants.
650pub type ArchiveStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes>> + Send + 'static>>;
651
652/// Stat metadata for a single path inside a container, returned by
653/// [`Runtime::archive_head`].
654///
655/// Mirrors Docker's `X-Docker-Container-Path-Stat` header payload (a
656/// base64-encoded JSON object with `name`, `size`, `mode`, `mtime`, and
657/// `linkTarget` fields). The API layer serializes this back into the same
658/// header for the `HEAD /containers/{id}/archive` response.
659#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
660pub struct PathStat {
661    /// Base name of the path (e.g. `"foo.txt"` for `/etc/foo.txt`).
662    pub name: String,
663    /// File size in bytes. For directories this is the size reported by the
664    /// runtime (typically the directory entry size, not the recursive sum).
665    pub size: i64,
666    /// Unix file mode bits (`S_IFMT | S_IRWXU | ...`). Encoded as a `u32` so
667    /// it round-trips through the Docker JSON header losslessly.
668    pub mode: u32,
669    /// Last-modification time as an RFC 3339 string. `None` when the runtime
670    /// cannot report it.
671    pub mtime: Option<String>,
672    /// Target of a symbolic link, when the path is a symlink. Empty string
673    /// for non-symlink paths (matching Docker's wire shape).
674    pub link_target: String,
675}
676
677/// Options accepted by [`Runtime::archive_put`].
678///
679/// Mirrors the query parameters Docker accepts on
680/// `PUT /containers/{id}/archive`:
681///
682/// * `noOverwriteDirNonDir=1` — refuse to replace a non-directory with a
683///   directory or vice versa. Default `false`.
684/// * `copyUIDGID=1` — preserve UID/GID of files in the archive instead of
685///   chown'ing them to the container's user. Default `false`.
686#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
687pub struct ArchivePutOptions {
688    /// When `true`, the runtime must reject puts that would replace a
689    /// non-directory with a directory (or vice versa) at the destination.
690    pub no_overwrite_dir_non_dir: bool,
691    /// When `true`, preserve UID/GID of files in the archive verbatim.
692    pub copy_uid_gid: bool,
693}
694
695/// Per-network attachment reported by [`Runtime::inspect_detailed`].
696///
697/// Mirrors the subset of bollard's `EndpointSettings` that the API needs to
698/// populate `ContainerInfo.networks` for standalone containers. Kept in
699/// `zlayer-agent` (rather than `zlayer-spec`) because it's a runtime-level
700/// inspect result, not a deployment specification.
701#[derive(Debug, Clone, Default, PartialEq, Eq)]
702pub struct NetworkAttachmentDetail {
703    /// Network name as reported by the runtime (Docker's key in
704    /// `NetworkSettings.Networks`, e.g. `"bridge"` or a user-defined network
705    /// name).
706    pub network: String,
707    /// DNS aliases the container answers to on this network. Empty when the
708    /// runtime doesn't surface aliases.
709    pub aliases: Vec<String>,
710    /// Assigned IPv4 address on this network, if any. Empty strings are
711    /// normalised to `None`.
712    pub ipv4: Option<String>,
713}
714
715/// Per-container health detail reported by [`Runtime::inspect_detailed`].
716///
717/// Sourced directly from bollard's `ContainerState.health` (Docker's native
718/// healthcheck tracking). Our internal `HealthMonitor` in
719/// `crates/zlayer-agent/src/health.rs` drives service-level health events; for
720/// standalone containers the API reports the runtime-native status instead so
721/// that images with a baked-in `HEALTHCHECK` show up correctly.
722#[derive(Debug, Clone, Default, PartialEq, Eq)]
723pub struct HealthDetail {
724    /// One of `"none"`, `"starting"`, `"healthy"`, `"unhealthy"` (Docker's
725    /// `HealthStatusEnum`). Empty string is normalised to `"none"` upstream.
726    pub status: String,
727    /// Consecutive failing probe count, if the runtime reports it.
728    pub failing_streak: Option<u32>,
729    /// Output from the most recent failing probe, when available.
730    pub last_output: Option<String>,
731}
732
733/// Rich inspect details for a single container, returned by
734/// [`Runtime::inspect_detailed`].
735///
736/// Carries the fields `ContainerInfo` needs on top of the bare
737/// [`ContainerState`] reported by [`Runtime::container_state`]:
738/// published ports, attached networks, first IPv4, health, and `exit_code`.
739///
740/// Default is an all-empty record — that's what the default trait method
741/// returns for runtimes that don't (yet) implement rich inspect, and the API
742/// layer treats all fields as purely additive, so a default record still
743/// produces a backwards-compatible `ContainerInfo`.
744#[derive(Debug, Clone, Default, PartialEq, Eq)]
745pub struct ContainerInspectDetails {
746    /// Published port mappings (container → host), translated back from the
747    /// runtime's internal port-binding map.
748    pub ports: Vec<zlayer_spec::PortMapping>,
749    /// Networks the container is attached to, plus the aliases + IPv4 for each.
750    pub networks: Vec<NetworkAttachmentDetail>,
751    /// First non-empty IPv4 address found across the container's networks,
752    /// useful as a "primary" IP for simple clients that don't want to iterate
753    /// `networks`. `None` when the container isn't on any network with an IP.
754    pub ipv4: Option<String>,
755    /// Health status when the container has a Docker-native `HEALTHCHECK` or
756    /// the runtime otherwise reports a health state.
757    pub health: Option<HealthDetail>,
758    /// Most recent exit code, when the runtime reports one. `None` for
759    /// containers that are still running and have never exited.
760    pub exit_code: Option<i32>,
761}
762
763/// Lightweight summary of a single container reported by
764/// [`Runtime::list_containers`].
765///
766/// Reconciliation only needs to match runtime containers against
767/// `ZLayer`'s own metadata, which lives in the `com.zlayer.container_id`
768/// label (see `zlayer-api::handlers::container_id_map::ZLAYER_CONTAINER_ID_LABEL`).
769/// Carrying that label value plus the runtime-native id is sufficient for
770/// the standalone-container reconcile pass; richer fields can be added
771/// when concrete callers need them.
772#[derive(Debug, Clone, Default, PartialEq, Eq)]
773pub struct RuntimeContainerSummary {
774    /// Backend-native container handle (Docker's 64-char hex, the youki
775    /// state-dir name, etc.). Opaque to the reconciler — only used for
776    /// logging and to disambiguate listings.
777    pub runtime_id: String,
778    /// Value of the `com.zlayer.container_id` label, if the runtime
779    /// reports one for this container. Containers without this label are
780    /// foreign (not ZLayer-managed) and should be ignored by reconcile.
781    pub zlayer_container_id_label: Option<String>,
782}
783
784/// One row of process information returned by [`Runtime::top_container`].
785///
786/// Mirrors Docker's `GET /containers/{id}/top` response shape: a `Titles`
787/// vector that names each column, plus a `Processes` matrix where each row is
788/// the per-process column values. The runtime decides which `ps` fields to
789/// emit; the API/Docker shim forwards them verbatim.
790#[derive(Debug, Clone, Default, PartialEq, Eq)]
791pub struct ContainerTopOutput {
792    /// Column titles (e.g. `["UID", "PID", "PPID", "C", "STIME", "TTY", "TIME", "CMD"]`).
793    pub titles: Vec<String>,
794    /// One row per process; each row has the same length as `titles`.
795    pub processes: Vec<Vec<String>>,
796}
797
798/// Filesystem change kind reported by [`Runtime::changes_container`].
799///
800/// Matches Docker's numeric encoding: `0 = Modified`, `1 = Added`, `2 = Deleted`.
801#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
802#[serde(rename_all = "snake_case")]
803pub enum FilesystemChangeKind {
804    /// File or directory was modified in the container's writable layer.
805    Modified,
806    /// File or directory was added.
807    Added,
808    /// File or directory was deleted.
809    Deleted,
810}
811
812impl FilesystemChangeKind {
813    /// Numeric wire value used by Docker's `/containers/{id}/changes`.
814    #[must_use]
815    pub const fn as_docker_kind(self) -> u8 {
816        match self {
817            Self::Modified => 0,
818            Self::Added => 1,
819            Self::Deleted => 2,
820        }
821    }
822}
823
824/// One filesystem change reported by [`Runtime::changes_container`].
825#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
826pub struct FilesystemChangeEntry {
827    /// Path inside the container that changed (absolute, e.g. `/etc/hosts`).
828    pub path: String,
829    /// Kind of change.
830    pub kind: FilesystemChangeKind,
831}
832
833/// One published port mapping entry returned by [`Runtime::port_mappings_container`].
834///
835/// Mirrors one entry of Docker's `/containers/{id}/port` map. A single
836/// container port may bind multiple host endpoints (e.g. IPv4 + IPv6); each
837/// such binding yields one [`PortMappingEntry`].
838#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
839pub struct PortMappingEntry {
840    /// Container port number that's published.
841    pub container_port: u16,
842    /// Protocol (`"tcp"`, `"udp"`, `"sctp"`).
843    pub protocol: String,
844    /// Host IP address that the container's port is mapped to.
845    pub host_ip: Option<String>,
846    /// Host port number that the container's port is mapped to.
847    pub host_port: Option<u16>,
848}
849
850/// Result of a [`Runtime::prune_containers`] call.
851#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
852pub struct ContainerPruneResult {
853    /// Container IDs that were removed (runtime-native or hex form).
854    pub deleted: Vec<String>,
855    /// Bytes reclaimed from the runtime's container storage. `0` when the
856    /// backend cannot report.
857    pub space_reclaimed: u64,
858}
859
860/// Runtime-level container restart policy attached to a
861/// [`ContainerResourceUpdate`].
862///
863/// Mirrors Docker's `HostConfig.RestartPolicy`. `name` is one of `""`,
864/// `"no"`, `"always"`, `"unless-stopped"`, or `"on-failure"`. Backends that
865/// can persist a restart policy (Docker via bollard, Youki via the
866/// supervisor's stored spec) honour it; backends that cannot
867/// (WASM, Mock) leave it unmodified.
868#[derive(Debug, Clone, Default, PartialEq, Eq)]
869pub struct ContainerRestartPolicyUpdate {
870    /// Restart policy name.
871    pub name: Option<String>,
872    /// Maximum retry count (only honoured when `name == "on-failure"`).
873    pub maximum_retry_count: Option<i64>,
874}
875
876/// Resource-update payload passed to [`Runtime::update_container_resources`].
877///
878/// Mirrors the resource subset of Docker's `POST /containers/{id}/update`
879/// body. Every field is `Option<...>`; backends apply only the fields that
880/// are `Some`. A request with all fields `None` is a no-op (consistent with
881/// Docker's behaviour).
882///
883/// Field semantics match Docker's wire shape: see [`ContainerUpdateRequest`]
884/// in `zlayer-types::api::containers` for the JSON encoding.
885#[derive(Debug, Clone, Default, PartialEq, Eq)]
886pub struct ContainerResourceUpdate {
887    /// CPU shares (cgroup `cpu.weight` / `cpu.shares`).
888    pub cpu_shares: Option<i64>,
889    /// Memory limit in bytes.
890    pub memory: Option<i64>,
891    /// CPU CFS period in microseconds.
892    pub cpu_period: Option<i64>,
893    /// CPU CFS quota in microseconds.
894    pub cpu_quota: Option<i64>,
895    /// CPU real-time period in microseconds.
896    pub cpu_realtime_period: Option<i64>,
897    /// CPU real-time runtime in microseconds.
898    pub cpu_realtime_runtime: Option<i64>,
899    /// CPUs allowed for execution (e.g. `"0-3"`).
900    pub cpuset_cpus: Option<String>,
901    /// Memory nodes (NUMA) allowed for execution.
902    pub cpuset_mems: Option<String>,
903    /// Soft memory limit in bytes.
904    pub memory_reservation: Option<i64>,
905    /// Total memory limit (memory + swap) in bytes. `-1` removes swap.
906    pub memory_swap: Option<i64>,
907    /// Kernel memory limit in bytes (deprecated upstream).
908    pub kernel_memory: Option<i64>,
909    /// Block IO weight (10-1000).
910    pub blkio_weight: Option<u16>,
911    /// PIDs limit. `0` or `-1` for unlimited.
912    pub pids_limit: Option<i64>,
913    /// Replacement restart policy. `None` leaves the policy unchanged.
914    pub restart_policy: Option<ContainerRestartPolicyUpdate>,
915}
916
917impl ContainerResourceUpdate {
918    /// Returns `true` when this update would not change anything — every
919    /// field is `None`. Backends short-circuit no-op updates rather than
920    /// touching the cgroup hierarchy.
921    #[must_use]
922    pub fn is_empty(&self) -> bool {
923        self.cpu_shares.is_none()
924            && self.memory.is_none()
925            && self.cpu_period.is_none()
926            && self.cpu_quota.is_none()
927            && self.cpu_realtime_period.is_none()
928            && self.cpu_realtime_runtime.is_none()
929            && self.cpuset_cpus.is_none()
930            && self.cpuset_mems.is_none()
931            && self.memory_reservation.is_none()
932            && self.memory_swap.is_none()
933            && self.kernel_memory.is_none()
934            && self.blkio_weight.is_none()
935            && self.pids_limit.is_none()
936            && self.restart_policy.is_none()
937    }
938}
939
940/// Result of a [`Runtime::update_container_resources`] call. Mirrors
941/// Docker's `{"Warnings": [...]}` response shape: backends append a string
942/// per "we accepted this but did not apply it" or "field deprecated"
943/// warning.
944#[derive(Debug, Clone, Default, PartialEq, Eq)]
945pub struct ContainerUpdateOutcome {
946    /// Human-readable warnings emitted while applying the update.
947    pub warnings: Vec<String>,
948}
949
950/// Detailed image inspect record returned by [`Runtime::inspect_image_native`].
951///
952/// Mirrors the union of fields exposed by Docker's `GET /images/{name}/json`
953/// (bollard's `ImageInspect`) so the API / Docker compat shim can surface a
954/// Docker-shaped JSON response without re-translating later. Optional fields
955/// remain `None` when the backend cannot provide them.
956#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
957pub struct ImageInspectInfo {
958    /// Content-addressed image id (`sha256:...`), when known.
959    pub id: Option<String>,
960    /// All tags (`repo:tag`) currently pointing at this image.
961    pub repo_tags: Vec<String>,
962    /// Manifest digests this image is known under (`repo@sha256:...`).
963    pub repo_digests: Vec<String>,
964    /// Parent image id (`sha256:...`), when the image was built locally.
965    pub parent: Option<String>,
966    /// Human-readable comment recorded at commit/import time.
967    pub comment: Option<String>,
968    /// Creation timestamp in RFC 3339 form.
969    pub created: Option<String>,
970    /// Container id this image was committed from, when applicable.
971    pub container: Option<String>,
972    /// Daemon version that built / imported this image.
973    pub docker_version: Option<String>,
974    /// Author recorded on the image (e.g. `MAINTAINER` instruction).
975    pub author: Option<String>,
976    /// Hardware architecture the image targets (`amd64`, `arm64`, ...).
977    pub architecture: Option<String>,
978    /// Operating system the image targets (`linux`, `windows`, ...).
979    pub os: Option<String>,
980    /// Total on-disk size in bytes, when known.
981    pub size: Option<u64>,
982    /// Layer order: list of `sha256:...` digests, root-most first.
983    pub layers: Vec<String>,
984    /// Container env (`KEY=VALUE`).
985    pub env: Vec<String>,
986    /// Default command vector.
987    pub cmd: Vec<String>,
988    /// Default entrypoint vector.
989    pub entrypoint: Vec<String>,
990    /// Working directory inside the image.
991    pub working_dir: Option<String>,
992    /// User the image runs as by default.
993    pub user: Option<String>,
994    /// Image labels.
995    pub labels: std::collections::BTreeMap<String, String>,
996}
997
998/// One row of an image's history, returned by [`Runtime::image_history`].
999///
1000/// Mirrors Docker's `GET /images/{name}/history` response.
1001#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1002pub struct ImageHistoryEntry {
1003    /// Layer / image id (`sha256:...`). May be `<missing>` for layers that
1004    /// were dropped during a squash.
1005    pub id: String,
1006    /// Unix-seconds timestamp when this layer was created.
1007    pub created: i64,
1008    /// Dockerfile-style instruction that produced this layer.
1009    pub created_by: String,
1010    /// Tags that point at this specific layer.
1011    pub tags: Vec<String>,
1012    /// Layer size in bytes.
1013    pub size: u64,
1014    /// Optional comment recorded with the layer.
1015    pub comment: String,
1016}
1017
1018/// One result returned by [`Runtime::search_images`].
1019///
1020/// Mirrors Docker's `GET /images/search` response items.
1021#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1022pub struct ImageSearchResult {
1023    /// Image name (e.g. `library/nginx`).
1024    pub name: String,
1025    /// Free-text description of the image.
1026    pub description: String,
1027    /// Number of stars on the source registry, when reported.
1028    pub star_count: u64,
1029    /// Whether the image is officially curated.
1030    pub official: bool,
1031    /// Whether the image was produced by an automated build (deprecated by
1032    /// Docker but still surfaced for compatibility).
1033    pub automated: bool,
1034}
1035
1036/// Result of a [`Runtime::commit_container`] call.
1037#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1038pub struct CommitOutcome {
1039    /// Content-addressed image id of the freshly created image.
1040    pub id: String,
1041}
1042
1043/// Options accepted by [`Runtime::commit_container`].
1044///
1045/// Mirrors Docker's `POST /commit` query parameters. All fields are optional
1046/// — when `repo`/`tag` are both empty the runtime creates an untagged image.
1047#[derive(Debug, Clone, Default, PartialEq, Eq)]
1048pub struct CommitOptions {
1049    /// Repository name to apply to the committed image (e.g. `myapp`).
1050    pub repo: Option<String>,
1051    /// Tag to apply (defaults to `latest` when `repo` is set and tag is empty).
1052    pub tag: Option<String>,
1053    /// Free-form comment to record on the committed image.
1054    pub comment: Option<String>,
1055    /// Author to record on the committed image.
1056    pub author: Option<String>,
1057    /// Whether to pause the container before committing (defaults to `true`).
1058    pub pause: bool,
1059    /// Dockerfile-style instructions to apply during commit.
1060    pub changes: Option<String>,
1061}
1062
1063/// Boxed async stream of TAR-archive byte chunks returned by image save /
1064/// container export endpoints. Each yielded `Bytes` is a contiguous slice
1065/// of an uncompressed TAR archive.
1066pub type ImageExportStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes>> + Send + 'static>>;
1067
1068/// The runtime's already-open image-store handles: its
1069/// [`LocalRegistry`](zlayer_registry::LocalRegistry) plus the shared,
1070/// single-process-exclusive blob cache. Returned by
1071/// [`Runtime::image_store_handles`] so callers (the `POST /images/import`
1072/// handler) can reuse the daemon's open store rather than opening a second
1073/// handle (which would fail with "database is locked by another process").
1074pub type ImageStoreHandles = (
1075    Arc<zlayer_registry::LocalRegistry>,
1076    Arc<Box<dyn zlayer_registry::BlobCacheBackend>>,
1077);
1078
1079/// One progress event emitted by [`Runtime::load_images`].
1080///
1081/// `Status` events carry per-line progress reported by the daemon while
1082/// the tar is being unpacked; `Done` is emitted exactly once when load
1083/// completes successfully.
1084#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
1085#[serde(tag = "kind", rename_all = "snake_case")]
1086pub enum LoadProgress {
1087    /// Mid-load progress entry.
1088    Status {
1089        /// Layer or image id when reported.
1090        #[serde(default, skip_serializing_if = "Option::is_none")]
1091        id: Option<String>,
1092        /// Human-readable status text.
1093        status: String,
1094    },
1095    /// Load completed. `references` lists the image references that were
1096    /// loaded into the cache.
1097    Done {
1098        /// Loaded image references (`repo:tag` or `sha256:...`).
1099        references: Vec<String>,
1100    },
1101}
1102
1103/// Boxed async stream of [`LoadProgress`] events returned by
1104/// [`Runtime::load_images`].
1105pub type LoadProgressStream = Pin<Box<dyn Stream<Item = Result<LoadProgress>> + Send + 'static>>;
1106
1107/// How a runtime joins a container to the cross-node `WireGuard` overlay.
1108///
1109/// The overlay can be attached three ways depending on whether the container is
1110/// a host process, a Windows compute system, or a full VM:
1111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1112pub enum OverlayAttachKind {
1113    /// Linux host process: overlayd enters `/proc/<pid>/ns/net` and plumbs a
1114    /// veth into the container's network namespace (the default).
1115    HostNetns,
1116    /// Windows: the HCN endpoint + namespace were created at container-create
1117    /// time; the agent only registers the assigned IP.
1118    HostIp,
1119    /// Any guest with no host-visible netns/PID (macOS VZ-Linux over vsock
1120    /// today; also macOS native guests and the Windows WSL2 distro): overlayd
1121    /// allocates the overlay identity and the agent pushes it into the guest,
1122    /// where a kernel `WireGuard` device is brought up. Pushed via
1123    /// [`Runtime::push_overlay_config`].
1124    GuestManaged,
1125    /// Host-shared native runtime with no per-container netns (macOS Seatbelt;
1126    /// macOS native-VZ): the NODE joins the overlay at host level (a `utun`
1127    /// bound to the node overlay IP) and every container reaches the daemon and
1128    /// its siblings over the shared host stack (Seatbelt) or a host
1129    /// `node_ip:port -> guest` forward (native-VZ). There is no per-container
1130    /// veth, no host PID to attach, and no guest push; per-replica addressing is
1131    /// the node overlay IP plus [`Runtime::get_container_port_override`].
1132    HostProxy,
1133}
1134
1135/// Abstract container runtime trait
1136///
1137/// This trait abstracts over different container runtimes (containerd, CRI-O, etc.)
1138#[async_trait::async_trait]
1139pub trait Runtime: Send + Sync {
1140    /// Pull an image to local storage
1141    async fn pull_image(&self, image: &str) -> Result<()>;
1142
1143    /// Pull an image to local storage with a specific policy.
1144    ///
1145    /// When `auth` is `Some`, the runtime uses those inline credentials for
1146    /// the pull (§3.10 of `ZLAYER_SDK_FIXES.md`). When `auth` is `None`, the
1147    /// runtime falls back to its existing credential-store lookup keyed by
1148    /// registry hostname (or anonymous access when no match exists).
1149    ///
1150    /// Non-Docker runtimes may accept but ignore the `auth` argument — their
1151    /// OCI puller (`zlayer-registry`) already resolves credentials from the
1152    /// store by hostname, and inline auth is primarily a Docker-backend
1153    /// concern. Ignoring it is safe: callers that need inline auth should use
1154    /// the Docker runtime.
1155    ///
1156    /// `source` is the per-image [`zlayer_spec::SourcePolicy`] selecting which
1157    /// tiers (local store, S3, remote registry) the pull may consult and in
1158    /// what order. Runtimes that build the `zlayer-registry` `ImagePuller`
1159    /// chain (youki, macOS VZ-Linux) honor it; runtimes that delegate to an
1160    /// external daemon (Docker/WSL/HCS) accept but ignore it.
1161    async fn pull_image_with_policy(
1162        &self,
1163        image: &str,
1164        policy: PullPolicy,
1165        auth: Option<&RegistryAuth>,
1166        source: zlayer_spec::SourcePolicy,
1167    ) -> Result<()>;
1168
1169    /// Create a container
1170    async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()>;
1171
1172    /// Inject the daemon's secrets provider so container env vars using the
1173    /// `$S:` reference syntax can be resolved at container-create time.
1174    ///
1175    /// Called once, post-construction, after the daemon has selected its
1176    /// secrets store. Default is a no-op; runtimes that build OCI bundles with
1177    /// secret resolution (`YoukiRuntime`; `CompositeRuntime` forwards to its inner
1178    /// runtime) override this.
1179    fn set_secrets_provider(&self, _provider: std::sync::Arc<dyn zlayer_secrets::SecretsProvider>) {
1180    }
1181
1182    /// Start a container
1183    async fn start_container(&self, id: &ContainerId) -> Result<()>;
1184
1185    /// Stop a container
1186    async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()>;
1187
1188    /// Remove a container
1189    async fn remove_container(&self, id: &ContainerId) -> Result<()>;
1190
1191    /// Get container state
1192    async fn container_state(&self, id: &ContainerId) -> Result<ContainerState>;
1193
1194    /// Get container logs as structured entries
1195    async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>>;
1196
1197    /// Execute command in container
1198    async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)>;
1199
1200    /// Execute a command in a container with Docker `exec` options (`--user`,
1201    /// `-w`/`--workdir`, `-e`/`--env`) applied.
1202    ///
1203    /// The default implementation ignores the user/cwd/env overrides and
1204    /// delegates to [`Runtime::exec`], so runtimes that don't (yet) plumb them
1205    /// keep working unchanged. Runtimes that can honour them — notably the macOS
1206    /// VZ-Linux runtime, which drives the in-guest agent over vsock — override
1207    /// this to drop to the requested uid/gid, chdir, and inject env before exec.
1208    ///
1209    /// `opts.command` is the argv (`command[0]` is the binary); `opts.user`,
1210    /// `opts.working_dir`, and `opts.env` carry the Docker overrides. The
1211    /// `tty`/`attach_*`/`privileged` fields of [`ExecOptions`] are not consulted
1212    /// by this buffered entry point.
1213    async fn exec_with_opts(
1214        &self,
1215        id: &ContainerId,
1216        opts: &ExecOptions,
1217    ) -> Result<(i32, String, String)> {
1218        self.exec(id, &opts.command).await
1219    }
1220
1221    /// Execute a command in a container and stream stdout / stderr / exit
1222    /// events as they are produced.
1223    ///
1224    /// The default implementation calls the buffered [`Runtime::exec`] and
1225    /// emits everything as a single `Stdout` event, a single `Stderr` event,
1226    /// and a final `Exit` event. Runtimes that support true streaming (e.g.
1227    /// Docker via bollard) override this to produce line-by-line events as
1228    /// the command runs.
1229    ///
1230    /// The returned stream always terminates with exactly one
1231    /// [`ExecEvent::Exit`] as the final item on success. Errors that occur
1232    /// before the stream is returned (e.g. container not found, failure to
1233    /// create the exec) are surfaced via the outer `Result`; errors that
1234    /// occur mid-stream are logged by the runtime and the stream closes.
1235    async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
1236        let (exit, stdout, stderr) = self.exec(id, cmd).await?;
1237        let mut events: Vec<ExecEvent> = Vec::with_capacity(3);
1238        if !stdout.is_empty() {
1239            events.push(ExecEvent::Stdout(stdout));
1240        }
1241        if !stderr.is_empty() {
1242            events.push(ExecEvent::Stderr(stderr));
1243        }
1244        events.push(ExecEvent::Exit(exit));
1245        Ok(Box::pin(futures_util::stream::iter(events)))
1246    }
1247
1248    /// Start an interactive exec session against a container, returning an
1249    /// [`ExecHandle`] the caller drives concurrently with the running process.
1250    ///
1251    /// Unlike [`Runtime::exec`] (which buffers stdout/stderr and returns only
1252    /// after the process exits) and [`Runtime::exec_stream`] (which streams
1253    /// line-by-line events one-way), `exec_pty` is the long-lived bidirectional
1254    /// entry point: when [`ExecOptions::tty`] is set the runtime allocates a
1255    /// pseudo-terminal pair and the returned [`ExecHandle::stream`] shuttles
1256    /// raw PTY bytes; when `tty` is false the stream still carries
1257    /// stdin/stdout/stderr but without PTY framing. The handle's
1258    /// [`ExecHandle::resize`] channel mirrors Docker's
1259    /// `POST /exec/{id}/resize` and the [`ExecHandle::exit`] future resolves
1260    /// with the process exit code once the runtime detects termination.
1261    ///
1262    /// The default implementation returns [`AgentError::Unsupported`].
1263    /// Backends that can host interactive execs (Docker via bollard's
1264    /// `start_exec` with hijacked stream, Youki via libcontainer's exec API,
1265    /// HCS via the Windows console) override this. Runtimes that have no
1266    /// notion of an interactive exec (WASM in-process, mocks that don't need
1267    /// PTY traffic) should leave the default in place — callers then surface
1268    /// a clear error rather than degrade silently to a buffered exec.
1269    async fn exec_pty(&self, _id: &ContainerId, _opts: ExecOptions) -> Result<ExecHandle> {
1270        Err(AgentError::Unsupported(
1271            "exec_pty is not supported by this runtime".into(),
1272        ))
1273    }
1274
1275    /// Get container resource statistics from cgroups
1276    ///
1277    /// Returns CPU and memory statistics for the specified container.
1278    /// Used for metrics collection and autoscaling decisions.
1279    async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats>;
1280
1281    /// Wait for a container to exit and return its exit code
1282    ///
1283    /// This method blocks until the container exits or an error occurs.
1284    /// Used primarily for job execution to implement run-to-completion semantics.
1285    async fn wait_container(&self, id: &ContainerId) -> Result<i32>;
1286
1287    /// Wait for a container to exit and return a [`WaitOutcome`] with richer
1288    /// classification (exit code + reason + signal + `finished_at` timestamp).
1289    ///
1290    /// The default implementation delegates to [`Runtime::wait_container`] and
1291    /// synthesizes a [`WaitReason::Exited`] result with no signal/timestamp.
1292    /// Runtimes that can distinguish OOM kills, signal deaths, or report a
1293    /// finished-at time (e.g. the Docker runtime, which has
1294    /// `ContainerInspectResponse.state.oom_killed` / `.finished_at`) should
1295    /// override this.
1296    ///
1297    /// This is the legacy entry point that always uses
1298    /// [`WaitCondition::NotRunning`]. Callers that need to honour Docker's
1299    /// `condition=` query parameter should use
1300    /// [`Runtime::wait_outcome_with_condition`] instead.
1301    async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
1302        let exit_code = self.wait_container(id).await?;
1303        Ok(WaitOutcome::exited(exit_code))
1304    }
1305
1306    /// Wait for a container to reach a [`WaitCondition`] and return a
1307    /// [`WaitOutcome`].
1308    ///
1309    /// Mirrors Docker's `POST /containers/{id}/wait?condition=<...>` semantics:
1310    ///
1311    /// * [`WaitCondition::NotRunning`] (the default) — block until the
1312    ///   container is no longer running. Returns the exit-code outcome.
1313    /// * [`WaitCondition::NextExit`] — wait for the next observed exit, even
1314    ///   if the container is already stopped at call time. The default
1315    ///   implementation cannot distinguish "already stopped" from "next exit",
1316    ///   so it falls back to the same wait as `NotRunning`. Backends that can
1317    ///   subscribe to runtime events (Docker via bollard's wait stream)
1318    ///   override this to honour the semantic.
1319    /// * [`WaitCondition::Removed`] — block until the container has been
1320    ///   removed. The default implementation again falls back to a normal
1321    ///   wait; the Docker runtime overrides it via bollard's `condition`
1322    ///   parameter.
1323    ///
1324    /// Default implementation delegates to [`Runtime::wait_outcome`] for all
1325    /// conditions, ignoring the condition argument. This keeps existing
1326    /// runtimes (Youki, WASM, mocks) working without code changes.
1327    async fn wait_outcome_with_condition(
1328        &self,
1329        id: &ContainerId,
1330        _condition: WaitCondition,
1331    ) -> Result<WaitOutcome> {
1332        self.wait_outcome(id).await
1333    }
1334
1335    /// Rename a container. Mirrors Docker's
1336    /// `POST /containers/{id}/rename?name=<new>` endpoint.
1337    ///
1338    /// `new_name` is the requested human-readable name (without any leading
1339    /// `/`). Backends are expected to validate the name against their own
1340    /// constraints (e.g. uniqueness, allowed characters) and return an
1341    /// appropriate [`AgentError`] on rejection.
1342    ///
1343    /// The default implementation returns [`AgentError::Unsupported`].
1344    /// Runtimes that can rename a live container override this:
1345    ///
1346    /// * Docker — calls bollard's `rename_container` with
1347    ///   `RenameContainerOptions { name }`.
1348    /// * Youki — currently returns `Unsupported` because the libcontainer
1349    ///   state-dir is keyed off the immutable `ContainerId` and renaming the
1350    ///   on-disk layout safely while a container is alive would require
1351    ///   coordination with the supervisor that owns the bundle path.
1352    /// * Other backends (WASM, HCS, mocks) inherit the `Unsupported` default.
1353    async fn rename_container(&self, _id: &ContainerId, _new_name: &str) -> Result<()> {
1354        Err(AgentError::Unsupported(
1355            "rename_container is not supported by this runtime".into(),
1356        ))
1357    }
1358
1359    /// Get container logs (stdout/stderr combined)
1360    ///
1361    /// Returns logs as structured entries.
1362    /// Used to capture job output after completion.
1363    async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>>;
1364
1365    /// Get the PID of a container's main process
1366    ///
1367    /// Returns:
1368    /// - `Ok(Some(pid))` for runtimes with real processes (Youki, Docker)
1369    /// - `Ok(None)` for runtimes without separate PIDs (WASM in-process)
1370    /// - `Err` if the container doesn't exist or there's an error
1371    ///
1372    /// Used for overlay network attachment and process management.
1373    async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>>;
1374
1375    /// How this runtime joins a container to the cross-node overlay network.
1376    ///
1377    /// Defaults to [`OverlayAttachKind::HostNetns`] (the Linux veth-by-PID path).
1378    /// VM runtimes with no host netns (macOS VZ-Linux) override this to
1379    /// [`OverlayAttachKind::GuestManaged`], which makes the service layer push a
1380    /// host-allocated overlay config into the guest via [`push_overlay_config`]
1381    /// instead of attaching a veth by PID.
1382    ///
1383    /// [`push_overlay_config`]: Runtime::push_overlay_config
1384    fn overlay_attach_kind(&self) -> OverlayAttachKind {
1385        OverlayAttachKind::HostNetns
1386    }
1387
1388    /// Per-container variant of [`Runtime::overlay_attach_kind`]. A composite
1389    /// runtime that multiplexes several backends (e.g. Windows HCS + WSL2, or
1390    /// macOS Seatbelt + VZ) overrides this to return the attach kind of the
1391    /// specific backend that owns `id` (resolved via an async backend lookup),
1392    /// so a Seatbelt container reports [`OverlayAttachKind::HostProxy`] while a
1393    /// co-located VZ-Linux container reports [`OverlayAttachKind::GuestManaged`].
1394    /// Defaults to the runtime-wide [`Runtime::overlay_attach_kind`].
1395    async fn overlay_attach_kind_for(&self, _id: &ContainerId) -> OverlayAttachKind {
1396        self.overlay_attach_kind()
1397    }
1398
1399    /// Push a host-allocated overlay (`WireGuard`) config into a guest that manages
1400    /// its own overlay interface ([`OverlayAttachKind::GuestManaged`]).
1401    ///
1402    /// The service layer obtains `config` from overlayd (which allocated the
1403    /// address + keypair and registered the public key in the mesh) and calls
1404    /// this so the runtime can deliver it to the guest (over vsock) and bring up
1405    /// the in-guest interface. Runtimes that attach by netns/PID never call this
1406    /// and use the default, which errors.
1407    async fn push_overlay_config(
1408        &self,
1409        _id: &ContainerId,
1410        _config: &zlayer_types::overlayd::GuestOverlayConfig,
1411    ) -> Result<()> {
1412        Err(AgentError::Unsupported(
1413            "push_overlay_config is not supported by this runtime".to_string(),
1414        ))
1415    }
1416
1417    /// Record the overlay `/32` overlayd assigned to a host-shared container and
1418    /// stand up userspace forwarders binding `<overlay_ip>:<published_port>` to
1419    /// the container's local delivery address. Called by the service/job layer
1420    /// AFTER a successful [`OverlayAttachKind::HostProxy`] attach, once the
1421    /// node-side `utun` alias for `overlay_ip` is live. Default no-op:
1422    /// host-netns (Linux) and guest-managed (VZ-Linux) runtimes surface their
1423    /// overlay IP by other means.
1424    async fn attach_overlay_ip(&self, _id: &ContainerId, _overlay_ip: IpAddr) -> Result<()> {
1425        Ok(())
1426    }
1427
1428    /// Tear down the forwarders and clear the stored overlay IP for `id`.
1429    /// Called on container teardown / scale-down. Default no-op.
1430    async fn detach_overlay_ip(&self, _id: &ContainerId) -> Result<()> {
1431        Ok(())
1432    }
1433
1434    /// Get the IP address of a container
1435    ///
1436    /// Returns:
1437    /// - `Ok(Some(ip))` if the container has a known IP address
1438    /// - `Ok(None)` if the container exists but has no IP assigned yet
1439    /// - `Err` if the container doesn't exist or there's an error
1440    ///
1441    /// Used for proxy backend registration when overlay networking is unavailable.
1442    async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>>;
1443
1444    /// Get a runtime-assigned port override for a container.
1445    ///
1446    /// Returns:
1447    /// - `Ok(Some(port))` if the runtime assigned a dynamic port to this container
1448    /// - `Ok(None)` if the container should use the spec-declared endpoint port
1449    ///
1450    /// This exists for runtimes where all containers share the host network stack
1451    /// (e.g., macOS sandbox). Without network namespaces, multiple replicas of
1452    /// the same service would conflict on the same port. The runtime assigns
1453    /// each replica a unique port and passes it via the `PORT` environment variable.
1454    /// The proxy then routes to `container_ip:override_port` instead of
1455    /// `container_ip:spec_port`.
1456    ///
1457    /// Runtimes with per-container networking (overlay, VMs, Docker) return `None`.
1458    async fn get_container_port_override(&self, _id: &ContainerId) -> Result<Option<u16>> {
1459        Ok(None)
1460    }
1461
1462    /// Get the HCN namespace GUID of a Windows container.
1463    ///
1464    /// Windows-only. Linux/macOS runtimes have no HCN namespace concept and
1465    /// return `Ok(None)`. The `HcsRuntime` overrides this to return the
1466    /// namespace GUID attached during `create_container`; `OverlayManager`
1467    /// then uses the GUID to register the container's assigned overlay IP
1468    /// against the right HCN compartment (analogous to how Linux uses PID
1469    /// to enter the netns via `/proc/{pid}/ns/net`).
1470    #[cfg(target_os = "windows")]
1471    async fn get_container_namespace_id(
1472        &self,
1473        _id: &ContainerId,
1474    ) -> Result<Option<windows::core::GUID>> {
1475        Ok(None)
1476    }
1477
1478    /// Sync all named volumes associated with this container to S3.
1479    ///
1480    /// Called after a container is stopped but before it is removed, giving
1481    /// the runtime a chance to flush persistent volume data to remote storage.
1482    ///
1483    /// The default implementation is a no-op. Runtimes that support S3-backed
1484    /// volume sync (e.g., Youki with the `s3` feature) override this.
1485    async fn sync_container_volumes(&self, _id: &ContainerId) -> Result<()> {
1486        Ok(())
1487    }
1488
1489    /// Stream container logs as raw byte chunks tagged with their channel.
1490    ///
1491    /// Mirrors the `GET /containers/{id}/logs` endpoint of the Docker Engine
1492    /// API: callers can request `follow`, `tail`, `since`/`until` time
1493    /// windows, per-channel filtering (`stdout` / `stderr`), and inline
1494    /// timestamps via [`LogsStreamOptions`]. Backends that demultiplex
1495    /// Docker's stdcopy framing emit one [`LogChunk`] per frame; line-based
1496    /// runtimes emit one chunk per line.
1497    ///
1498    /// The default implementation returns [`AgentError::Unsupported`].
1499    /// Concrete runtimes override this with backend-specific streaming
1500    /// (bollard's `logs` for Docker, log-file tailing for Youki/HCS, etc.).
1501    /// The stream is `'static` so HTTP handlers can drive it independently
1502    /// of the trait-method borrow.
1503    async fn logs_stream(&self, _id: &ContainerId, _opts: LogsStreamOptions) -> Result<LogsStream> {
1504        Err(AgentError::Unsupported(
1505            "logs_stream is not supported by this runtime".into(),
1506        ))
1507    }
1508
1509    /// Stream periodic resource-usage samples for a container.
1510    ///
1511    /// Mirrors the streaming form of `GET /containers/{id}/stats` in the
1512    /// Docker Engine API: each yielded [`StatsSample`] is one full snapshot
1513    /// of CPU / memory / network / block-IO / pids counters at the moment
1514    /// it was taken. Sampling cadence is backend-defined (Docker emits one
1515    /// sample per second by default).
1516    ///
1517    /// The default implementation returns [`AgentError::Unsupported`].
1518    /// Backends that can produce this data implement it directly: Docker
1519    /// via bollard's `stats`, Youki by polling cgroup stat files,
1520    /// `MockRuntime` by emitting a deterministic single sample.
1521    async fn stats_stream(&self, _id: &ContainerId) -> Result<StatsStream> {
1522        Err(AgentError::Unsupported(
1523            "stats_stream is not supported by this runtime".into(),
1524        ))
1525    }
1526
1527    /// Pull an image, streaming progress events as layers are downloaded.
1528    ///
1529    /// Mirrors the streaming form of `POST /images/create` in the Docker
1530    /// Engine API. Backends emit a series of [`PullProgress::Status`]
1531    /// events for in-flight layers, followed by exactly one
1532    /// [`PullProgress::Done`] event on success. Errors that occur mid-pull
1533    /// surface as `Err` items on the stream and terminate it.
1534    ///
1535    /// `auth` carries inline credentials for this pull. When `None`, the
1536    /// runtime falls back to its credential-store lookup keyed by registry
1537    /// hostname (matching the semantics of [`Runtime::pull_image_with_policy`]).
1538    ///
1539    /// Backends override this with their native streaming pull
1540    /// (bollard's `create_image` for Docker, `zlayer-registry` for Youki).
1541    ///
1542    /// The default implementation performs a BLOCKING pull via
1543    /// [`Runtime::pull_image_with_policy`] and then synthesizes a minimal
1544    /// Docker-style progress stream (a `Status` line + a terminal `Done`). This
1545    /// keeps the streaming `POST /images/create` (e.g. `docker pull` through
1546    /// the `zlayer-docker` compat socket) working on runtimes that lack a native
1547    /// streaming pull — notably the macOS sandbox. The streaming form means
1548    /// "make this image available now", so it pulls if not already present.
1549    async fn pull_image_stream(
1550        &self,
1551        image: &str,
1552        auth: Option<&RegistryAuth>,
1553    ) -> Result<PullProgressStream> {
1554        self.pull_image_with_policy(
1555            image,
1556            PullPolicy::IfNotPresent,
1557            auth,
1558            zlayer_spec::SourcePolicy::default(),
1559        )
1560        .await?;
1561        let reference = image.to_string();
1562        let events: Vec<Result<PullProgress>> = vec![
1563            Ok(PullProgress::Status {
1564                id: None,
1565                status: format!("Pulling from {reference}"),
1566                progress: None,
1567                current: None,
1568                total: None,
1569            }),
1570            Ok(PullProgress::Done {
1571                reference,
1572                digest: None,
1573            }),
1574        ];
1575        Ok(Box::pin(futures_util::stream::iter(events)))
1576    }
1577
1578    /// List all images managed by this runtime's image storage.
1579    ///
1580    /// The default implementation returns `AgentError::Unsupported` — individual
1581    /// runtimes override this with backend-specific logic (bollard for Docker,
1582    /// zlayer-registry cache walk for Youki, etc.).
1583    async fn list_images(&self) -> Result<Vec<ImageInfo>> {
1584        Err(AgentError::Unsupported(
1585            "list_images is not supported by this runtime".into(),
1586        ))
1587    }
1588
1589    /// Remove an image by reference from local storage.
1590    ///
1591    /// When `force` is true, also removes the image even when other containers
1592    /// reference it. The default implementation returns `AgentError::Unsupported`.
1593    async fn remove_image(&self, _image: &str, _force: bool) -> Result<()> {
1594        Err(AgentError::Unsupported(
1595            "remove_image is not supported by this runtime".into(),
1596        ))
1597    }
1598
1599    /// Prune dangling / unused images from local storage.
1600    ///
1601    /// Returns a [`PruneResult`] describing what was removed. The default
1602    /// implementation returns `AgentError::Unsupported`.
1603    async fn prune_images(&self) -> Result<PruneResult> {
1604        Err(AgentError::Unsupported(
1605            "prune_images is not supported by this runtime".into(),
1606        ))
1607    }
1608
1609    /// Send a signal to a running container.
1610    ///
1611    /// When `signal` is `None`, the runtime sends `SIGKILL` (matching Docker's
1612    /// `docker kill` default). Backends validate the signal name and reject
1613    /// anything outside the standard POSIX set (`SIGKILL`, `SIGTERM`, `SIGINT`,
1614    /// `SIGHUP`, `SIGUSR1`, `SIGUSR2`).
1615    ///
1616    /// Used by `POST /api/v1/containers/{id}/kill` and Docker-compat
1617    /// `docker kill`. The default implementation returns
1618    /// [`AgentError::Unsupported`].
1619    async fn kill_container(&self, _id: &ContainerId, _signal: Option<&str>) -> Result<()> {
1620        Err(AgentError::Unsupported(
1621            "kill_container is not supported by this runtime".into(),
1622        ))
1623    }
1624
1625    /// Write a chunk of stdin to a running container's main process.
1626    ///
1627    /// Powers the host→guest direction of interactive (`-it`) sessions: the
1628    /// daemon's `POST /api/v1/containers/{id}/stdin` endpoint forwards raw
1629    /// terminal bytes here, which the runtime relays to the workload (for the
1630    /// macOS VZ-Linux backend, as `Msg::Stdin` frames to the in-guest agent).
1631    ///
1632    /// The default implementation returns [`AgentError::Unsupported`] so
1633    /// non-interactive backends keep compiling.
1634    async fn write_stdin(&self, _id: &ContainerId, _data: &[u8]) -> Result<()> {
1635        Err(AgentError::Unsupported(
1636            "write_stdin is not supported by this runtime".into(),
1637        ))
1638    }
1639
1640    /// Signal end-of-input (close stdin) for a running container.
1641    ///
1642    /// Powers Ctrl-D / detach for interactive sessions: the daemon's
1643    /// `DELETE /api/v1/containers/{id}/stdin` endpoint calls this, which causes
1644    /// the runtime to stop forwarding stdin and emit a final close marker (for
1645    /// the macOS VZ-Linux backend, `Msg::StdinEof` to the in-guest agent).
1646    ///
1647    /// The default implementation returns [`AgentError::Unsupported`].
1648    async fn close_stdin(&self, _id: &ContainerId) -> Result<()> {
1649        Err(AgentError::Unsupported(
1650            "close_stdin is not supported by this runtime".into(),
1651        ))
1652    }
1653
1654    /// Create a new tag pointing at an existing image.
1655    ///
1656    /// `source` is the reference to an already-cached image. `target` is the
1657    /// new reference to create — it must be a full reference (repository + tag).
1658    ///
1659    /// Used by `POST /api/v1/images/tag` and Docker-compat `docker tag`. The
1660    /// default implementation returns [`AgentError::Unsupported`].
1661    async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
1662        Err(AgentError::Unsupported(
1663            "tag_image is not supported by this runtime".into(),
1664        ))
1665    }
1666
1667    /// Inspect an image and return a Docker-shaped detail record.
1668    ///
1669    /// Mirrors Docker's `GET /images/{name}/json`. Backends translate their
1670    /// native inspect output into [`ImageInspectInfo`]; the API/Docker
1671    /// compat shim emits the JSON body. The default implementation returns
1672    /// [`AgentError::Unsupported`] so non-Docker backends keep compiling.
1673    async fn inspect_image_native(&self, _image: &str) -> Result<ImageInspectInfo> {
1674        Err(AgentError::Unsupported(
1675            "inspect_image_native is not supported by this runtime".into(),
1676        ))
1677    }
1678
1679    /// Return the parent-layer history for an image.
1680    ///
1681    /// Mirrors Docker's `GET /images/{name}/history`. The default
1682    /// implementation returns [`AgentError::Unsupported`].
1683    async fn image_history(&self, _image: &str) -> Result<Vec<ImageHistoryEntry>> {
1684        Err(AgentError::Unsupported(
1685            "image_history is not supported by this runtime".into(),
1686        ))
1687    }
1688
1689    /// Search a registry for images matching `term`.
1690    ///
1691    /// Mirrors Docker's `GET /images/search`. `limit` caps the number of
1692    /// returned items; `0` means "let the registry decide". The default
1693    /// implementation returns [`AgentError::Unsupported`].
1694    async fn search_images(&self, _term: &str, _limit: u32) -> Result<Vec<ImageSearchResult>> {
1695        Err(AgentError::Unsupported(
1696            "search_images is not supported by this runtime".into(),
1697        ))
1698    }
1699
1700    /// Stream a tar archive containing one or more images.
1701    ///
1702    /// Mirrors Docker's `GET /images/get?names=...`. Multi-image archives
1703    /// dedupe shared layers. The default implementation returns
1704    /// [`AgentError::Unsupported`].
1705    async fn save_images(&self, _names: &[String]) -> Result<ImageExportStream> {
1706        Err(AgentError::Unsupported(
1707            "save_images is not supported by this runtime".into(),
1708        ))
1709    }
1710
1711    /// Load images from a tar archive.
1712    ///
1713    /// Mirrors Docker's `POST /images/load`. `tar_bytes` is the
1714    /// uncompressed (or gzip-compressed) tar produced by [`Self::save_images`].
1715    /// `quiet` suppresses progress events when set. The default
1716    /// implementation returns [`AgentError::Unsupported`].
1717    async fn load_images(
1718        &self,
1719        _tar_bytes: bytes::Bytes,
1720        _quiet: bool,
1721    ) -> Result<LoadProgressStream> {
1722        Err(AgentError::Unsupported(
1723            "load_images is not supported by this runtime".into(),
1724        ))
1725    }
1726
1727    /// Import a single image from a tar root filesystem.
1728    ///
1729    /// Mirrors the `fromSrc=`-mode of Docker's `POST /images/create`.
1730    /// `tar_bytes` is a tar of the root filesystem; `repo`/`tag` are the
1731    /// reference to apply to the resulting image. The default
1732    /// implementation returns [`AgentError::Unsupported`].
1733    async fn import_image(
1734        &self,
1735        _tar_bytes: bytes::Bytes,
1736        _repo: Option<&str>,
1737        _tag: Option<&str>,
1738    ) -> Result<String> {
1739        Err(AgentError::Unsupported(
1740            "import_image is not supported by this runtime".into(),
1741        ))
1742    }
1743
1744    /// Expose the runtime's already-open image store handles: its
1745    /// [`LocalRegistry`](zlayer_registry::LocalRegistry) and shared blob cache.
1746    ///
1747    /// `zlayer-registry`'s on-disk blob cache (ZQL / redb) is **single-process
1748    /// exclusive** — only one open handle may exist per process. The youki
1749    /// runtime opens both at construction and holds them for the daemon's
1750    /// lifetime. Callers that need to write into the *same* store the daemon
1751    /// serves from (notably the `POST /images/import` handler) must reuse these
1752    /// shared `Arc`s rather than opening a second handle, which would fail with
1753    /// "database is locked by another process".
1754    ///
1755    /// Returns `Some((registry, blob_cache))` for runtimes that own an on-disk
1756    /// `zlayer-registry` store (youki, and a composite whose primary owns one).
1757    /// The default returns `None` for runtimes that delegate image storage to
1758    /// an external daemon (Docker/WSL/HCS) or have no store (WASM, mocks); such
1759    /// runtimes keep the import handler on its runtime-backed fallback.
1760    fn image_store_handles(&self) -> Option<ImageStoreHandles> {
1761        None
1762    }
1763
1764    /// Stream a tar archive of the container's filesystem.
1765    ///
1766    /// Mirrors Docker's `GET /containers/{id}/export`. The default
1767    /// implementation returns [`AgentError::Unsupported`].
1768    async fn export_container_fs(&self, _id: &ContainerId) -> Result<ImageExportStream> {
1769        Err(AgentError::Unsupported(
1770            "export_container_fs is not supported by this runtime".into(),
1771        ))
1772    }
1773
1774    /// Commit a container's filesystem state to a new image.
1775    ///
1776    /// Mirrors Docker's `POST /commit?container=...`. `opts` carries the
1777    /// optional repo/tag/comment/author/pause/changes parameters. The
1778    /// default implementation returns [`AgentError::Unsupported`].
1779    async fn commit_container(
1780        &self,
1781        _id: &ContainerId,
1782        _opts: &CommitOptions,
1783    ) -> Result<CommitOutcome> {
1784        Err(AgentError::Unsupported(
1785            "commit_container is not supported by this runtime".into(),
1786        ))
1787    }
1788
1789    /// Return rich inspect details for a container: published ports, attached
1790    /// networks, first IPv4, health, and most-recent exit code.
1791    ///
1792    /// Runtimes implement this by translating the backend's native inspect
1793    /// response (bollard's `ContainerInspectResponse` for Docker) into the
1794    /// runtime-level [`ContainerInspectDetails`] struct. The API layer merges
1795    /// these fields into `ContainerInfo` on `GET /api/v1/containers` and
1796    /// `GET /api/v1/containers/{id}` (§3.15 of `ZLAYER_SDK_FIXES.md`).
1797    ///
1798    /// The default implementation returns [`ContainerInspectDetails::default`]
1799    /// — an all-empty record, which the API layer treats as "this runtime
1800    /// doesn't support rich inspect; skip all the extra fields". This keeps
1801    /// non-Docker runtimes (Youki, WASM, Mock) backwards compatible; they can
1802    /// override this later if they gain equivalent inspect capability.
1803    async fn inspect_detailed(&self, _id: &ContainerId) -> Result<ContainerInspectDetails> {
1804        Ok(ContainerInspectDetails::default())
1805    }
1806
1807    /// Pause all processes in the container by freezing its cgroup.
1808    ///
1809    /// Mirrors Docker's `POST /containers/{id}/pause`. After pause, the
1810    /// container's processes are suspended in the kernel via the cgroup
1811    /// freezer; calls to [`Runtime::container_state`] still report
1812    /// `Running` but no instructions execute until [`Runtime::unpause_container`].
1813    ///
1814    /// The default implementation returns [`AgentError::Unsupported`].
1815    /// Backends override this with their native pause API (bollard's
1816    /// `pause_container` for Docker, libcontainer's `Container::pause` for
1817    /// Youki).
1818    async fn pause_container(&self, _id: &ContainerId) -> Result<()> {
1819        Err(AgentError::Unsupported(
1820            "pause_container is not supported by this runtime".into(),
1821        ))
1822    }
1823
1824    /// Resume a previously-paused container by thawing its cgroup freezer.
1825    ///
1826    /// Mirrors Docker's `POST /containers/{id}/unpause`. The default
1827    /// implementation returns [`AgentError::Unsupported`].
1828    async fn unpause_container(&self, _id: &ContainerId) -> Result<()> {
1829        Err(AgentError::Unsupported(
1830            "unpause_container is not supported by this runtime".into(),
1831        ))
1832    }
1833
1834    /// Update a running container's resource limits and restart policy.
1835    ///
1836    /// Mirrors Docker's `POST /containers/{id}/update`. The fields on
1837    /// [`ContainerResourceUpdate`] are individually optional: backends
1838    /// apply only the fields that are `Some` and leave the rest of the
1839    /// container's runtime configuration untouched. A fully-empty update
1840    /// short-circuits to a no-op (no cgroup writes, no warnings).
1841    ///
1842    /// Returns a [`ContainerUpdateOutcome`] whose `warnings` vector
1843    /// surfaces non-fatal issues (e.g. "kernel memory limit is
1844    /// deprecated", or "real-time scheduling not supported on this
1845    /// kernel"). Empty `warnings` ⇒ every requested field was applied.
1846    ///
1847    /// The default implementation returns [`AgentError::Unsupported`].
1848    /// Backends override this:
1849    ///
1850    /// * Docker — calls bollard's `update_container` with a
1851    ///   `ContainerUpdateBody` populated from the input.
1852    /// * Youki — writes the corresponding cgroup v2 files
1853    ///   (`cpu.weight`, `memory.max`, `pids.max`, `io.weight`,
1854    ///   `cpuset.cpus`, `cpuset.mems`) under
1855    ///   `<container_root>/cgroup` and persists the new restart policy
1856    ///   in the on-disk supervisor state.
1857    /// * Other backends (WASM, mocks) inherit the `Unsupported` default.
1858    async fn update_container_resources(
1859        &self,
1860        _id: &ContainerId,
1861        _update: &ContainerResourceUpdate,
1862    ) -> Result<ContainerUpdateOutcome> {
1863        Err(AgentError::Unsupported(
1864            "update_container_resources is not supported by this runtime".into(),
1865        ))
1866    }
1867
1868    /// List the processes running inside a container (`docker top`).
1869    ///
1870    /// `ps_args` is forwarded to the runtime as the `ps(1)` argument list when
1871    /// supported; an empty slice means "use the runtime's default columns".
1872    /// Mirrors Docker's `GET /containers/{id}/top?ps_args=<...>`.
1873    ///
1874    /// The default implementation returns [`AgentError::Unsupported`].
1875    async fn top_container(
1876        &self,
1877        _id: &ContainerId,
1878        _ps_args: &[String],
1879    ) -> Result<ContainerTopOutput> {
1880        Err(AgentError::Unsupported(
1881            "top_container is not supported by this runtime".into(),
1882        ))
1883    }
1884
1885    /// Report changes to a container's filesystem since it was created.
1886    ///
1887    /// Mirrors Docker's `GET /containers/{id}/changes`. Returns one
1888    /// [`FilesystemChangeEntry`] per added / modified / deleted path in the
1889    /// container's writable layer. Runtimes that don't compute layer diffs
1890    /// (e.g. youki, which uses raw bundle rootfs without a layered FS) return
1891    /// [`AgentError::Unsupported`].
1892    async fn changes_container(&self, _id: &ContainerId) -> Result<Vec<FilesystemChangeEntry>> {
1893        Err(AgentError::Unsupported(
1894            "changes_container is not supported by this runtime".into(),
1895        ))
1896    }
1897
1898    /// Report the published port mappings for a container.
1899    ///
1900    /// Mirrors Docker's `GET /containers/{id}/port`. Returns one
1901    /// [`PortMappingEntry`] per (container-port, protocol, host-binding)
1902    /// triple. Containers with no published ports return an empty vector.
1903    ///
1904    /// The default implementation returns [`AgentError::Unsupported`].
1905    async fn port_mappings_container(&self, _id: &ContainerId) -> Result<Vec<PortMappingEntry>> {
1906        Err(AgentError::Unsupported(
1907            "port_mappings_container is not supported by this runtime".into(),
1908        ))
1909    }
1910
1911    /// Prune stopped containers from the runtime.
1912    ///
1913    /// Mirrors Docker's `POST /containers/prune`. Returns the IDs of
1914    /// containers that were removed plus the bytes reclaimed. The default
1915    /// implementation returns [`AgentError::Unsupported`].
1916    async fn prune_containers(&self) -> Result<ContainerPruneResult> {
1917        Err(AgentError::Unsupported(
1918            "prune_containers is not supported by this runtime".into(),
1919        ))
1920    }
1921
1922    /// Enumerate all containers known to this runtime, including stopped /
1923    /// exited ones (Docker's `list_containers(all=true)` semantics).
1924    ///
1925    /// Used by `zlayer-api::handlers::standalone_reconcile` on daemon boot
1926    /// to match persisted standalone-container records against the
1927    /// runtime's actual inventory: entries the runtime no longer reports
1928    /// are pruned, surviving entries are re-registered in the
1929    /// `ContainerIdMap`, and runtime containers carrying a
1930    /// `com.zlayer.container_id` label that has no storage match are
1931    /// counted as orphans (logged but otherwise left alone).
1932    ///
1933    /// The default implementation returns an empty list, which makes
1934    /// reconcile degrade to a label-blind pass: storage entries can still
1935    /// be probed individually via [`Runtime::container_state`], but orphan
1936    /// detection is disabled until the backend overrides this method
1937    /// (Docker via `bollard::list_containers`, youki via state-dir walk).
1938    async fn list_containers(&self) -> Result<Vec<RuntimeContainerSummary>> {
1939        Ok(Vec::new())
1940    }
1941
1942    /// Stream a TAR archive of the file or directory at `path` inside the
1943    /// container.
1944    ///
1945    /// Mirrors Docker's `GET /containers/{id}/archive?path=<...>`. The
1946    /// returned [`ArchiveStream`] yields raw `application/x-tar` bytes.
1947    /// Backends decide whether to materialize the archive in memory or
1948    /// stream it on the fly:
1949    ///
1950    /// * Docker — bollard's `download_from_container` produces a chunked
1951    ///   stream of TAR bytes; we forward it verbatim.
1952    /// * Youki — a rootfs walk under `<bundle>/rootfs<path>` produces a
1953    ///   TAR archive in a worker task and streams it through an mpsc.
1954    /// * Other backends (WASM, mocks) inherit the `Unsupported` default.
1955    ///
1956    /// The default implementation returns [`AgentError::Unsupported`].
1957    async fn archive_get(&self, _id: &ContainerId, _path: &str) -> Result<ArchiveStream> {
1958        Err(AgentError::Unsupported(
1959            "archive_get is not supported by this runtime".into(),
1960        ))
1961    }
1962
1963    /// Extract a TAR archive into the container at `path`.
1964    ///
1965    /// Mirrors Docker's `PUT /containers/{id}/archive?path=<...>`. The
1966    /// runtime must extract `tar_bytes` (an uncompressed TAR archive) into
1967    /// `path` inside the container, honouring [`ArchivePutOptions`].
1968    ///
1969    /// `path` must already exist inside the container and must be a
1970    /// directory; if it does not exist the runtime returns
1971    /// [`AgentError::NotFound`]. Mismatched directory/non-directory
1972    /// replacements with `no_overwrite_dir_non_dir=true` return
1973    /// [`AgentError::InvalidSpec`].
1974    ///
1975    /// The default implementation returns [`AgentError::Unsupported`].
1976    async fn archive_put(
1977        &self,
1978        _id: &ContainerId,
1979        _path: &str,
1980        _tar_bytes: bytes::Bytes,
1981        _opts: ArchivePutOptions,
1982    ) -> Result<()> {
1983        Err(AgentError::Unsupported(
1984            "archive_put is not supported by this runtime".into(),
1985        ))
1986    }
1987
1988    /// Return stat metadata for the file or directory at `path` inside the
1989    /// container.
1990    ///
1991    /// Mirrors Docker's `HEAD /containers/{id}/archive?path=<...>`, which
1992    /// answers with the metadata that `GET /archive` *would* expose without
1993    /// materializing the TAR. Used by `docker cp` and the API layer to
1994    /// short-circuit on missing paths.
1995    ///
1996    /// The default implementation returns [`AgentError::Unsupported`].
1997    async fn archive_head(&self, _id: &ContainerId, _path: &str) -> Result<PathStat> {
1998        Err(AgentError::Unsupported(
1999            "archive_head is not supported by this runtime".into(),
2000        ))
2001    }
2002}
2003
2004/// Validate a signal name for [`Runtime::kill_container`].
2005///
2006/// Accepts both the `SIG`-prefixed form (`"SIGKILL"`) and the bare form
2007/// (`"KILL"`). Returns the canonical uppercase `SIG`-prefixed name on success.
2008///
2009/// # Errors
2010///
2011/// Returns [`AgentError::InvalidSpec`] when `signal` is not one of the
2012/// supported signals: `SIGKILL`, `SIGTERM`, `SIGINT`, `SIGHUP`, `SIGUSR1`,
2013/// `SIGUSR2`.
2014pub fn validate_signal(signal: &str) -> Result<String> {
2015    let trimmed = signal.trim();
2016    if trimmed.is_empty() {
2017        return Err(AgentError::InvalidSpec(
2018            "signal must not be empty".to_string(),
2019        ));
2020    }
2021    let upper = trimmed.to_ascii_uppercase();
2022    let canonical = if upper.starts_with("SIG") {
2023        upper
2024    } else {
2025        format!("SIG{upper}")
2026    };
2027    match canonical.as_str() {
2028        "SIGKILL" | "SIGTERM" | "SIGINT" | "SIGHUP" | "SIGUSR1" | "SIGUSR2" => Ok(canonical),
2029        other => Err(AgentError::InvalidSpec(format!(
2030            "unsupported signal '{other}'; allowed: SIGKILL, SIGTERM, SIGINT, SIGHUP, SIGUSR1, SIGUSR2"
2031        ))),
2032    }
2033}
2034
2035/// Auth context injected into every container so it can talk back to the host
2036/// API without needing external credentials.
2037#[derive(Debug, Clone)]
2038pub struct ContainerAuthContext {
2039    /// Base URL of the `ZLayer` API, e.g. `"http://127.0.0.1:3669"`.
2040    pub api_url: String,
2041    /// JWT signing secret — used to mint per-container tokens at start time.
2042    pub jwt_secret: String,
2043    /// Absolute path of the Unix socket on the host (bind-mounted into Linux
2044    /// containers; added to `writable_dirs` for macOS sandbox).
2045    pub socket_path: String,
2046    /// Sink for persisting/revoking the per-container scoped token. `None`
2047    /// disables persistence (token minted without a `jti`, not revocable).
2048    pub token_sink: Option<std::sync::Arc<dyn crate::auth::ContainerTokenSink>>,
2049    /// Spawns/tears down the per-container Docker Engine API socket for
2050    /// containers that opt into `zlayer.io/docker-socket`. `None` disables the
2051    /// feature (no socket provisioned). Wired in the bin (the agent can't depend
2052    /// on `zlayer-docker`).
2053    pub docker_socket_spawner: Option<std::sync::Arc<dyn crate::auth::DockerSocketSpawner>>,
2054}
2055
2056/// In-memory mock runtime for testing and development.
2057///
2058/// In addition to tracking container lifecycle in memory, the mock exposes
2059/// per-container queues for streaming method outputs so unit tests can
2060/// pre-script the events that [`Runtime::logs_stream`],
2061/// [`Runtime::stats_stream`], and [`Runtime::pull_image_stream`] should
2062/// yield. See [`MockRuntime::enqueue_log_chunk`],
2063/// [`MockRuntime::enqueue_stats_sample`], and
2064/// [`MockRuntime::enqueue_pull_progress`].
2065pub struct MockRuntime {
2066    containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
2067    /// Pre-scripted log chunks per container. Each call to
2068    /// [`Runtime::logs_stream`] drains this queue in order; once empty, the
2069    /// stream either terminates (when `follow=false`) or hangs forever
2070    /// (when `follow=true`) so tests can exercise both branches.
2071    pub logs_to_yield:
2072        Arc<Mutex<std::collections::HashMap<ContainerId, VecDeque<Result<LogChunk>>>>>,
2073    /// Pre-scripted stats samples per container. Drained in order by
2074    /// [`Runtime::stats_stream`].
2075    pub stats_to_yield:
2076        Arc<Mutex<std::collections::HashMap<ContainerId, VecDeque<Result<StatsSample>>>>>,
2077    /// Pre-scripted pull progress events keyed by image reference. Drained
2078    /// in order by [`Runtime::pull_image_stream`].
2079    pub pull_progress_to_yield:
2080        Arc<Mutex<std::collections::HashMap<String, VecDeque<Result<PullProgress>>>>>,
2081}
2082
2083impl MockRuntime {
2084    #[must_use]
2085    pub fn new() -> Self {
2086        Self {
2087            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
2088            logs_to_yield: Arc::new(Mutex::new(std::collections::HashMap::new())),
2089            stats_to_yield: Arc::new(Mutex::new(std::collections::HashMap::new())),
2090            pull_progress_to_yield: Arc::new(Mutex::new(std::collections::HashMap::new())),
2091        }
2092    }
2093
2094    /// Push a single [`LogChunk`] onto the queue for `id`. Subsequent calls to
2095    /// [`Runtime::logs_stream`] will yield enqueued chunks in FIFO order.
2096    pub async fn enqueue_log_chunk(&self, id: &ContainerId, chunk: LogChunk) {
2097        self.logs_to_yield
2098            .lock()
2099            .await
2100            .entry(id.clone())
2101            .or_default()
2102            .push_back(Ok(chunk));
2103    }
2104
2105    /// Push a pre-built error onto the log queue for `id`. The next
2106    /// [`Runtime::logs_stream`] call drains this as the next yielded item.
2107    pub async fn enqueue_log_error(&self, id: &ContainerId, err: AgentError) {
2108        self.logs_to_yield
2109            .lock()
2110            .await
2111            .entry(id.clone())
2112            .or_default()
2113            .push_back(Err(err));
2114    }
2115
2116    /// Push a single [`StatsSample`] onto the queue for `id`. Subsequent calls
2117    /// to [`Runtime::stats_stream`] will yield enqueued samples in FIFO order.
2118    pub async fn enqueue_stats_sample(&self, id: &ContainerId, sample: StatsSample) {
2119        self.stats_to_yield
2120            .lock()
2121            .await
2122            .entry(id.clone())
2123            .or_default()
2124            .push_back(Ok(sample));
2125    }
2126
2127    /// Push a single [`PullProgress`] event onto the queue for `image`.
2128    /// Subsequent calls to [`Runtime::pull_image_stream`] for the same image
2129    /// reference will yield enqueued events in FIFO order.
2130    pub async fn enqueue_pull_progress(&self, image: &str, progress: PullProgress) {
2131        self.pull_progress_to_yield
2132            .lock()
2133            .await
2134            .entry(image.to_string())
2135            .or_default()
2136            .push_back(Ok(progress));
2137    }
2138}
2139
2140impl Default for MockRuntime {
2141    fn default() -> Self {
2142        Self::new()
2143    }
2144}
2145
2146#[async_trait::async_trait]
2147impl Runtime for MockRuntime {
2148    async fn pull_image(&self, _image: &str) -> Result<()> {
2149        self.pull_image_with_policy(
2150            _image,
2151            PullPolicy::IfNotPresent,
2152            None,
2153            zlayer_spec::SourcePolicy::default(),
2154        )
2155        .await
2156    }
2157
2158    async fn pull_image_with_policy(
2159        &self,
2160        _image: &str,
2161        _policy: PullPolicy,
2162        _auth: Option<&RegistryAuth>,
2163        _source: zlayer_spec::SourcePolicy,
2164    ) -> Result<()> {
2165        // Mock: always succeeds
2166        tokio::time::sleep(Duration::from_millis(100)).await;
2167        Ok(())
2168    }
2169
2170    async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
2171        let mut containers = self.containers.write().await;
2172        containers.insert(
2173            id.clone(),
2174            Container {
2175                id: id.clone(),
2176                image: spec.image.name.to_string(),
2177                state: ContainerState::Pending,
2178                pid: None,
2179                task: None,
2180                overlay_ip: None,
2181                health_monitor: None,
2182                port_override: None,
2183            },
2184        );
2185        Ok(())
2186    }
2187
2188    async fn start_container(&self, id: &ContainerId) -> Result<()> {
2189        let mut containers = self.containers.write().await;
2190        if let Some(container) = containers.get_mut(id) {
2191            container.state = ContainerState::Running;
2192            container.pid = Some(std::process::id()); // Mock PID
2193        }
2194        Ok(())
2195    }
2196
2197    async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
2198        let mut containers = self.containers.write().await;
2199        if let Some(container) = containers.get_mut(id) {
2200            container.state = ContainerState::Exited { code: 0 };
2201        }
2202        Ok(())
2203    }
2204
2205    async fn remove_container(&self, id: &ContainerId) -> Result<()> {
2206        let mut containers = self.containers.write().await;
2207        containers.remove(id);
2208        Ok(())
2209    }
2210
2211    async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
2212        let containers = self.containers.read().await;
2213        containers
2214            .get(id)
2215            .map(|c| c.state.clone())
2216            .ok_or_else(|| AgentError::NotFound {
2217                container: id.to_string(),
2218                reason: "container not found".to_string(),
2219            })
2220    }
2221
2222    async fn container_logs(&self, _id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
2223        let entries = vec![
2224            LogEntry {
2225                timestamp: chrono::Utc::now(),
2226                stream: LogStream::Stdout,
2227                message: "mock log line 1".to_string(),
2228                source: LogSource::Container("mock".to_string()),
2229                service: None,
2230                deployment: None,
2231            },
2232            LogEntry {
2233                timestamp: chrono::Utc::now(),
2234                stream: LogStream::Stderr,
2235                message: "mock error line".to_string(),
2236                source: LogSource::Container("mock".to_string()),
2237                service: None,
2238                deployment: None,
2239            },
2240        ];
2241        let skip = entries.len().saturating_sub(tail);
2242        Ok(entries.into_iter().skip(skip).collect())
2243    }
2244
2245    async fn exec(&self, _id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
2246        Ok((0, cmd.join(" "), String::new()))
2247    }
2248
2249    async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
2250        // Mock: return dummy stats
2251        let containers = self.containers.read().await;
2252        if containers.contains_key(id) {
2253            Ok(ContainerStats {
2254                cpu_usage_usec: 1_000_000,       // 1 second
2255                memory_bytes: 50 * 1024 * 1024,  // 50 MB
2256                memory_limit: 256 * 1024 * 1024, // 256 MB
2257                timestamp: std::time::Instant::now(),
2258            })
2259        } else {
2260            Err(AgentError::NotFound {
2261                container: id.to_string(),
2262                reason: "container not found".to_string(),
2263            })
2264        }
2265    }
2266
2267    async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
2268        // Mock: simulate waiting for container to exit
2269        let containers = self.containers.read().await;
2270        if let Some(container) = containers.get(id) {
2271            match &container.state {
2272                ContainerState::Exited { code } => Ok(*code),
2273                ContainerState::Failed { .. } => Ok(1),
2274                _ => {
2275                    // Simulate a brief wait and then return success
2276                    drop(containers);
2277                    tokio::time::sleep(Duration::from_millis(50)).await;
2278                    Ok(0)
2279                }
2280            }
2281        } else {
2282            Err(AgentError::NotFound {
2283                container: id.to_string(),
2284                reason: "container not found".to_string(),
2285            })
2286        }
2287    }
2288
2289    async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
2290        // Mock: return dummy structured log entries
2291        let containers = self.containers.read().await;
2292        if containers.contains_key(id) {
2293            let container_name = id.to_string();
2294            Ok(vec![
2295                LogEntry {
2296                    timestamp: chrono::Utc::now(),
2297                    stream: LogStream::Stdout,
2298                    message: format!("[{container_name}] Container started"),
2299                    source: LogSource::Container(container_name.clone()),
2300                    service: None,
2301                    deployment: None,
2302                },
2303                LogEntry {
2304                    timestamp: chrono::Utc::now(),
2305                    stream: LogStream::Stdout,
2306                    message: format!("[{container_name}] Executing command..."),
2307                    source: LogSource::Container(container_name.clone()),
2308                    service: None,
2309                    deployment: None,
2310                },
2311                LogEntry {
2312                    timestamp: chrono::Utc::now(),
2313                    stream: LogStream::Stdout,
2314                    message: format!("[{container_name}] Command completed successfully"),
2315                    source: LogSource::Container(container_name),
2316                    service: None,
2317                    deployment: None,
2318                },
2319            ])
2320        } else {
2321            Err(AgentError::NotFound {
2322                container: id.to_string(),
2323                reason: "container not found".to_string(),
2324            })
2325        }
2326    }
2327
2328    async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
2329        let containers = self.containers.read().await;
2330        if let Some(container) = containers.get(id) {
2331            Ok(container.pid)
2332        } else {
2333            Err(AgentError::NotFound {
2334                container: id.to_string(),
2335                reason: "container not found".to_string(),
2336            })
2337        }
2338    }
2339
2340    async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
2341        let containers = self.containers.read().await;
2342        if containers.contains_key(id) {
2343            // Mock: deterministic IP based on replica number (172.17.0.{replica+2})
2344            #[allow(clippy::cast_possible_truncation)]
2345            let last_octet = (id.replica + 2) as u8;
2346            Ok(Some(IpAddr::V4(std::net::Ipv4Addr::new(
2347                172, 17, 0, last_octet,
2348            ))))
2349        } else {
2350            Err(AgentError::NotFound {
2351                container: id.to_string(),
2352                reason: "container not found".to_string(),
2353            })
2354        }
2355    }
2356
2357    async fn list_images(&self) -> Result<Vec<ImageInfo>> {
2358        Ok(Vec::new())
2359    }
2360
2361    async fn remove_image(&self, _image: &str, _force: bool) -> Result<()> {
2362        Ok(())
2363    }
2364
2365    async fn prune_images(&self) -> Result<PruneResult> {
2366        Ok(PruneResult::default())
2367    }
2368
2369    async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
2370        // Validate signal even in the mock so callers exercise the same error
2371        // path. Default to SIGKILL when omitted.
2372        let _canonical = validate_signal(signal.unwrap_or("SIGKILL"))?;
2373        let mut containers = self.containers.write().await;
2374        let container = containers.get_mut(id).ok_or_else(|| AgentError::NotFound {
2375            container: id.to_string(),
2376            reason: "container not found".to_string(),
2377        })?;
2378        container.state = ContainerState::Exited { code: 137 };
2379        Ok(())
2380    }
2381
2382    async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
2383        // The in-memory mock doesn't store images; treat tag as a no-op success.
2384        Ok(())
2385    }
2386
2387    async fn logs_stream(&self, id: &ContainerId, opts: LogsStreamOptions) -> Result<LogsStream> {
2388        use futures_util::StreamExt;
2389
2390        // Drain the per-container queue once at call time; the resulting
2391        // iterator is owned by the stream so the lock isn't held while the
2392        // consumer reads.
2393        let queued: Vec<Result<LogChunk>> = {
2394            let mut guard = self.logs_to_yield.lock().await;
2395            guard.remove(id).map(Vec::from).unwrap_or_default()
2396        };
2397        let head = futures_util::stream::iter(queued);
2398        if opts.follow {
2399            // After the queued items are drained, hang forever so callers can
2400            // exercise the "still-following, no more data" branch and cancel
2401            // by dropping the stream.
2402            let tail = futures_util::stream::pending::<Result<LogChunk>>();
2403            Ok(Box::pin(head.chain(tail)))
2404        } else {
2405            Ok(Box::pin(head))
2406        }
2407    }
2408
2409    async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
2410        let queued: Vec<Result<StatsSample>> = {
2411            let mut guard = self.stats_to_yield.lock().await;
2412            guard.remove(id).map(Vec::from).unwrap_or_default()
2413        };
2414        // `stats_stream` has no `follow` flag on the trait. The mock yields
2415        // exactly what was pre-loaded and then closes — tests that want a
2416        // forever-pending stream can simulate it by holding the receiver and
2417        // never enqueueing more. Closing on drain keeps tests bounded so a
2418        // forgotten consumer never deadlocks the test runner.
2419        Ok(Box::pin(futures_util::stream::iter(queued)))
2420    }
2421
2422    async fn pull_image_stream(
2423        &self,
2424        image: &str,
2425        _auth: Option<&RegistryAuth>,
2426    ) -> Result<PullProgressStream> {
2427        let queued: Vec<Result<PullProgress>> = {
2428            let mut guard = self.pull_progress_to_yield.lock().await;
2429            guard.remove(image).map(Vec::from).unwrap_or_default()
2430        };
2431        // Pulls are inherently bounded: the real backends emit a final `Done`
2432        // event and close the stream. The mock follows the same shape — it
2433        // just yields whatever the test pre-loaded and ends.
2434        Ok(Box::pin(futures_util::stream::iter(queued)))
2435    }
2436}
2437
2438#[cfg(test)]
2439mod tests {
2440    use super::*;
2441
2442    #[tokio::test]
2443    async fn test_mock_runtime() {
2444        let runtime = MockRuntime::new();
2445        let id = ContainerId::new("test".to_string(), 1);
2446
2447        runtime.pull_image("test:latest").await.unwrap();
2448        runtime.create_container(&id, &mock_spec()).await.unwrap();
2449        runtime.start_container(&id).await.unwrap();
2450
2451        let state = runtime.container_state(&id).await.unwrap();
2452        assert_eq!(state, ContainerState::Running);
2453    }
2454
2455    #[test]
2456    fn parse_display_round_trips_cluster_shape() {
2457        // The exact id the deployment-run path surfaces: service=alpine,
2458        // role=default, replica=1, node_id=1 → `alpine-default-1-on-1`.
2459        let cid = ContainerId::with_role_and_node("alpine", 1, "default", 1);
2460        let s = cid.to_string();
2461        assert_eq!(s, "alpine-default-1-on-1");
2462        assert_eq!(ContainerId::parse_display(&s), Some(cid));
2463    }
2464
2465    #[test]
2466    fn parse_display_round_trips_legacy_shape() {
2467        let cid = ContainerId::new("alpine", 3);
2468        let s = cid.to_string();
2469        assert_eq!(s, "alpine-rep-3");
2470        assert_eq!(ContainerId::parse_display(&s), Some(cid));
2471    }
2472
2473    #[test]
2474    fn parse_display_handles_hyphenated_service() {
2475        // A service name containing `-` must round-trip: the parser anchors on
2476        // the rightmost structural markers, not a left-to-right split.
2477        let cid = ContainerId::with_role_and_node("my-web-app", 2, "read", 7);
2478        let s = cid.to_string();
2479        assert_eq!(s, "my-web-app-read-2-on-7");
2480        assert_eq!(ContainerId::parse_display(&s), Some(cid));
2481    }
2482
2483    #[test]
2484    fn parse_display_rejects_non_ids() {
2485        // A 64-char hex id, a bare name, and obvious garbage must not parse.
2486        assert_eq!(ContainerId::parse_display("alpine"), None);
2487        assert_eq!(
2488            ContainerId::parse_display(
2489                "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
2490            ),
2491            None
2492        );
2493        assert_eq!(ContainerId::parse_display("alpine-default-x-on-1"), None);
2494        assert_eq!(ContainerId::parse_display("alpine-default-1-on-y"), None);
2495        assert_eq!(ContainerId::parse_display(""), None);
2496    }
2497
2498    #[test]
2499    fn validate_signal_accepts_known_signals() {
2500        // SIG-prefixed form
2501        assert_eq!(validate_signal("SIGKILL").unwrap(), "SIGKILL");
2502        assert_eq!(validate_signal("SIGTERM").unwrap(), "SIGTERM");
2503        assert_eq!(validate_signal("SIGINT").unwrap(), "SIGINT");
2504        assert_eq!(validate_signal("SIGHUP").unwrap(), "SIGHUP");
2505        assert_eq!(validate_signal("SIGUSR1").unwrap(), "SIGUSR1");
2506        assert_eq!(validate_signal("SIGUSR2").unwrap(), "SIGUSR2");
2507
2508        // Bare form (no "SIG" prefix) should be canonicalised.
2509        assert_eq!(validate_signal("KILL").unwrap(), "SIGKILL");
2510        assert_eq!(validate_signal("term").unwrap(), "SIGTERM");
2511        // Whitespace around the name is tolerated.
2512        assert_eq!(validate_signal(" INT ").unwrap(), "SIGINT");
2513    }
2514
2515    #[test]
2516    fn validate_signal_rejects_unknown_or_empty() {
2517        assert!(matches!(
2518            validate_signal(""),
2519            Err(AgentError::InvalidSpec(_))
2520        ));
2521        assert!(matches!(
2522            validate_signal("   "),
2523            Err(AgentError::InvalidSpec(_))
2524        ));
2525        assert!(matches!(
2526            validate_signal("SIGSEGV"),
2527            Err(AgentError::InvalidSpec(_))
2528        ));
2529        assert!(matches!(
2530            validate_signal("NOPE"),
2531            Err(AgentError::InvalidSpec(_))
2532        ));
2533        // Signals outside the POSIX allowlist are rejected even if real.
2534        assert!(matches!(
2535            validate_signal("SIGPIPE"),
2536            Err(AgentError::InvalidSpec(_))
2537        ));
2538    }
2539
2540    #[tokio::test]
2541    async fn mock_kill_container_defaults_to_sigkill() {
2542        let runtime = MockRuntime::new();
2543        let id = ContainerId::new("kill-me".to_string(), 0);
2544        runtime.create_container(&id, &mock_spec()).await.unwrap();
2545        runtime.start_container(&id).await.unwrap();
2546
2547        // `None` -> defaults to SIGKILL; returns Ok and marks the container
2548        // as exited.
2549        runtime.kill_container(&id, None).await.unwrap();
2550        let state = runtime.container_state(&id).await.unwrap();
2551        assert!(
2552            matches!(state, ContainerState::Exited { code: 137 }),
2553            "expected Exited(137), got {state:?}"
2554        );
2555    }
2556
2557    #[test]
2558    fn wait_reason_serializes_as_snake_case() {
2559        assert_eq!(
2560            serde_json::to_string(&WaitReason::Exited).unwrap(),
2561            "\"exited\""
2562        );
2563        assert_eq!(
2564            serde_json::to_string(&WaitReason::Signal).unwrap(),
2565            "\"signal\""
2566        );
2567        assert_eq!(
2568            serde_json::to_string(&WaitReason::OomKilled).unwrap(),
2569            "\"oom_killed\""
2570        );
2571        assert_eq!(
2572            serde_json::to_string(&WaitReason::RuntimeError).unwrap(),
2573            "\"runtime_error\""
2574        );
2575    }
2576
2577    #[test]
2578    fn wait_reason_deserialize_roundtrip() {
2579        for variant in [
2580            WaitReason::Exited,
2581            WaitReason::Signal,
2582            WaitReason::OomKilled,
2583            WaitReason::RuntimeError,
2584        ] {
2585            let s = serde_json::to_string(&variant).unwrap();
2586            let back: WaitReason = serde_json::from_str(&s).unwrap();
2587            assert_eq!(variant, back, "roundtrip failed for {variant:?}");
2588        }
2589    }
2590
2591    #[test]
2592    fn signal_name_from_exit_code_known_signals() {
2593        assert_eq!(signal_name_from_exit_code(137).as_deref(), Some("SIGKILL"));
2594        assert_eq!(signal_name_from_exit_code(143).as_deref(), Some("SIGTERM"));
2595        assert_eq!(signal_name_from_exit_code(130).as_deref(), Some("SIGINT"));
2596        assert_eq!(signal_name_from_exit_code(129).as_deref(), Some("SIGHUP"));
2597        assert_eq!(signal_name_from_exit_code(139).as_deref(), Some("SIGSEGV"));
2598    }
2599
2600    #[test]
2601    fn signal_name_from_exit_code_handles_unknown_and_normal() {
2602        // Normal exits (<= 128) return None.
2603        assert_eq!(signal_name_from_exit_code(0), None);
2604        assert_eq!(signal_name_from_exit_code(1), None);
2605        assert_eq!(signal_name_from_exit_code(128), None);
2606
2607        // Unknown signals produce a stable string form.
2608        assert_eq!(
2609            signal_name_from_exit_code(128 + 99).as_deref(),
2610            Some("signal_99")
2611        );
2612    }
2613
2614    #[tokio::test]
2615    async fn default_wait_outcome_delegates_to_wait_container() {
2616        let runtime = MockRuntime::new();
2617        let id = ContainerId::new("wait-test".to_string(), 0);
2618        runtime.create_container(&id, &mock_spec()).await.unwrap();
2619        runtime.start_container(&id).await.unwrap();
2620
2621        let outcome = runtime.wait_outcome(&id).await.unwrap();
2622        // MockRuntime::wait_container returns 0 for running containers.
2623        assert_eq!(outcome.exit_code, 0);
2624        assert_eq!(outcome.reason, WaitReason::Exited);
2625        assert!(outcome.signal.is_none());
2626        assert!(outcome.finished_at.is_none());
2627    }
2628
2629    #[tokio::test]
2630    async fn mock_kill_container_rejects_bogus_signal() {
2631        let runtime = MockRuntime::new();
2632        let id = ContainerId::new("kill-me".to_string(), 0);
2633        runtime.create_container(&id, &mock_spec()).await.unwrap();
2634        runtime.start_container(&id).await.unwrap();
2635
2636        let err = runtime
2637            .kill_container(&id, Some("SIGFOO"))
2638            .await
2639            .unwrap_err();
2640        assert!(
2641            matches!(err, AgentError::InvalidSpec(_)),
2642            "expected InvalidSpec, got {err:?}"
2643        );
2644    }
2645
2646    // The default trait impls of `logs_stream` and `stats_stream` still return
2647    // `AgentError::Unsupported`; `pull_image_stream` now performs a blocking
2648    // pull and synthesizes a Status+Done progress stream. `MockRuntime`
2649    // overrides all three so tests can pre-script stream output (see
2650    // `mock_logs_stream_yields_queued_items_in_order` and friends below).
2651    // A trivial `BareRuntime` exercises the default trait impls without
2652    // dragging in MockRuntime's overrides.
2653
2654    /// Minimal `Runtime` implementation used to exercise the default trait
2655    /// impls of `logs_stream` / `stats_stream` / `pull_image_stream`. Most
2656    /// methods panic, but `pull_image_with_policy` returns `Ok(())` so the
2657    /// default `pull_image_stream` (which delegates to it) can be exercised.
2658    struct BareRuntime;
2659
2660    #[async_trait::async_trait]
2661    impl Runtime for BareRuntime {
2662        async fn pull_image(&self, _image: &str) -> Result<()> {
2663            unimplemented!()
2664        }
2665        async fn pull_image_with_policy(
2666            &self,
2667            _image: &str,
2668            _policy: PullPolicy,
2669            _auth: Option<&RegistryAuth>,
2670            _source: zlayer_spec::SourcePolicy,
2671        ) -> Result<()> {
2672            // The default `pull_image_stream` delegates here; return Ok so the
2673            // streaming default can be exercised without a real registry.
2674            Ok(())
2675        }
2676        async fn create_container(&self, _id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
2677            unimplemented!()
2678        }
2679        async fn start_container(&self, _id: &ContainerId) -> Result<()> {
2680            unimplemented!()
2681        }
2682        async fn stop_container(&self, _id: &ContainerId, _timeout: Duration) -> Result<()> {
2683            unimplemented!()
2684        }
2685        async fn remove_container(&self, _id: &ContainerId) -> Result<()> {
2686            unimplemented!()
2687        }
2688        async fn container_state(&self, _id: &ContainerId) -> Result<ContainerState> {
2689            unimplemented!()
2690        }
2691        async fn container_logs(&self, _id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
2692            unimplemented!()
2693        }
2694        async fn exec(&self, _id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
2695            unimplemented!()
2696        }
2697        async fn get_container_stats(&self, _id: &ContainerId) -> Result<ContainerStats> {
2698            unimplemented!()
2699        }
2700        async fn wait_container(&self, _id: &ContainerId) -> Result<i32> {
2701            unimplemented!()
2702        }
2703        async fn get_logs(&self, _id: &ContainerId) -> Result<Vec<LogEntry>> {
2704            unimplemented!()
2705        }
2706        async fn get_container_pid(&self, _id: &ContainerId) -> Result<Option<u32>> {
2707            unimplemented!()
2708        }
2709        async fn get_container_ip(&self, _id: &ContainerId) -> Result<Option<IpAddr>> {
2710            unimplemented!()
2711        }
2712    }
2713
2714    #[tokio::test]
2715    async fn default_logs_stream_is_unsupported() {
2716        let runtime = BareRuntime;
2717        let id = ContainerId::new("stream-test".to_string(), 0);
2718        // The success-side `LogsStream` is not `Debug`, so we can't call
2719        // `unwrap_err`; pattern-match on the Result directly instead.
2720        match runtime.logs_stream(&id, LogsStreamOptions::default()).await {
2721            Err(AgentError::Unsupported(_)) => {}
2722            Err(other) => panic!("expected Unsupported, got {other:?}"),
2723            Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2724        }
2725    }
2726
2727    #[tokio::test]
2728    async fn default_stats_stream_is_unsupported() {
2729        let runtime = BareRuntime;
2730        let id = ContainerId::new("stream-test".to_string(), 0);
2731        match runtime.stats_stream(&id).await {
2732            Err(AgentError::Unsupported(_)) => {}
2733            Err(other) => panic!("expected Unsupported, got {other:?}"),
2734            Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2735        }
2736    }
2737
2738    #[tokio::test]
2739    async fn default_pull_image_stream_synthesizes_progress() {
2740        use futures_util::StreamExt as _;
2741
2742        // The default `pull_image_stream` now performs a blocking pull (via
2743        // `pull_image_with_policy`, which `BareRuntime` answers with `Ok`) and
2744        // synthesizes a Status + Done progress stream.
2745        let runtime = BareRuntime;
2746        let stream = runtime
2747            .pull_image_stream("alpine:latest", None)
2748            .await
2749            .expect("default pull_image_stream should succeed when the pull succeeds");
2750        let events: Vec<_> = stream.collect().await;
2751        assert_eq!(events.len(), 2, "expected a Status then a Done event");
2752        assert!(
2753            matches!(events[0], Ok(PullProgress::Status { .. })),
2754            "first event should be a Status line, got {:?}",
2755            events[0]
2756        );
2757        assert!(
2758            matches!(events[1], Ok(PullProgress::Done { .. })),
2759            "second event should be the terminal Done, got {:?}",
2760            events[1]
2761        );
2762    }
2763
2764    #[tokio::test]
2765    async fn default_archive_get_is_unsupported() {
2766        let runtime = BareRuntime;
2767        let id = ContainerId::new("archive-test".to_string(), 0);
2768        match runtime.archive_get(&id, "/etc/hosts").await {
2769            Err(AgentError::Unsupported(_)) => {}
2770            Err(other) => panic!("expected Unsupported, got {other:?}"),
2771            Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2772        }
2773    }
2774
2775    #[tokio::test]
2776    async fn default_archive_put_is_unsupported() {
2777        let runtime = BareRuntime;
2778        let id = ContainerId::new("archive-test".to_string(), 0);
2779        let err = runtime
2780            .archive_put(
2781                &id,
2782                "/tmp",
2783                bytes::Bytes::from_static(&[]),
2784                ArchivePutOptions::default(),
2785            )
2786            .await
2787            .unwrap_err();
2788        assert!(matches!(err, AgentError::Unsupported(_)));
2789    }
2790
2791    #[tokio::test]
2792    async fn default_archive_head_is_unsupported() {
2793        let runtime = BareRuntime;
2794        let id = ContainerId::new("archive-test".to_string(), 0);
2795        let err = runtime.archive_head(&id, "/etc/hosts").await.unwrap_err();
2796        assert!(matches!(err, AgentError::Unsupported(_)));
2797    }
2798
2799    #[tokio::test]
2800    async fn default_exec_pty_is_unsupported() {
2801        let runtime = BareRuntime;
2802        let id = ContainerId::new("exec-pty".to_string(), 0);
2803        // The success-side `ExecHandle` is not `Debug`, so we can't call
2804        // `unwrap_err`; pattern-match on the Result directly instead.
2805        match runtime.exec_pty(&id, ExecOptions::default()).await {
2806            Err(AgentError::Unsupported(_)) => {}
2807            Err(other) => panic!("expected Unsupported, got {other:?}"),
2808            Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2809        }
2810    }
2811
2812    #[tokio::test]
2813    async fn default_inspect_image_native_is_unsupported() {
2814        let runtime = BareRuntime;
2815        let err = runtime.inspect_image_native("alpine").await.unwrap_err();
2816        assert!(matches!(err, AgentError::Unsupported(_)));
2817    }
2818
2819    #[tokio::test]
2820    async fn default_image_history_is_unsupported() {
2821        let runtime = BareRuntime;
2822        let err = runtime.image_history("alpine").await.unwrap_err();
2823        assert!(matches!(err, AgentError::Unsupported(_)));
2824    }
2825
2826    #[tokio::test]
2827    async fn default_search_images_is_unsupported() {
2828        let runtime = BareRuntime;
2829        let err = runtime.search_images("nginx", 10).await.unwrap_err();
2830        assert!(matches!(err, AgentError::Unsupported(_)));
2831    }
2832
2833    #[tokio::test]
2834    async fn default_save_images_is_unsupported() {
2835        let runtime = BareRuntime;
2836        // The success-side stream isn't `Debug`, so pattern-match the Result.
2837        match runtime.save_images(&["alpine".to_string()]).await {
2838            Err(AgentError::Unsupported(_)) => {}
2839            Err(other) => panic!("expected Unsupported, got {other:?}"),
2840            Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2841        }
2842    }
2843
2844    #[tokio::test]
2845    async fn default_load_images_is_unsupported() {
2846        let runtime = BareRuntime;
2847        match runtime
2848            .load_images(bytes::Bytes::from_static(&[]), false)
2849            .await
2850        {
2851            Err(AgentError::Unsupported(_)) => {}
2852            Err(other) => panic!("expected Unsupported, got {other:?}"),
2853            Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2854        }
2855    }
2856
2857    #[tokio::test]
2858    async fn default_import_image_is_unsupported() {
2859        let runtime = BareRuntime;
2860        let err = runtime
2861            .import_image(bytes::Bytes::from_static(&[]), None, None)
2862            .await
2863            .unwrap_err();
2864        assert!(matches!(err, AgentError::Unsupported(_)));
2865    }
2866
2867    #[tokio::test]
2868    async fn default_export_container_fs_is_unsupported() {
2869        let runtime = BareRuntime;
2870        let id = ContainerId::new("export".to_string(), 0);
2871        match runtime.export_container_fs(&id).await {
2872            Err(AgentError::Unsupported(_)) => {}
2873            Err(other) => panic!("expected Unsupported, got {other:?}"),
2874            Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2875        }
2876    }
2877
2878    #[tokio::test]
2879    async fn default_commit_container_is_unsupported() {
2880        let runtime = BareRuntime;
2881        let id = ContainerId::new("commit".to_string(), 0);
2882        let err = runtime
2883            .commit_container(&id, &CommitOptions::default())
2884            .await
2885            .unwrap_err();
2886        assert!(matches!(err, AgentError::Unsupported(_)));
2887    }
2888
2889    #[test]
2890    fn load_progress_serializes_with_kind_discriminator() {
2891        let status = LoadProgress::Status {
2892            id: Some("abc".to_string()),
2893            status: "Loading layer".to_string(),
2894        };
2895        let json = serde_json::to_value(&status).unwrap();
2896        assert_eq!(json["kind"], "status");
2897        assert_eq!(json["status"], "Loading layer");
2898
2899        let done = LoadProgress::Done {
2900            references: vec!["alpine:latest".to_string()],
2901        };
2902        let json = serde_json::to_value(&done).unwrap();
2903        assert_eq!(json["kind"], "done");
2904        assert_eq!(json["references"], serde_json::json!(["alpine:latest"]));
2905    }
2906
2907    #[test]
2908    fn commit_options_default_is_no_op_pause_false() {
2909        let opts = CommitOptions::default();
2910        assert!(opts.repo.is_none());
2911        assert!(opts.tag.is_none());
2912        assert!(opts.comment.is_none());
2913        assert!(opts.author.is_none());
2914        assert!(!opts.pause);
2915        assert!(opts.changes.is_none());
2916    }
2917
2918    #[test]
2919    fn image_inspect_info_default_round_trips_via_serde() {
2920        let info = ImageInspectInfo::default();
2921        let json = serde_json::to_string(&info).unwrap();
2922        let back: ImageInspectInfo = serde_json::from_str(&json).unwrap();
2923        assert_eq!(info, back);
2924    }
2925
2926    #[tokio::test]
2927    async fn mock_logs_stream_yields_queued_items_in_order() {
2928        use futures_util::StreamExt;
2929
2930        let runtime = MockRuntime::new();
2931        let id = ContainerId::new("logs-order".to_string(), 0);
2932
2933        let make_chunk = |s: &str, ch: LogChannel| LogChunk {
2934            stream: ch,
2935            bytes: bytes::Bytes::copy_from_slice(s.as_bytes()),
2936            timestamp: None,
2937        };
2938
2939        runtime
2940            .enqueue_log_chunk(&id, make_chunk("first", LogChannel::Stdout))
2941            .await;
2942        runtime
2943            .enqueue_log_chunk(&id, make_chunk("second", LogChannel::Stderr))
2944            .await;
2945        runtime
2946            .enqueue_log_chunk(&id, make_chunk("third", LogChannel::Stdout))
2947            .await;
2948
2949        // `follow=false` so the stream ends once the queue is drained.
2950        let opts = LogsStreamOptions {
2951            follow: false,
2952            ..LogsStreamOptions::default()
2953        };
2954        let mut stream = runtime.logs_stream(&id, opts).await.unwrap();
2955
2956        let mut got = Vec::new();
2957        while let Some(item) = stream.next().await {
2958            let chunk = item.unwrap();
2959            got.push((
2960                chunk.stream,
2961                String::from_utf8(chunk.bytes.to_vec()).unwrap(),
2962            ));
2963        }
2964        assert_eq!(
2965            got,
2966            vec![
2967                (LogChannel::Stdout, "first".to_string()),
2968                (LogChannel::Stderr, "second".to_string()),
2969                (LogChannel::Stdout, "third".to_string()),
2970            ]
2971        );
2972    }
2973
2974    #[tokio::test]
2975    async fn mock_logs_stream_empty_queue_ends_immediately_when_not_follow() {
2976        use futures_util::StreamExt;
2977
2978        let runtime = MockRuntime::new();
2979        let id = ContainerId::new("logs-empty".to_string(), 0);
2980
2981        let opts = LogsStreamOptions {
2982            follow: false,
2983            ..LogsStreamOptions::default()
2984        };
2985        let mut stream = runtime.logs_stream(&id, opts).await.unwrap();
2986
2987        // Empty queue + follow=false => stream is closed on first poll.
2988        // Wrap in a short timeout so a regression that hangs would surface
2989        // as a test failure rather than the test runner stalling.
2990        let next = tokio::time::timeout(Duration::from_millis(500), stream.next())
2991            .await
2992            .expect("stream did not terminate; expected immediate close on empty queue");
2993        assert!(
2994            next.is_none(),
2995            "expected stream to be exhausted, got Some(_)"
2996        );
2997    }
2998
2999    #[tokio::test]
3000    async fn mock_stats_stream_yields_queued_samples_in_order() {
3001        use futures_util::StreamExt;
3002
3003        let runtime = MockRuntime::new();
3004        let id = ContainerId::new("stats-order".to_string(), 0);
3005
3006        let now = chrono::Utc::now();
3007        let mk = |cpu: u64| StatsSample {
3008            cpu_total_ns: cpu,
3009            cpu_system_ns: 0,
3010            online_cpus: 1,
3011            mem_used_bytes: 0,
3012            mem_limit_bytes: 0,
3013            net_rx_bytes: 0,
3014            net_tx_bytes: 0,
3015            blkio_read_bytes: 0,
3016            blkio_write_bytes: 0,
3017            pids_current: 0,
3018            pids_limit: None,
3019            timestamp: now,
3020        };
3021
3022        runtime.enqueue_stats_sample(&id, mk(100)).await;
3023        runtime.enqueue_stats_sample(&id, mk(200)).await;
3024        runtime.enqueue_stats_sample(&id, mk(300)).await;
3025
3026        let mut stream = runtime.stats_stream(&id).await.unwrap();
3027
3028        let mut cpus = Vec::new();
3029        while let Some(item) = stream.next().await {
3030            cpus.push(item.unwrap().cpu_total_ns);
3031        }
3032        assert_eq!(cpus, vec![100, 200, 300]);
3033    }
3034
3035    #[tokio::test]
3036    async fn mock_pull_image_stream_yields_queued_progress_in_order() {
3037        use futures_util::StreamExt;
3038
3039        let runtime = MockRuntime::new();
3040        let image = "alpine:latest";
3041
3042        runtime
3043            .enqueue_pull_progress(
3044                image,
3045                PullProgress::Status {
3046                    id: Some("layer-1".to_string()),
3047                    status: "Pulling fs layer".to_string(),
3048                    progress: None,
3049                    current: None,
3050                    total: None,
3051                },
3052            )
3053            .await;
3054        runtime
3055            .enqueue_pull_progress(
3056                image,
3057                PullProgress::Status {
3058                    id: Some("layer-1".to_string()),
3059                    status: "Downloading".to_string(),
3060                    progress: Some("[==>  ] 1MB/4MB".to_string()),
3061                    current: Some(1024 * 1024),
3062                    total: Some(4 * 1024 * 1024),
3063                },
3064            )
3065            .await;
3066        runtime
3067            .enqueue_pull_progress(
3068                image,
3069                PullProgress::Done {
3070                    reference: image.to_string(),
3071                    digest: Some("sha256:deadbeef".to_string()),
3072                },
3073            )
3074            .await;
3075
3076        let mut stream = runtime.pull_image_stream(image, None).await.unwrap();
3077        let mut events = Vec::new();
3078        while let Some(item) = stream.next().await {
3079            events.push(item.unwrap());
3080        }
3081
3082        assert_eq!(events.len(), 3);
3083        match &events[0] {
3084            PullProgress::Status { status, .. } => assert_eq!(status, "Pulling fs layer"),
3085            done @ PullProgress::Done { .. } => panic!("expected Status, got {done:?}"),
3086        }
3087        match &events[1] {
3088            PullProgress::Status {
3089                status,
3090                current,
3091                total,
3092                ..
3093            } => {
3094                assert_eq!(status, "Downloading");
3095                assert_eq!(*current, Some(1024 * 1024));
3096                assert_eq!(*total, Some(4 * 1024 * 1024));
3097            }
3098            done @ PullProgress::Done { .. } => panic!("expected Status, got {done:?}"),
3099        }
3100        match &events[2] {
3101            PullProgress::Done { reference, digest } => {
3102                assert_eq!(reference, image);
3103                assert_eq!(digest.as_deref(), Some("sha256:deadbeef"));
3104            }
3105            status @ PullProgress::Status { .. } => panic!("expected Done, got {status:?}"),
3106        }
3107    }
3108
3109    #[test]
3110    fn log_channel_serializes_as_snake_case() {
3111        assert_eq!(
3112            serde_json::to_string(&LogChannel::Stdin).unwrap(),
3113            "\"stdin\""
3114        );
3115        assert_eq!(
3116            serde_json::to_string(&LogChannel::Stdout).unwrap(),
3117            "\"stdout\""
3118        );
3119        assert_eq!(
3120            serde_json::to_string(&LogChannel::Stderr).unwrap(),
3121            "\"stderr\""
3122        );
3123    }
3124
3125    fn mock_spec() -> ServiceSpec {
3126        use zlayer_spec::*;
3127        serde_yaml::from_str::<DeploymentSpec>(
3128            r"
3129version: v1
3130deployment: test
3131services:
3132  test:
3133    rtype: service
3134    image:
3135      name: test:latest
3136    endpoints:
3137      - name: http
3138        protocol: http
3139        port: 8080
3140",
3141        )
3142        .unwrap()
3143        .services
3144        .remove("test")
3145        .unwrap()
3146    }
3147}