zlayer_agent/runtime.rs
1//! Abstract container runtime interface
2//!
3//! Defines the Runtime trait that can be implemented for different container runtimes
4//! (containerd, CRI-O, etc.)
5
6use crate::cgroups_stats::ContainerStats;
7use crate::error::{AgentError, Result};
8use futures_util::Stream;
9use std::net::IpAddr;
10use std::pin::Pin;
11use std::time::Duration;
12use tokio::task::JoinHandle;
13use zlayer_observability::logs::{LogEntry, LogSource, LogStream};
14use zlayer_spec::{PullPolicy, RegistryAuth, ServiceSpec};
15
16/// Container state
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub enum ContainerState {
19 /// Container is being pulled/created
20 Pending,
21 /// Init actions are running
22 Initializing,
23 /// Container is running
24 Running,
25 /// Container is stopping
26 Stopping,
27 /// Container has exited
28 Exited { code: i32 },
29 /// Container failed
30 Failed { reason: String },
31}
32
33/// Container identifier
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub struct ContainerId {
36 pub service: String,
37 pub replica: u32,
38}
39
40impl std::fmt::Display for ContainerId {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 write!(f, "{}-rep-{}", self.service, self.replica)
43 }
44}
45
46/// Container handle
47pub struct Container {
48 pub id: ContainerId,
49 pub state: ContainerState,
50 pub pid: Option<u32>,
51 pub task: Option<JoinHandle<std::io::Result<()>>>,
52 /// Overlay network IP address assigned to this container
53 pub overlay_ip: Option<IpAddr>,
54 /// Health monitor task handle for this container
55 pub health_monitor: Option<JoinHandle<()>>,
56 /// Runtime-assigned port override (used by macOS sandbox where all
57 /// containers share the host network and need unique ports).
58 /// When `Some(port)`, the proxy should use this port instead of the
59 /// spec-declared endpoint port for this specific container's backend address.
60 pub port_override: Option<u16>,
61}
62
63/// Summary information about a cached image on the host runtime.
64#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
65pub struct ImageInfo {
66 /// Canonical image reference (e.g. `zachhandley/zlayer-manager:latest`).
67 pub reference: String,
68 /// Content-addressed digest if known (`sha256:...`). `None` when the
69 /// backend only tracks images by tag.
70 pub digest: Option<String>,
71 /// Total on-disk / in-cache size in bytes, when available.
72 pub size_bytes: Option<u64>,
73}
74
75/// Result of a prune operation.
76#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
77pub struct PruneResult {
78 /// Image references that were removed.
79 pub deleted: Vec<String>,
80 /// Bytes reclaimed from the cache. `0` when the backend cannot report.
81 pub space_reclaimed: u64,
82}
83
84/// Reason a container stopped running, as reported by [`Runtime::wait_outcome`].
85///
86/// Serialized as `snake_case` strings on the wire (`exited`, `signal`,
87/// `oom_killed`, `runtime_error`) so the API DTO can emit the reason as-is
88/// without a second translation layer.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
90#[serde(rename_all = "snake_case")]
91pub enum WaitReason {
92 /// Container exited normally.
93 Exited,
94 /// Container was killed by a signal (e.g. `SIGKILL`, `SIGTERM`).
95 Signal,
96 /// Container was killed by the OOM killer.
97 OomKilled,
98 /// Runtime-side failure (pre-start error, runtime crash, etc.).
99 RuntimeError,
100}
101
102/// Richer wait result returned by [`Runtime::wait_outcome`].
103///
104/// Backwards-compatible with [`Runtime::wait_container`] (which returns just
105/// `exit_code`). The API handler uses this to populate the extended
106/// `ContainerWaitResponse` fields (`reason`, `signal`, `finished_at`) while
107/// existing callers that only need the exit code can keep using
108/// `wait_container`.
109#[derive(Debug, Clone)]
110pub struct WaitOutcome {
111 /// Process exit code (0 = success). When the container was killed by
112 /// signal `N`, this is typically `128 + N`.
113 pub exit_code: i32,
114 /// Classification of the exit.
115 pub reason: WaitReason,
116 /// Signal name when `reason == WaitReason::Signal`, e.g. `"SIGKILL"`.
117 /// Derived from `exit_code - 128` on a best-effort basis.
118 pub signal: Option<String>,
119 /// Time the container exited, if the runtime reports it.
120 pub finished_at: Option<chrono::DateTime<chrono::Utc>>,
121}
122
123impl WaitOutcome {
124 /// Build a plain `Exited` outcome with no signal/timestamp metadata — the
125 /// default that matches the pre-§3.12 behaviour.
126 #[must_use]
127 pub fn exited(exit_code: i32) -> Self {
128 Self {
129 exit_code,
130 reason: WaitReason::Exited,
131 signal: None,
132 finished_at: None,
133 }
134 }
135}
136
137/// Map a signal-style exit code (`128 + N`) to a canonical signal name.
138///
139/// Recognises common POSIX signals and falls back to `signal_<n>` for
140/// unknown numbers so the caller always gets *something* readable.
141#[must_use]
142pub fn signal_name_from_exit_code(exit_code: i32) -> Option<String> {
143 if exit_code <= 128 {
144 return None;
145 }
146 let n = exit_code - 128;
147 let name = match n {
148 1 => "SIGHUP",
149 2 => "SIGINT",
150 3 => "SIGQUIT",
151 4 => "SIGILL",
152 6 => "SIGABRT",
153 7 => "SIGBUS",
154 8 => "SIGFPE",
155 9 => "SIGKILL",
156 10 => "SIGUSR1",
157 11 => "SIGSEGV",
158 12 => "SIGUSR2",
159 13 => "SIGPIPE",
160 14 => "SIGALRM",
161 15 => "SIGTERM",
162 17 => "SIGSTOP",
163 18 => "SIGCONT",
164 _ => return Some(format!("signal_{n}")),
165 };
166 Some(name.to_string())
167}
168
169/// One streaming event emitted by [`Runtime::exec_stream`].
170///
171/// Runtimes push these events as the exec'd command produces output. The final
172/// event for any successful stream is always an [`ExecEvent::Exit`] carrying
173/// the process's exit code.
174#[derive(Debug, Clone, PartialEq, Eq)]
175pub enum ExecEvent {
176 /// A chunk of stdout from the running command. Emitted line-by-line by
177 /// Docker; other runtimes may emit the full buffered output in one event.
178 Stdout(String),
179 /// A chunk of stderr from the running command.
180 Stderr(String),
181 /// The command has exited with this exit code. Always the final event.
182 Exit(i32),
183}
184
185/// Boxed async stream of [`ExecEvent`]s returned by [`Runtime::exec_stream`].
186pub type ExecEventStream = Pin<Box<dyn Stream<Item = ExecEvent> + Send>>;
187
188/// Per-network attachment reported by [`Runtime::inspect_detailed`].
189///
190/// Mirrors the subset of bollard's `EndpointSettings` that the API needs to
191/// populate `ContainerInfo.networks` for standalone containers. Kept in
192/// `zlayer-agent` (rather than `zlayer-spec`) because it's a runtime-level
193/// inspect result, not a deployment specification.
194#[derive(Debug, Clone, Default, PartialEq, Eq)]
195pub struct NetworkAttachmentDetail {
196 /// Network name as reported by the runtime (Docker's key in
197 /// `NetworkSettings.Networks`, e.g. `"bridge"` or a user-defined network
198 /// name).
199 pub network: String,
200 /// DNS aliases the container answers to on this network. Empty when the
201 /// runtime doesn't surface aliases.
202 pub aliases: Vec<String>,
203 /// Assigned IPv4 address on this network, if any. Empty strings are
204 /// normalised to `None`.
205 pub ipv4: Option<String>,
206}
207
208/// Per-container health detail reported by [`Runtime::inspect_detailed`].
209///
210/// Sourced directly from bollard's `ContainerState.health` (Docker's native
211/// healthcheck tracking). Our internal `HealthMonitor` in
212/// `crates/zlayer-agent/src/health.rs` drives service-level health events; for
213/// standalone containers the API reports the runtime-native status instead so
214/// that images with a baked-in `HEALTHCHECK` show up correctly.
215#[derive(Debug, Clone, Default, PartialEq, Eq)]
216pub struct HealthDetail {
217 /// One of `"none"`, `"starting"`, `"healthy"`, `"unhealthy"` (Docker's
218 /// `HealthStatusEnum`). Empty string is normalised to `"none"` upstream.
219 pub status: String,
220 /// Consecutive failing probe count, if the runtime reports it.
221 pub failing_streak: Option<u32>,
222 /// Output from the most recent failing probe, when available.
223 pub last_output: Option<String>,
224}
225
226/// Rich inspect details for a single container, returned by
227/// [`Runtime::inspect_detailed`].
228///
229/// Carries the fields `ContainerInfo` needs on top of the bare
230/// [`ContainerState`] reported by [`Runtime::container_state`]:
231/// published ports, attached networks, first IPv4, health, and `exit_code`.
232///
233/// Default is an all-empty record — that's what the default trait method
234/// returns for runtimes that don't (yet) implement rich inspect, and the API
235/// layer treats all fields as purely additive, so a default record still
236/// produces a backwards-compatible `ContainerInfo`.
237#[derive(Debug, Clone, Default, PartialEq, Eq)]
238pub struct ContainerInspectDetails {
239 /// Published port mappings (container → host), translated back from the
240 /// runtime's internal port-binding map.
241 pub ports: Vec<zlayer_spec::PortMapping>,
242 /// Networks the container is attached to, plus the aliases + IPv4 for each.
243 pub networks: Vec<NetworkAttachmentDetail>,
244 /// First non-empty IPv4 address found across the container's networks,
245 /// useful as a "primary" IP for simple clients that don't want to iterate
246 /// `networks`. `None` when the container isn't on any network with an IP.
247 pub ipv4: Option<String>,
248 /// Health status when the container has a Docker-native `HEALTHCHECK` or
249 /// the runtime otherwise reports a health state.
250 pub health: Option<HealthDetail>,
251 /// Most recent exit code, when the runtime reports one. `None` for
252 /// containers that are still running and have never exited.
253 pub exit_code: Option<i32>,
254}
255
256/// Abstract container runtime trait
257///
258/// This trait abstracts over different container runtimes (containerd, CRI-O, etc.)
259#[async_trait::async_trait]
260pub trait Runtime: Send + Sync {
261 /// Pull an image to local storage
262 async fn pull_image(&self, image: &str) -> Result<()>;
263
264 /// Pull an image to local storage with a specific policy.
265 ///
266 /// When `auth` is `Some`, the runtime uses those inline credentials for
267 /// the pull (§3.10 of `ZLAYER_SDK_FIXES.md`). When `auth` is `None`, the
268 /// runtime falls back to its existing credential-store lookup keyed by
269 /// registry hostname (or anonymous access when no match exists).
270 ///
271 /// Non-Docker runtimes may accept but ignore the `auth` argument — their
272 /// OCI puller (`zlayer-registry`) already resolves credentials from the
273 /// store by hostname, and inline auth is primarily a Docker-backend
274 /// concern. Ignoring it is safe: callers that need inline auth should use
275 /// the Docker runtime.
276 async fn pull_image_with_policy(
277 &self,
278 image: &str,
279 policy: PullPolicy,
280 auth: Option<&RegistryAuth>,
281 ) -> Result<()>;
282
283 /// Create a container
284 async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()>;
285
286 /// Start a container
287 async fn start_container(&self, id: &ContainerId) -> Result<()>;
288
289 /// Stop a container
290 async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()>;
291
292 /// Remove a container
293 async fn remove_container(&self, id: &ContainerId) -> Result<()>;
294
295 /// Get container state
296 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState>;
297
298 /// Get container logs as structured entries
299 async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>>;
300
301 /// Execute command in container
302 async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)>;
303
304 /// Execute a command in a container and stream stdout / stderr / exit
305 /// events as they are produced.
306 ///
307 /// The default implementation calls the buffered [`Runtime::exec`] and
308 /// emits everything as a single `Stdout` event, a single `Stderr` event,
309 /// and a final `Exit` event. Runtimes that support true streaming (e.g.
310 /// Docker via bollard) override this to produce line-by-line events as
311 /// the command runs.
312 ///
313 /// The returned stream always terminates with exactly one
314 /// [`ExecEvent::Exit`] as the final item on success. Errors that occur
315 /// before the stream is returned (e.g. container not found, failure to
316 /// create the exec) are surfaced via the outer `Result`; errors that
317 /// occur mid-stream are logged by the runtime and the stream closes.
318 async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
319 let (exit, stdout, stderr) = self.exec(id, cmd).await?;
320 let mut events: Vec<ExecEvent> = Vec::with_capacity(3);
321 if !stdout.is_empty() {
322 events.push(ExecEvent::Stdout(stdout));
323 }
324 if !stderr.is_empty() {
325 events.push(ExecEvent::Stderr(stderr));
326 }
327 events.push(ExecEvent::Exit(exit));
328 Ok(Box::pin(futures_util::stream::iter(events)))
329 }
330
331 /// Get container resource statistics from cgroups
332 ///
333 /// Returns CPU and memory statistics for the specified container.
334 /// Used for metrics collection and autoscaling decisions.
335 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats>;
336
337 /// Wait for a container to exit and return its exit code
338 ///
339 /// This method blocks until the container exits or an error occurs.
340 /// Used primarily for job execution to implement run-to-completion semantics.
341 async fn wait_container(&self, id: &ContainerId) -> Result<i32>;
342
343 /// Wait for a container to exit and return a [`WaitOutcome`] with richer
344 /// classification (exit code + reason + signal + `finished_at` timestamp).
345 ///
346 /// The default implementation delegates to [`Runtime::wait_container`] and
347 /// synthesizes a [`WaitReason::Exited`] result with no signal/timestamp.
348 /// Runtimes that can distinguish OOM kills, signal deaths, or report a
349 /// finished-at time (e.g. the Docker runtime, which has
350 /// `ContainerInspectResponse.state.oom_killed` / `.finished_at`) should
351 /// override this.
352 async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
353 let exit_code = self.wait_container(id).await?;
354 Ok(WaitOutcome::exited(exit_code))
355 }
356
357 /// Get container logs (stdout/stderr combined)
358 ///
359 /// Returns logs as structured entries.
360 /// Used to capture job output after completion.
361 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>>;
362
363 /// Get the PID of a container's main process
364 ///
365 /// Returns:
366 /// - `Ok(Some(pid))` for runtimes with real processes (Youki, Docker)
367 /// - `Ok(None)` for runtimes without separate PIDs (WASM in-process)
368 /// - `Err` if the container doesn't exist or there's an error
369 ///
370 /// Used for overlay network attachment and process management.
371 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>>;
372
373 /// Get the IP address of a container
374 ///
375 /// Returns:
376 /// - `Ok(Some(ip))` if the container has a known IP address
377 /// - `Ok(None)` if the container exists but has no IP assigned yet
378 /// - `Err` if the container doesn't exist or there's an error
379 ///
380 /// Used for proxy backend registration when overlay networking is unavailable.
381 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>>;
382
383 /// Get a runtime-assigned port override for a container.
384 ///
385 /// Returns:
386 /// - `Ok(Some(port))` if the runtime assigned a dynamic port to this container
387 /// - `Ok(None)` if the container should use the spec-declared endpoint port
388 ///
389 /// This exists for runtimes where all containers share the host network stack
390 /// (e.g., macOS sandbox). Without network namespaces, multiple replicas of
391 /// the same service would conflict on the same port. The runtime assigns
392 /// each replica a unique port and passes it via the `PORT` environment variable.
393 /// The proxy then routes to `container_ip:override_port` instead of
394 /// `container_ip:spec_port`.
395 ///
396 /// Runtimes with per-container networking (overlay, VMs, Docker) return `None`.
397 async fn get_container_port_override(&self, _id: &ContainerId) -> Result<Option<u16>> {
398 Ok(None)
399 }
400
401 /// Get the HCN namespace GUID of a Windows container.
402 ///
403 /// Windows-only. Linux/macOS runtimes have no HCN namespace concept and
404 /// return `Ok(None)`. The `HcsRuntime` overrides this to return the
405 /// namespace GUID attached during `create_container`; `OverlayManager`
406 /// then uses the GUID to register the container's assigned overlay IP
407 /// against the right HCN compartment (analogous to how Linux uses PID
408 /// to enter the netns via `/proc/{pid}/ns/net`).
409 #[cfg(target_os = "windows")]
410 async fn get_container_namespace_id(
411 &self,
412 _id: &ContainerId,
413 ) -> Result<Option<windows::core::GUID>> {
414 Ok(None)
415 }
416
417 /// Sync all named volumes associated with this container to S3.
418 ///
419 /// Called after a container is stopped but before it is removed, giving
420 /// the runtime a chance to flush persistent volume data to remote storage.
421 ///
422 /// The default implementation is a no-op. Runtimes that support S3-backed
423 /// volume sync (e.g., Youki with the `s3` feature) override this.
424 async fn sync_container_volumes(&self, _id: &ContainerId) -> Result<()> {
425 Ok(())
426 }
427
428 /// List all images managed by this runtime's image storage.
429 ///
430 /// The default implementation returns `AgentError::Unsupported` — individual
431 /// runtimes override this with backend-specific logic (bollard for Docker,
432 /// zlayer-registry cache walk for Youki, etc.).
433 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
434 Err(AgentError::Unsupported(
435 "list_images is not supported by this runtime".into(),
436 ))
437 }
438
439 /// Remove an image by reference from local storage.
440 ///
441 /// When `force` is true, also removes the image even when other containers
442 /// reference it. The default implementation returns `AgentError::Unsupported`.
443 async fn remove_image(&self, _image: &str, _force: bool) -> Result<()> {
444 Err(AgentError::Unsupported(
445 "remove_image is not supported by this runtime".into(),
446 ))
447 }
448
449 /// Prune dangling / unused images from local storage.
450 ///
451 /// Returns a [`PruneResult`] describing what was removed. The default
452 /// implementation returns `AgentError::Unsupported`.
453 async fn prune_images(&self) -> Result<PruneResult> {
454 Err(AgentError::Unsupported(
455 "prune_images is not supported by this runtime".into(),
456 ))
457 }
458
459 /// Send a signal to a running container.
460 ///
461 /// When `signal` is `None`, the runtime sends `SIGKILL` (matching Docker's
462 /// `docker kill` default). Backends validate the signal name and reject
463 /// anything outside the standard POSIX set (`SIGKILL`, `SIGTERM`, `SIGINT`,
464 /// `SIGHUP`, `SIGUSR1`, `SIGUSR2`).
465 ///
466 /// Used by `POST /api/v1/containers/{id}/kill` and Docker-compat
467 /// `docker kill`. The default implementation returns
468 /// [`AgentError::Unsupported`].
469 async fn kill_container(&self, _id: &ContainerId, _signal: Option<&str>) -> Result<()> {
470 Err(AgentError::Unsupported(
471 "kill_container is not supported by this runtime".into(),
472 ))
473 }
474
475 /// Create a new tag pointing at an existing image.
476 ///
477 /// `source` is the reference to an already-cached image. `target` is the
478 /// new reference to create — it must be a full reference (repository + tag).
479 ///
480 /// Used by `POST /api/v1/images/tag` and Docker-compat `docker tag`. The
481 /// default implementation returns [`AgentError::Unsupported`].
482 async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
483 Err(AgentError::Unsupported(
484 "tag_image is not supported by this runtime".into(),
485 ))
486 }
487
488 /// Return rich inspect details for a container: published ports, attached
489 /// networks, first IPv4, health, and most-recent exit code.
490 ///
491 /// Runtimes implement this by translating the backend's native inspect
492 /// response (bollard's `ContainerInspectResponse` for Docker) into the
493 /// runtime-level [`ContainerInspectDetails`] struct. The API layer merges
494 /// these fields into `ContainerInfo` on `GET /api/v1/containers` and
495 /// `GET /api/v1/containers/{id}` (§3.15 of `ZLAYER_SDK_FIXES.md`).
496 ///
497 /// The default implementation returns [`ContainerInspectDetails::default`]
498 /// — an all-empty record, which the API layer treats as "this runtime
499 /// doesn't support rich inspect; skip all the extra fields". This keeps
500 /// non-Docker runtimes (Youki, WASM, Mock) backwards compatible; they can
501 /// override this later if they gain equivalent inspect capability.
502 async fn inspect_detailed(&self, _id: &ContainerId) -> Result<ContainerInspectDetails> {
503 Ok(ContainerInspectDetails::default())
504 }
505}
506
507/// Validate a signal name for [`Runtime::kill_container`].
508///
509/// Accepts both the `SIG`-prefixed form (`"SIGKILL"`) and the bare form
510/// (`"KILL"`). Returns the canonical uppercase `SIG`-prefixed name on success.
511///
512/// # Errors
513///
514/// Returns [`AgentError::InvalidSpec`] when `signal` is not one of the
515/// supported signals: `SIGKILL`, `SIGTERM`, `SIGINT`, `SIGHUP`, `SIGUSR1`,
516/// `SIGUSR2`.
517pub fn validate_signal(signal: &str) -> Result<String> {
518 let trimmed = signal.trim();
519 if trimmed.is_empty() {
520 return Err(AgentError::InvalidSpec(
521 "signal must not be empty".to_string(),
522 ));
523 }
524 let upper = trimmed.to_ascii_uppercase();
525 let canonical = if upper.starts_with("SIG") {
526 upper
527 } else {
528 format!("SIG{upper}")
529 };
530 match canonical.as_str() {
531 "SIGKILL" | "SIGTERM" | "SIGINT" | "SIGHUP" | "SIGUSR1" | "SIGUSR2" => Ok(canonical),
532 other => Err(AgentError::InvalidSpec(format!(
533 "unsupported signal '{other}'; allowed: SIGKILL, SIGTERM, SIGINT, SIGHUP, SIGUSR1, SIGUSR2"
534 ))),
535 }
536}
537
538/// Auth context injected into every container so it can talk back to the host
539/// API without needing external credentials.
540#[derive(Debug, Clone)]
541pub struct ContainerAuthContext {
542 /// Base URL of the `ZLayer` API, e.g. `"http://127.0.0.1:3669"`.
543 pub api_url: String,
544 /// JWT signing secret — used to mint per-container tokens at start time.
545 pub jwt_secret: String,
546 /// Absolute path of the Unix socket on the host (bind-mounted into Linux
547 /// containers; added to `writable_dirs` for macOS sandbox).
548 pub socket_path: String,
549}
550
551/// In-memory mock runtime for testing and development
552pub struct MockRuntime {
553 containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
554}
555
556impl MockRuntime {
557 #[must_use]
558 pub fn new() -> Self {
559 Self {
560 containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
561 }
562 }
563}
564
565impl Default for MockRuntime {
566 fn default() -> Self {
567 Self::new()
568 }
569}
570
571#[async_trait::async_trait]
572impl Runtime for MockRuntime {
573 async fn pull_image(&self, _image: &str) -> Result<()> {
574 self.pull_image_with_policy(_image, PullPolicy::IfNotPresent, None)
575 .await
576 }
577
578 async fn pull_image_with_policy(
579 &self,
580 _image: &str,
581 _policy: PullPolicy,
582 _auth: Option<&RegistryAuth>,
583 ) -> Result<()> {
584 // Mock: always succeeds
585 tokio::time::sleep(Duration::from_millis(100)).await;
586 Ok(())
587 }
588
589 async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
590 let mut containers = self.containers.write().await;
591 containers.insert(
592 id.clone(),
593 Container {
594 id: id.clone(),
595 state: ContainerState::Pending,
596 pid: None,
597 task: None,
598 overlay_ip: None,
599 health_monitor: None,
600 port_override: None,
601 },
602 );
603 Ok(())
604 }
605
606 async fn start_container(&self, id: &ContainerId) -> Result<()> {
607 let mut containers = self.containers.write().await;
608 if let Some(container) = containers.get_mut(id) {
609 container.state = ContainerState::Running;
610 container.pid = Some(std::process::id()); // Mock PID
611 }
612 Ok(())
613 }
614
615 async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
616 let mut containers = self.containers.write().await;
617 if let Some(container) = containers.get_mut(id) {
618 container.state = ContainerState::Exited { code: 0 };
619 }
620 Ok(())
621 }
622
623 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
624 let mut containers = self.containers.write().await;
625 containers.remove(id);
626 Ok(())
627 }
628
629 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
630 let containers = self.containers.read().await;
631 containers
632 .get(id)
633 .map(|c| c.state.clone())
634 .ok_or_else(|| AgentError::NotFound {
635 container: id.to_string(),
636 reason: "container not found".to_string(),
637 })
638 }
639
640 async fn container_logs(&self, _id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
641 let entries = vec![
642 LogEntry {
643 timestamp: chrono::Utc::now(),
644 stream: LogStream::Stdout,
645 message: "mock log line 1".to_string(),
646 source: LogSource::Container("mock".to_string()),
647 service: None,
648 deployment: None,
649 },
650 LogEntry {
651 timestamp: chrono::Utc::now(),
652 stream: LogStream::Stderr,
653 message: "mock error line".to_string(),
654 source: LogSource::Container("mock".to_string()),
655 service: None,
656 deployment: None,
657 },
658 ];
659 let skip = entries.len().saturating_sub(tail);
660 Ok(entries.into_iter().skip(skip).collect())
661 }
662
663 async fn exec(&self, _id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
664 Ok((0, cmd.join(" "), String::new()))
665 }
666
667 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
668 // Mock: return dummy stats
669 let containers = self.containers.read().await;
670 if containers.contains_key(id) {
671 Ok(ContainerStats {
672 cpu_usage_usec: 1_000_000, // 1 second
673 memory_bytes: 50 * 1024 * 1024, // 50 MB
674 memory_limit: 256 * 1024 * 1024, // 256 MB
675 timestamp: std::time::Instant::now(),
676 })
677 } else {
678 Err(AgentError::NotFound {
679 container: id.to_string(),
680 reason: "container not found".to_string(),
681 })
682 }
683 }
684
685 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
686 // Mock: simulate waiting for container to exit
687 let containers = self.containers.read().await;
688 if let Some(container) = containers.get(id) {
689 match &container.state {
690 ContainerState::Exited { code } => Ok(*code),
691 ContainerState::Failed { .. } => Ok(1),
692 _ => {
693 // Simulate a brief wait and then return success
694 drop(containers);
695 tokio::time::sleep(Duration::from_millis(50)).await;
696 Ok(0)
697 }
698 }
699 } else {
700 Err(AgentError::NotFound {
701 container: id.to_string(),
702 reason: "container not found".to_string(),
703 })
704 }
705 }
706
707 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
708 // Mock: return dummy structured log entries
709 let containers = self.containers.read().await;
710 if containers.contains_key(id) {
711 let container_name = id.to_string();
712 Ok(vec![
713 LogEntry {
714 timestamp: chrono::Utc::now(),
715 stream: LogStream::Stdout,
716 message: format!("[{container_name}] Container started"),
717 source: LogSource::Container(container_name.clone()),
718 service: None,
719 deployment: None,
720 },
721 LogEntry {
722 timestamp: chrono::Utc::now(),
723 stream: LogStream::Stdout,
724 message: format!("[{container_name}] Executing command..."),
725 source: LogSource::Container(container_name.clone()),
726 service: None,
727 deployment: None,
728 },
729 LogEntry {
730 timestamp: chrono::Utc::now(),
731 stream: LogStream::Stdout,
732 message: format!("[{container_name}] Command completed successfully"),
733 source: LogSource::Container(container_name),
734 service: None,
735 deployment: None,
736 },
737 ])
738 } else {
739 Err(AgentError::NotFound {
740 container: id.to_string(),
741 reason: "container not found".to_string(),
742 })
743 }
744 }
745
746 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
747 let containers = self.containers.read().await;
748 if let Some(container) = containers.get(id) {
749 Ok(container.pid)
750 } else {
751 Err(AgentError::NotFound {
752 container: id.to_string(),
753 reason: "container not found".to_string(),
754 })
755 }
756 }
757
758 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
759 let containers = self.containers.read().await;
760 if containers.contains_key(id) {
761 // Mock: deterministic IP based on replica number (172.17.0.{replica+2})
762 #[allow(clippy::cast_possible_truncation)]
763 let last_octet = (id.replica + 2) as u8;
764 Ok(Some(IpAddr::V4(std::net::Ipv4Addr::new(
765 172, 17, 0, last_octet,
766 ))))
767 } else {
768 Err(AgentError::NotFound {
769 container: id.to_string(),
770 reason: "container not found".to_string(),
771 })
772 }
773 }
774
775 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
776 Ok(Vec::new())
777 }
778
779 async fn remove_image(&self, _image: &str, _force: bool) -> Result<()> {
780 Ok(())
781 }
782
783 async fn prune_images(&self) -> Result<PruneResult> {
784 Ok(PruneResult::default())
785 }
786
787 async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
788 // Validate signal even in the mock so callers exercise the same error
789 // path. Default to SIGKILL when omitted.
790 let _canonical = validate_signal(signal.unwrap_or("SIGKILL"))?;
791 let mut containers = self.containers.write().await;
792 let container = containers.get_mut(id).ok_or_else(|| AgentError::NotFound {
793 container: id.to_string(),
794 reason: "container not found".to_string(),
795 })?;
796 container.state = ContainerState::Exited { code: 137 };
797 Ok(())
798 }
799
800 async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
801 // The in-memory mock doesn't store images; treat tag as a no-op success.
802 Ok(())
803 }
804}
805
806#[cfg(test)]
807mod tests {
808 use super::*;
809
810 #[tokio::test]
811 async fn test_mock_runtime() {
812 let runtime = MockRuntime::new();
813 let id = ContainerId {
814 service: "test".to_string(),
815 replica: 1,
816 };
817
818 runtime.pull_image("test:latest").await.unwrap();
819 runtime.create_container(&id, &mock_spec()).await.unwrap();
820 runtime.start_container(&id).await.unwrap();
821
822 let state = runtime.container_state(&id).await.unwrap();
823 assert_eq!(state, ContainerState::Running);
824 }
825
826 #[test]
827 fn validate_signal_accepts_known_signals() {
828 // SIG-prefixed form
829 assert_eq!(validate_signal("SIGKILL").unwrap(), "SIGKILL");
830 assert_eq!(validate_signal("SIGTERM").unwrap(), "SIGTERM");
831 assert_eq!(validate_signal("SIGINT").unwrap(), "SIGINT");
832 assert_eq!(validate_signal("SIGHUP").unwrap(), "SIGHUP");
833 assert_eq!(validate_signal("SIGUSR1").unwrap(), "SIGUSR1");
834 assert_eq!(validate_signal("SIGUSR2").unwrap(), "SIGUSR2");
835
836 // Bare form (no "SIG" prefix) should be canonicalised.
837 assert_eq!(validate_signal("KILL").unwrap(), "SIGKILL");
838 assert_eq!(validate_signal("term").unwrap(), "SIGTERM");
839 // Whitespace around the name is tolerated.
840 assert_eq!(validate_signal(" INT ").unwrap(), "SIGINT");
841 }
842
843 #[test]
844 fn validate_signal_rejects_unknown_or_empty() {
845 assert!(matches!(
846 validate_signal(""),
847 Err(AgentError::InvalidSpec(_))
848 ));
849 assert!(matches!(
850 validate_signal(" "),
851 Err(AgentError::InvalidSpec(_))
852 ));
853 assert!(matches!(
854 validate_signal("SIGSEGV"),
855 Err(AgentError::InvalidSpec(_))
856 ));
857 assert!(matches!(
858 validate_signal("NOPE"),
859 Err(AgentError::InvalidSpec(_))
860 ));
861 // Signals outside the POSIX allowlist are rejected even if real.
862 assert!(matches!(
863 validate_signal("SIGPIPE"),
864 Err(AgentError::InvalidSpec(_))
865 ));
866 }
867
868 #[tokio::test]
869 async fn mock_kill_container_defaults_to_sigkill() {
870 let runtime = MockRuntime::new();
871 let id = ContainerId {
872 service: "kill-me".to_string(),
873 replica: 0,
874 };
875 runtime.create_container(&id, &mock_spec()).await.unwrap();
876 runtime.start_container(&id).await.unwrap();
877
878 // `None` -> defaults to SIGKILL; returns Ok and marks the container
879 // as exited.
880 runtime.kill_container(&id, None).await.unwrap();
881 let state = runtime.container_state(&id).await.unwrap();
882 assert!(
883 matches!(state, ContainerState::Exited { code: 137 }),
884 "expected Exited(137), got {state:?}"
885 );
886 }
887
888 #[test]
889 fn wait_reason_serializes_as_snake_case() {
890 assert_eq!(
891 serde_json::to_string(&WaitReason::Exited).unwrap(),
892 "\"exited\""
893 );
894 assert_eq!(
895 serde_json::to_string(&WaitReason::Signal).unwrap(),
896 "\"signal\""
897 );
898 assert_eq!(
899 serde_json::to_string(&WaitReason::OomKilled).unwrap(),
900 "\"oom_killed\""
901 );
902 assert_eq!(
903 serde_json::to_string(&WaitReason::RuntimeError).unwrap(),
904 "\"runtime_error\""
905 );
906 }
907
908 #[test]
909 fn wait_reason_deserialize_roundtrip() {
910 for variant in [
911 WaitReason::Exited,
912 WaitReason::Signal,
913 WaitReason::OomKilled,
914 WaitReason::RuntimeError,
915 ] {
916 let s = serde_json::to_string(&variant).unwrap();
917 let back: WaitReason = serde_json::from_str(&s).unwrap();
918 assert_eq!(variant, back, "roundtrip failed for {variant:?}");
919 }
920 }
921
922 #[test]
923 fn signal_name_from_exit_code_known_signals() {
924 assert_eq!(signal_name_from_exit_code(137).as_deref(), Some("SIGKILL"));
925 assert_eq!(signal_name_from_exit_code(143).as_deref(), Some("SIGTERM"));
926 assert_eq!(signal_name_from_exit_code(130).as_deref(), Some("SIGINT"));
927 assert_eq!(signal_name_from_exit_code(129).as_deref(), Some("SIGHUP"));
928 assert_eq!(signal_name_from_exit_code(139).as_deref(), Some("SIGSEGV"));
929 }
930
931 #[test]
932 fn signal_name_from_exit_code_handles_unknown_and_normal() {
933 // Normal exits (<= 128) return None.
934 assert_eq!(signal_name_from_exit_code(0), None);
935 assert_eq!(signal_name_from_exit_code(1), None);
936 assert_eq!(signal_name_from_exit_code(128), None);
937
938 // Unknown signals produce a stable string form.
939 assert_eq!(
940 signal_name_from_exit_code(128 + 99).as_deref(),
941 Some("signal_99")
942 );
943 }
944
945 #[tokio::test]
946 async fn default_wait_outcome_delegates_to_wait_container() {
947 let runtime = MockRuntime::new();
948 let id = ContainerId {
949 service: "wait-test".to_string(),
950 replica: 0,
951 };
952 runtime.create_container(&id, &mock_spec()).await.unwrap();
953 runtime.start_container(&id).await.unwrap();
954
955 let outcome = runtime.wait_outcome(&id).await.unwrap();
956 // MockRuntime::wait_container returns 0 for running containers.
957 assert_eq!(outcome.exit_code, 0);
958 assert_eq!(outcome.reason, WaitReason::Exited);
959 assert!(outcome.signal.is_none());
960 assert!(outcome.finished_at.is_none());
961 }
962
963 #[tokio::test]
964 async fn mock_kill_container_rejects_bogus_signal() {
965 let runtime = MockRuntime::new();
966 let id = ContainerId {
967 service: "kill-me".to_string(),
968 replica: 0,
969 };
970 runtime.create_container(&id, &mock_spec()).await.unwrap();
971 runtime.start_container(&id).await.unwrap();
972
973 let err = runtime
974 .kill_container(&id, Some("SIGFOO"))
975 .await
976 .unwrap_err();
977 assert!(
978 matches!(err, AgentError::InvalidSpec(_)),
979 "expected InvalidSpec, got {err:?}"
980 );
981 }
982
983 fn mock_spec() -> ServiceSpec {
984 use zlayer_spec::*;
985 serde_yaml::from_str::<DeploymentSpec>(
986 r"
987version: v1
988deployment: test
989services:
990 test:
991 rtype: service
992 image:
993 name: test:latest
994 endpoints:
995 - name: http
996 protocol: http
997 port: 8080
998",
999 )
1000 .unwrap()
1001 .services
1002 .remove("test")
1003 .unwrap()
1004 }
1005}