Skip to main content

zlayer_agent/
runtime.rs

1//! Abstract container runtime interface
2//!
3//! Defines the Runtime trait that can be implemented for different container runtimes
4//! (containerd, CRI-O, etc.)
5
6use crate::cgroups_stats::ContainerStats;
7use crate::error::{AgentError, Result};
8use futures_util::Stream;
9use std::collections::VecDeque;
10use std::future::Future;
11use std::net::IpAddr;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::io::{AsyncRead, AsyncWrite};
16use tokio::sync::Mutex;
17use tokio::task::JoinHandle;
18use zlayer_observability::logs::{LogEntry, LogSource, LogStream};
19use zlayer_spec::{PullPolicy, RegistryAuth, ServiceSpec};
20
21/// Container state
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub enum ContainerState {
24    /// Container is being pulled/created
25    Pending,
26    /// Init actions are running
27    Initializing,
28    /// Container is running
29    Running,
30    /// Container is stopping
31    Stopping,
32    /// Container has exited
33    Exited { code: i32 },
34    /// Container failed
35    Failed { reason: String },
36}
37
38/// Container identifier
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40pub struct ContainerId {
41    pub service: String,
42    pub replica: u32,
43}
44
45impl std::fmt::Display for ContainerId {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        write!(f, "{}-rep-{}", self.service, self.replica)
48    }
49}
50
51/// Container handle
52pub struct Container {
53    pub id: ContainerId,
54    pub state: ContainerState,
55    pub pid: Option<u32>,
56    pub task: Option<JoinHandle<std::io::Result<()>>>,
57    /// Overlay network IP address assigned to this container
58    pub overlay_ip: Option<IpAddr>,
59    /// Health monitor task handle for this container
60    pub health_monitor: Option<JoinHandle<()>>,
61    /// Runtime-assigned port override (used by macOS sandbox where all
62    /// containers share the host network and need unique ports).
63    /// When `Some(port)`, the proxy should use this port instead of the
64    /// spec-declared endpoint port for this specific container's backend address.
65    pub port_override: Option<u16>,
66}
67
68/// Summary information about a cached image on the host runtime.
69#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
70pub struct ImageInfo {
71    /// Canonical image reference (e.g. `zachhandley/zlayer-manager:latest`).
72    pub reference: String,
73    /// Content-addressed digest if known (`sha256:...`). `None` when the
74    /// backend only tracks images by tag.
75    pub digest: Option<String>,
76    /// Total on-disk / in-cache size in bytes, when available.
77    pub size_bytes: Option<u64>,
78}
79
80/// Result of a prune operation.
81#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
82pub struct PruneResult {
83    /// Image references that were removed.
84    pub deleted: Vec<String>,
85    /// Bytes reclaimed from the cache. `0` when the backend cannot report.
86    pub space_reclaimed: u64,
87}
88
89/// Reason a container stopped running, as reported by [`Runtime::wait_outcome`].
90///
91/// Serialized as `snake_case` strings on the wire (`exited`, `signal`,
92/// `oom_killed`, `runtime_error`) so the API DTO can emit the reason as-is
93/// without a second translation layer.
94#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
95#[serde(rename_all = "snake_case")]
96pub enum WaitReason {
97    /// Container exited normally.
98    Exited,
99    /// Container was killed by a signal (e.g. `SIGKILL`, `SIGTERM`).
100    Signal,
101    /// Container was killed by the OOM killer.
102    OomKilled,
103    /// Runtime-side failure (pre-start error, runtime crash, etc.).
104    RuntimeError,
105}
106
107/// Wait condition mirroring Docker's `POST /containers/{id}/wait?condition=`.
108///
109/// Maps 1:1 to the wire form (`not-running`, `next-exit`, `removed`) via
110/// kebab-case serde so the daemon and the Docker compat shim can deserialize
111/// the query parameter in a single step. The default condition is
112/// [`WaitCondition::NotRunning`], matching Docker's behaviour when the
113/// `condition` query param is omitted.
114#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
115#[serde(rename_all = "kebab-case")]
116pub enum WaitCondition {
117    /// Wait until the container is not running. This is the default when
118    /// the caller doesn't specify a condition. Returns immediately if the
119    /// container has already exited; otherwise blocks until it does.
120    #[default]
121    NotRunning,
122    /// Wait for the next container exit, even if the container is already
123    /// stopped at the time of the call. Restarts the wait loop on each
124    /// observed exit.
125    NextExit,
126    /// Wait until the container is removed. Useful in conjunction with
127    /// `--rm` / `AutoRemove` lifecycle wiring.
128    Removed,
129}
130
131impl WaitCondition {
132    /// Return the wire string this variant serializes to (`"not-running"`,
133    /// `"next-exit"`, `"removed"`), matching Docker's `condition=` query
134    /// parameter spelling.
135    #[must_use]
136    pub const fn as_wire_str(self) -> &'static str {
137        match self {
138            Self::NotRunning => "not-running",
139            Self::NextExit => "next-exit",
140            Self::Removed => "removed",
141        }
142    }
143
144    /// Parse a Docker-style condition string (`"not-running"`, `"next-exit"`,
145    /// `"removed"`). Returns `None` for unknown values so callers can
146    /// distinguish "default" (omitted) from "rejected".
147    #[must_use]
148    pub fn from_wire_str(s: &str) -> Option<Self> {
149        match s {
150            "not-running" | "" => Some(Self::NotRunning),
151            "next-exit" => Some(Self::NextExit),
152            "removed" => Some(Self::Removed),
153            _ => None,
154        }
155    }
156}
157
158/// Richer wait result returned by [`Runtime::wait_outcome`].
159///
160/// Backwards-compatible with [`Runtime::wait_container`] (which returns just
161/// `exit_code`). The API handler uses this to populate the extended
162/// `ContainerWaitResponse` fields (`reason`, `signal`, `finished_at`) while
163/// existing callers that only need the exit code can keep using
164/// `wait_container`.
165#[derive(Debug, Clone)]
166pub struct WaitOutcome {
167    /// Process exit code (0 = success). When the container was killed by
168    /// signal `N`, this is typically `128 + N`.
169    pub exit_code: i32,
170    /// Classification of the exit.
171    pub reason: WaitReason,
172    /// Signal name when `reason == WaitReason::Signal`, e.g. `"SIGKILL"`.
173    /// Derived from `exit_code - 128` on a best-effort basis.
174    pub signal: Option<String>,
175    /// Time the container exited, if the runtime reports it.
176    pub finished_at: Option<chrono::DateTime<chrono::Utc>>,
177}
178
179impl WaitOutcome {
180    /// Build a plain `Exited` outcome with no signal/timestamp metadata โ€” the
181    /// default that matches the pre-ยง3.12 behaviour.
182    #[must_use]
183    pub fn exited(exit_code: i32) -> Self {
184        Self {
185            exit_code,
186            reason: WaitReason::Exited,
187            signal: None,
188            finished_at: None,
189        }
190    }
191}
192
193/// Map a signal-style exit code (`128 + N`) to a canonical signal name.
194///
195/// Recognises common POSIX signals and falls back to `signal_<n>` for
196/// unknown numbers so the caller always gets *something* readable.
197#[must_use]
198pub fn signal_name_from_exit_code(exit_code: i32) -> Option<String> {
199    if exit_code <= 128 {
200        return None;
201    }
202    let n = exit_code - 128;
203    let name = match n {
204        1 => "SIGHUP",
205        2 => "SIGINT",
206        3 => "SIGQUIT",
207        4 => "SIGILL",
208        6 => "SIGABRT",
209        7 => "SIGBUS",
210        8 => "SIGFPE",
211        9 => "SIGKILL",
212        10 => "SIGUSR1",
213        11 => "SIGSEGV",
214        12 => "SIGUSR2",
215        13 => "SIGPIPE",
216        14 => "SIGALRM",
217        15 => "SIGTERM",
218        17 => "SIGSTOP",
219        18 => "SIGCONT",
220        _ => return Some(format!("signal_{n}")),
221    };
222    Some(name.to_string())
223}
224
225/// One streaming event emitted by [`Runtime::exec_stream`].
226///
227/// Runtimes push these events as the exec'd command produces output. The final
228/// event for any successful stream is always an [`ExecEvent::Exit`] carrying
229/// the process's exit code.
230#[derive(Debug, Clone, PartialEq, Eq)]
231pub enum ExecEvent {
232    /// A chunk of stdout from the running command. Emitted line-by-line by
233    /// Docker; other runtimes may emit the full buffered output in one event.
234    Stdout(String),
235    /// A chunk of stderr from the running command.
236    Stderr(String),
237    /// The command has exited with this exit code. Always the final event.
238    Exit(i32),
239}
240
241/// Boxed async stream of [`ExecEvent`]s returned by [`Runtime::exec_stream`].
242pub type ExecEventStream = Pin<Box<dyn Stream<Item = ExecEvent> + Send>>;
243
244/// Options accepted by [`Runtime::exec_pty`].
245///
246/// Mirrors the union of fields exposed by Docker's `POST /containers/{id}/exec`
247/// (`ExecConfig`) so the daemon can pass them through with minimal translation.
248/// Unlike [`Runtime::exec`] / [`Runtime::exec_stream`] (which capture stdout
249/// and stderr separately and return only after the process exits), `exec_pty`
250/// is the interactive entry point: it allocates a PTY when `tty` is set,
251/// streams I/O bidirectionally over a single duplex byte stream, and returns
252/// an [`ExecHandle`] that the caller drives concurrently with the running
253/// process.
254#[derive(Debug, Clone, Default, PartialEq, Eq)]
255#[allow(clippy::struct_excessive_bools)] // mirrors Docker's `ExecConfig` 1:1
256pub struct ExecOptions {
257    /// The argv vector for the exec'd process (`command[0]` is the binary).
258    pub command: Vec<String>,
259    /// Extra environment variables, in `KEY=VALUE` form. Merged into the
260    /// container's existing env on the runtime side.
261    pub env: Vec<String>,
262    /// Optional working directory inside the container. `None` keeps the
263    /// container's default `WORKDIR`.
264    pub working_dir: Option<String>,
265    /// Optional `user[:group]` override. `None` keeps the container's
266    /// configured user.
267    pub user: Option<String>,
268    /// Run the exec with privileged capabilities (Docker `Privileged`).
269    pub privileged: bool,
270    /// Allocate a TTY for the exec'd process (Docker `Tty`). When `true`, the
271    /// runtime should set up a pseudo-terminal pair and the duplex stream on
272    /// the returned [`ExecHandle`] carries multiplexed PTY traffic; when
273    /// `false`, the stream carries raw stdout/stderr without PTY framing.
274    pub tty: bool,
275    /// Attach stdin so the caller can write to the process (Docker
276    /// `AttachStdin`). When `false`, the writable half of the duplex stream
277    /// is effectively a no-op.
278    pub attach_stdin: bool,
279    /// Attach stdout so the caller receives the process's stdout on the
280    /// readable half of the duplex stream (Docker `AttachStdout`).
281    pub attach_stdout: bool,
282    /// Attach stderr so the caller receives the process's stderr on the
283    /// readable half of the duplex stream (Docker `AttachStderr`).
284    pub attach_stderr: bool,
285}
286
287/// Marker supertrait combining [`AsyncRead`] + [`AsyncWrite`] so they can be
288/// used together as a single trait object. Rust forbids stacking two
289/// non-auto traits directly in `dyn`, so [`ExecPtyStream`] is built on top
290/// of this helper instead.
291///
292/// A blanket impl below covers every type that already satisfies the four
293/// component bounds, so callers never need to implement `ExecDuplex`
294/// manually โ€” they just hand the runtime any concrete duplex stream that's
295/// `AsyncRead + AsyncWrite + Send + Unpin`.
296pub trait ExecDuplex: AsyncRead + AsyncWrite + Send + Unpin {}
297
298impl<T> ExecDuplex for T where T: AsyncRead + AsyncWrite + Send + Unpin + ?Sized {}
299
300/// Duplex byte stream used by [`ExecHandle`] to shuttle stdin/stdout (and,
301/// when [`ExecOptions::tty`] is set, multiplexed PTY traffic) between the
302/// caller and the exec'd process.
303///
304/// `Unpin` is required (via [`ExecDuplex`]) so callers can poll the trait
305/// object directly via the usual `tokio::io::AsyncReadExt` /
306/// `AsyncWriteExt` extension methods without having to pin the box
307/// themselves.
308pub type ExecPtyStream = Box<dyn ExecDuplex + 'static>;
309
310/// Future returned by [`ExecHandle`] that resolves with the exec'd process's
311/// exit code once the runtime observes it has terminated.
312pub type ExecExitFuture = Pin<Box<dyn Future<Output = Result<i32>> + Send>>;
313
314/// Runtime-side handle returned by [`Runtime::exec_pty`].
315///
316/// Bundles everything a long-lived interactive exec session needs:
317///
318/// 1. A duplex [`ExecPtyStream`] for shuttling stdin/stdout (or full PTY
319///    traffic when `tty` is set) between the caller and the running process.
320/// 2. A `tokio::sync::mpsc::Sender<(rows, cols)>` so the caller can resize
321///    the allocated PTY in response to terminal-size changes (mirrors
322///    Docker's `POST /exec/{id}/resize` endpoint). Runtimes that don't allocate
323///    a PTY should still accept the channel and treat resize messages as
324///    no-ops; the channel is dropped on the runtime side once the process
325///    exits.
326/// 3. A boxed [`ExecExitFuture`] that resolves with the exit code once the
327///    runtime detects the process has terminated.
328///
329/// `ExecHandle` deliberately does not implement `Debug` / `Clone` because
330/// every field holds a trait object (or, in the exit future's case, an opaque
331/// boxed future). Consumers move the fields out by destructuring and then
332/// drive the I/O stream, the resize channel, and the exit future
333/// independently.
334pub struct ExecHandle {
335    /// Bidirectional byte channel between the caller and the exec'd process.
336    pub stream: ExecPtyStream,
337    /// Channel for sending `(rows, cols)` resize requests for the PTY
338    /// allocated to the exec session. Bounded so a stuck runtime can't make
339    /// the caller buffer unbounded resize events; senders should drop the
340    /// most recent excess size on backpressure.
341    pub resize: tokio::sync::mpsc::Sender<(u16, u16)>,
342    /// Future that resolves with the process's exit code once the runtime
343    /// observes the exec has terminated. Consumers typically `await` this on
344    /// a dedicated task while pumping the duplex stream on another.
345    pub exit: ExecExitFuture,
346}
347
348/// Which standard stream a [`LogChunk`] originated from.
349///
350/// Mirrors the three POSIX file descriptors. `Stdin` is included for
351/// completeness โ€” Docker's multiplexed log header carries a `stdin` channel
352/// for attached interactive containers โ€” but most container runtimes only
353/// emit `Stdout` / `Stderr` chunks.
354#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
355#[serde(rename_all = "snake_case")]
356pub enum LogChannel {
357    /// Standard input (rarely emitted; included for completeness with
358    /// Docker's stdcopy framing).
359    Stdin,
360    /// Standard output.
361    Stdout,
362    /// Standard error.
363    Stderr,
364}
365
366/// One chunk of container log output emitted by [`Runtime::logs_stream`].
367///
368/// Streams emit one `LogChunk` per line (or per Docker stdcopy frame) as
369/// data is produced by the container. Backends that expose timestamps
370/// populate `timestamp`; ones that don't leave it `None`.
371#[derive(Debug, Clone)]
372pub struct LogChunk {
373    /// Which standard stream produced this chunk.
374    pub stream: LogChannel,
375    /// Raw bytes of the chunk. Not necessarily UTF-8 โ€” container output is
376    /// arbitrary binary data and consumers must handle invalid UTF-8.
377    pub bytes: bytes::Bytes,
378    /// When the runtime reported this chunk, when known.
379    pub timestamp: Option<chrono::DateTime<chrono::Utc>>,
380}
381
382/// Options accepted by [`Runtime::logs_stream`].
383///
384/// Mirrors the `GET /containers/{id}/logs` query parameters in the Docker
385/// Engine API so backends can pass them through with minimal translation.
386#[derive(Debug, Clone, Default)]
387#[allow(clippy::struct_excessive_bools)] // mirrors Docker's logs query params 1:1
388pub struct LogsStreamOptions {
389    /// Continue streaming after the current end-of-log marker. When `false`,
390    /// the stream terminates once the runtime has emitted all currently
391    /// buffered logs.
392    pub follow: bool,
393    /// Tail the last N lines before starting to stream. `None` means "all
394    /// available logs from the start".
395    pub tail: Option<u64>,
396    /// Earliest timestamp (Unix seconds) to include. `None` means "no
397    /// lower bound".
398    pub since: Option<i64>,
399    /// Latest timestamp (Unix seconds) to include. `None` means "no upper
400    /// bound".
401    pub until: Option<i64>,
402    /// When `true`, the runtime should populate [`LogChunk::timestamp`] for
403    /// every chunk. Backends that always carry timestamps may ignore this
404    /// flag.
405    pub timestamps: bool,
406    /// Include stdout in the stream.
407    pub stdout: bool,
408    /// Include stderr in the stream.
409    pub stderr: bool,
410}
411
412/// One periodic resource-usage sample emitted by [`Runtime::stats_stream`].
413///
414/// Mirrors the union of fields exposed by Docker's `/containers/{id}/stats`
415/// endpoint and Linux cgroup stat files so downstream consumers (autoscaler,
416/// `docker stats`-compat HTTP shim) can read a single shape regardless of
417/// backend. Counters that the runtime cannot report are left at zero โ€”
418/// missing data is signalled separately via the surrounding stream
419/// metadata.
420#[derive(Debug, Clone)]
421pub struct StatsSample {
422    /// Cumulative container CPU time consumed, in nanoseconds.
423    pub cpu_total_ns: u64,
424    /// Cumulative system CPU time observed at the same moment, in
425    /// nanoseconds. Used to compute relative CPU percentage between
426    /// successive samples (Docker's classic `cpu_delta / system_delta`
427    /// formula).
428    pub cpu_system_ns: u64,
429    /// Number of CPUs currently online for this container. Used as the
430    /// final scaling factor in the CPU-percentage calculation.
431    pub online_cpus: u32,
432    /// Resident memory currently in use, in bytes (cgroup
433    /// `memory.usage_in_bytes` minus inactive page cache for v1, or
434    /// `memory.current` for v2).
435    pub mem_used_bytes: u64,
436    /// Memory limit configured on the container, in bytes. `0` when no
437    /// limit is set (cgroup reports its sentinel value).
438    pub mem_limit_bytes: u64,
439    /// Cumulative bytes received across all attached network interfaces.
440    pub net_rx_bytes: u64,
441    /// Cumulative bytes transmitted across all attached network interfaces.
442    pub net_tx_bytes: u64,
443    /// Cumulative bytes read from block devices.
444    pub blkio_read_bytes: u64,
445    /// Cumulative bytes written to block devices.
446    pub blkio_write_bytes: u64,
447    /// Number of process IDs currently running inside the container's pid
448    /// namespace.
449    pub pids_current: u64,
450    /// Configured pids limit, if any. `None` means unlimited.
451    pub pids_limit: Option<u64>,
452    /// Wallclock time the sample was taken.
453    pub timestamp: chrono::DateTime<chrono::Utc>,
454}
455
456/// One progress event emitted by [`Runtime::pull_image_stream`].
457///
458/// Backends emit a series of `Status` events as layers are downloaded /
459/// extracted, followed by exactly one `Done` event when the pull completes
460/// successfully. Errors mid-pull are propagated as the stream's `Err`
461/// variant and terminate the stream.
462#[derive(Debug, Clone)]
463pub enum PullProgress {
464    /// Progress update for an in-flight layer or stage.
465    Status {
466        /// Layer ID or other backend-specific identifier, when available.
467        id: Option<String>,
468        /// Human-readable status text, e.g. `"Pulling fs layer"`,
469        /// `"Downloading"`, `"Extracting"`. Always present; may be empty
470        /// when the backend has nothing to report this tick.
471        status: String,
472        /// Pre-formatted progress bar (Docker emits a string like
473        /// `"[========>          ] 12.3MB/45.6MB"`). `None` when the
474        /// backend reports raw `current`/`total` only.
475        progress: Option<String>,
476        /// Bytes transferred so far for this layer, when reported.
477        current: Option<u64>,
478        /// Expected total bytes for this layer, when reported.
479        total: Option<u64>,
480    },
481    /// Pull completed successfully.
482    Done {
483        /// Resolved canonical image reference (typically the same as the
484        /// requested reference, but may include a digest the backend
485        /// resolved).
486        reference: String,
487        /// Content-addressed digest, when the backend reports one.
488        digest: Option<String>,
489    },
490}
491
492/// Boxed async stream of `Result<LogChunk, AgentError>` items returned by
493/// [`Runtime::logs_stream`].
494///
495/// `'static` lifetime so handlers can hold the stream past the trait method's
496/// borrow of `self`.
497pub type LogsStream = Pin<Box<dyn Stream<Item = Result<LogChunk>> + Send + 'static>>;
498
499/// Boxed async stream of `Result<StatsSample, AgentError>` items returned by
500/// [`Runtime::stats_stream`].
501pub type StatsStream = Pin<Box<dyn Stream<Item = Result<StatsSample>> + Send + 'static>>;
502
503/// Boxed async stream of `Result<PullProgress, AgentError>` items returned by
504/// [`Runtime::pull_image_stream`].
505pub type PullProgressStream = Pin<Box<dyn Stream<Item = Result<PullProgress>> + Send + 'static>>;
506
507/// Boxed async stream of TAR-archive byte chunks returned by
508/// [`Runtime::archive_get`].
509///
510/// Each yielded `Bytes` is a contiguous slice of the TAR archive that the
511/// runtime is producing for the requested container path. The stream ends
512/// once the archive is fully written. Mid-stream errors map to
513/// [`AgentError`] variants.
514pub type ArchiveStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes>> + Send + 'static>>;
515
516/// Stat metadata for a single path inside a container, returned by
517/// [`Runtime::archive_head`].
518///
519/// Mirrors Docker's `X-Docker-Container-Path-Stat` header payload (a
520/// base64-encoded JSON object with `name`, `size`, `mode`, `mtime`, and
521/// `linkTarget` fields). The API layer serializes this back into the same
522/// header for the `HEAD /containers/{id}/archive` response.
523#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
524pub struct PathStat {
525    /// Base name of the path (e.g. `"foo.txt"` for `/etc/foo.txt`).
526    pub name: String,
527    /// File size in bytes. For directories this is the size reported by the
528    /// runtime (typically the directory entry size, not the recursive sum).
529    pub size: i64,
530    /// Unix file mode bits (`S_IFMT | S_IRWXU | ...`). Encoded as a `u32` so
531    /// it round-trips through the Docker JSON header losslessly.
532    pub mode: u32,
533    /// Last-modification time as an RFC 3339 string. `None` when the runtime
534    /// cannot report it.
535    pub mtime: Option<String>,
536    /// Target of a symbolic link, when the path is a symlink. Empty string
537    /// for non-symlink paths (matching Docker's wire shape).
538    pub link_target: String,
539}
540
541/// Options accepted by [`Runtime::archive_put`].
542///
543/// Mirrors the query parameters Docker accepts on
544/// `PUT /containers/{id}/archive`:
545///
546/// * `noOverwriteDirNonDir=1` โ€” refuse to replace a non-directory with a
547///   directory or vice versa. Default `false`.
548/// * `copyUIDGID=1` โ€” preserve UID/GID of files in the archive instead of
549///   chown'ing them to the container's user. Default `false`.
550#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
551pub struct ArchivePutOptions {
552    /// When `true`, the runtime must reject puts that would replace a
553    /// non-directory with a directory (or vice versa) at the destination.
554    pub no_overwrite_dir_non_dir: bool,
555    /// When `true`, preserve UID/GID of files in the archive verbatim.
556    pub copy_uid_gid: bool,
557}
558
559/// Per-network attachment reported by [`Runtime::inspect_detailed`].
560///
561/// Mirrors the subset of bollard's `EndpointSettings` that the API needs to
562/// populate `ContainerInfo.networks` for standalone containers. Kept in
563/// `zlayer-agent` (rather than `zlayer-spec`) because it's a runtime-level
564/// inspect result, not a deployment specification.
565#[derive(Debug, Clone, Default, PartialEq, Eq)]
566pub struct NetworkAttachmentDetail {
567    /// Network name as reported by the runtime (Docker's key in
568    /// `NetworkSettings.Networks`, e.g. `"bridge"` or a user-defined network
569    /// name).
570    pub network: String,
571    /// DNS aliases the container answers to on this network. Empty when the
572    /// runtime doesn't surface aliases.
573    pub aliases: Vec<String>,
574    /// Assigned IPv4 address on this network, if any. Empty strings are
575    /// normalised to `None`.
576    pub ipv4: Option<String>,
577}
578
579/// Per-container health detail reported by [`Runtime::inspect_detailed`].
580///
581/// Sourced directly from bollard's `ContainerState.health` (Docker's native
582/// healthcheck tracking). Our internal `HealthMonitor` in
583/// `crates/zlayer-agent/src/health.rs` drives service-level health events; for
584/// standalone containers the API reports the runtime-native status instead so
585/// that images with a baked-in `HEALTHCHECK` show up correctly.
586#[derive(Debug, Clone, Default, PartialEq, Eq)]
587pub struct HealthDetail {
588    /// One of `"none"`, `"starting"`, `"healthy"`, `"unhealthy"` (Docker's
589    /// `HealthStatusEnum`). Empty string is normalised to `"none"` upstream.
590    pub status: String,
591    /// Consecutive failing probe count, if the runtime reports it.
592    pub failing_streak: Option<u32>,
593    /// Output from the most recent failing probe, when available.
594    pub last_output: Option<String>,
595}
596
597/// Rich inspect details for a single container, returned by
598/// [`Runtime::inspect_detailed`].
599///
600/// Carries the fields `ContainerInfo` needs on top of the bare
601/// [`ContainerState`] reported by [`Runtime::container_state`]:
602/// published ports, attached networks, first IPv4, health, and `exit_code`.
603///
604/// Default is an all-empty record โ€” that's what the default trait method
605/// returns for runtimes that don't (yet) implement rich inspect, and the API
606/// layer treats all fields as purely additive, so a default record still
607/// produces a backwards-compatible `ContainerInfo`.
608#[derive(Debug, Clone, Default, PartialEq, Eq)]
609pub struct ContainerInspectDetails {
610    /// Published port mappings (container โ†’ host), translated back from the
611    /// runtime's internal port-binding map.
612    pub ports: Vec<zlayer_spec::PortMapping>,
613    /// Networks the container is attached to, plus the aliases + IPv4 for each.
614    pub networks: Vec<NetworkAttachmentDetail>,
615    /// First non-empty IPv4 address found across the container's networks,
616    /// useful as a "primary" IP for simple clients that don't want to iterate
617    /// `networks`. `None` when the container isn't on any network with an IP.
618    pub ipv4: Option<String>,
619    /// Health status when the container has a Docker-native `HEALTHCHECK` or
620    /// the runtime otherwise reports a health state.
621    pub health: Option<HealthDetail>,
622    /// Most recent exit code, when the runtime reports one. `None` for
623    /// containers that are still running and have never exited.
624    pub exit_code: Option<i32>,
625}
626
627/// Lightweight summary of a single container reported by
628/// [`Runtime::list_containers`].
629///
630/// Reconciliation only needs to match runtime containers against
631/// `ZLayer`'s own metadata, which lives in the `com.zlayer.container_id`
632/// label (see `zlayer-api::handlers::container_id_map::ZLAYER_CONTAINER_ID_LABEL`).
633/// Carrying that label value plus the runtime-native id is sufficient for
634/// the standalone-container reconcile pass; richer fields can be added
635/// when concrete callers need them.
636#[derive(Debug, Clone, Default, PartialEq, Eq)]
637pub struct RuntimeContainerSummary {
638    /// Backend-native container handle (Docker's 64-char hex, the youki
639    /// state-dir name, etc.). Opaque to the reconciler โ€” only used for
640    /// logging and to disambiguate listings.
641    pub runtime_id: String,
642    /// Value of the `com.zlayer.container_id` label, if the runtime
643    /// reports one for this container. Containers without this label are
644    /// foreign (not ZLayer-managed) and should be ignored by reconcile.
645    pub zlayer_container_id_label: Option<String>,
646}
647
648/// One row of process information returned by [`Runtime::top_container`].
649///
650/// Mirrors Docker's `GET /containers/{id}/top` response shape: a `Titles`
651/// vector that names each column, plus a `Processes` matrix where each row is
652/// the per-process column values. The runtime decides which `ps` fields to
653/// emit; the API/Docker shim forwards them verbatim.
654#[derive(Debug, Clone, Default, PartialEq, Eq)]
655pub struct ContainerTopOutput {
656    /// Column titles (e.g. `["UID", "PID", "PPID", "C", "STIME", "TTY", "TIME", "CMD"]`).
657    pub titles: Vec<String>,
658    /// One row per process; each row has the same length as `titles`.
659    pub processes: Vec<Vec<String>>,
660}
661
662/// Filesystem change kind reported by [`Runtime::changes_container`].
663///
664/// Matches Docker's numeric encoding: `0 = Modified`, `1 = Added`, `2 = Deleted`.
665#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
666#[serde(rename_all = "snake_case")]
667pub enum FilesystemChangeKind {
668    /// File or directory was modified in the container's writable layer.
669    Modified,
670    /// File or directory was added.
671    Added,
672    /// File or directory was deleted.
673    Deleted,
674}
675
676impl FilesystemChangeKind {
677    /// Numeric wire value used by Docker's `/containers/{id}/changes`.
678    #[must_use]
679    pub const fn as_docker_kind(self) -> u8 {
680        match self {
681            Self::Modified => 0,
682            Self::Added => 1,
683            Self::Deleted => 2,
684        }
685    }
686}
687
688/// One filesystem change reported by [`Runtime::changes_container`].
689#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
690pub struct FilesystemChangeEntry {
691    /// Path inside the container that changed (absolute, e.g. `/etc/hosts`).
692    pub path: String,
693    /// Kind of change.
694    pub kind: FilesystemChangeKind,
695}
696
697/// One published port mapping entry returned by [`Runtime::port_mappings_container`].
698///
699/// Mirrors one entry of Docker's `/containers/{id}/port` map. A single
700/// container port may bind multiple host endpoints (e.g. IPv4 + IPv6); each
701/// such binding yields one [`PortMappingEntry`].
702#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
703pub struct PortMappingEntry {
704    /// Container port number that's published.
705    pub container_port: u16,
706    /// Protocol (`"tcp"`, `"udp"`, `"sctp"`).
707    pub protocol: String,
708    /// Host IP address that the container's port is mapped to.
709    pub host_ip: Option<String>,
710    /// Host port number that the container's port is mapped to.
711    pub host_port: Option<u16>,
712}
713
714/// Result of a [`Runtime::prune_containers`] call.
715#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
716pub struct ContainerPruneResult {
717    /// Container IDs that were removed (runtime-native or hex form).
718    pub deleted: Vec<String>,
719    /// Bytes reclaimed from the runtime's container storage. `0` when the
720    /// backend cannot report.
721    pub space_reclaimed: u64,
722}
723
724/// Runtime-level container restart policy attached to a
725/// [`ContainerResourceUpdate`].
726///
727/// Mirrors Docker's `HostConfig.RestartPolicy`. `name` is one of `""`,
728/// `"no"`, `"always"`, `"unless-stopped"`, or `"on-failure"`. Backends that
729/// can persist a restart policy (Docker via bollard, Youki via the
730/// supervisor's stored spec) honour it; backends that cannot
731/// (WASM, Mock) leave it unmodified.
732#[derive(Debug, Clone, Default, PartialEq, Eq)]
733pub struct ContainerRestartPolicyUpdate {
734    /// Restart policy name.
735    pub name: Option<String>,
736    /// Maximum retry count (only honoured when `name == "on-failure"`).
737    pub maximum_retry_count: Option<i64>,
738}
739
740/// Resource-update payload passed to [`Runtime::update_container_resources`].
741///
742/// Mirrors the resource subset of Docker's `POST /containers/{id}/update`
743/// body. Every field is `Option<...>`; backends apply only the fields that
744/// are `Some`. A request with all fields `None` is a no-op (consistent with
745/// Docker's behaviour).
746///
747/// Field semantics match Docker's wire shape: see [`ContainerUpdateRequest`]
748/// in `zlayer-types::api::containers` for the JSON encoding.
749#[derive(Debug, Clone, Default, PartialEq, Eq)]
750pub struct ContainerResourceUpdate {
751    /// CPU shares (cgroup `cpu.weight` / `cpu.shares`).
752    pub cpu_shares: Option<i64>,
753    /// Memory limit in bytes.
754    pub memory: Option<i64>,
755    /// CPU CFS period in microseconds.
756    pub cpu_period: Option<i64>,
757    /// CPU CFS quota in microseconds.
758    pub cpu_quota: Option<i64>,
759    /// CPU real-time period in microseconds.
760    pub cpu_realtime_period: Option<i64>,
761    /// CPU real-time runtime in microseconds.
762    pub cpu_realtime_runtime: Option<i64>,
763    /// CPUs allowed for execution (e.g. `"0-3"`).
764    pub cpuset_cpus: Option<String>,
765    /// Memory nodes (NUMA) allowed for execution.
766    pub cpuset_mems: Option<String>,
767    /// Soft memory limit in bytes.
768    pub memory_reservation: Option<i64>,
769    /// Total memory limit (memory + swap) in bytes. `-1` removes swap.
770    pub memory_swap: Option<i64>,
771    /// Kernel memory limit in bytes (deprecated upstream).
772    pub kernel_memory: Option<i64>,
773    /// Block IO weight (10-1000).
774    pub blkio_weight: Option<u16>,
775    /// PIDs limit. `0` or `-1` for unlimited.
776    pub pids_limit: Option<i64>,
777    /// Replacement restart policy. `None` leaves the policy unchanged.
778    pub restart_policy: Option<ContainerRestartPolicyUpdate>,
779}
780
781impl ContainerResourceUpdate {
782    /// Returns `true` when this update would not change anything โ€” every
783    /// field is `None`. Backends short-circuit no-op updates rather than
784    /// touching the cgroup hierarchy.
785    #[must_use]
786    pub fn is_empty(&self) -> bool {
787        self.cpu_shares.is_none()
788            && self.memory.is_none()
789            && self.cpu_period.is_none()
790            && self.cpu_quota.is_none()
791            && self.cpu_realtime_period.is_none()
792            && self.cpu_realtime_runtime.is_none()
793            && self.cpuset_cpus.is_none()
794            && self.cpuset_mems.is_none()
795            && self.memory_reservation.is_none()
796            && self.memory_swap.is_none()
797            && self.kernel_memory.is_none()
798            && self.blkio_weight.is_none()
799            && self.pids_limit.is_none()
800            && self.restart_policy.is_none()
801    }
802}
803
804/// Result of a [`Runtime::update_container_resources`] call. Mirrors
805/// Docker's `{"Warnings": [...]}` response shape: backends append a string
806/// per "we accepted this but did not apply it" or "field deprecated"
807/// warning.
808#[derive(Debug, Clone, Default, PartialEq, Eq)]
809pub struct ContainerUpdateOutcome {
810    /// Human-readable warnings emitted while applying the update.
811    pub warnings: Vec<String>,
812}
813
814/// Detailed image inspect record returned by [`Runtime::inspect_image_native`].
815///
816/// Mirrors the union of fields exposed by Docker's `GET /images/{name}/json`
817/// (bollard's `ImageInspect`) so the API / Docker compat shim can surface a
818/// Docker-shaped JSON response without re-translating later. Optional fields
819/// remain `None` when the backend cannot provide them.
820#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
821pub struct ImageInspectInfo {
822    /// Content-addressed image id (`sha256:...`), when known.
823    pub id: Option<String>,
824    /// All tags (`repo:tag`) currently pointing at this image.
825    pub repo_tags: Vec<String>,
826    /// Manifest digests this image is known under (`repo@sha256:...`).
827    pub repo_digests: Vec<String>,
828    /// Parent image id (`sha256:...`), when the image was built locally.
829    pub parent: Option<String>,
830    /// Human-readable comment recorded at commit/import time.
831    pub comment: Option<String>,
832    /// Creation timestamp in RFC 3339 form.
833    pub created: Option<String>,
834    /// Container id this image was committed from, when applicable.
835    pub container: Option<String>,
836    /// Daemon version that built / imported this image.
837    pub docker_version: Option<String>,
838    /// Author recorded on the image (e.g. `MAINTAINER` instruction).
839    pub author: Option<String>,
840    /// Hardware architecture the image targets (`amd64`, `arm64`, ...).
841    pub architecture: Option<String>,
842    /// Operating system the image targets (`linux`, `windows`, ...).
843    pub os: Option<String>,
844    /// Total on-disk size in bytes, when known.
845    pub size: Option<u64>,
846    /// Layer order: list of `sha256:...` digests, root-most first.
847    pub layers: Vec<String>,
848    /// Container env (`KEY=VALUE`).
849    pub env: Vec<String>,
850    /// Default command vector.
851    pub cmd: Vec<String>,
852    /// Default entrypoint vector.
853    pub entrypoint: Vec<String>,
854    /// Working directory inside the image.
855    pub working_dir: Option<String>,
856    /// User the image runs as by default.
857    pub user: Option<String>,
858    /// Image labels.
859    pub labels: std::collections::BTreeMap<String, String>,
860}
861
862/// One row of an image's history, returned by [`Runtime::image_history`].
863///
864/// Mirrors Docker's `GET /images/{name}/history` response.
865#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
866pub struct ImageHistoryEntry {
867    /// Layer / image id (`sha256:...`). May be `<missing>` for layers that
868    /// were dropped during a squash.
869    pub id: String,
870    /// Unix-seconds timestamp when this layer was created.
871    pub created: i64,
872    /// Dockerfile-style instruction that produced this layer.
873    pub created_by: String,
874    /// Tags that point at this specific layer.
875    pub tags: Vec<String>,
876    /// Layer size in bytes.
877    pub size: u64,
878    /// Optional comment recorded with the layer.
879    pub comment: String,
880}
881
882/// One result returned by [`Runtime::search_images`].
883///
884/// Mirrors Docker's `GET /images/search` response items.
885#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
886pub struct ImageSearchResult {
887    /// Image name (e.g. `library/nginx`).
888    pub name: String,
889    /// Free-text description of the image.
890    pub description: String,
891    /// Number of stars on the source registry, when reported.
892    pub star_count: u64,
893    /// Whether the image is officially curated.
894    pub official: bool,
895    /// Whether the image was produced by an automated build (deprecated by
896    /// Docker but still surfaced for compatibility).
897    pub automated: bool,
898}
899
900/// Result of a [`Runtime::commit_container`] call.
901#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
902pub struct CommitOutcome {
903    /// Content-addressed image id of the freshly created image.
904    pub id: String,
905}
906
907/// Options accepted by [`Runtime::commit_container`].
908///
909/// Mirrors Docker's `POST /commit` query parameters. All fields are optional
910/// โ€” when `repo`/`tag` are both empty the runtime creates an untagged image.
911#[derive(Debug, Clone, Default, PartialEq, Eq)]
912pub struct CommitOptions {
913    /// Repository name to apply to the committed image (e.g. `myapp`).
914    pub repo: Option<String>,
915    /// Tag to apply (defaults to `latest` when `repo` is set and tag is empty).
916    pub tag: Option<String>,
917    /// Free-form comment to record on the committed image.
918    pub comment: Option<String>,
919    /// Author to record on the committed image.
920    pub author: Option<String>,
921    /// Whether to pause the container before committing (defaults to `true`).
922    pub pause: bool,
923    /// Dockerfile-style instructions to apply during commit.
924    pub changes: Option<String>,
925}
926
927/// Boxed async stream of TAR-archive byte chunks returned by image save /
928/// container export endpoints. Each yielded `Bytes` is a contiguous slice
929/// of an uncompressed TAR archive.
930pub type ImageExportStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes>> + Send + 'static>>;
931
932/// One progress event emitted by [`Runtime::load_images`].
933///
934/// `Status` events carry per-line progress reported by the daemon while
935/// the tar is being unpacked; `Done` is emitted exactly once when load
936/// completes successfully.
937#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
938#[serde(tag = "kind", rename_all = "snake_case")]
939pub enum LoadProgress {
940    /// Mid-load progress entry.
941    Status {
942        /// Layer or image id when reported.
943        #[serde(default, skip_serializing_if = "Option::is_none")]
944        id: Option<String>,
945        /// Human-readable status text.
946        status: String,
947    },
948    /// Load completed. `references` lists the image references that were
949    /// loaded into the cache.
950    Done {
951        /// Loaded image references (`repo:tag` or `sha256:...`).
952        references: Vec<String>,
953    },
954}
955
956/// Boxed async stream of [`LoadProgress`] events returned by
957/// [`Runtime::load_images`].
958pub type LoadProgressStream = Pin<Box<dyn Stream<Item = Result<LoadProgress>> + Send + 'static>>;
959
960/// Abstract container runtime trait
961///
962/// This trait abstracts over different container runtimes (containerd, CRI-O, etc.)
963#[async_trait::async_trait]
964pub trait Runtime: Send + Sync {
965    /// Pull an image to local storage
966    async fn pull_image(&self, image: &str) -> Result<()>;
967
968    /// Pull an image to local storage with a specific policy.
969    ///
970    /// When `auth` is `Some`, the runtime uses those inline credentials for
971    /// the pull (ยง3.10 of `ZLAYER_SDK_FIXES.md`). When `auth` is `None`, the
972    /// runtime falls back to its existing credential-store lookup keyed by
973    /// registry hostname (or anonymous access when no match exists).
974    ///
975    /// Non-Docker runtimes may accept but ignore the `auth` argument โ€” their
976    /// OCI puller (`zlayer-registry`) already resolves credentials from the
977    /// store by hostname, and inline auth is primarily a Docker-backend
978    /// concern. Ignoring it is safe: callers that need inline auth should use
979    /// the Docker runtime.
980    async fn pull_image_with_policy(
981        &self,
982        image: &str,
983        policy: PullPolicy,
984        auth: Option<&RegistryAuth>,
985    ) -> Result<()>;
986
987    /// Create a container
988    async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()>;
989
990    /// Start a container
991    async fn start_container(&self, id: &ContainerId) -> Result<()>;
992
993    /// Stop a container
994    async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()>;
995
996    /// Remove a container
997    async fn remove_container(&self, id: &ContainerId) -> Result<()>;
998
999    /// Get container state
1000    async fn container_state(&self, id: &ContainerId) -> Result<ContainerState>;
1001
1002    /// Get container logs as structured entries
1003    async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>>;
1004
1005    /// Execute command in container
1006    async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)>;
1007
1008    /// Execute a command in a container and stream stdout / stderr / exit
1009    /// events as they are produced.
1010    ///
1011    /// The default implementation calls the buffered [`Runtime::exec`] and
1012    /// emits everything as a single `Stdout` event, a single `Stderr` event,
1013    /// and a final `Exit` event. Runtimes that support true streaming (e.g.
1014    /// Docker via bollard) override this to produce line-by-line events as
1015    /// the command runs.
1016    ///
1017    /// The returned stream always terminates with exactly one
1018    /// [`ExecEvent::Exit`] as the final item on success. Errors that occur
1019    /// before the stream is returned (e.g. container not found, failure to
1020    /// create the exec) are surfaced via the outer `Result`; errors that
1021    /// occur mid-stream are logged by the runtime and the stream closes.
1022    async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
1023        let (exit, stdout, stderr) = self.exec(id, cmd).await?;
1024        let mut events: Vec<ExecEvent> = Vec::with_capacity(3);
1025        if !stdout.is_empty() {
1026            events.push(ExecEvent::Stdout(stdout));
1027        }
1028        if !stderr.is_empty() {
1029            events.push(ExecEvent::Stderr(stderr));
1030        }
1031        events.push(ExecEvent::Exit(exit));
1032        Ok(Box::pin(futures_util::stream::iter(events)))
1033    }
1034
1035    /// Start an interactive exec session against a container, returning an
1036    /// [`ExecHandle`] the caller drives concurrently with the running process.
1037    ///
1038    /// Unlike [`Runtime::exec`] (which buffers stdout/stderr and returns only
1039    /// after the process exits) and [`Runtime::exec_stream`] (which streams
1040    /// line-by-line events one-way), `exec_pty` is the long-lived bidirectional
1041    /// entry point: when [`ExecOptions::tty`] is set the runtime allocates a
1042    /// pseudo-terminal pair and the returned [`ExecHandle::stream`] shuttles
1043    /// raw PTY bytes; when `tty` is false the stream still carries
1044    /// stdin/stdout/stderr but without PTY framing. The handle's
1045    /// [`ExecHandle::resize`] channel mirrors Docker's
1046    /// `POST /exec/{id}/resize` and the [`ExecHandle::exit`] future resolves
1047    /// with the process exit code once the runtime detects termination.
1048    ///
1049    /// The default implementation returns [`AgentError::Unsupported`].
1050    /// Backends that can host interactive execs (Docker via bollard's
1051    /// `start_exec` with hijacked stream, Youki via libcontainer's exec API,
1052    /// HCS via the Windows console) override this. Runtimes that have no
1053    /// notion of an interactive exec (WASM in-process, mocks that don't need
1054    /// PTY traffic) should leave the default in place โ€” callers then surface
1055    /// a clear error rather than degrade silently to a buffered exec.
1056    async fn exec_pty(&self, _id: &ContainerId, _opts: ExecOptions) -> Result<ExecHandle> {
1057        Err(AgentError::Unsupported(
1058            "exec_pty is not supported by this runtime".into(),
1059        ))
1060    }
1061
1062    /// Get container resource statistics from cgroups
1063    ///
1064    /// Returns CPU and memory statistics for the specified container.
1065    /// Used for metrics collection and autoscaling decisions.
1066    async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats>;
1067
1068    /// Wait for a container to exit and return its exit code
1069    ///
1070    /// This method blocks until the container exits or an error occurs.
1071    /// Used primarily for job execution to implement run-to-completion semantics.
1072    async fn wait_container(&self, id: &ContainerId) -> Result<i32>;
1073
1074    /// Wait for a container to exit and return a [`WaitOutcome`] with richer
1075    /// classification (exit code + reason + signal + `finished_at` timestamp).
1076    ///
1077    /// The default implementation delegates to [`Runtime::wait_container`] and
1078    /// synthesizes a [`WaitReason::Exited`] result with no signal/timestamp.
1079    /// Runtimes that can distinguish OOM kills, signal deaths, or report a
1080    /// finished-at time (e.g. the Docker runtime, which has
1081    /// `ContainerInspectResponse.state.oom_killed` / `.finished_at`) should
1082    /// override this.
1083    ///
1084    /// This is the legacy entry point that always uses
1085    /// [`WaitCondition::NotRunning`]. Callers that need to honour Docker's
1086    /// `condition=` query parameter should use
1087    /// [`Runtime::wait_outcome_with_condition`] instead.
1088    async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
1089        let exit_code = self.wait_container(id).await?;
1090        Ok(WaitOutcome::exited(exit_code))
1091    }
1092
1093    /// Wait for a container to reach a [`WaitCondition`] and return a
1094    /// [`WaitOutcome`].
1095    ///
1096    /// Mirrors Docker's `POST /containers/{id}/wait?condition=<...>` semantics:
1097    ///
1098    /// * [`WaitCondition::NotRunning`] (the default) โ€” block until the
1099    ///   container is no longer running. Returns the exit-code outcome.
1100    /// * [`WaitCondition::NextExit`] โ€” wait for the next observed exit, even
1101    ///   if the container is already stopped at call time. The default
1102    ///   implementation cannot distinguish "already stopped" from "next exit",
1103    ///   so it falls back to the same wait as `NotRunning`. Backends that can
1104    ///   subscribe to runtime events (Docker via bollard's wait stream)
1105    ///   override this to honour the semantic.
1106    /// * [`WaitCondition::Removed`] โ€” block until the container has been
1107    ///   removed. The default implementation again falls back to a normal
1108    ///   wait; the Docker runtime overrides it via bollard's `condition`
1109    ///   parameter.
1110    ///
1111    /// Default implementation delegates to [`Runtime::wait_outcome`] for all
1112    /// conditions, ignoring the condition argument. This keeps existing
1113    /// runtimes (Youki, WASM, mocks) working without code changes.
1114    async fn wait_outcome_with_condition(
1115        &self,
1116        id: &ContainerId,
1117        _condition: WaitCondition,
1118    ) -> Result<WaitOutcome> {
1119        self.wait_outcome(id).await
1120    }
1121
1122    /// Rename a container. Mirrors Docker's
1123    /// `POST /containers/{id}/rename?name=<new>` endpoint.
1124    ///
1125    /// `new_name` is the requested human-readable name (without any leading
1126    /// `/`). Backends are expected to validate the name against their own
1127    /// constraints (e.g. uniqueness, allowed characters) and return an
1128    /// appropriate [`AgentError`] on rejection.
1129    ///
1130    /// The default implementation returns [`AgentError::Unsupported`].
1131    /// Runtimes that can rename a live container override this:
1132    ///
1133    /// * Docker โ€” calls bollard's `rename_container` with
1134    ///   `RenameContainerOptions { name }`.
1135    /// * Youki โ€” currently returns `Unsupported` because the libcontainer
1136    ///   state-dir is keyed off the immutable `ContainerId` and renaming the
1137    ///   on-disk layout safely while a container is alive would require
1138    ///   coordination with the supervisor that owns the bundle path.
1139    /// * Other backends (WASM, HCS, mocks) inherit the `Unsupported` default.
1140    async fn rename_container(&self, _id: &ContainerId, _new_name: &str) -> Result<()> {
1141        Err(AgentError::Unsupported(
1142            "rename_container is not supported by this runtime".into(),
1143        ))
1144    }
1145
1146    /// Get container logs (stdout/stderr combined)
1147    ///
1148    /// Returns logs as structured entries.
1149    /// Used to capture job output after completion.
1150    async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>>;
1151
1152    /// Get the PID of a container's main process
1153    ///
1154    /// Returns:
1155    /// - `Ok(Some(pid))` for runtimes with real processes (Youki, Docker)
1156    /// - `Ok(None)` for runtimes without separate PIDs (WASM in-process)
1157    /// - `Err` if the container doesn't exist or there's an error
1158    ///
1159    /// Used for overlay network attachment and process management.
1160    async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>>;
1161
1162    /// Get the IP address of a container
1163    ///
1164    /// Returns:
1165    /// - `Ok(Some(ip))` if the container has a known IP address
1166    /// - `Ok(None)` if the container exists but has no IP assigned yet
1167    /// - `Err` if the container doesn't exist or there's an error
1168    ///
1169    /// Used for proxy backend registration when overlay networking is unavailable.
1170    async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>>;
1171
1172    /// Get a runtime-assigned port override for a container.
1173    ///
1174    /// Returns:
1175    /// - `Ok(Some(port))` if the runtime assigned a dynamic port to this container
1176    /// - `Ok(None)` if the container should use the spec-declared endpoint port
1177    ///
1178    /// This exists for runtimes where all containers share the host network stack
1179    /// (e.g., macOS sandbox). Without network namespaces, multiple replicas of
1180    /// the same service would conflict on the same port. The runtime assigns
1181    /// each replica a unique port and passes it via the `PORT` environment variable.
1182    /// The proxy then routes to `container_ip:override_port` instead of
1183    /// `container_ip:spec_port`.
1184    ///
1185    /// Runtimes with per-container networking (overlay, VMs, Docker) return `None`.
1186    async fn get_container_port_override(&self, _id: &ContainerId) -> Result<Option<u16>> {
1187        Ok(None)
1188    }
1189
1190    /// Get the HCN namespace GUID of a Windows container.
1191    ///
1192    /// Windows-only. Linux/macOS runtimes have no HCN namespace concept and
1193    /// return `Ok(None)`. The `HcsRuntime` overrides this to return the
1194    /// namespace GUID attached during `create_container`; `OverlayManager`
1195    /// then uses the GUID to register the container's assigned overlay IP
1196    /// against the right HCN compartment (analogous to how Linux uses PID
1197    /// to enter the netns via `/proc/{pid}/ns/net`).
1198    #[cfg(target_os = "windows")]
1199    async fn get_container_namespace_id(
1200        &self,
1201        _id: &ContainerId,
1202    ) -> Result<Option<windows::core::GUID>> {
1203        Ok(None)
1204    }
1205
1206    /// Sync all named volumes associated with this container to S3.
1207    ///
1208    /// Called after a container is stopped but before it is removed, giving
1209    /// the runtime a chance to flush persistent volume data to remote storage.
1210    ///
1211    /// The default implementation is a no-op. Runtimes that support S3-backed
1212    /// volume sync (e.g., Youki with the `s3` feature) override this.
1213    async fn sync_container_volumes(&self, _id: &ContainerId) -> Result<()> {
1214        Ok(())
1215    }
1216
1217    /// Stream container logs as raw byte chunks tagged with their channel.
1218    ///
1219    /// Mirrors the `GET /containers/{id}/logs` endpoint of the Docker Engine
1220    /// API: callers can request `follow`, `tail`, `since`/`until` time
1221    /// windows, per-channel filtering (`stdout` / `stderr`), and inline
1222    /// timestamps via [`LogsStreamOptions`]. Backends that demultiplex
1223    /// Docker's stdcopy framing emit one [`LogChunk`] per frame; line-based
1224    /// runtimes emit one chunk per line.
1225    ///
1226    /// The default implementation returns [`AgentError::Unsupported`].
1227    /// Concrete runtimes override this with backend-specific streaming
1228    /// (bollard's `logs` for Docker, log-file tailing for Youki/HCS, etc.).
1229    /// The stream is `'static` so HTTP handlers can drive it independently
1230    /// of the trait-method borrow.
1231    async fn logs_stream(&self, _id: &ContainerId, _opts: LogsStreamOptions) -> Result<LogsStream> {
1232        Err(AgentError::Unsupported(
1233            "logs_stream is not supported by this runtime".into(),
1234        ))
1235    }
1236
1237    /// Stream periodic resource-usage samples for a container.
1238    ///
1239    /// Mirrors the streaming form of `GET /containers/{id}/stats` in the
1240    /// Docker Engine API: each yielded [`StatsSample`] is one full snapshot
1241    /// of CPU / memory / network / block-IO / pids counters at the moment
1242    /// it was taken. Sampling cadence is backend-defined (Docker emits one
1243    /// sample per second by default).
1244    ///
1245    /// The default implementation returns [`AgentError::Unsupported`].
1246    /// Backends that can produce this data implement it directly: Docker
1247    /// via bollard's `stats`, Youki by polling cgroup stat files,
1248    /// `MockRuntime` by emitting a deterministic single sample.
1249    async fn stats_stream(&self, _id: &ContainerId) -> Result<StatsStream> {
1250        Err(AgentError::Unsupported(
1251            "stats_stream is not supported by this runtime".into(),
1252        ))
1253    }
1254
1255    /// Pull an image, streaming progress events as layers are downloaded.
1256    ///
1257    /// Mirrors the streaming form of `POST /images/create` in the Docker
1258    /// Engine API. Backends emit a series of [`PullProgress::Status`]
1259    /// events for in-flight layers, followed by exactly one
1260    /// [`PullProgress::Done`] event on success. Errors that occur mid-pull
1261    /// surface as `Err` items on the stream and terminate it.
1262    ///
1263    /// `auth` carries inline credentials for this pull. When `None`, the
1264    /// runtime falls back to its credential-store lookup keyed by registry
1265    /// hostname (matching the semantics of [`Runtime::pull_image_with_policy`]).
1266    ///
1267    /// The default implementation returns [`AgentError::Unsupported`].
1268    /// Backends override this with their native streaming pull
1269    /// (bollard's `create_image` for Docker, `zlayer-registry` for Youki).
1270    async fn pull_image_stream(
1271        &self,
1272        _image: &str,
1273        _auth: Option<&RegistryAuth>,
1274    ) -> Result<PullProgressStream> {
1275        Err(AgentError::Unsupported(
1276            "pull_image_stream is not supported by this runtime".into(),
1277        ))
1278    }
1279
1280    /// List all images managed by this runtime's image storage.
1281    ///
1282    /// The default implementation returns `AgentError::Unsupported` โ€” individual
1283    /// runtimes override this with backend-specific logic (bollard for Docker,
1284    /// zlayer-registry cache walk for Youki, etc.).
1285    async fn list_images(&self) -> Result<Vec<ImageInfo>> {
1286        Err(AgentError::Unsupported(
1287            "list_images is not supported by this runtime".into(),
1288        ))
1289    }
1290
1291    /// Remove an image by reference from local storage.
1292    ///
1293    /// When `force` is true, also removes the image even when other containers
1294    /// reference it. The default implementation returns `AgentError::Unsupported`.
1295    async fn remove_image(&self, _image: &str, _force: bool) -> Result<()> {
1296        Err(AgentError::Unsupported(
1297            "remove_image is not supported by this runtime".into(),
1298        ))
1299    }
1300
1301    /// Prune dangling / unused images from local storage.
1302    ///
1303    /// Returns a [`PruneResult`] describing what was removed. The default
1304    /// implementation returns `AgentError::Unsupported`.
1305    async fn prune_images(&self) -> Result<PruneResult> {
1306        Err(AgentError::Unsupported(
1307            "prune_images is not supported by this runtime".into(),
1308        ))
1309    }
1310
1311    /// Send a signal to a running container.
1312    ///
1313    /// When `signal` is `None`, the runtime sends `SIGKILL` (matching Docker's
1314    /// `docker kill` default). Backends validate the signal name and reject
1315    /// anything outside the standard POSIX set (`SIGKILL`, `SIGTERM`, `SIGINT`,
1316    /// `SIGHUP`, `SIGUSR1`, `SIGUSR2`).
1317    ///
1318    /// Used by `POST /api/v1/containers/{id}/kill` and Docker-compat
1319    /// `docker kill`. The default implementation returns
1320    /// [`AgentError::Unsupported`].
1321    async fn kill_container(&self, _id: &ContainerId, _signal: Option<&str>) -> Result<()> {
1322        Err(AgentError::Unsupported(
1323            "kill_container is not supported by this runtime".into(),
1324        ))
1325    }
1326
1327    /// Create a new tag pointing at an existing image.
1328    ///
1329    /// `source` is the reference to an already-cached image. `target` is the
1330    /// new reference to create โ€” it must be a full reference (repository + tag).
1331    ///
1332    /// Used by `POST /api/v1/images/tag` and Docker-compat `docker tag`. The
1333    /// default implementation returns [`AgentError::Unsupported`].
1334    async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
1335        Err(AgentError::Unsupported(
1336            "tag_image is not supported by this runtime".into(),
1337        ))
1338    }
1339
1340    /// Inspect an image and return a Docker-shaped detail record.
1341    ///
1342    /// Mirrors Docker's `GET /images/{name}/json`. Backends translate their
1343    /// native inspect output into [`ImageInspectInfo`]; the API/Docker
1344    /// compat shim emits the JSON body. The default implementation returns
1345    /// [`AgentError::Unsupported`] so non-Docker backends keep compiling.
1346    async fn inspect_image_native(&self, _image: &str) -> Result<ImageInspectInfo> {
1347        Err(AgentError::Unsupported(
1348            "inspect_image_native is not supported by this runtime".into(),
1349        ))
1350    }
1351
1352    /// Return the parent-layer history for an image.
1353    ///
1354    /// Mirrors Docker's `GET /images/{name}/history`. The default
1355    /// implementation returns [`AgentError::Unsupported`].
1356    async fn image_history(&self, _image: &str) -> Result<Vec<ImageHistoryEntry>> {
1357        Err(AgentError::Unsupported(
1358            "image_history is not supported by this runtime".into(),
1359        ))
1360    }
1361
1362    /// Search a registry for images matching `term`.
1363    ///
1364    /// Mirrors Docker's `GET /images/search`. `limit` caps the number of
1365    /// returned items; `0` means "let the registry decide". The default
1366    /// implementation returns [`AgentError::Unsupported`].
1367    async fn search_images(&self, _term: &str, _limit: u32) -> Result<Vec<ImageSearchResult>> {
1368        Err(AgentError::Unsupported(
1369            "search_images is not supported by this runtime".into(),
1370        ))
1371    }
1372
1373    /// Stream a tar archive containing one or more images.
1374    ///
1375    /// Mirrors Docker's `GET /images/get?names=...`. Multi-image archives
1376    /// dedupe shared layers. The default implementation returns
1377    /// [`AgentError::Unsupported`].
1378    async fn save_images(&self, _names: &[String]) -> Result<ImageExportStream> {
1379        Err(AgentError::Unsupported(
1380            "save_images is not supported by this runtime".into(),
1381        ))
1382    }
1383
1384    /// Load images from a tar archive.
1385    ///
1386    /// Mirrors Docker's `POST /images/load`. `tar_bytes` is the
1387    /// uncompressed (or gzip-compressed) tar produced by [`Self::save_images`].
1388    /// `quiet` suppresses progress events when set. The default
1389    /// implementation returns [`AgentError::Unsupported`].
1390    async fn load_images(
1391        &self,
1392        _tar_bytes: bytes::Bytes,
1393        _quiet: bool,
1394    ) -> Result<LoadProgressStream> {
1395        Err(AgentError::Unsupported(
1396            "load_images is not supported by this runtime".into(),
1397        ))
1398    }
1399
1400    /// Import a single image from a tar root filesystem.
1401    ///
1402    /// Mirrors the `fromSrc=`-mode of Docker's `POST /images/create`.
1403    /// `tar_bytes` is a tar of the root filesystem; `repo`/`tag` are the
1404    /// reference to apply to the resulting image. The default
1405    /// implementation returns [`AgentError::Unsupported`].
1406    async fn import_image(
1407        &self,
1408        _tar_bytes: bytes::Bytes,
1409        _repo: Option<&str>,
1410        _tag: Option<&str>,
1411    ) -> Result<String> {
1412        Err(AgentError::Unsupported(
1413            "import_image is not supported by this runtime".into(),
1414        ))
1415    }
1416
1417    /// Stream a tar archive of the container's filesystem.
1418    ///
1419    /// Mirrors Docker's `GET /containers/{id}/export`. The default
1420    /// implementation returns [`AgentError::Unsupported`].
1421    async fn export_container_fs(&self, _id: &ContainerId) -> Result<ImageExportStream> {
1422        Err(AgentError::Unsupported(
1423            "export_container_fs is not supported by this runtime".into(),
1424        ))
1425    }
1426
1427    /// Commit a container's filesystem state to a new image.
1428    ///
1429    /// Mirrors Docker's `POST /commit?container=...`. `opts` carries the
1430    /// optional repo/tag/comment/author/pause/changes parameters. The
1431    /// default implementation returns [`AgentError::Unsupported`].
1432    async fn commit_container(
1433        &self,
1434        _id: &ContainerId,
1435        _opts: &CommitOptions,
1436    ) -> Result<CommitOutcome> {
1437        Err(AgentError::Unsupported(
1438            "commit_container is not supported by this runtime".into(),
1439        ))
1440    }
1441
1442    /// Return rich inspect details for a container: published ports, attached
1443    /// networks, first IPv4, health, and most-recent exit code.
1444    ///
1445    /// Runtimes implement this by translating the backend's native inspect
1446    /// response (bollard's `ContainerInspectResponse` for Docker) into the
1447    /// runtime-level [`ContainerInspectDetails`] struct. The API layer merges
1448    /// these fields into `ContainerInfo` on `GET /api/v1/containers` and
1449    /// `GET /api/v1/containers/{id}` (ยง3.15 of `ZLAYER_SDK_FIXES.md`).
1450    ///
1451    /// The default implementation returns [`ContainerInspectDetails::default`]
1452    /// โ€” an all-empty record, which the API layer treats as "this runtime
1453    /// doesn't support rich inspect; skip all the extra fields". This keeps
1454    /// non-Docker runtimes (Youki, WASM, Mock) backwards compatible; they can
1455    /// override this later if they gain equivalent inspect capability.
1456    async fn inspect_detailed(&self, _id: &ContainerId) -> Result<ContainerInspectDetails> {
1457        Ok(ContainerInspectDetails::default())
1458    }
1459
1460    /// Pause all processes in the container by freezing its cgroup.
1461    ///
1462    /// Mirrors Docker's `POST /containers/{id}/pause`. After pause, the
1463    /// container's processes are suspended in the kernel via the cgroup
1464    /// freezer; calls to [`Runtime::container_state`] still report
1465    /// `Running` but no instructions execute until [`Runtime::unpause_container`].
1466    ///
1467    /// The default implementation returns [`AgentError::Unsupported`].
1468    /// Backends override this with their native pause API (bollard's
1469    /// `pause_container` for Docker, libcontainer's `Container::pause` for
1470    /// Youki).
1471    async fn pause_container(&self, _id: &ContainerId) -> Result<()> {
1472        Err(AgentError::Unsupported(
1473            "pause_container is not supported by this runtime".into(),
1474        ))
1475    }
1476
1477    /// Resume a previously-paused container by thawing its cgroup freezer.
1478    ///
1479    /// Mirrors Docker's `POST /containers/{id}/unpause`. The default
1480    /// implementation returns [`AgentError::Unsupported`].
1481    async fn unpause_container(&self, _id: &ContainerId) -> Result<()> {
1482        Err(AgentError::Unsupported(
1483            "unpause_container is not supported by this runtime".into(),
1484        ))
1485    }
1486
1487    /// Update a running container's resource limits and restart policy.
1488    ///
1489    /// Mirrors Docker's `POST /containers/{id}/update`. The fields on
1490    /// [`ContainerResourceUpdate`] are individually optional: backends
1491    /// apply only the fields that are `Some` and leave the rest of the
1492    /// container's runtime configuration untouched. A fully-empty update
1493    /// short-circuits to a no-op (no cgroup writes, no warnings).
1494    ///
1495    /// Returns a [`ContainerUpdateOutcome`] whose `warnings` vector
1496    /// surfaces non-fatal issues (e.g. "kernel memory limit is
1497    /// deprecated", or "real-time scheduling not supported on this
1498    /// kernel"). Empty `warnings` โ‡’ every requested field was applied.
1499    ///
1500    /// The default implementation returns [`AgentError::Unsupported`].
1501    /// Backends override this:
1502    ///
1503    /// * Docker โ€” calls bollard's `update_container` with a
1504    ///   `ContainerUpdateBody` populated from the input.
1505    /// * Youki โ€” writes the corresponding cgroup v2 files
1506    ///   (`cpu.weight`, `memory.max`, `pids.max`, `io.weight`,
1507    ///   `cpuset.cpus`, `cpuset.mems`) under
1508    ///   `<container_root>/cgroup` and persists the new restart policy
1509    ///   in the on-disk supervisor state.
1510    /// * Other backends (WASM, mocks) inherit the `Unsupported` default.
1511    async fn update_container_resources(
1512        &self,
1513        _id: &ContainerId,
1514        _update: &ContainerResourceUpdate,
1515    ) -> Result<ContainerUpdateOutcome> {
1516        Err(AgentError::Unsupported(
1517            "update_container_resources is not supported by this runtime".into(),
1518        ))
1519    }
1520
1521    /// List the processes running inside a container (`docker top`).
1522    ///
1523    /// `ps_args` is forwarded to the runtime as the `ps(1)` argument list when
1524    /// supported; an empty slice means "use the runtime's default columns".
1525    /// Mirrors Docker's `GET /containers/{id}/top?ps_args=<...>`.
1526    ///
1527    /// The default implementation returns [`AgentError::Unsupported`].
1528    async fn top_container(
1529        &self,
1530        _id: &ContainerId,
1531        _ps_args: &[String],
1532    ) -> Result<ContainerTopOutput> {
1533        Err(AgentError::Unsupported(
1534            "top_container is not supported by this runtime".into(),
1535        ))
1536    }
1537
1538    /// Report changes to a container's filesystem since it was created.
1539    ///
1540    /// Mirrors Docker's `GET /containers/{id}/changes`. Returns one
1541    /// [`FilesystemChangeEntry`] per added / modified / deleted path in the
1542    /// container's writable layer. Runtimes that don't compute layer diffs
1543    /// (e.g. youki, which uses raw bundle rootfs without a layered FS) return
1544    /// [`AgentError::Unsupported`].
1545    async fn changes_container(&self, _id: &ContainerId) -> Result<Vec<FilesystemChangeEntry>> {
1546        Err(AgentError::Unsupported(
1547            "changes_container is not supported by this runtime".into(),
1548        ))
1549    }
1550
1551    /// Report the published port mappings for a container.
1552    ///
1553    /// Mirrors Docker's `GET /containers/{id}/port`. Returns one
1554    /// [`PortMappingEntry`] per (container-port, protocol, host-binding)
1555    /// triple. Containers with no published ports return an empty vector.
1556    ///
1557    /// The default implementation returns [`AgentError::Unsupported`].
1558    async fn port_mappings_container(&self, _id: &ContainerId) -> Result<Vec<PortMappingEntry>> {
1559        Err(AgentError::Unsupported(
1560            "port_mappings_container is not supported by this runtime".into(),
1561        ))
1562    }
1563
1564    /// Prune stopped containers from the runtime.
1565    ///
1566    /// Mirrors Docker's `POST /containers/prune`. Returns the IDs of
1567    /// containers that were removed plus the bytes reclaimed. The default
1568    /// implementation returns [`AgentError::Unsupported`].
1569    async fn prune_containers(&self) -> Result<ContainerPruneResult> {
1570        Err(AgentError::Unsupported(
1571            "prune_containers is not supported by this runtime".into(),
1572        ))
1573    }
1574
1575    /// Enumerate all containers known to this runtime, including stopped /
1576    /// exited ones (Docker's `list_containers(all=true)` semantics).
1577    ///
1578    /// Used by `zlayer-api::handlers::standalone_reconcile` on daemon boot
1579    /// to match persisted standalone-container records against the
1580    /// runtime's actual inventory: entries the runtime no longer reports
1581    /// are pruned, surviving entries are re-registered in the
1582    /// `ContainerIdMap`, and runtime containers carrying a
1583    /// `com.zlayer.container_id` label that has no storage match are
1584    /// counted as orphans (logged but otherwise left alone).
1585    ///
1586    /// The default implementation returns an empty list, which makes
1587    /// reconcile degrade to a label-blind pass: storage entries can still
1588    /// be probed individually via [`Runtime::container_state`], but orphan
1589    /// detection is disabled until the backend overrides this method
1590    /// (Docker via `bollard::list_containers`, youki via state-dir walk).
1591    async fn list_containers(&self) -> Result<Vec<RuntimeContainerSummary>> {
1592        Ok(Vec::new())
1593    }
1594
1595    /// Stream a TAR archive of the file or directory at `path` inside the
1596    /// container.
1597    ///
1598    /// Mirrors Docker's `GET /containers/{id}/archive?path=<...>`. The
1599    /// returned [`ArchiveStream`] yields raw `application/x-tar` bytes.
1600    /// Backends decide whether to materialize the archive in memory or
1601    /// stream it on the fly:
1602    ///
1603    /// * Docker โ€” bollard's `download_from_container` produces a chunked
1604    ///   stream of TAR bytes; we forward it verbatim.
1605    /// * Youki โ€” a rootfs walk under `<bundle>/rootfs<path>` produces a
1606    ///   TAR archive in a worker task and streams it through an mpsc.
1607    /// * Other backends (WASM, mocks) inherit the `Unsupported` default.
1608    ///
1609    /// The default implementation returns [`AgentError::Unsupported`].
1610    async fn archive_get(&self, _id: &ContainerId, _path: &str) -> Result<ArchiveStream> {
1611        Err(AgentError::Unsupported(
1612            "archive_get is not supported by this runtime".into(),
1613        ))
1614    }
1615
1616    /// Extract a TAR archive into the container at `path`.
1617    ///
1618    /// Mirrors Docker's `PUT /containers/{id}/archive?path=<...>`. The
1619    /// runtime must extract `tar_bytes` (an uncompressed TAR archive) into
1620    /// `path` inside the container, honouring [`ArchivePutOptions`].
1621    ///
1622    /// `path` must already exist inside the container and must be a
1623    /// directory; if it does not exist the runtime returns
1624    /// [`AgentError::NotFound`]. Mismatched directory/non-directory
1625    /// replacements with `no_overwrite_dir_non_dir=true` return
1626    /// [`AgentError::InvalidSpec`].
1627    ///
1628    /// The default implementation returns [`AgentError::Unsupported`].
1629    async fn archive_put(
1630        &self,
1631        _id: &ContainerId,
1632        _path: &str,
1633        _tar_bytes: bytes::Bytes,
1634        _opts: ArchivePutOptions,
1635    ) -> Result<()> {
1636        Err(AgentError::Unsupported(
1637            "archive_put is not supported by this runtime".into(),
1638        ))
1639    }
1640
1641    /// Return stat metadata for the file or directory at `path` inside the
1642    /// container.
1643    ///
1644    /// Mirrors Docker's `HEAD /containers/{id}/archive?path=<...>`, which
1645    /// answers with the metadata that `GET /archive` *would* expose without
1646    /// materializing the TAR. Used by `docker cp` and the API layer to
1647    /// short-circuit on missing paths.
1648    ///
1649    /// The default implementation returns [`AgentError::Unsupported`].
1650    async fn archive_head(&self, _id: &ContainerId, _path: &str) -> Result<PathStat> {
1651        Err(AgentError::Unsupported(
1652            "archive_head is not supported by this runtime".into(),
1653        ))
1654    }
1655}
1656
1657/// Validate a signal name for [`Runtime::kill_container`].
1658///
1659/// Accepts both the `SIG`-prefixed form (`"SIGKILL"`) and the bare form
1660/// (`"KILL"`). Returns the canonical uppercase `SIG`-prefixed name on success.
1661///
1662/// # Errors
1663///
1664/// Returns [`AgentError::InvalidSpec`] when `signal` is not one of the
1665/// supported signals: `SIGKILL`, `SIGTERM`, `SIGINT`, `SIGHUP`, `SIGUSR1`,
1666/// `SIGUSR2`.
1667pub fn validate_signal(signal: &str) -> Result<String> {
1668    let trimmed = signal.trim();
1669    if trimmed.is_empty() {
1670        return Err(AgentError::InvalidSpec(
1671            "signal must not be empty".to_string(),
1672        ));
1673    }
1674    let upper = trimmed.to_ascii_uppercase();
1675    let canonical = if upper.starts_with("SIG") {
1676        upper
1677    } else {
1678        format!("SIG{upper}")
1679    };
1680    match canonical.as_str() {
1681        "SIGKILL" | "SIGTERM" | "SIGINT" | "SIGHUP" | "SIGUSR1" | "SIGUSR2" => Ok(canonical),
1682        other => Err(AgentError::InvalidSpec(format!(
1683            "unsupported signal '{other}'; allowed: SIGKILL, SIGTERM, SIGINT, SIGHUP, SIGUSR1, SIGUSR2"
1684        ))),
1685    }
1686}
1687
1688/// Auth context injected into every container so it can talk back to the host
1689/// API without needing external credentials.
1690#[derive(Debug, Clone)]
1691pub struct ContainerAuthContext {
1692    /// Base URL of the `ZLayer` API, e.g. `"http://127.0.0.1:3669"`.
1693    pub api_url: String,
1694    /// JWT signing secret โ€” used to mint per-container tokens at start time.
1695    pub jwt_secret: String,
1696    /// Absolute path of the Unix socket on the host (bind-mounted into Linux
1697    /// containers; added to `writable_dirs` for macOS sandbox).
1698    pub socket_path: String,
1699}
1700
1701/// In-memory mock runtime for testing and development.
1702///
1703/// In addition to tracking container lifecycle in memory, the mock exposes
1704/// per-container queues for streaming method outputs so unit tests can
1705/// pre-script the events that [`Runtime::logs_stream`],
1706/// [`Runtime::stats_stream`], and [`Runtime::pull_image_stream`] should
1707/// yield. See [`MockRuntime::enqueue_log_chunk`],
1708/// [`MockRuntime::enqueue_stats_sample`], and
1709/// [`MockRuntime::enqueue_pull_progress`].
1710pub struct MockRuntime {
1711    containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
1712    /// Pre-scripted log chunks per container. Each call to
1713    /// [`Runtime::logs_stream`] drains this queue in order; once empty, the
1714    /// stream either terminates (when `follow=false`) or hangs forever
1715    /// (when `follow=true`) so tests can exercise both branches.
1716    pub logs_to_yield:
1717        Arc<Mutex<std::collections::HashMap<ContainerId, VecDeque<Result<LogChunk>>>>>,
1718    /// Pre-scripted stats samples per container. Drained in order by
1719    /// [`Runtime::stats_stream`].
1720    pub stats_to_yield:
1721        Arc<Mutex<std::collections::HashMap<ContainerId, VecDeque<Result<StatsSample>>>>>,
1722    /// Pre-scripted pull progress events keyed by image reference. Drained
1723    /// in order by [`Runtime::pull_image_stream`].
1724    pub pull_progress_to_yield:
1725        Arc<Mutex<std::collections::HashMap<String, VecDeque<Result<PullProgress>>>>>,
1726}
1727
1728impl MockRuntime {
1729    #[must_use]
1730    pub fn new() -> Self {
1731        Self {
1732            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
1733            logs_to_yield: Arc::new(Mutex::new(std::collections::HashMap::new())),
1734            stats_to_yield: Arc::new(Mutex::new(std::collections::HashMap::new())),
1735            pull_progress_to_yield: Arc::new(Mutex::new(std::collections::HashMap::new())),
1736        }
1737    }
1738
1739    /// Push a single [`LogChunk`] onto the queue for `id`. Subsequent calls to
1740    /// [`Runtime::logs_stream`] will yield enqueued chunks in FIFO order.
1741    pub async fn enqueue_log_chunk(&self, id: &ContainerId, chunk: LogChunk) {
1742        self.logs_to_yield
1743            .lock()
1744            .await
1745            .entry(id.clone())
1746            .or_default()
1747            .push_back(Ok(chunk));
1748    }
1749
1750    /// Push a pre-built error onto the log queue for `id`. The next
1751    /// [`Runtime::logs_stream`] call drains this as the next yielded item.
1752    pub async fn enqueue_log_error(&self, id: &ContainerId, err: AgentError) {
1753        self.logs_to_yield
1754            .lock()
1755            .await
1756            .entry(id.clone())
1757            .or_default()
1758            .push_back(Err(err));
1759    }
1760
1761    /// Push a single [`StatsSample`] onto the queue for `id`. Subsequent calls
1762    /// to [`Runtime::stats_stream`] will yield enqueued samples in FIFO order.
1763    pub async fn enqueue_stats_sample(&self, id: &ContainerId, sample: StatsSample) {
1764        self.stats_to_yield
1765            .lock()
1766            .await
1767            .entry(id.clone())
1768            .or_default()
1769            .push_back(Ok(sample));
1770    }
1771
1772    /// Push a single [`PullProgress`] event onto the queue for `image`.
1773    /// Subsequent calls to [`Runtime::pull_image_stream`] for the same image
1774    /// reference will yield enqueued events in FIFO order.
1775    pub async fn enqueue_pull_progress(&self, image: &str, progress: PullProgress) {
1776        self.pull_progress_to_yield
1777            .lock()
1778            .await
1779            .entry(image.to_string())
1780            .or_default()
1781            .push_back(Ok(progress));
1782    }
1783}
1784
1785impl Default for MockRuntime {
1786    fn default() -> Self {
1787        Self::new()
1788    }
1789}
1790
1791#[async_trait::async_trait]
1792impl Runtime for MockRuntime {
1793    async fn pull_image(&self, _image: &str) -> Result<()> {
1794        self.pull_image_with_policy(_image, PullPolicy::IfNotPresent, None)
1795            .await
1796    }
1797
1798    async fn pull_image_with_policy(
1799        &self,
1800        _image: &str,
1801        _policy: PullPolicy,
1802        _auth: Option<&RegistryAuth>,
1803    ) -> Result<()> {
1804        // Mock: always succeeds
1805        tokio::time::sleep(Duration::from_millis(100)).await;
1806        Ok(())
1807    }
1808
1809    async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
1810        let mut containers = self.containers.write().await;
1811        containers.insert(
1812            id.clone(),
1813            Container {
1814                id: id.clone(),
1815                state: ContainerState::Pending,
1816                pid: None,
1817                task: None,
1818                overlay_ip: None,
1819                health_monitor: None,
1820                port_override: None,
1821            },
1822        );
1823        Ok(())
1824    }
1825
1826    async fn start_container(&self, id: &ContainerId) -> Result<()> {
1827        let mut containers = self.containers.write().await;
1828        if let Some(container) = containers.get_mut(id) {
1829            container.state = ContainerState::Running;
1830            container.pid = Some(std::process::id()); // Mock PID
1831        }
1832        Ok(())
1833    }
1834
1835    async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
1836        let mut containers = self.containers.write().await;
1837        if let Some(container) = containers.get_mut(id) {
1838            container.state = ContainerState::Exited { code: 0 };
1839        }
1840        Ok(())
1841    }
1842
1843    async fn remove_container(&self, id: &ContainerId) -> Result<()> {
1844        let mut containers = self.containers.write().await;
1845        containers.remove(id);
1846        Ok(())
1847    }
1848
1849    async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
1850        let containers = self.containers.read().await;
1851        containers
1852            .get(id)
1853            .map(|c| c.state.clone())
1854            .ok_or_else(|| AgentError::NotFound {
1855                container: id.to_string(),
1856                reason: "container not found".to_string(),
1857            })
1858    }
1859
1860    async fn container_logs(&self, _id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
1861        let entries = vec![
1862            LogEntry {
1863                timestamp: chrono::Utc::now(),
1864                stream: LogStream::Stdout,
1865                message: "mock log line 1".to_string(),
1866                source: LogSource::Container("mock".to_string()),
1867                service: None,
1868                deployment: None,
1869            },
1870            LogEntry {
1871                timestamp: chrono::Utc::now(),
1872                stream: LogStream::Stderr,
1873                message: "mock error line".to_string(),
1874                source: LogSource::Container("mock".to_string()),
1875                service: None,
1876                deployment: None,
1877            },
1878        ];
1879        let skip = entries.len().saturating_sub(tail);
1880        Ok(entries.into_iter().skip(skip).collect())
1881    }
1882
1883    async fn exec(&self, _id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
1884        Ok((0, cmd.join(" "), String::new()))
1885    }
1886
1887    async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
1888        // Mock: return dummy stats
1889        let containers = self.containers.read().await;
1890        if containers.contains_key(id) {
1891            Ok(ContainerStats {
1892                cpu_usage_usec: 1_000_000,       // 1 second
1893                memory_bytes: 50 * 1024 * 1024,  // 50 MB
1894                memory_limit: 256 * 1024 * 1024, // 256 MB
1895                timestamp: std::time::Instant::now(),
1896            })
1897        } else {
1898            Err(AgentError::NotFound {
1899                container: id.to_string(),
1900                reason: "container not found".to_string(),
1901            })
1902        }
1903    }
1904
1905    async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
1906        // Mock: simulate waiting for container to exit
1907        let containers = self.containers.read().await;
1908        if let Some(container) = containers.get(id) {
1909            match &container.state {
1910                ContainerState::Exited { code } => Ok(*code),
1911                ContainerState::Failed { .. } => Ok(1),
1912                _ => {
1913                    // Simulate a brief wait and then return success
1914                    drop(containers);
1915                    tokio::time::sleep(Duration::from_millis(50)).await;
1916                    Ok(0)
1917                }
1918            }
1919        } else {
1920            Err(AgentError::NotFound {
1921                container: id.to_string(),
1922                reason: "container not found".to_string(),
1923            })
1924        }
1925    }
1926
1927    async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
1928        // Mock: return dummy structured log entries
1929        let containers = self.containers.read().await;
1930        if containers.contains_key(id) {
1931            let container_name = id.to_string();
1932            Ok(vec![
1933                LogEntry {
1934                    timestamp: chrono::Utc::now(),
1935                    stream: LogStream::Stdout,
1936                    message: format!("[{container_name}] Container started"),
1937                    source: LogSource::Container(container_name.clone()),
1938                    service: None,
1939                    deployment: None,
1940                },
1941                LogEntry {
1942                    timestamp: chrono::Utc::now(),
1943                    stream: LogStream::Stdout,
1944                    message: format!("[{container_name}] Executing command..."),
1945                    source: LogSource::Container(container_name.clone()),
1946                    service: None,
1947                    deployment: None,
1948                },
1949                LogEntry {
1950                    timestamp: chrono::Utc::now(),
1951                    stream: LogStream::Stdout,
1952                    message: format!("[{container_name}] Command completed successfully"),
1953                    source: LogSource::Container(container_name),
1954                    service: None,
1955                    deployment: None,
1956                },
1957            ])
1958        } else {
1959            Err(AgentError::NotFound {
1960                container: id.to_string(),
1961                reason: "container not found".to_string(),
1962            })
1963        }
1964    }
1965
1966    async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
1967        let containers = self.containers.read().await;
1968        if let Some(container) = containers.get(id) {
1969            Ok(container.pid)
1970        } else {
1971            Err(AgentError::NotFound {
1972                container: id.to_string(),
1973                reason: "container not found".to_string(),
1974            })
1975        }
1976    }
1977
1978    async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
1979        let containers = self.containers.read().await;
1980        if containers.contains_key(id) {
1981            // Mock: deterministic IP based on replica number (172.17.0.{replica+2})
1982            #[allow(clippy::cast_possible_truncation)]
1983            let last_octet = (id.replica + 2) as u8;
1984            Ok(Some(IpAddr::V4(std::net::Ipv4Addr::new(
1985                172, 17, 0, last_octet,
1986            ))))
1987        } else {
1988            Err(AgentError::NotFound {
1989                container: id.to_string(),
1990                reason: "container not found".to_string(),
1991            })
1992        }
1993    }
1994
1995    async fn list_images(&self) -> Result<Vec<ImageInfo>> {
1996        Ok(Vec::new())
1997    }
1998
1999    async fn remove_image(&self, _image: &str, _force: bool) -> Result<()> {
2000        Ok(())
2001    }
2002
2003    async fn prune_images(&self) -> Result<PruneResult> {
2004        Ok(PruneResult::default())
2005    }
2006
2007    async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
2008        // Validate signal even in the mock so callers exercise the same error
2009        // path. Default to SIGKILL when omitted.
2010        let _canonical = validate_signal(signal.unwrap_or("SIGKILL"))?;
2011        let mut containers = self.containers.write().await;
2012        let container = containers.get_mut(id).ok_or_else(|| AgentError::NotFound {
2013            container: id.to_string(),
2014            reason: "container not found".to_string(),
2015        })?;
2016        container.state = ContainerState::Exited { code: 137 };
2017        Ok(())
2018    }
2019
2020    async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
2021        // The in-memory mock doesn't store images; treat tag as a no-op success.
2022        Ok(())
2023    }
2024
2025    async fn logs_stream(&self, id: &ContainerId, opts: LogsStreamOptions) -> Result<LogsStream> {
2026        use futures_util::StreamExt;
2027
2028        // Drain the per-container queue once at call time; the resulting
2029        // iterator is owned by the stream so the lock isn't held while the
2030        // consumer reads.
2031        let queued: Vec<Result<LogChunk>> = {
2032            let mut guard = self.logs_to_yield.lock().await;
2033            guard.remove(id).map(Vec::from).unwrap_or_default()
2034        };
2035        let head = futures_util::stream::iter(queued);
2036        if opts.follow {
2037            // After the queued items are drained, hang forever so callers can
2038            // exercise the "still-following, no more data" branch and cancel
2039            // by dropping the stream.
2040            let tail = futures_util::stream::pending::<Result<LogChunk>>();
2041            Ok(Box::pin(head.chain(tail)))
2042        } else {
2043            Ok(Box::pin(head))
2044        }
2045    }
2046
2047    async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
2048        let queued: Vec<Result<StatsSample>> = {
2049            let mut guard = self.stats_to_yield.lock().await;
2050            guard.remove(id).map(Vec::from).unwrap_or_default()
2051        };
2052        // `stats_stream` has no `follow` flag on the trait. The mock yields
2053        // exactly what was pre-loaded and then closes โ€” tests that want a
2054        // forever-pending stream can simulate it by holding the receiver and
2055        // never enqueueing more. Closing on drain keeps tests bounded so a
2056        // forgotten consumer never deadlocks the test runner.
2057        Ok(Box::pin(futures_util::stream::iter(queued)))
2058    }
2059
2060    async fn pull_image_stream(
2061        &self,
2062        image: &str,
2063        _auth: Option<&RegistryAuth>,
2064    ) -> Result<PullProgressStream> {
2065        let queued: Vec<Result<PullProgress>> = {
2066            let mut guard = self.pull_progress_to_yield.lock().await;
2067            guard.remove(image).map(Vec::from).unwrap_or_default()
2068        };
2069        // Pulls are inherently bounded: the real backends emit a final `Done`
2070        // event and close the stream. The mock follows the same shape โ€” it
2071        // just yields whatever the test pre-loaded and ends.
2072        Ok(Box::pin(futures_util::stream::iter(queued)))
2073    }
2074}
2075
2076#[cfg(test)]
2077mod tests {
2078    use super::*;
2079
2080    #[tokio::test]
2081    async fn test_mock_runtime() {
2082        let runtime = MockRuntime::new();
2083        let id = ContainerId {
2084            service: "test".to_string(),
2085            replica: 1,
2086        };
2087
2088        runtime.pull_image("test:latest").await.unwrap();
2089        runtime.create_container(&id, &mock_spec()).await.unwrap();
2090        runtime.start_container(&id).await.unwrap();
2091
2092        let state = runtime.container_state(&id).await.unwrap();
2093        assert_eq!(state, ContainerState::Running);
2094    }
2095
2096    #[test]
2097    fn validate_signal_accepts_known_signals() {
2098        // SIG-prefixed form
2099        assert_eq!(validate_signal("SIGKILL").unwrap(), "SIGKILL");
2100        assert_eq!(validate_signal("SIGTERM").unwrap(), "SIGTERM");
2101        assert_eq!(validate_signal("SIGINT").unwrap(), "SIGINT");
2102        assert_eq!(validate_signal("SIGHUP").unwrap(), "SIGHUP");
2103        assert_eq!(validate_signal("SIGUSR1").unwrap(), "SIGUSR1");
2104        assert_eq!(validate_signal("SIGUSR2").unwrap(), "SIGUSR2");
2105
2106        // Bare form (no "SIG" prefix) should be canonicalised.
2107        assert_eq!(validate_signal("KILL").unwrap(), "SIGKILL");
2108        assert_eq!(validate_signal("term").unwrap(), "SIGTERM");
2109        // Whitespace around the name is tolerated.
2110        assert_eq!(validate_signal(" INT ").unwrap(), "SIGINT");
2111    }
2112
2113    #[test]
2114    fn validate_signal_rejects_unknown_or_empty() {
2115        assert!(matches!(
2116            validate_signal(""),
2117            Err(AgentError::InvalidSpec(_))
2118        ));
2119        assert!(matches!(
2120            validate_signal("   "),
2121            Err(AgentError::InvalidSpec(_))
2122        ));
2123        assert!(matches!(
2124            validate_signal("SIGSEGV"),
2125            Err(AgentError::InvalidSpec(_))
2126        ));
2127        assert!(matches!(
2128            validate_signal("NOPE"),
2129            Err(AgentError::InvalidSpec(_))
2130        ));
2131        // Signals outside the POSIX allowlist are rejected even if real.
2132        assert!(matches!(
2133            validate_signal("SIGPIPE"),
2134            Err(AgentError::InvalidSpec(_))
2135        ));
2136    }
2137
2138    #[tokio::test]
2139    async fn mock_kill_container_defaults_to_sigkill() {
2140        let runtime = MockRuntime::new();
2141        let id = ContainerId {
2142            service: "kill-me".to_string(),
2143            replica: 0,
2144        };
2145        runtime.create_container(&id, &mock_spec()).await.unwrap();
2146        runtime.start_container(&id).await.unwrap();
2147
2148        // `None` -> defaults to SIGKILL; returns Ok and marks the container
2149        // as exited.
2150        runtime.kill_container(&id, None).await.unwrap();
2151        let state = runtime.container_state(&id).await.unwrap();
2152        assert!(
2153            matches!(state, ContainerState::Exited { code: 137 }),
2154            "expected Exited(137), got {state:?}"
2155        );
2156    }
2157
2158    #[test]
2159    fn wait_reason_serializes_as_snake_case() {
2160        assert_eq!(
2161            serde_json::to_string(&WaitReason::Exited).unwrap(),
2162            "\"exited\""
2163        );
2164        assert_eq!(
2165            serde_json::to_string(&WaitReason::Signal).unwrap(),
2166            "\"signal\""
2167        );
2168        assert_eq!(
2169            serde_json::to_string(&WaitReason::OomKilled).unwrap(),
2170            "\"oom_killed\""
2171        );
2172        assert_eq!(
2173            serde_json::to_string(&WaitReason::RuntimeError).unwrap(),
2174            "\"runtime_error\""
2175        );
2176    }
2177
2178    #[test]
2179    fn wait_reason_deserialize_roundtrip() {
2180        for variant in [
2181            WaitReason::Exited,
2182            WaitReason::Signal,
2183            WaitReason::OomKilled,
2184            WaitReason::RuntimeError,
2185        ] {
2186            let s = serde_json::to_string(&variant).unwrap();
2187            let back: WaitReason = serde_json::from_str(&s).unwrap();
2188            assert_eq!(variant, back, "roundtrip failed for {variant:?}");
2189        }
2190    }
2191
2192    #[test]
2193    fn signal_name_from_exit_code_known_signals() {
2194        assert_eq!(signal_name_from_exit_code(137).as_deref(), Some("SIGKILL"));
2195        assert_eq!(signal_name_from_exit_code(143).as_deref(), Some("SIGTERM"));
2196        assert_eq!(signal_name_from_exit_code(130).as_deref(), Some("SIGINT"));
2197        assert_eq!(signal_name_from_exit_code(129).as_deref(), Some("SIGHUP"));
2198        assert_eq!(signal_name_from_exit_code(139).as_deref(), Some("SIGSEGV"));
2199    }
2200
2201    #[test]
2202    fn signal_name_from_exit_code_handles_unknown_and_normal() {
2203        // Normal exits (<= 128) return None.
2204        assert_eq!(signal_name_from_exit_code(0), None);
2205        assert_eq!(signal_name_from_exit_code(1), None);
2206        assert_eq!(signal_name_from_exit_code(128), None);
2207
2208        // Unknown signals produce a stable string form.
2209        assert_eq!(
2210            signal_name_from_exit_code(128 + 99).as_deref(),
2211            Some("signal_99")
2212        );
2213    }
2214
2215    #[tokio::test]
2216    async fn default_wait_outcome_delegates_to_wait_container() {
2217        let runtime = MockRuntime::new();
2218        let id = ContainerId {
2219            service: "wait-test".to_string(),
2220            replica: 0,
2221        };
2222        runtime.create_container(&id, &mock_spec()).await.unwrap();
2223        runtime.start_container(&id).await.unwrap();
2224
2225        let outcome = runtime.wait_outcome(&id).await.unwrap();
2226        // MockRuntime::wait_container returns 0 for running containers.
2227        assert_eq!(outcome.exit_code, 0);
2228        assert_eq!(outcome.reason, WaitReason::Exited);
2229        assert!(outcome.signal.is_none());
2230        assert!(outcome.finished_at.is_none());
2231    }
2232
2233    #[tokio::test]
2234    async fn mock_kill_container_rejects_bogus_signal() {
2235        let runtime = MockRuntime::new();
2236        let id = ContainerId {
2237            service: "kill-me".to_string(),
2238            replica: 0,
2239        };
2240        runtime.create_container(&id, &mock_spec()).await.unwrap();
2241        runtime.start_container(&id).await.unwrap();
2242
2243        let err = runtime
2244            .kill_container(&id, Some("SIGFOO"))
2245            .await
2246            .unwrap_err();
2247        assert!(
2248            matches!(err, AgentError::InvalidSpec(_)),
2249            "expected InvalidSpec, got {err:?}"
2250        );
2251    }
2252
2253    // The default trait impls of `logs_stream`, `stats_stream`, and
2254    // `pull_image_stream` still return `AgentError::Unsupported`. `MockRuntime`
2255    // overrides all three so tests can pre-script stream output (see
2256    // `mock_logs_stream_yields_queued_items_in_order` and friends below).
2257    // A trivial `BareRuntime` exercises the default trait impls without
2258    // dragging in MockRuntime's overrides.
2259
2260    /// Minimal `Runtime` implementation used to exercise the default trait
2261    /// impls of `logs_stream` / `stats_stream` / `pull_image_stream`. Every
2262    /// non-default method panics โ€” the bare runtime is only ever called for
2263    /// the three streaming methods the tests care about.
2264    struct BareRuntime;
2265
2266    #[async_trait::async_trait]
2267    impl Runtime for BareRuntime {
2268        async fn pull_image(&self, _image: &str) -> Result<()> {
2269            unimplemented!()
2270        }
2271        async fn pull_image_with_policy(
2272            &self,
2273            _image: &str,
2274            _policy: PullPolicy,
2275            _auth: Option<&RegistryAuth>,
2276        ) -> Result<()> {
2277            unimplemented!()
2278        }
2279        async fn create_container(&self, _id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
2280            unimplemented!()
2281        }
2282        async fn start_container(&self, _id: &ContainerId) -> Result<()> {
2283            unimplemented!()
2284        }
2285        async fn stop_container(&self, _id: &ContainerId, _timeout: Duration) -> Result<()> {
2286            unimplemented!()
2287        }
2288        async fn remove_container(&self, _id: &ContainerId) -> Result<()> {
2289            unimplemented!()
2290        }
2291        async fn container_state(&self, _id: &ContainerId) -> Result<ContainerState> {
2292            unimplemented!()
2293        }
2294        async fn container_logs(&self, _id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
2295            unimplemented!()
2296        }
2297        async fn exec(&self, _id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
2298            unimplemented!()
2299        }
2300        async fn get_container_stats(&self, _id: &ContainerId) -> Result<ContainerStats> {
2301            unimplemented!()
2302        }
2303        async fn wait_container(&self, _id: &ContainerId) -> Result<i32> {
2304            unimplemented!()
2305        }
2306        async fn get_logs(&self, _id: &ContainerId) -> Result<Vec<LogEntry>> {
2307            unimplemented!()
2308        }
2309        async fn get_container_pid(&self, _id: &ContainerId) -> Result<Option<u32>> {
2310            unimplemented!()
2311        }
2312        async fn get_container_ip(&self, _id: &ContainerId) -> Result<Option<IpAddr>> {
2313            unimplemented!()
2314        }
2315    }
2316
2317    #[tokio::test]
2318    async fn default_logs_stream_is_unsupported() {
2319        let runtime = BareRuntime;
2320        let id = ContainerId {
2321            service: "stream-test".to_string(),
2322            replica: 0,
2323        };
2324        // The success-side `LogsStream` is not `Debug`, so we can't call
2325        // `unwrap_err`; pattern-match on the Result directly instead.
2326        match runtime.logs_stream(&id, LogsStreamOptions::default()).await {
2327            Err(AgentError::Unsupported(_)) => {}
2328            Err(other) => panic!("expected Unsupported, got {other:?}"),
2329            Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2330        }
2331    }
2332
2333    #[tokio::test]
2334    async fn default_stats_stream_is_unsupported() {
2335        let runtime = BareRuntime;
2336        let id = ContainerId {
2337            service: "stream-test".to_string(),
2338            replica: 0,
2339        };
2340        match runtime.stats_stream(&id).await {
2341            Err(AgentError::Unsupported(_)) => {}
2342            Err(other) => panic!("expected Unsupported, got {other:?}"),
2343            Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2344        }
2345    }
2346
2347    #[tokio::test]
2348    async fn default_pull_image_stream_is_unsupported() {
2349        let runtime = BareRuntime;
2350        match runtime.pull_image_stream("alpine:latest", None).await {
2351            Err(AgentError::Unsupported(_)) => {}
2352            Err(other) => panic!("expected Unsupported, got {other:?}"),
2353            Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2354        }
2355    }
2356
2357    #[tokio::test]
2358    async fn default_archive_get_is_unsupported() {
2359        let runtime = BareRuntime;
2360        let id = ContainerId {
2361            service: "archive-test".to_string(),
2362            replica: 0,
2363        };
2364        match runtime.archive_get(&id, "/etc/hosts").await {
2365            Err(AgentError::Unsupported(_)) => {}
2366            Err(other) => panic!("expected Unsupported, got {other:?}"),
2367            Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2368        }
2369    }
2370
2371    #[tokio::test]
2372    async fn default_archive_put_is_unsupported() {
2373        let runtime = BareRuntime;
2374        let id = ContainerId {
2375            service: "archive-test".to_string(),
2376            replica: 0,
2377        };
2378        let err = runtime
2379            .archive_put(
2380                &id,
2381                "/tmp",
2382                bytes::Bytes::from_static(&[]),
2383                ArchivePutOptions::default(),
2384            )
2385            .await
2386            .unwrap_err();
2387        assert!(matches!(err, AgentError::Unsupported(_)));
2388    }
2389
2390    #[tokio::test]
2391    async fn default_archive_head_is_unsupported() {
2392        let runtime = BareRuntime;
2393        let id = ContainerId {
2394            service: "archive-test".to_string(),
2395            replica: 0,
2396        };
2397        let err = runtime.archive_head(&id, "/etc/hosts").await.unwrap_err();
2398        assert!(matches!(err, AgentError::Unsupported(_)));
2399    }
2400
2401    #[tokio::test]
2402    async fn default_exec_pty_is_unsupported() {
2403        let runtime = BareRuntime;
2404        let id = ContainerId {
2405            service: "exec-pty".to_string(),
2406            replica: 0,
2407        };
2408        // The success-side `ExecHandle` is not `Debug`, so we can't call
2409        // `unwrap_err`; pattern-match on the Result directly instead.
2410        match runtime.exec_pty(&id, ExecOptions::default()).await {
2411            Err(AgentError::Unsupported(_)) => {}
2412            Err(other) => panic!("expected Unsupported, got {other:?}"),
2413            Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2414        }
2415    }
2416
2417    #[tokio::test]
2418    async fn default_inspect_image_native_is_unsupported() {
2419        let runtime = BareRuntime;
2420        let err = runtime.inspect_image_native("alpine").await.unwrap_err();
2421        assert!(matches!(err, AgentError::Unsupported(_)));
2422    }
2423
2424    #[tokio::test]
2425    async fn default_image_history_is_unsupported() {
2426        let runtime = BareRuntime;
2427        let err = runtime.image_history("alpine").await.unwrap_err();
2428        assert!(matches!(err, AgentError::Unsupported(_)));
2429    }
2430
2431    #[tokio::test]
2432    async fn default_search_images_is_unsupported() {
2433        let runtime = BareRuntime;
2434        let err = runtime.search_images("nginx", 10).await.unwrap_err();
2435        assert!(matches!(err, AgentError::Unsupported(_)));
2436    }
2437
2438    #[tokio::test]
2439    async fn default_save_images_is_unsupported() {
2440        let runtime = BareRuntime;
2441        // The success-side stream isn't `Debug`, so pattern-match the Result.
2442        match runtime.save_images(&["alpine".to_string()]).await {
2443            Err(AgentError::Unsupported(_)) => {}
2444            Err(other) => panic!("expected Unsupported, got {other:?}"),
2445            Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2446        }
2447    }
2448
2449    #[tokio::test]
2450    async fn default_load_images_is_unsupported() {
2451        let runtime = BareRuntime;
2452        match runtime
2453            .load_images(bytes::Bytes::from_static(&[]), false)
2454            .await
2455        {
2456            Err(AgentError::Unsupported(_)) => {}
2457            Err(other) => panic!("expected Unsupported, got {other:?}"),
2458            Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2459        }
2460    }
2461
2462    #[tokio::test]
2463    async fn default_import_image_is_unsupported() {
2464        let runtime = BareRuntime;
2465        let err = runtime
2466            .import_image(bytes::Bytes::from_static(&[]), None, None)
2467            .await
2468            .unwrap_err();
2469        assert!(matches!(err, AgentError::Unsupported(_)));
2470    }
2471
2472    #[tokio::test]
2473    async fn default_export_container_fs_is_unsupported() {
2474        let runtime = BareRuntime;
2475        let id = ContainerId {
2476            service: "export".to_string(),
2477            replica: 0,
2478        };
2479        match runtime.export_container_fs(&id).await {
2480            Err(AgentError::Unsupported(_)) => {}
2481            Err(other) => panic!("expected Unsupported, got {other:?}"),
2482            Ok(_) => panic!("expected Err(Unsupported), got Ok"),
2483        }
2484    }
2485
2486    #[tokio::test]
2487    async fn default_commit_container_is_unsupported() {
2488        let runtime = BareRuntime;
2489        let id = ContainerId {
2490            service: "commit".to_string(),
2491            replica: 0,
2492        };
2493        let err = runtime
2494            .commit_container(&id, &CommitOptions::default())
2495            .await
2496            .unwrap_err();
2497        assert!(matches!(err, AgentError::Unsupported(_)));
2498    }
2499
2500    #[test]
2501    fn load_progress_serializes_with_kind_discriminator() {
2502        let status = LoadProgress::Status {
2503            id: Some("abc".to_string()),
2504            status: "Loading layer".to_string(),
2505        };
2506        let json = serde_json::to_value(&status).unwrap();
2507        assert_eq!(json["kind"], "status");
2508        assert_eq!(json["status"], "Loading layer");
2509
2510        let done = LoadProgress::Done {
2511            references: vec!["alpine:latest".to_string()],
2512        };
2513        let json = serde_json::to_value(&done).unwrap();
2514        assert_eq!(json["kind"], "done");
2515        assert_eq!(json["references"], serde_json::json!(["alpine:latest"]));
2516    }
2517
2518    #[test]
2519    fn commit_options_default_is_no_op_pause_false() {
2520        let opts = CommitOptions::default();
2521        assert!(opts.repo.is_none());
2522        assert!(opts.tag.is_none());
2523        assert!(opts.comment.is_none());
2524        assert!(opts.author.is_none());
2525        assert!(!opts.pause);
2526        assert!(opts.changes.is_none());
2527    }
2528
2529    #[test]
2530    fn image_inspect_info_default_round_trips_via_serde() {
2531        let info = ImageInspectInfo::default();
2532        let json = serde_json::to_string(&info).unwrap();
2533        let back: ImageInspectInfo = serde_json::from_str(&json).unwrap();
2534        assert_eq!(info, back);
2535    }
2536
2537    #[tokio::test]
2538    async fn mock_logs_stream_yields_queued_items_in_order() {
2539        use futures_util::StreamExt;
2540
2541        let runtime = MockRuntime::new();
2542        let id = ContainerId {
2543            service: "logs-order".to_string(),
2544            replica: 0,
2545        };
2546
2547        let make_chunk = |s: &str, ch: LogChannel| LogChunk {
2548            stream: ch,
2549            bytes: bytes::Bytes::copy_from_slice(s.as_bytes()),
2550            timestamp: None,
2551        };
2552
2553        runtime
2554            .enqueue_log_chunk(&id, make_chunk("first", LogChannel::Stdout))
2555            .await;
2556        runtime
2557            .enqueue_log_chunk(&id, make_chunk("second", LogChannel::Stderr))
2558            .await;
2559        runtime
2560            .enqueue_log_chunk(&id, make_chunk("third", LogChannel::Stdout))
2561            .await;
2562
2563        // `follow=false` so the stream ends once the queue is drained.
2564        let opts = LogsStreamOptions {
2565            follow: false,
2566            ..LogsStreamOptions::default()
2567        };
2568        let mut stream = runtime.logs_stream(&id, opts).await.unwrap();
2569
2570        let mut got = Vec::new();
2571        while let Some(item) = stream.next().await {
2572            let chunk = item.unwrap();
2573            got.push((
2574                chunk.stream,
2575                String::from_utf8(chunk.bytes.to_vec()).unwrap(),
2576            ));
2577        }
2578        assert_eq!(
2579            got,
2580            vec![
2581                (LogChannel::Stdout, "first".to_string()),
2582                (LogChannel::Stderr, "second".to_string()),
2583                (LogChannel::Stdout, "third".to_string()),
2584            ]
2585        );
2586    }
2587
2588    #[tokio::test]
2589    async fn mock_logs_stream_empty_queue_ends_immediately_when_not_follow() {
2590        use futures_util::StreamExt;
2591
2592        let runtime = MockRuntime::new();
2593        let id = ContainerId {
2594            service: "logs-empty".to_string(),
2595            replica: 0,
2596        };
2597
2598        let opts = LogsStreamOptions {
2599            follow: false,
2600            ..LogsStreamOptions::default()
2601        };
2602        let mut stream = runtime.logs_stream(&id, opts).await.unwrap();
2603
2604        // Empty queue + follow=false => stream is closed on first poll.
2605        // Wrap in a short timeout so a regression that hangs would surface
2606        // as a test failure rather than the test runner stalling.
2607        let next = tokio::time::timeout(Duration::from_millis(500), stream.next())
2608            .await
2609            .expect("stream did not terminate; expected immediate close on empty queue");
2610        assert!(
2611            next.is_none(),
2612            "expected stream to be exhausted, got Some(_)"
2613        );
2614    }
2615
2616    #[tokio::test]
2617    async fn mock_stats_stream_yields_queued_samples_in_order() {
2618        use futures_util::StreamExt;
2619
2620        let runtime = MockRuntime::new();
2621        let id = ContainerId {
2622            service: "stats-order".to_string(),
2623            replica: 0,
2624        };
2625
2626        let now = chrono::Utc::now();
2627        let mk = |cpu: u64| StatsSample {
2628            cpu_total_ns: cpu,
2629            cpu_system_ns: 0,
2630            online_cpus: 1,
2631            mem_used_bytes: 0,
2632            mem_limit_bytes: 0,
2633            net_rx_bytes: 0,
2634            net_tx_bytes: 0,
2635            blkio_read_bytes: 0,
2636            blkio_write_bytes: 0,
2637            pids_current: 0,
2638            pids_limit: None,
2639            timestamp: now,
2640        };
2641
2642        runtime.enqueue_stats_sample(&id, mk(100)).await;
2643        runtime.enqueue_stats_sample(&id, mk(200)).await;
2644        runtime.enqueue_stats_sample(&id, mk(300)).await;
2645
2646        let mut stream = runtime.stats_stream(&id).await.unwrap();
2647
2648        let mut cpus = Vec::new();
2649        while let Some(item) = stream.next().await {
2650            cpus.push(item.unwrap().cpu_total_ns);
2651        }
2652        assert_eq!(cpus, vec![100, 200, 300]);
2653    }
2654
2655    #[tokio::test]
2656    async fn mock_pull_image_stream_yields_queued_progress_in_order() {
2657        use futures_util::StreamExt;
2658
2659        let runtime = MockRuntime::new();
2660        let image = "alpine:latest";
2661
2662        runtime
2663            .enqueue_pull_progress(
2664                image,
2665                PullProgress::Status {
2666                    id: Some("layer-1".to_string()),
2667                    status: "Pulling fs layer".to_string(),
2668                    progress: None,
2669                    current: None,
2670                    total: None,
2671                },
2672            )
2673            .await;
2674        runtime
2675            .enqueue_pull_progress(
2676                image,
2677                PullProgress::Status {
2678                    id: Some("layer-1".to_string()),
2679                    status: "Downloading".to_string(),
2680                    progress: Some("[==>  ] 1MB/4MB".to_string()),
2681                    current: Some(1024 * 1024),
2682                    total: Some(4 * 1024 * 1024),
2683                },
2684            )
2685            .await;
2686        runtime
2687            .enqueue_pull_progress(
2688                image,
2689                PullProgress::Done {
2690                    reference: image.to_string(),
2691                    digest: Some("sha256:deadbeef".to_string()),
2692                },
2693            )
2694            .await;
2695
2696        let mut stream = runtime.pull_image_stream(image, None).await.unwrap();
2697        let mut events = Vec::new();
2698        while let Some(item) = stream.next().await {
2699            events.push(item.unwrap());
2700        }
2701
2702        assert_eq!(events.len(), 3);
2703        match &events[0] {
2704            PullProgress::Status { status, .. } => assert_eq!(status, "Pulling fs layer"),
2705            done @ PullProgress::Done { .. } => panic!("expected Status, got {done:?}"),
2706        }
2707        match &events[1] {
2708            PullProgress::Status {
2709                status,
2710                current,
2711                total,
2712                ..
2713            } => {
2714                assert_eq!(status, "Downloading");
2715                assert_eq!(*current, Some(1024 * 1024));
2716                assert_eq!(*total, Some(4 * 1024 * 1024));
2717            }
2718            done @ PullProgress::Done { .. } => panic!("expected Status, got {done:?}"),
2719        }
2720        match &events[2] {
2721            PullProgress::Done { reference, digest } => {
2722                assert_eq!(reference, image);
2723                assert_eq!(digest.as_deref(), Some("sha256:deadbeef"));
2724            }
2725            status @ PullProgress::Status { .. } => panic!("expected Done, got {status:?}"),
2726        }
2727    }
2728
2729    #[test]
2730    fn log_channel_serializes_as_snake_case() {
2731        assert_eq!(
2732            serde_json::to_string(&LogChannel::Stdin).unwrap(),
2733            "\"stdin\""
2734        );
2735        assert_eq!(
2736            serde_json::to_string(&LogChannel::Stdout).unwrap(),
2737            "\"stdout\""
2738        );
2739        assert_eq!(
2740            serde_json::to_string(&LogChannel::Stderr).unwrap(),
2741            "\"stderr\""
2742        );
2743    }
2744
2745    fn mock_spec() -> ServiceSpec {
2746        use zlayer_spec::*;
2747        serde_yaml::from_str::<DeploymentSpec>(
2748            r"
2749version: v1
2750deployment: test
2751services:
2752  test:
2753    rtype: service
2754    image:
2755      name: test:latest
2756    endpoints:
2757      - name: http
2758        protocol: http
2759        port: 8080
2760",
2761        )
2762        .unwrap()
2763        .services
2764        .remove("test")
2765        .unwrap()
2766    }
2767}