Skip to main content

zlayer_agent/
runtime.rs

1//! Abstract container runtime interface
2//!
3//! Defines the Runtime trait that can be implemented for different container runtimes
4//! (containerd, CRI-O, etc.)
5
6use crate::cgroups_stats::ContainerStats;
7use crate::error::{AgentError, Result};
8use futures_util::Stream;
9use std::net::IpAddr;
10use std::pin::Pin;
11use std::time::Duration;
12use tokio::task::JoinHandle;
13use zlayer_observability::logs::{LogEntry, LogSource, LogStream};
14use zlayer_spec::{PullPolicy, RegistryAuth, ServiceSpec};
15
16/// Container state
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum ContainerState {
19    /// Container is being pulled/created
20    Pending,
21    /// Init actions are running
22    Initializing,
23    /// Container is running
24    Running,
25    /// Container is stopping
26    Stopping,
27    /// Container has exited
28    Exited { code: i32 },
29    /// Container failed
30    Failed { reason: String },
31}
32
33/// Container identifier
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct ContainerId {
36    pub service: String,
37    pub replica: u32,
38}
39
40impl std::fmt::Display for ContainerId {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        write!(f, "{}-rep-{}", self.service, self.replica)
43    }
44}
45
46/// Container handle
47pub struct Container {
48    pub id: ContainerId,
49    pub state: ContainerState,
50    pub pid: Option<u32>,
51    pub task: Option<JoinHandle<std::io::Result<()>>>,
52    /// Overlay network IP address assigned to this container
53    pub overlay_ip: Option<IpAddr>,
54    /// Health monitor task handle for this container
55    pub health_monitor: Option<JoinHandle<()>>,
56    /// Runtime-assigned port override (used by macOS sandbox where all
57    /// containers share the host network and need unique ports).
58    /// When `Some(port)`, the proxy should use this port instead of the
59    /// spec-declared endpoint port for this specific container's backend address.
60    pub port_override: Option<u16>,
61}
62
63/// Summary information about a cached image on the host runtime.
64#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct ImageInfo {
66    /// Canonical image reference (e.g. `zachhandley/zlayer-manager:latest`).
67    pub reference: String,
68    /// Content-addressed digest if known (`sha256:...`). `None` when the
69    /// backend only tracks images by tag.
70    pub digest: Option<String>,
71    /// Total on-disk / in-cache size in bytes, when available.
72    pub size_bytes: Option<u64>,
73}
74
75/// Result of a prune operation.
76#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
77pub struct PruneResult {
78    /// Image references that were removed.
79    pub deleted: Vec<String>,
80    /// Bytes reclaimed from the cache. `0` when the backend cannot report.
81    pub space_reclaimed: u64,
82}
83
84/// Reason a container stopped running, as reported by [`Runtime::wait_outcome`].
85///
86/// Serialized as `snake_case` strings on the wire (`exited`, `signal`,
87/// `oom_killed`, `runtime_error`) so the API DTO can emit the reason as-is
88/// without a second translation layer.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
90#[serde(rename_all = "snake_case")]
91pub enum WaitReason {
92    /// Container exited normally.
93    Exited,
94    /// Container was killed by a signal (e.g. `SIGKILL`, `SIGTERM`).
95    Signal,
96    /// Container was killed by the OOM killer.
97    OomKilled,
98    /// Runtime-side failure (pre-start error, runtime crash, etc.).
99    RuntimeError,
100}
101
102/// Richer wait result returned by [`Runtime::wait_outcome`].
103///
104/// Backwards-compatible with [`Runtime::wait_container`] (which returns just
105/// `exit_code`). The API handler uses this to populate the extended
106/// `ContainerWaitResponse` fields (`reason`, `signal`, `finished_at`) while
107/// existing callers that only need the exit code can keep using
108/// `wait_container`.
109#[derive(Debug, Clone)]
110pub struct WaitOutcome {
111    /// Process exit code (0 = success). When the container was killed by
112    /// signal `N`, this is typically `128 + N`.
113    pub exit_code: i32,
114    /// Classification of the exit.
115    pub reason: WaitReason,
116    /// Signal name when `reason == WaitReason::Signal`, e.g. `"SIGKILL"`.
117    /// Derived from `exit_code - 128` on a best-effort basis.
118    pub signal: Option<String>,
119    /// Time the container exited, if the runtime reports it.
120    pub finished_at: Option<chrono::DateTime<chrono::Utc>>,
121}
122
123impl WaitOutcome {
124    /// Build a plain `Exited` outcome with no signal/timestamp metadata — the
125    /// default that matches the pre-§3.12 behaviour.
126    #[must_use]
127    pub fn exited(exit_code: i32) -> Self {
128        Self {
129            exit_code,
130            reason: WaitReason::Exited,
131            signal: None,
132            finished_at: None,
133        }
134    }
135}
136
137/// Map a signal-style exit code (`128 + N`) to a canonical signal name.
138///
139/// Recognises common POSIX signals and falls back to `signal_<n>` for
140/// unknown numbers so the caller always gets *something* readable.
141#[must_use]
142pub fn signal_name_from_exit_code(exit_code: i32) -> Option<String> {
143    if exit_code <= 128 {
144        return None;
145    }
146    let n = exit_code - 128;
147    let name = match n {
148        1 => "SIGHUP",
149        2 => "SIGINT",
150        3 => "SIGQUIT",
151        4 => "SIGILL",
152        6 => "SIGABRT",
153        7 => "SIGBUS",
154        8 => "SIGFPE",
155        9 => "SIGKILL",
156        10 => "SIGUSR1",
157        11 => "SIGSEGV",
158        12 => "SIGUSR2",
159        13 => "SIGPIPE",
160        14 => "SIGALRM",
161        15 => "SIGTERM",
162        17 => "SIGSTOP",
163        18 => "SIGCONT",
164        _ => return Some(format!("signal_{n}")),
165    };
166    Some(name.to_string())
167}
168
169/// One streaming event emitted by [`Runtime::exec_stream`].
170///
171/// Runtimes push these events as the exec'd command produces output. The final
172/// event for any successful stream is always an [`ExecEvent::Exit`] carrying
173/// the process's exit code.
174#[derive(Debug, Clone, PartialEq, Eq)]
175pub enum ExecEvent {
176    /// A chunk of stdout from the running command. Emitted line-by-line by
177    /// Docker; other runtimes may emit the full buffered output in one event.
178    Stdout(String),
179    /// A chunk of stderr from the running command.
180    Stderr(String),
181    /// The command has exited with this exit code. Always the final event.
182    Exit(i32),
183}
184
185/// Boxed async stream of [`ExecEvent`]s returned by [`Runtime::exec_stream`].
186pub type ExecEventStream = Pin<Box<dyn Stream<Item = ExecEvent> + Send>>;
187
188/// Per-network attachment reported by [`Runtime::inspect_detailed`].
189///
190/// Mirrors the subset of bollard's `EndpointSettings` that the API needs to
191/// populate `ContainerInfo.networks` for standalone containers. Kept in
192/// `zlayer-agent` (rather than `zlayer-spec`) because it's a runtime-level
193/// inspect result, not a deployment specification.
194#[derive(Debug, Clone, Default, PartialEq, Eq)]
195pub struct NetworkAttachmentDetail {
196    /// Network name as reported by the runtime (Docker's key in
197    /// `NetworkSettings.Networks`, e.g. `"bridge"` or a user-defined network
198    /// name).
199    pub network: String,
200    /// DNS aliases the container answers to on this network. Empty when the
201    /// runtime doesn't surface aliases.
202    pub aliases: Vec<String>,
203    /// Assigned IPv4 address on this network, if any. Empty strings are
204    /// normalised to `None`.
205    pub ipv4: Option<String>,
206}
207
208/// Per-container health detail reported by [`Runtime::inspect_detailed`].
209///
210/// Sourced directly from bollard's `ContainerState.health` (Docker's native
211/// healthcheck tracking). Our internal `HealthMonitor` in
212/// `crates/zlayer-agent/src/health.rs` drives service-level health events; for
213/// standalone containers the API reports the runtime-native status instead so
214/// that images with a baked-in `HEALTHCHECK` show up correctly.
215#[derive(Debug, Clone, Default, PartialEq, Eq)]
216pub struct HealthDetail {
217    /// One of `"none"`, `"starting"`, `"healthy"`, `"unhealthy"` (Docker's
218    /// `HealthStatusEnum`). Empty string is normalised to `"none"` upstream.
219    pub status: String,
220    /// Consecutive failing probe count, if the runtime reports it.
221    pub failing_streak: Option<u32>,
222    /// Output from the most recent failing probe, when available.
223    pub last_output: Option<String>,
224}
225
226/// Rich inspect details for a single container, returned by
227/// [`Runtime::inspect_detailed`].
228///
229/// Carries the fields `ContainerInfo` needs on top of the bare
230/// [`ContainerState`] reported by [`Runtime::container_state`]:
231/// published ports, attached networks, first IPv4, health, and `exit_code`.
232///
233/// Default is an all-empty record — that's what the default trait method
234/// returns for runtimes that don't (yet) implement rich inspect, and the API
235/// layer treats all fields as purely additive, so a default record still
236/// produces a backwards-compatible `ContainerInfo`.
237#[derive(Debug, Clone, Default, PartialEq, Eq)]
238pub struct ContainerInspectDetails {
239    /// Published port mappings (container → host), translated back from the
240    /// runtime's internal port-binding map.
241    pub ports: Vec<zlayer_spec::PortMapping>,
242    /// Networks the container is attached to, plus the aliases + IPv4 for each.
243    pub networks: Vec<NetworkAttachmentDetail>,
244    /// First non-empty IPv4 address found across the container's networks,
245    /// useful as a "primary" IP for simple clients that don't want to iterate
246    /// `networks`. `None` when the container isn't on any network with an IP.
247    pub ipv4: Option<String>,
248    /// Health status when the container has a Docker-native `HEALTHCHECK` or
249    /// the runtime otherwise reports a health state.
250    pub health: Option<HealthDetail>,
251    /// Most recent exit code, when the runtime reports one. `None` for
252    /// containers that are still running and have never exited.
253    pub exit_code: Option<i32>,
254}
255
256/// Abstract container runtime trait
257///
258/// This trait abstracts over different container runtimes (containerd, CRI-O, etc.)
259#[async_trait::async_trait]
260pub trait Runtime: Send + Sync {
261    /// Pull an image to local storage
262    async fn pull_image(&self, image: &str) -> Result<()>;
263
264    /// Pull an image to local storage with a specific policy.
265    ///
266    /// When `auth` is `Some`, the runtime uses those inline credentials for
267    /// the pull (§3.10 of `ZLAYER_SDK_FIXES.md`). When `auth` is `None`, the
268    /// runtime falls back to its existing credential-store lookup keyed by
269    /// registry hostname (or anonymous access when no match exists).
270    ///
271    /// Non-Docker runtimes may accept but ignore the `auth` argument — their
272    /// OCI puller (`zlayer-registry`) already resolves credentials from the
273    /// store by hostname, and inline auth is primarily a Docker-backend
274    /// concern. Ignoring it is safe: callers that need inline auth should use
275    /// the Docker runtime.
276    async fn pull_image_with_policy(
277        &self,
278        image: &str,
279        policy: PullPolicy,
280        auth: Option<&RegistryAuth>,
281    ) -> Result<()>;
282
283    /// Create a container
284    async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()>;
285
286    /// Start a container
287    async fn start_container(&self, id: &ContainerId) -> Result<()>;
288
289    /// Stop a container
290    async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()>;
291
292    /// Remove a container
293    async fn remove_container(&self, id: &ContainerId) -> Result<()>;
294
295    /// Get container state
296    async fn container_state(&self, id: &ContainerId) -> Result<ContainerState>;
297
298    /// Get container logs as structured entries
299    async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>>;
300
301    /// Execute command in container
302    async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)>;
303
304    /// Execute a command in a container and stream stdout / stderr / exit
305    /// events as they are produced.
306    ///
307    /// The default implementation calls the buffered [`Runtime::exec`] and
308    /// emits everything as a single `Stdout` event, a single `Stderr` event,
309    /// and a final `Exit` event. Runtimes that support true streaming (e.g.
310    /// Docker via bollard) override this to produce line-by-line events as
311    /// the command runs.
312    ///
313    /// The returned stream always terminates with exactly one
314    /// [`ExecEvent::Exit`] as the final item on success. Errors that occur
315    /// before the stream is returned (e.g. container not found, failure to
316    /// create the exec) are surfaced via the outer `Result`; errors that
317    /// occur mid-stream are logged by the runtime and the stream closes.
318    async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
319        let (exit, stdout, stderr) = self.exec(id, cmd).await?;
320        let mut events: Vec<ExecEvent> = Vec::with_capacity(3);
321        if !stdout.is_empty() {
322            events.push(ExecEvent::Stdout(stdout));
323        }
324        if !stderr.is_empty() {
325            events.push(ExecEvent::Stderr(stderr));
326        }
327        events.push(ExecEvent::Exit(exit));
328        Ok(Box::pin(futures_util::stream::iter(events)))
329    }
330
331    /// Get container resource statistics from cgroups
332    ///
333    /// Returns CPU and memory statistics for the specified container.
334    /// Used for metrics collection and autoscaling decisions.
335    async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats>;
336
337    /// Wait for a container to exit and return its exit code
338    ///
339    /// This method blocks until the container exits or an error occurs.
340    /// Used primarily for job execution to implement run-to-completion semantics.
341    async fn wait_container(&self, id: &ContainerId) -> Result<i32>;
342
343    /// Wait for a container to exit and return a [`WaitOutcome`] with richer
344    /// classification (exit code + reason + signal + `finished_at` timestamp).
345    ///
346    /// The default implementation delegates to [`Runtime::wait_container`] and
347    /// synthesizes a [`WaitReason::Exited`] result with no signal/timestamp.
348    /// Runtimes that can distinguish OOM kills, signal deaths, or report a
349    /// finished-at time (e.g. the Docker runtime, which has
350    /// `ContainerInspectResponse.state.oom_killed` / `.finished_at`) should
351    /// override this.
352    async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
353        let exit_code = self.wait_container(id).await?;
354        Ok(WaitOutcome::exited(exit_code))
355    }
356
357    /// Get container logs (stdout/stderr combined)
358    ///
359    /// Returns logs as structured entries.
360    /// Used to capture job output after completion.
361    async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>>;
362
363    /// Get the PID of a container's main process
364    ///
365    /// Returns:
366    /// - `Ok(Some(pid))` for runtimes with real processes (Youki, Docker)
367    /// - `Ok(None)` for runtimes without separate PIDs (WASM in-process)
368    /// - `Err` if the container doesn't exist or there's an error
369    ///
370    /// Used for overlay network attachment and process management.
371    async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>>;
372
373    /// Get the IP address of a container
374    ///
375    /// Returns:
376    /// - `Ok(Some(ip))` if the container has a known IP address
377    /// - `Ok(None)` if the container exists but has no IP assigned yet
378    /// - `Err` if the container doesn't exist or there's an error
379    ///
380    /// Used for proxy backend registration when overlay networking is unavailable.
381    async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>>;
382
383    /// Get a runtime-assigned port override for a container.
384    ///
385    /// Returns:
386    /// - `Ok(Some(port))` if the runtime assigned a dynamic port to this container
387    /// - `Ok(None)` if the container should use the spec-declared endpoint port
388    ///
389    /// This exists for runtimes where all containers share the host network stack
390    /// (e.g., macOS sandbox). Without network namespaces, multiple replicas of
391    /// the same service would conflict on the same port. The runtime assigns
392    /// each replica a unique port and passes it via the `PORT` environment variable.
393    /// The proxy then routes to `container_ip:override_port` instead of
394    /// `container_ip:spec_port`.
395    ///
396    /// Runtimes with per-container networking (overlay, VMs, Docker) return `None`.
397    async fn get_container_port_override(&self, _id: &ContainerId) -> Result<Option<u16>> {
398        Ok(None)
399    }
400
401    /// Get the HCN namespace GUID of a Windows container.
402    ///
403    /// Windows-only. Linux/macOS runtimes have no HCN namespace concept and
404    /// return `Ok(None)`. The `HcsRuntime` overrides this to return the
405    /// namespace GUID attached during `create_container`; `OverlayManager`
406    /// then uses the GUID to register the container's assigned overlay IP
407    /// against the right HCN compartment (analogous to how Linux uses PID
408    /// to enter the netns via `/proc/{pid}/ns/net`).
409    #[cfg(target_os = "windows")]
410    async fn get_container_namespace_id(
411        &self,
412        _id: &ContainerId,
413    ) -> Result<Option<windows::core::GUID>> {
414        Ok(None)
415    }
416
417    /// Sync all named volumes associated with this container to S3.
418    ///
419    /// Called after a container is stopped but before it is removed, giving
420    /// the runtime a chance to flush persistent volume data to remote storage.
421    ///
422    /// The default implementation is a no-op. Runtimes that support S3-backed
423    /// volume sync (e.g., Youki with the `s3` feature) override this.
424    async fn sync_container_volumes(&self, _id: &ContainerId) -> Result<()> {
425        Ok(())
426    }
427
428    /// List all images managed by this runtime's image storage.
429    ///
430    /// The default implementation returns `AgentError::Unsupported` — individual
431    /// runtimes override this with backend-specific logic (bollard for Docker,
432    /// zlayer-registry cache walk for Youki, etc.).
433    async fn list_images(&self) -> Result<Vec<ImageInfo>> {
434        Err(AgentError::Unsupported(
435            "list_images is not supported by this runtime".into(),
436        ))
437    }
438
439    /// Remove an image by reference from local storage.
440    ///
441    /// When `force` is true, also removes the image even when other containers
442    /// reference it. The default implementation returns `AgentError::Unsupported`.
443    async fn remove_image(&self, _image: &str, _force: bool) -> Result<()> {
444        Err(AgentError::Unsupported(
445            "remove_image is not supported by this runtime".into(),
446        ))
447    }
448
449    /// Prune dangling / unused images from local storage.
450    ///
451    /// Returns a [`PruneResult`] describing what was removed. The default
452    /// implementation returns `AgentError::Unsupported`.
453    async fn prune_images(&self) -> Result<PruneResult> {
454        Err(AgentError::Unsupported(
455            "prune_images is not supported by this runtime".into(),
456        ))
457    }
458
459    /// Send a signal to a running container.
460    ///
461    /// When `signal` is `None`, the runtime sends `SIGKILL` (matching Docker's
462    /// `docker kill` default). Backends validate the signal name and reject
463    /// anything outside the standard POSIX set (`SIGKILL`, `SIGTERM`, `SIGINT`,
464    /// `SIGHUP`, `SIGUSR1`, `SIGUSR2`).
465    ///
466    /// Used by `POST /api/v1/containers/{id}/kill` and Docker-compat
467    /// `docker kill`. The default implementation returns
468    /// [`AgentError::Unsupported`].
469    async fn kill_container(&self, _id: &ContainerId, _signal: Option<&str>) -> Result<()> {
470        Err(AgentError::Unsupported(
471            "kill_container is not supported by this runtime".into(),
472        ))
473    }
474
475    /// Create a new tag pointing at an existing image.
476    ///
477    /// `source` is the reference to an already-cached image. `target` is the
478    /// new reference to create — it must be a full reference (repository + tag).
479    ///
480    /// Used by `POST /api/v1/images/tag` and Docker-compat `docker tag`. The
481    /// default implementation returns [`AgentError::Unsupported`].
482    async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
483        Err(AgentError::Unsupported(
484            "tag_image is not supported by this runtime".into(),
485        ))
486    }
487
488    /// Return rich inspect details for a container: published ports, attached
489    /// networks, first IPv4, health, and most-recent exit code.
490    ///
491    /// Runtimes implement this by translating the backend's native inspect
492    /// response (bollard's `ContainerInspectResponse` for Docker) into the
493    /// runtime-level [`ContainerInspectDetails`] struct. The API layer merges
494    /// these fields into `ContainerInfo` on `GET /api/v1/containers` and
495    /// `GET /api/v1/containers/{id}` (§3.15 of `ZLAYER_SDK_FIXES.md`).
496    ///
497    /// The default implementation returns [`ContainerInspectDetails::default`]
498    /// — an all-empty record, which the API layer treats as "this runtime
499    /// doesn't support rich inspect; skip all the extra fields". This keeps
500    /// non-Docker runtimes (Youki, WASM, Mock) backwards compatible; they can
501    /// override this later if they gain equivalent inspect capability.
502    async fn inspect_detailed(&self, _id: &ContainerId) -> Result<ContainerInspectDetails> {
503        Ok(ContainerInspectDetails::default())
504    }
505}
506
507/// Validate a signal name for [`Runtime::kill_container`].
508///
509/// Accepts both the `SIG`-prefixed form (`"SIGKILL"`) and the bare form
510/// (`"KILL"`). Returns the canonical uppercase `SIG`-prefixed name on success.
511///
512/// # Errors
513///
514/// Returns [`AgentError::InvalidSpec`] when `signal` is not one of the
515/// supported signals: `SIGKILL`, `SIGTERM`, `SIGINT`, `SIGHUP`, `SIGUSR1`,
516/// `SIGUSR2`.
517pub fn validate_signal(signal: &str) -> Result<String> {
518    let trimmed = signal.trim();
519    if trimmed.is_empty() {
520        return Err(AgentError::InvalidSpec(
521            "signal must not be empty".to_string(),
522        ));
523    }
524    let upper = trimmed.to_ascii_uppercase();
525    let canonical = if upper.starts_with("SIG") {
526        upper
527    } else {
528        format!("SIG{upper}")
529    };
530    match canonical.as_str() {
531        "SIGKILL" | "SIGTERM" | "SIGINT" | "SIGHUP" | "SIGUSR1" | "SIGUSR2" => Ok(canonical),
532        other => Err(AgentError::InvalidSpec(format!(
533            "unsupported signal '{other}'; allowed: SIGKILL, SIGTERM, SIGINT, SIGHUP, SIGUSR1, SIGUSR2"
534        ))),
535    }
536}
537
538/// Auth context injected into every container so it can talk back to the host
539/// API without needing external credentials.
540#[derive(Debug, Clone)]
541pub struct ContainerAuthContext {
542    /// Base URL of the `ZLayer` API, e.g. `"http://127.0.0.1:3669"`.
543    pub api_url: String,
544    /// JWT signing secret — used to mint per-container tokens at start time.
545    pub jwt_secret: String,
546    /// Absolute path of the Unix socket on the host (bind-mounted into Linux
547    /// containers; added to `writable_dirs` for macOS sandbox).
548    pub socket_path: String,
549}
550
551/// In-memory mock runtime for testing and development
552pub struct MockRuntime {
553    containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
554}
555
556impl MockRuntime {
557    #[must_use]
558    pub fn new() -> Self {
559        Self {
560            containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
561        }
562    }
563}
564
565impl Default for MockRuntime {
566    fn default() -> Self {
567        Self::new()
568    }
569}
570
571#[async_trait::async_trait]
572impl Runtime for MockRuntime {
573    async fn pull_image(&self, _image: &str) -> Result<()> {
574        self.pull_image_with_policy(_image, PullPolicy::IfNotPresent, None)
575            .await
576    }
577
578    async fn pull_image_with_policy(
579        &self,
580        _image: &str,
581        _policy: PullPolicy,
582        _auth: Option<&RegistryAuth>,
583    ) -> Result<()> {
584        // Mock: always succeeds
585        tokio::time::sleep(Duration::from_millis(100)).await;
586        Ok(())
587    }
588
589    async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
590        let mut containers = self.containers.write().await;
591        containers.insert(
592            id.clone(),
593            Container {
594                id: id.clone(),
595                state: ContainerState::Pending,
596                pid: None,
597                task: None,
598                overlay_ip: None,
599                health_monitor: None,
600                port_override: None,
601            },
602        );
603        Ok(())
604    }
605
606    async fn start_container(&self, id: &ContainerId) -> Result<()> {
607        let mut containers = self.containers.write().await;
608        if let Some(container) = containers.get_mut(id) {
609            container.state = ContainerState::Running;
610            container.pid = Some(std::process::id()); // Mock PID
611        }
612        Ok(())
613    }
614
615    async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
616        let mut containers = self.containers.write().await;
617        if let Some(container) = containers.get_mut(id) {
618            container.state = ContainerState::Exited { code: 0 };
619        }
620        Ok(())
621    }
622
623    async fn remove_container(&self, id: &ContainerId) -> Result<()> {
624        let mut containers = self.containers.write().await;
625        containers.remove(id);
626        Ok(())
627    }
628
629    async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
630        let containers = self.containers.read().await;
631        containers
632            .get(id)
633            .map(|c| c.state.clone())
634            .ok_or_else(|| AgentError::NotFound {
635                container: id.to_string(),
636                reason: "container not found".to_string(),
637            })
638    }
639
640    async fn container_logs(&self, _id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
641        let entries = vec![
642            LogEntry {
643                timestamp: chrono::Utc::now(),
644                stream: LogStream::Stdout,
645                message: "mock log line 1".to_string(),
646                source: LogSource::Container("mock".to_string()),
647                service: None,
648                deployment: None,
649            },
650            LogEntry {
651                timestamp: chrono::Utc::now(),
652                stream: LogStream::Stderr,
653                message: "mock error line".to_string(),
654                source: LogSource::Container("mock".to_string()),
655                service: None,
656                deployment: None,
657            },
658        ];
659        let skip = entries.len().saturating_sub(tail);
660        Ok(entries.into_iter().skip(skip).collect())
661    }
662
663    async fn exec(&self, _id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
664        Ok((0, cmd.join(" "), String::new()))
665    }
666
667    async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
668        // Mock: return dummy stats
669        let containers = self.containers.read().await;
670        if containers.contains_key(id) {
671            Ok(ContainerStats {
672                cpu_usage_usec: 1_000_000,       // 1 second
673                memory_bytes: 50 * 1024 * 1024,  // 50 MB
674                memory_limit: 256 * 1024 * 1024, // 256 MB
675                timestamp: std::time::Instant::now(),
676            })
677        } else {
678            Err(AgentError::NotFound {
679                container: id.to_string(),
680                reason: "container not found".to_string(),
681            })
682        }
683    }
684
685    async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
686        // Mock: simulate waiting for container to exit
687        let containers = self.containers.read().await;
688        if let Some(container) = containers.get(id) {
689            match &container.state {
690                ContainerState::Exited { code } => Ok(*code),
691                ContainerState::Failed { .. } => Ok(1),
692                _ => {
693                    // Simulate a brief wait and then return success
694                    drop(containers);
695                    tokio::time::sleep(Duration::from_millis(50)).await;
696                    Ok(0)
697                }
698            }
699        } else {
700            Err(AgentError::NotFound {
701                container: id.to_string(),
702                reason: "container not found".to_string(),
703            })
704        }
705    }
706
707    async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
708        // Mock: return dummy structured log entries
709        let containers = self.containers.read().await;
710        if containers.contains_key(id) {
711            let container_name = id.to_string();
712            Ok(vec![
713                LogEntry {
714                    timestamp: chrono::Utc::now(),
715                    stream: LogStream::Stdout,
716                    message: format!("[{container_name}] Container started"),
717                    source: LogSource::Container(container_name.clone()),
718                    service: None,
719                    deployment: None,
720                },
721                LogEntry {
722                    timestamp: chrono::Utc::now(),
723                    stream: LogStream::Stdout,
724                    message: format!("[{container_name}] Executing command..."),
725                    source: LogSource::Container(container_name.clone()),
726                    service: None,
727                    deployment: None,
728                },
729                LogEntry {
730                    timestamp: chrono::Utc::now(),
731                    stream: LogStream::Stdout,
732                    message: format!("[{container_name}] Command completed successfully"),
733                    source: LogSource::Container(container_name),
734                    service: None,
735                    deployment: None,
736                },
737            ])
738        } else {
739            Err(AgentError::NotFound {
740                container: id.to_string(),
741                reason: "container not found".to_string(),
742            })
743        }
744    }
745
746    async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
747        let containers = self.containers.read().await;
748        if let Some(container) = containers.get(id) {
749            Ok(container.pid)
750        } else {
751            Err(AgentError::NotFound {
752                container: id.to_string(),
753                reason: "container not found".to_string(),
754            })
755        }
756    }
757
758    async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
759        let containers = self.containers.read().await;
760        if containers.contains_key(id) {
761            // Mock: deterministic IP based on replica number (172.17.0.{replica+2})
762            #[allow(clippy::cast_possible_truncation)]
763            let last_octet = (id.replica + 2) as u8;
764            Ok(Some(IpAddr::V4(std::net::Ipv4Addr::new(
765                172, 17, 0, last_octet,
766            ))))
767        } else {
768            Err(AgentError::NotFound {
769                container: id.to_string(),
770                reason: "container not found".to_string(),
771            })
772        }
773    }
774
775    async fn list_images(&self) -> Result<Vec<ImageInfo>> {
776        Ok(Vec::new())
777    }
778
779    async fn remove_image(&self, _image: &str, _force: bool) -> Result<()> {
780        Ok(())
781    }
782
783    async fn prune_images(&self) -> Result<PruneResult> {
784        Ok(PruneResult::default())
785    }
786
787    async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
788        // Validate signal even in the mock so callers exercise the same error
789        // path. Default to SIGKILL when omitted.
790        let _canonical = validate_signal(signal.unwrap_or("SIGKILL"))?;
791        let mut containers = self.containers.write().await;
792        let container = containers.get_mut(id).ok_or_else(|| AgentError::NotFound {
793            container: id.to_string(),
794            reason: "container not found".to_string(),
795        })?;
796        container.state = ContainerState::Exited { code: 137 };
797        Ok(())
798    }
799
800    async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
801        // The in-memory mock doesn't store images; treat tag as a no-op success.
802        Ok(())
803    }
804}
805
806#[cfg(test)]
807mod tests {
808    use super::*;
809
810    #[tokio::test]
811    async fn test_mock_runtime() {
812        let runtime = MockRuntime::new();
813        let id = ContainerId {
814            service: "test".to_string(),
815            replica: 1,
816        };
817
818        runtime.pull_image("test:latest").await.unwrap();
819        runtime.create_container(&id, &mock_spec()).await.unwrap();
820        runtime.start_container(&id).await.unwrap();
821
822        let state = runtime.container_state(&id).await.unwrap();
823        assert_eq!(state, ContainerState::Running);
824    }
825
826    #[test]
827    fn validate_signal_accepts_known_signals() {
828        // SIG-prefixed form
829        assert_eq!(validate_signal("SIGKILL").unwrap(), "SIGKILL");
830        assert_eq!(validate_signal("SIGTERM").unwrap(), "SIGTERM");
831        assert_eq!(validate_signal("SIGINT").unwrap(), "SIGINT");
832        assert_eq!(validate_signal("SIGHUP").unwrap(), "SIGHUP");
833        assert_eq!(validate_signal("SIGUSR1").unwrap(), "SIGUSR1");
834        assert_eq!(validate_signal("SIGUSR2").unwrap(), "SIGUSR2");
835
836        // Bare form (no "SIG" prefix) should be canonicalised.
837        assert_eq!(validate_signal("KILL").unwrap(), "SIGKILL");
838        assert_eq!(validate_signal("term").unwrap(), "SIGTERM");
839        // Whitespace around the name is tolerated.
840        assert_eq!(validate_signal(" INT ").unwrap(), "SIGINT");
841    }
842
843    #[test]
844    fn validate_signal_rejects_unknown_or_empty() {
845        assert!(matches!(
846            validate_signal(""),
847            Err(AgentError::InvalidSpec(_))
848        ));
849        assert!(matches!(
850            validate_signal("   "),
851            Err(AgentError::InvalidSpec(_))
852        ));
853        assert!(matches!(
854            validate_signal("SIGSEGV"),
855            Err(AgentError::InvalidSpec(_))
856        ));
857        assert!(matches!(
858            validate_signal("NOPE"),
859            Err(AgentError::InvalidSpec(_))
860        ));
861        // Signals outside the POSIX allowlist are rejected even if real.
862        assert!(matches!(
863            validate_signal("SIGPIPE"),
864            Err(AgentError::InvalidSpec(_))
865        ));
866    }
867
868    #[tokio::test]
869    async fn mock_kill_container_defaults_to_sigkill() {
870        let runtime = MockRuntime::new();
871        let id = ContainerId {
872            service: "kill-me".to_string(),
873            replica: 0,
874        };
875        runtime.create_container(&id, &mock_spec()).await.unwrap();
876        runtime.start_container(&id).await.unwrap();
877
878        // `None` -> defaults to SIGKILL; returns Ok and marks the container
879        // as exited.
880        runtime.kill_container(&id, None).await.unwrap();
881        let state = runtime.container_state(&id).await.unwrap();
882        assert!(
883            matches!(state, ContainerState::Exited { code: 137 }),
884            "expected Exited(137), got {state:?}"
885        );
886    }
887
888    #[test]
889    fn wait_reason_serializes_as_snake_case() {
890        assert_eq!(
891            serde_json::to_string(&WaitReason::Exited).unwrap(),
892            "\"exited\""
893        );
894        assert_eq!(
895            serde_json::to_string(&WaitReason::Signal).unwrap(),
896            "\"signal\""
897        );
898        assert_eq!(
899            serde_json::to_string(&WaitReason::OomKilled).unwrap(),
900            "\"oom_killed\""
901        );
902        assert_eq!(
903            serde_json::to_string(&WaitReason::RuntimeError).unwrap(),
904            "\"runtime_error\""
905        );
906    }
907
908    #[test]
909    fn wait_reason_deserialize_roundtrip() {
910        for variant in [
911            WaitReason::Exited,
912            WaitReason::Signal,
913            WaitReason::OomKilled,
914            WaitReason::RuntimeError,
915        ] {
916            let s = serde_json::to_string(&variant).unwrap();
917            let back: WaitReason = serde_json::from_str(&s).unwrap();
918            assert_eq!(variant, back, "roundtrip failed for {variant:?}");
919        }
920    }
921
922    #[test]
923    fn signal_name_from_exit_code_known_signals() {
924        assert_eq!(signal_name_from_exit_code(137).as_deref(), Some("SIGKILL"));
925        assert_eq!(signal_name_from_exit_code(143).as_deref(), Some("SIGTERM"));
926        assert_eq!(signal_name_from_exit_code(130).as_deref(), Some("SIGINT"));
927        assert_eq!(signal_name_from_exit_code(129).as_deref(), Some("SIGHUP"));
928        assert_eq!(signal_name_from_exit_code(139).as_deref(), Some("SIGSEGV"));
929    }
930
931    #[test]
932    fn signal_name_from_exit_code_handles_unknown_and_normal() {
933        // Normal exits (<= 128) return None.
934        assert_eq!(signal_name_from_exit_code(0), None);
935        assert_eq!(signal_name_from_exit_code(1), None);
936        assert_eq!(signal_name_from_exit_code(128), None);
937
938        // Unknown signals produce a stable string form.
939        assert_eq!(
940            signal_name_from_exit_code(128 + 99).as_deref(),
941            Some("signal_99")
942        );
943    }
944
945    #[tokio::test]
946    async fn default_wait_outcome_delegates_to_wait_container() {
947        let runtime = MockRuntime::new();
948        let id = ContainerId {
949            service: "wait-test".to_string(),
950            replica: 0,
951        };
952        runtime.create_container(&id, &mock_spec()).await.unwrap();
953        runtime.start_container(&id).await.unwrap();
954
955        let outcome = runtime.wait_outcome(&id).await.unwrap();
956        // MockRuntime::wait_container returns 0 for running containers.
957        assert_eq!(outcome.exit_code, 0);
958        assert_eq!(outcome.reason, WaitReason::Exited);
959        assert!(outcome.signal.is_none());
960        assert!(outcome.finished_at.is_none());
961    }
962
963    #[tokio::test]
964    async fn mock_kill_container_rejects_bogus_signal() {
965        let runtime = MockRuntime::new();
966        let id = ContainerId {
967            service: "kill-me".to_string(),
968            replica: 0,
969        };
970        runtime.create_container(&id, &mock_spec()).await.unwrap();
971        runtime.start_container(&id).await.unwrap();
972
973        let err = runtime
974            .kill_container(&id, Some("SIGFOO"))
975            .await
976            .unwrap_err();
977        assert!(
978            matches!(err, AgentError::InvalidSpec(_)),
979            "expected InvalidSpec, got {err:?}"
980        );
981    }
982
983    fn mock_spec() -> ServiceSpec {
984        use zlayer_spec::*;
985        serde_yaml::from_str::<DeploymentSpec>(
986            r"
987version: v1
988deployment: test
989services:
990  test:
991    rtype: service
992    image:
993      name: test:latest
994    endpoints:
995      - name: http
996        protocol: http
997        port: 8080
998",
999        )
1000        .unwrap()
1001        .services
1002        .remove("test")
1003        .unwrap()
1004    }
1005}