Skip to main content

zlayer_agent/runtimes/
composite.rs

1//! Composite runtime that dispatches per-container to a primary + optional delegate.
2//!
3//! The [`CompositeRuntime`] owns a *primary* runtime (the node-native runtime —
4//! e.g. `HcsRuntime` on Windows, `YoukiRuntime` on Linux, Docker elsewhere) and
5//! an optional *delegate* runtime used for foreign-OS workloads (e.g. a WSL2
6//! delegate on Windows that runs Linux containers). Each call is routed based
7//! on the container's identity:
8//!
9//! * **[`Runtime::create_container`]** consults
10//!   [`ServiceSpec::platform`](zlayer_spec::ServiceSpec) first; when the
11//!   spec's OS targets the delegate we route there, otherwise primary. When
12//!   `platform` is `None`, a secondary **image-OS cache** (populated by
13//!   [`CompositeRuntime::record_image_os`] from OCI manifest inspection at
14//!   pull time) is consulted. If both are unknown we fall through to the
15//!   primary. **Strict policy (H-7):** if either source identifies the
16//!   workload as Linux and this node has no delegate configured, dispatch
17//!   returns [`AgentError::RouteToPeer`] so the scheduler can re-place the
18//!   workload on a Linux peer — the old permissive "fall through to primary"
19//!   behavior is gone.
20//! * All subsequent per-container operations (start/stop/remove/logs/exec/…)
21//!   look up the container in an internal **dispatch cache** that records
22//!   which runtime created it. This guarantees the same runtime sees the
23//!   container for its whole lifecycle, even after daemon restarts within
24//!   the same process.
25//! * Cross-cutting image operations (`pull_image`, `pull_image_with_policy`,
26//!   `list_images`, `prune_images`) fan out to both runtimes — we cannot know
27//!   in advance which runtime will execute a pulled image, and merged image
28//!   listings give users a single coherent view. `remove_image` / `tag_image`
29//!   try primary first and fall back to delegate.
30//!
31//! The dispatch cache is populated on `create_container` and cleared on
32//! `remove_container`. Looking up an unknown id yields
33//! [`AgentError::NotFound`], which surfaces as a clean 404 at the API layer
34//! rather than silently forwarding to the wrong runtime.
35
36use std::collections::HashMap;
37use std::net::IpAddr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use async_trait::async_trait;
42use tokio::sync::RwLock;
43use zlayer_observability::logs::{LogEntry, LogStream};
44use zlayer_spec::{OsKind, PullPolicy, RegistryAuth, RuntimeIsolation, ServiceSpec};
45
46use crate::cgroups_stats::ContainerStats;
47use crate::error::{AgentError, Result};
48use crate::runtime::{
49    ContainerId, ContainerInspectDetails, ContainerResourceUpdate, ContainerState,
50    ContainerUpdateOutcome, ExecEventStream, ImageInfo, ImageInspectInfo, ImageStoreHandles,
51    LogChannel, LogChunk, LogsStream, LogsStreamOptions, OverlayAttachKind, PruneResult, Runtime,
52    StatsSample, StatsStream, WaitCondition, WaitOutcome,
53};
54
55/// Which underlying runtime a given container was dispatched to.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57enum DispatchTarget {
58    Primary,
59    Delegate,
60    /// The Apple-Virtualization (VZ) delegate (macOS only). Selected
61    /// automatically for `com.zlayer.runtime=vz` base bundles, or per-service
62    /// via the `com.zlayer.isolation=vz` label.
63    Vz,
64    /// The Apple-Virtualization **Linux-guest** delegate (macOS only). The
65    /// default Linux path on macOS: selected for Linux images, the
66    /// `com.zlayer.runtime=vz-linux` marker, or the
67    /// `com.zlayer.isolation=vz-linux` label.
68    VzLinux,
69}
70
71/// Routes each container to either the primary runtime or an optional delegate.
72///
73/// See the module-level documentation for the dispatch rules.
74pub struct CompositeRuntime {
75    primary: Arc<dyn Runtime>,
76    delegate: Option<Arc<dyn Runtime>>,
77    /// Opt-in Apple-Virtualization delegate (macOS). Selected only when a
78    /// service carries `com.zlayer.isolation=vz`.
79    vz: Option<Arc<dyn Runtime>>,
80    /// Apple-Virtualization Linux-guest delegate (macOS). When present, it is
81    /// the default runtime for Linux images on this node; libkrun
82    /// (`delegate`) is then reachable only via `com.zlayer.isolation=vm`.
83    vz_linux: Option<Arc<dyn Runtime>>,
84    /// Per-container dispatch cache. Populated on `create_container`, removed
85    /// on `remove_container`.
86    dispatch: Arc<RwLock<HashMap<ContainerId, DispatchTarget>>>,
87    /// Image-OS cache consulted when a spec has no explicit `platform`.
88    /// Populated by [`CompositeRuntime::record_image_os`], which is driven
89    /// from [`zlayer_registry::fetch_image_os`] during `pull_image*`.
90    image_os: Arc<RwLock<HashMap<String, OsKind>>>,
91    /// Image runtime-marker cache (the `com.zlayer.runtime` manifest
92    /// annotation, e.g. `"vz"`). Populated from
93    /// [`zlayer_registry::fetch_image_runtime_marker`] during `pull_image*` so
94    /// `select_for` can auto-detect a VZ base bundle and prefer the VZ runtime
95    /// for it without requiring a per-service label.
96    image_runtime: Arc<RwLock<HashMap<String, String>>>,
97    /// Image-declared isolation default cache (the `com.zlayer.isolation` OCI
98    /// config LABEL, e.g. `"sandbox"` / `"vz-linux"`). Populated from
99    /// [`zlayer_registry::fetch_image_isolation_marker_in_cache_only`] during
100    /// `pull_image*` so `select_for` can honor an image's declared default
101    /// without requiring a per-service label — at a precedence BELOW an explicit
102    /// create-request `spec.runtime` / `com.zlayer.isolation` label but ABOVE
103    /// the `com.zlayer.runtime` manifest marker.
104    image_isolation: Arc<RwLock<HashMap<String, String>>>,
105    /// Filesystem paths of the persistent blob caches that the runtimes pull
106    /// into, tried IN ORDER for image-OS / runtime-marker inspection. Typically:
107    ///
108    /// 1. the VZ-Linux runtime's `{data_dir}/vz/linux/images/blobs.redb` (the
109    ///    delegate that actually runs the Linux workload), and
110    /// 2. the primary Sandbox runtime's `{data_dir}/images/blobs.redb`.
111    ///
112    /// Both stores matter because `pull_image` pulls into BOTH (primary first,
113    /// then VZ-Linux), and either pull may short-circuit under
114    /// `PullPolicy::IfNotPresent` when its rootfs already exists — leaving the
115    /// manifest/config in only ONE of the two caches. Inspection therefore
116    /// probes them in order and stops at the first store that resolves the OS,
117    /// LOCAL-ONLY via [`zlayer_registry::fetch_image_os_in_cache_only`] — so an
118    /// already-pulled Linux image is detected as Linux (and routed to VZ-Linux)
119    /// with NO network call, even under a Docker Hub rate-limit. For the OS
120    /// dispatch path there is intentionally **no** network fallback: a local
121    /// miss yields "OS unknown" and dispatch uses its safe macOS default rather
122    /// than risking a 429 (see [`CompositeRuntime::inspect_image_os`]).
123    os_inspect_cache_paths: Vec<std::path::PathBuf>,
124}
125
126impl CompositeRuntime {
127    /// Construct a new composite runtime.
128    ///
129    /// `primary` handles containers whose platform matches the host node.
130    /// `delegate`, when present, handles foreign-OS containers (currently:
131    /// Linux containers on a Windows host via the WSL2 delegate runtime).
132    #[must_use]
133    pub fn new(primary: Arc<dyn Runtime>, delegate: Option<Arc<dyn Runtime>>) -> Self {
134        Self {
135            primary,
136            delegate,
137            vz: None,
138            vz_linux: None,
139            dispatch: Arc::new(RwLock::new(HashMap::new())),
140            image_os: Arc::new(RwLock::new(HashMap::new())),
141            image_runtime: Arc::new(RwLock::new(HashMap::new())),
142            image_isolation: Arc::new(RwLock::new(HashMap::new())),
143            os_inspect_cache_paths: Vec::new(),
144        }
145    }
146
147    /// Point image-OS / runtime-marker inspection at a single persistent blob
148    /// cache the runtimes pull into, so the OS of an already-pulled image
149    /// resolves from the LOCAL config blob with no network round-trip.
150    ///
151    /// Convenience wrapper over [`CompositeRuntime::with_os_inspect_cache_paths`]
152    /// for callers that only have one store. `path` is the on-disk blob-cache
153    /// file (e.g. the VZ-Linux runtime's `{data_dir}/vz/linux/images/blobs.redb`).
154    #[must_use]
155    pub fn with_os_inspect_cache_path(self, path: Option<std::path::PathBuf>) -> Self {
156        self.with_os_inspect_cache_paths(path.into_iter().collect())
157    }
158
159    /// Point image-OS / runtime-marker inspection at an ORDERED list of
160    /// persistent blob caches the runtimes pull into.
161    ///
162    /// Inspection probes each store LOCAL-ONLY (no network) in order and stops
163    /// at the first that resolves the image's OS / marker. This matters because
164    /// `pull_image` pulls into BOTH the VZ-Linux store and the primary Sandbox
165    /// store, and either pull may short-circuit under `PullPolicy::IfNotPresent`
166    /// when its rootfs already exists — leaving the manifest/config in only ONE
167    /// of the two caches. Probing both (VZ-Linux first, then primary) is what
168    /// lets a locally-cached Linux image route to VZ-Linux under a Docker Hub
169    /// rate-limit (see [`zlayer_registry::fetch_image_os_in_cache_only`]).
170    #[must_use]
171    pub fn with_os_inspect_cache_paths(mut self, paths: Vec<std::path::PathBuf>) -> Self {
172        self.os_inspect_cache_paths = paths;
173        self
174    }
175
176    /// True when `image`'s reference is already present in ANY configured
177    /// backend's image store — the same local resolution `create_container`
178    /// launches from (`list_images` merges every backend and, on the Sandbox
179    /// store, surfaces a locally-built image by the `ref` file it wrote, even
180    /// when no manifest/digest was ever pulled). Drives the `IfNotPresent`
181    /// short-circuit so a built/never-pulled image is treated as present rather
182    /// than triggering a doomed remote manifest fetch. A `list_images` error
183    /// (e.g. every backend reports `Unsupported`) is treated as "unknown" →
184    /// `false`, so we fall back to the normal pull rather than mask the image.
185    async fn image_present_locally(&self, image: &str) -> bool {
186        match self.list_images().await {
187            Ok(images) => images.iter().any(|info| info.reference == image),
188            Err(e) => {
189                tracing::debug!(
190                    image,
191                    error = %e,
192                    "composite: list_images unavailable for the IfNotPresent presence check; \
193                     proceeding with a normal pull",
194                );
195                false
196            }
197        }
198    }
199
200    /// Resolve `image`'s OS for **dispatch**, probing each configured local blob
201    /// cache in order, **LOCAL-ONLY — never a network call**.
202    ///
203    /// This is the dispatch-population path: it runs inside `pull_image*` purely
204    /// to fill the image-OS cache that [`CompositeRuntime::select_for`] consults.
205    /// It MUST NOT touch the wire. The image's layers have already been pulled
206    /// and extracted by the time we get here, and the runtimes that did the pull
207    /// (VZ-Linux / Sandbox) wrote the manifest+config into the very blob caches
208    /// `os_inspect_cache_paths` points at — so the OS is knowable with zero
209    /// network round-trips.
210    ///
211    /// The old code fell back to a network inspection (`fetch_image_os`) when no
212    /// local cache resolved the OS. That network call was reachable on a Docker
213    /// Hub 429, and a failed inspection left the cache empty → a cached Linux
214    /// image (e.g. `alpine`) fell through to the Seatbelt sandbox (`Primary`),
215    /// which cannot exec a Linux ELF (exit 127). The network fallback is gone:
216    /// a registry rate-limit can no longer break dispatch here. A genuine local
217    /// miss simply returns `Ok(None)` (dispatch then uses its safe macOS
218    /// fallthrough), and it never errors or blocks.
219    async fn inspect_image_os(
220        &self,
221        image: &str,
222    ) -> std::result::Result<Option<OsKind>, zlayer_registry::RegistryError> {
223        // A natively-built image (macOS Seatbelt) records its platform in a
224        // `metadata.json` sidecar beside its rootfs, NOT in the blob cache — it
225        // never round-trips a registry, so nothing populates the manifest/config
226        // blob the cache probe below reads. Consult the sidecar FIRST: it is the
227        // durable source of truth (a plain file, no redb single-writer
228        // contention), and a `darwin` sidecar resolves to `OsKind::Macos` so
229        // `select_for` routes the image to the Seatbelt primary with NO
230        // `com.zlayer.isolation` label.
231        if let Some(os) = self.image_os_from_local_sidecar(image).await {
232            return Ok(Some(os));
233        }
234
235        for path in &self.os_inspect_cache_paths {
236            match zlayer_registry::CacheType::persistent_at(path)
237                .build()
238                .await
239            {
240                Ok(cache) => {
241                    match zlayer_registry::fetch_image_os_in_cache_only(image, cache, None).await {
242                        Ok(Some(os)) => return Ok(Some(os)),
243                        Ok(None) => {
244                            tracing::trace!(
245                                image,
246                                cache = %path.display(),
247                                "image OS not resolvable from this local cache; trying next",
248                            );
249                        }
250                        Err(e) => return Err(e),
251                    }
252                }
253                Err(e) => {
254                    tracing::debug!(
255                        image,
256                        cache = %path.display(),
257                        error = %e,
258                        "failed to open OS-inspect blob cache; trying next",
259                    );
260                }
261            }
262        }
263        // No local cache resolved it. We deliberately do NOT fall back to a
264        // network inspection: a Docker Hub 429 must never reach this
265        // dispatch-population path (see the doc comment above). A clean local
266        // miss is `Ok(None)` — dispatch falls through to its safe macOS default.
267        Ok(None)
268    }
269
270    /// Resolve `image`'s OS from a natively-built image's `metadata.json`
271    /// platform sidecar, probing the image store beside each configured
272    /// OS-inspect blob cache (`{cache_parent}/{sanitize_image_name(image)}/
273    /// metadata.json`).
274    ///
275    /// The macOS Seatbelt builder writes this sidecar; it is the source of truth
276    /// for a Mac-native bundle, which has no registry manifest for the
277    /// blob-cache probe to read. A `darwin` sidecar resolves to
278    /// [`OsKind::Macos`]. `None` when no sidecar resolves an OS (the caller then
279    /// falls through to the blob-cache probe). All I/O is local-disk only.
280    async fn image_os_from_local_sidecar(&self, image: &str) -> Option<OsKind> {
281        let sanitized = sanitize_image_name(image);
282        for path in &self.os_inspect_cache_paths {
283            let Some(images_dir) = path.parent() else {
284                continue;
285            };
286            let sidecar = images_dir
287                .join(&sanitized)
288                .join(zlayer_types::local_image::LOCAL_IMAGE_METADATA_FILE);
289            let Ok(bytes) = tokio::fs::read(&sidecar).await else {
290                continue;
291            };
292            let meta: zlayer_types::local_image::LocalImageMetadata =
293                match serde_json::from_slice(&bytes) {
294                    Ok(m) => m,
295                    Err(e) => {
296                        tracing::debug!(
297                            image,
298                            sidecar = %sidecar.display(),
299                            error = %e,
300                            "failed to parse local image metadata sidecar; trying next",
301                        );
302                        continue;
303                    }
304                };
305            if let Some(os) = OsKind::from_oci_str(&meta.os) {
306                return Some(os);
307            }
308        }
309        None
310    }
311
312    /// Resolve `image`'s `com.zlayer.runtime` marker, probing each configured
313    /// local blob cache in order (no network per cache) before any network call.
314    async fn inspect_image_runtime_marker(
315        &self,
316        image: &str,
317        auth: Option<&RegistryAuth>,
318    ) -> std::result::Result<Option<String>, zlayer_registry::RegistryError> {
319        for path in &self.os_inspect_cache_paths {
320            match zlayer_registry::CacheType::persistent_at(path)
321                .build()
322                .await
323            {
324                Ok(cache) => {
325                    match zlayer_registry::fetch_image_runtime_marker_in_cache_only(
326                        image, cache, None,
327                    )
328                    .await
329                    {
330                        Ok(Some(marker)) => return Ok(Some(marker)),
331                        Ok(None) => {
332                            tracing::trace!(
333                                image,
334                                cache = %path.display(),
335                                "runtime marker not resolvable from this local cache; trying next",
336                            );
337                        }
338                        Err(e) => return Err(e),
339                    }
340                }
341                Err(e) => {
342                    tracing::debug!(
343                        image,
344                        cache = %path.display(),
345                        error = %e,
346                        "failed to open marker-inspect blob cache; trying next",
347                    );
348                }
349            }
350        }
351        zlayer_registry::fetch_image_runtime_marker(image, auth).await
352    }
353
354    /// Resolve `image`'s `com.zlayer.isolation` declared isolation default,
355    /// probing each configured local blob cache in order (no network per cache)
356    /// before any network call. Mirrors
357    /// [`CompositeRuntime::inspect_image_runtime_marker`] exactly; only the
358    /// underlying registry reader differs (config LABEL vs manifest annotation).
359    async fn inspect_image_isolation_marker(
360        &self,
361        image: &str,
362        auth: Option<&RegistryAuth>,
363    ) -> std::result::Result<Option<String>, zlayer_registry::RegistryError> {
364        for path in &self.os_inspect_cache_paths {
365            match zlayer_registry::CacheType::persistent_at(path)
366                .build()
367                .await
368            {
369                Ok(cache) => {
370                    match zlayer_registry::fetch_image_isolation_marker_in_cache_only(
371                        image, cache, None,
372                    )
373                    .await
374                    {
375                        Ok(Some(marker)) => return Ok(Some(marker)),
376                        Ok(None) => {
377                            tracing::trace!(
378                                image,
379                                cache = %path.display(),
380                                "isolation default not resolvable from this local cache; trying next",
381                            );
382                        }
383                        Err(e) => return Err(e),
384                    }
385                }
386                Err(e) => {
387                    tracing::debug!(
388                        image,
389                        cache = %path.display(),
390                        error = %e,
391                        "failed to open isolation-inspect blob cache; trying next",
392                    );
393                }
394            }
395        }
396        zlayer_registry::fetch_image_isolation_marker(image, auth).await
397    }
398
399    /// Attach an opt-in Apple-Virtualization delegate. Services labelled
400    /// `com.zlayer.isolation=vz` route to it; everything else is unaffected.
401    #[must_use]
402    pub fn with_vz_delegate(mut self, vz: Option<Arc<dyn Runtime>>) -> Self {
403        self.vz = vz;
404        self
405    }
406
407    /// Attach the Apple-Virtualization Linux-guest delegate. When present it
408    /// becomes the **default** runtime for Linux images on this node (libkrun
409    /// is then reachable only via the explicit `com.zlayer.isolation=vm`
410    /// label); when `None`, Linux dispatch falls back to the libkrun delegate
411    /// or `RouteToPeer` as before.
412    #[must_use]
413    pub fn with_vz_linux_delegate(mut self, vz_linux: Option<Arc<dyn Runtime>>) -> Self {
414        self.vz_linux = vz_linux;
415        self
416    }
417
418    /// Access the primary runtime (for introspection / tests).
419    #[must_use]
420    pub fn primary(&self) -> &Arc<dyn Runtime> {
421        &self.primary
422    }
423
424    /// Access the delegate runtime, if one is configured.
425    #[must_use]
426    pub fn delegate(&self) -> Option<&Arc<dyn Runtime>> {
427        self.delegate.as_ref()
428    }
429
430    /// Record that `image` is known to target operating system `os`.
431    ///
432    /// Wired from [`zlayer_registry::fetch_image_os`] during `pull_image*`
433    /// (see [`CompositeRuntime::apply_image_os_inspection`]) so that specs
434    /// without an explicit `platform` still dispatch correctly.
435    pub(crate) async fn record_image_os(&self, image: &str, os: OsKind) {
436        self.image_os.write().await.insert(image.to_string(), os);
437    }
438
439    /// Record an image's `com.zlayer.runtime` marker (e.g. `"vz"`), used by
440    /// [`CompositeRuntime::select_for`] to auto-detect a runtime-specific bundle.
441    pub(crate) async fn record_image_runtime(&self, image: &str, marker: String) {
442        self.image_runtime
443            .write()
444            .await
445            .insert(image.to_string(), marker);
446    }
447
448    /// Record an image's `com.zlayer.isolation` declared isolation default
449    /// (e.g. `"sandbox"`), used by [`CompositeRuntime::select_for`] to honor an
450    /// image-declared default at create time.
451    pub(crate) async fn record_image_isolation(&self, image: &str, marker: String) {
452        self.image_isolation
453            .write()
454            .await
455            .insert(image.to_string(), marker);
456    }
457
458    /// Apply a manifest runtime-marker inspection to the cache. Mirrors
459    /// [`CompositeRuntime::apply_image_os_inspection`]'s non-fatal contract:
460    /// only a present marker updates the cache; absence or error leaves it
461    /// untouched (dispatch falls through to the OS/platform rules).
462    async fn apply_image_runtime_inspection(
463        &self,
464        image: &str,
465        result: std::result::Result<Option<String>, zlayer_registry::RegistryError>,
466    ) {
467        match result {
468            Ok(Some(marker)) => {
469                tracing::debug!(image, marker, "cached image runtime marker for dispatch");
470                self.record_image_runtime(image, marker).await;
471            }
472            Ok(None) => {}
473            Err(e) => {
474                tracing::trace!(
475                    image,
476                    error = %e,
477                    "failed to inspect image runtime marker — dispatch unaffected",
478                );
479            }
480        }
481    }
482
483    /// Apply a manifest/config isolation-default inspection to the cache.
484    /// Mirrors [`CompositeRuntime::apply_image_runtime_inspection`]'s non-fatal
485    /// contract: only a present default updates the cache; absence or error
486    /// leaves it untouched (dispatch falls through to the marker/OS/platform
487    /// rules).
488    async fn apply_image_isolation_inspection(
489        &self,
490        image: &str,
491        result: std::result::Result<Option<String>, zlayer_registry::RegistryError>,
492    ) {
493        match result {
494            Ok(Some(marker)) => {
495                tracing::debug!(image, marker, "cached image isolation default for dispatch");
496                self.record_image_isolation(image, marker).await;
497            }
498            Ok(None) => {}
499            Err(e) => {
500                tracing::trace!(
501                    image,
502                    error = %e,
503                    "failed to inspect image isolation default — dispatch unaffected",
504                );
505            }
506        }
507    }
508
509    /// Apply the result of a manifest OS inspection to the image-OS cache.
510    ///
511    /// Factored out of [`Runtime::pull_image`] and
512    /// [`Runtime::pull_image_with_policy`] so the cache-update policy can be
513    /// unit-tested without depending on a live registry. The three branches
514    /// mirror the contract of [`zlayer_registry::fetch_image_os`]:
515    ///
516    /// * `Ok(Some(os))` — populate the cache so future `create_container`
517    ///   calls without an explicit `spec.platform` dispatch to the right
518    ///   runtime.
519    /// * `Ok(None)` — the config blob had no (or an unrecognized) `os`
520    ///   field. Leave the cache untouched; dispatch falls through to primary.
521    /// * `Err(_)` — transient or registry error. Log at warn and leave the
522    ///   cache untouched. We never fail the overall `pull_image*` call on
523    ///   inspection failure: the primary runtime's own pull already
524    ///   succeeded, and the safe fall-through is "primary".
525    async fn apply_image_os_inspection(
526        &self,
527        image: &str,
528        result: std::result::Result<Option<OsKind>, zlayer_registry::RegistryError>,
529    ) {
530        match result {
531            Ok(Some(os)) => {
532                self.record_image_os(image, os).await;
533                tracing::debug!(image, ?os, "cached image OS for dispatch");
534            }
535            Ok(None) => {
536                tracing::trace!(
537                    image,
538                    "image manifest has no OS field — dispatch will fall through to primary",
539                );
540            }
541            Err(e) => {
542                tracing::warn!(
543                    image,
544                    error = %e,
545                    "failed to inspect image manifest OS — dispatch will fall through to primary",
546                );
547            }
548        }
549    }
550
551    /// Decide which runtime should handle a `create_container` call for `spec`.
552    ///
553    /// The `service` argument is the originating service name, used to build an
554    /// actionable [`AgentError::RouteToPeer`] when a Linux workload lands on
555    /// this node without a local delegate so the scheduler can re-place it on
556    /// a capable peer.
557    ///
558    /// Policy (H-7): Linux workloads are never silently routed to the primary
559    /// on nodes without a delegate. The old "permissive" fall-through (primary
560    /// handles everything) returned an `Unsupported` error only when
561    /// `spec.platform` was explicitly set, but fell through to primary for
562    /// specs without a platform — producing cryptic downstream errors when the
563    /// image-OS cache said `Linux`. We now return `RouteToPeer` in both cases.
564    ///
565    /// Routing precedence, locally-known OS only (NO network call ever happens
566    /// here — the image-OS cache is filled local-only at pull time):
567    /// 1. explicit create-request `spec.runtime`,
568    /// 2. explicit create-request `com.zlayer.isolation` label,
569    /// 3. image-DECLARED isolation default (the `com.zlayer.isolation` config
570    ///    label, cached at pull time): an image may declare its preferred
571    ///    backend, overridden by 1/2 but winning over the marker below,
572    /// 4. `com.zlayer.runtime` manifest marker (`vz` / `vz-linux`),
573    /// 5. `spec.platform.os`,
574    /// 6. the image-OS cache: `Linux` -> `VzLinux` (when present), `Macos` /
575    ///    `Windows` -> `Primary`,
576    /// 7. FINAL fallthrough — OS genuinely unknown: on a macOS host (proxied by
577    ///    the presence of a `vz_linux` delegate) default to `VzLinux`, because
578    ///    almost every registry image is Linux and the Seatbelt sandbox cannot
579    ///    exec a Linux ELF. A macOS-native rootfs never reaches this branch: it
580    ///    resolves `os == Macos` at step 4 and routes to `Primary`.
581    #[allow(clippy::too_many_lines)]
582    async fn select_for(&self, service: &str, spec: &ServiceSpec) -> Result<DispatchTarget> {
583        // Resolve the cached image OS once. Used by the Sandbox Linux-image
584        // guard (both the `spec.runtime` arm below and the `com.zlayer.isolation
585        // =sandbox` label arm) to avoid handing a Linux ELF to the Seatbelt
586        // sandbox (exit-127). `None` / non-Linux means "not KNOWN Linux" — the
587        // guard only overrides when the image OS is positively known to be Linux.
588        let image_known_linux = matches!(
589            self.image_os
590                .read()
591                .await
592                .get(&spec.image.name.to_string())
593                .copied(),
594            Some(OsKind::Linux)
595        );
596
597        // Highest-priority backend selector: the explicit `ServiceSpec.runtime`
598        // field. It overrides the back-compat `com.zlayer.isolation` label and
599        // every auto-detection rule below. `Auto` (or `None`) falls through to
600        // the label / marker / platform / image-OS logic unchanged.
601        if let Some(isolation) = spec.runtime {
602            match isolation {
603                RuntimeIsolation::Auto => {}
604                RuntimeIsolation::Sandbox => {
605                    // Seatbelt sandbox (primary). Guard: never dispatch a
606                    // KNOWN-Linux image to the native sandbox — it cannot exec a
607                    // Linux ELF (exit 127). Route to VZ-Linux when available.
608                    if self.vz_linux.is_some() && image_known_linux {
609                        tracing::warn!(
610                            service,
611                            image = %spec.image.name,
612                            "sandbox requested but image is Linux; routing to VZ-Linux"
613                        );
614                        return Ok(DispatchTarget::VzLinux);
615                    }
616                    return Ok(DispatchTarget::Primary);
617                }
618                RuntimeIsolation::Vz => {
619                    if self.vz.is_some() {
620                        return Ok(DispatchTarget::Vz);
621                    }
622                    return Err(AgentError::Configuration(
623                        "runtime `vz` requested but the native-VZ delegate is not \
624                         available on this node"
625                            .to_string(),
626                    ));
627                }
628                RuntimeIsolation::VzLinux => {
629                    if self.vz_linux.is_some() {
630                        return Ok(DispatchTarget::VzLinux);
631                    }
632                    return Err(AgentError::Configuration(
633                        "runtime `vz-linux` requested but the VZ-Linux delegate is not \
634                         available on this node"
635                            .to_string(),
636                    ));
637                }
638                RuntimeIsolation::Vm => {
639                    if self.delegate.is_some() {
640                        return Ok(DispatchTarget::Delegate);
641                    }
642                    return Err(AgentError::Configuration(
643                        "runtime `vm` requested but the libkrun delegate is not \
644                         available on this node"
645                            .to_string(),
646                    ));
647                }
648            }
649        }
650
651        // Explicit per-service isolation label wins over everything below.
652        //   `com.zlayer.isolation=vz`               -> VZ (native-macOS guest VM)
653        //   `com.zlayer.isolation=vz-linux`         -> VZ Linux-guest VM
654        //   `com.zlayer.isolation=vm|libkrun`       -> libkrun delegate (force VM)
655        //   `com.zlayer.isolation=sandbox|seatbelt` -> Seatbelt sandbox (primary),
656        //                                              opting OUT of VZ auto-detect.
657        if let Some(label) = spec.labels.get("com.zlayer.isolation") {
658            if self.vz.is_some() && label.eq_ignore_ascii_case("vz") {
659                return Ok(DispatchTarget::Vz);
660            }
661            if self.vz_linux.is_some() && label.eq_ignore_ascii_case("vz-linux") {
662                return Ok(DispatchTarget::VzLinux);
663            }
664            if label.eq_ignore_ascii_case("vm") || label.eq_ignore_ascii_case("libkrun") {
665                // Force the libkrun delegate. If no delegate exists the
666                // platform/image-OS rules below produce the appropriate
667                // `RouteToPeer`, so only short-circuit when one is present.
668                if self.delegate.is_some() {
669                    return Ok(DispatchTarget::Delegate);
670                }
671            }
672            if label.eq_ignore_ascii_case("sandbox") || label.eq_ignore_ascii_case("seatbelt") {
673                // Same Linux-image guard as the `spec.runtime = Sandbox` arm:
674                // a KNOWN-Linux image must not go to the Seatbelt sandbox
675                // (exit-127); route it to VZ-Linux when that delegate exists.
676                if self.vz_linux.is_some() && image_known_linux {
677                    tracing::warn!(
678                        service,
679                        image = %spec.image.name,
680                        "sandbox requested but image is Linux; routing to VZ-Linux"
681                    );
682                    return Ok(DispatchTarget::VzLinux);
683                }
684                return Ok(DispatchTarget::Primary);
685            }
686        }
687
688        // Image-DECLARED isolation default: the `com.zlayer.isolation` OCI config
689        // label the ZImage builder stamps into the image (cached at pull time via
690        // `apply_image_isolation_inspection`). It sits BELOW an explicit
691        // create-request `spec.runtime` / `com.zlayer.isolation` label (both
692        // handled above, so a caller always overrides the image) but ABOVE the
693        // `com.zlayer.runtime` manifest marker. The string→target mapping and the
694        // Linux-image guard mirror the `spec.labels` block above exactly; a missing
695        // or unavailable delegate falls through to the marker/OS/platform rules.
696        if let Some(label) = self
697            .image_isolation
698            .read()
699            .await
700            .get(&spec.image.name.to_string())
701            .cloned()
702        {
703            if self.vz.is_some() && label.eq_ignore_ascii_case("vz") {
704                return Ok(DispatchTarget::Vz);
705            }
706            if self.vz_linux.is_some() && label.eq_ignore_ascii_case("vz-linux") {
707                return Ok(DispatchTarget::VzLinux);
708            }
709            if label.eq_ignore_ascii_case("vm") || label.eq_ignore_ascii_case("libkrun") {
710                // Force the libkrun delegate when present; otherwise fall through
711                // so the platform/image-OS rules below produce the right routing.
712                if self.delegate.is_some() {
713                    return Ok(DispatchTarget::Delegate);
714                }
715            }
716            if label.eq_ignore_ascii_case("sandbox") || label.eq_ignore_ascii_case("seatbelt") {
717                // Same Linux-image guard as the `spec.labels` / `spec.runtime`
718                // sandbox arms: a KNOWN-Linux image must not go to the Seatbelt
719                // sandbox (exit-127); route it to VZ-Linux when that delegate exists.
720                if self.vz_linux.is_some() && image_known_linux {
721                    tracing::warn!(
722                        service,
723                        image = %spec.image.name,
724                        "image declares isolation=sandbox but image is Linux; routing to VZ-Linux"
725                    );
726                    return Ok(DispatchTarget::VzLinux);
727                }
728                return Ok(DispatchTarget::Primary);
729            }
730        }
731
732        // Auto-detect a VZ base bundle: when the image's manifest carries
733        // `com.zlayer.runtime=vz` (stamped by `zlayer vz build-base`), prefer the
734        // VZ runtime — it is the only runtime that can boot such a bundle. This
735        // is the "prefer VZ by default" behaviour: it only fires for genuine VZ
736        // bundles, so Seatbelt-rootfs and Linux images are unaffected.
737        if self.vz.is_some()
738            && self
739                .image_runtime
740                .read()
741                .await
742                .get(&spec.image.name.to_string())
743                .is_some_and(|m| m.eq_ignore_ascii_case(zlayer_registry::ZLAYER_RUNTIME_VZ))
744        {
745            return Ok(DispatchTarget::Vz);
746        }
747
748        // Auto-detect a VZ Linux-guest image: when the manifest carries
749        // `com.zlayer.runtime=vz-linux`, prefer the VZ Linux runtime.
750        if self.vz_linux.is_some()
751            && self
752                .image_runtime
753                .read()
754                .await
755                .get(&spec.image.name.to_string())
756                .is_some_and(|m| m.eq_ignore_ascii_case(zlayer_registry::ZLAYER_RUNTIME_LINUX_VZ))
757        {
758            return Ok(DispatchTarget::VzLinux);
759        }
760
761        if let Some(platform) = &spec.platform {
762            let target = match platform.os {
763                OsKind::Windows | OsKind::Macos => DispatchTarget::Primary,
764                // On macOS the VZ Linux-guest runtime is the default Linux path;
765                // only fall back to the libkrun delegate when it is absent.
766                OsKind::Linux if self.vz_linux.is_some() => DispatchTarget::VzLinux,
767                OsKind::Linux => DispatchTarget::Delegate,
768            };
769            if matches!(target, DispatchTarget::Delegate) && self.delegate.is_none() {
770                return Err(AgentError::RouteToPeer {
771                    service: service.to_string(),
772                    required_os: OsKind::Linux.as_oci_str().to_string(),
773                    reason: "spec.platform.os = linux but this node has no WSL2 delegate \
774                            configured; enable `--install-wsl yes` on this node or add a Linux \
775                            peer to the cluster"
776                        .to_string(),
777                });
778            }
779            return Ok(target);
780        }
781
782        if let Some(os) = self
783            .image_os
784            .read()
785            .await
786            .get(&spec.image.name.to_string())
787            .copied()
788        {
789            return match os {
790                OsKind::Linux => {
791                    if self.vz_linux.is_some() {
792                        // VZ Linux-guest is the default Linux path on macOS.
793                        Ok(DispatchTarget::VzLinux)
794                    } else if self.delegate.is_some() {
795                        Ok(DispatchTarget::Delegate)
796                    } else {
797                        // No delegate and the image manifest says Linux —
798                        // refuse at the composite layer so the scheduler can
799                        // re-place on a Linux peer instead of the primary
800                        // failing with a cryptic HCS error.
801                        Err(AgentError::RouteToPeer {
802                            service: service.to_string(),
803                            required_os: OsKind::Linux.as_oci_str().to_string(),
804                            reason: format!(
805                                "image '{}' manifest reports os=linux but this node has no WSL2 \
806                                 delegate configured; enable `--install-wsl yes` on this node or \
807                                 add a Linux peer to the cluster",
808                                spec.image.name
809                            ),
810                        })
811                    }
812                }
813                OsKind::Windows | OsKind::Macos => Ok(DispatchTarget::Primary),
814            };
815        }
816
817        // OS genuinely unknown (no isolation label, no runtime marker, no
818        // `spec.platform`, no image-OS cache hit). On a macOS host with a
819        // VZ-Linux delegate, default to VZ-Linux: the overwhelming majority of
820        // images pulled from public registries are Linux, and the Seatbelt
821        // sandbox (the primary) cannot exec a Linux ELF — sending an unknown
822        // image there is the exit-127 failure this fix exists to prevent. The
823        // user is fine with VZ-Linux as the default; the only hard rule is that
824        // a macOS-native rootfs must never go to the Linux VM, and that is
825        // already guaranteed above by the `image_os == Macos -> Primary` branch
826        // (a native bundle resolves its OS locally and never reaches here).
827        //
828        // The `vz_linux` delegate is only ever attached on a macOS host, so its
829        // presence is a sufficient proxy for "macOS host" — non-macOS hosts
830        // (Windows HCS, Linux) keep the historical primary fallthrough.
831        if self.vz_linux.is_some() {
832            return Ok(DispatchTarget::VzLinux);
833        }
834
835        Ok(DispatchTarget::Primary)
836    }
837
838    /// Look up an existing dispatch decision for `id`, or return `NotFound`.
839    async fn lookup(&self, id: &ContainerId) -> Result<Arc<dyn Runtime>> {
840        let target =
841            self.dispatch
842                .read()
843                .await
844                .get(id)
845                .copied()
846                .ok_or_else(|| AgentError::NotFound {
847                    container: id.to_string(),
848                    reason: "no dispatch record in CompositeRuntime".to_string(),
849                })?;
850        Ok(self.runtime_for(target).clone())
851    }
852
853    /// Resolve a [`DispatchTarget`] to the concrete runtime reference.
854    ///
855    /// Unwrapping the delegate is safe because [`Self::select_for`] returns
856    /// `Err` whenever a delegate would be required but is missing, so a
857    /// `DispatchTarget::Delegate` can never end up in the dispatch map
858    /// without a delegate being present.
859    fn runtime_for(&self, t: DispatchTarget) -> &Arc<dyn Runtime> {
860        match t {
861            DispatchTarget::Primary => &self.primary,
862            DispatchTarget::Delegate => self
863                .delegate
864                .as_ref()
865                .expect("delegate target requires delegate to exist"),
866            // `select_for` only returns `Vz` when a vz delegate is present;
867            // fall back to primary defensively.
868            DispatchTarget::Vz => self.vz.as_ref().unwrap_or(&self.primary),
869            // `select_for` only returns `VzLinux` when a vz-linux delegate is
870            // present; fall back to primary defensively.
871            DispatchTarget::VzLinux => self.vz_linux.as_ref().unwrap_or(&self.primary),
872        }
873    }
874
875    /// Build the ordered list of backends to try for a per-container read
876    /// (logs / stats), owning backend first.
877    ///
878    /// The container's dispatch record (recorded at `create_container`) names
879    /// the runtime that actually ran it, so we try that one first. The other
880    /// configured backends follow as a defensive fallback for the case where
881    /// the owning backend can answer container lifecycle calls but not a
882    /// particular read (e.g. the macOS `SandboxRuntime` primary implements
883    /// `container_logs`/`get_container_stats` but not the *streaming*
884    /// `logs_stream`/`stats_stream`, so it returns `Unsupported` for the
885    /// latter). Returns `NotFound` when the id was never dispatched.
886    async fn read_backends(
887        &self,
888        id: &ContainerId,
889    ) -> Result<Vec<(&'static str, Arc<dyn Runtime>)>> {
890        let owner =
891            self.dispatch
892                .read()
893                .await
894                .get(id)
895                .copied()
896                .ok_or_else(|| AgentError::NotFound {
897                    container: id.to_string(),
898                    reason: "no dispatch record in CompositeRuntime".to_string(),
899                })?;
900
901        // Owning backend first, then every other configured backend (de-duped
902        // against the owner) so a read the owner can't serve can still be
903        // satisfied elsewhere instead of 500-ing.
904        let all: [(DispatchTarget, Option<&Arc<dyn Runtime>>); 4] = [
905            (DispatchTarget::Primary, Some(&self.primary)),
906            (DispatchTarget::Delegate, self.delegate.as_ref()),
907            (DispatchTarget::Vz, self.vz.as_ref()),
908            (DispatchTarget::VzLinux, self.vz_linux.as_ref()),
909        ];
910
911        let label_for = |t: DispatchTarget| match t {
912            DispatchTarget::Primary => "primary",
913            DispatchTarget::Delegate => "delegate",
914            DispatchTarget::Vz => "vz",
915            DispatchTarget::VzLinux => "vz_linux",
916        };
917
918        let mut out: Vec<(&'static str, Arc<dyn Runtime>)> =
919            vec![(label_for(owner), self.runtime_for(owner).clone())];
920        for (target, rt) in all {
921            if target != owner {
922                if let Some(rt) = rt {
923                    out.push((label_for(target), rt.clone()));
924                }
925            }
926        }
927        Ok(out)
928    }
929}
930
931/// Accumulates per-backend errors while a read fans out across the
932/// owner-first fallback chain, so the *final* error reflects the right HTTP
933/// status.
934///
935/// Every backend in the chain is tried; a backend that does not own the
936/// container returns [`AgentError::NotFound`] (a *skip*, not authoritative),
937/// while a backend that owns it but cannot serve this particular read returns
938/// some other error (notably the `Unsupported` default for an unimplemented
939/// streaming read) — a *soft miss* we fall back from. The distinction matters
940/// for the final error: if **every** backend returned `NotFound`, the container
941/// genuinely does not exist here and we surface `NotFound` (→ 404); if any
942/// backend produced a non-`NotFound` error, that is the more informative
943/// failure to report (→ 500) once no backend could serve the read.
944#[derive(Default)]
945struct ReadMissAccumulator {
946    /// The most recent non-`NotFound` error, if any backend produced one.
947    soft_err: Option<AgentError>,
948    /// The most recent `NotFound`, used only when *no* soft error occurred.
949    not_found: Option<AgentError>,
950}
951
952impl ReadMissAccumulator {
953    fn record(&mut self, e: AgentError) {
954        if matches!(e, AgentError::NotFound { .. }) {
955            self.not_found = Some(e);
956        } else {
957            self.soft_err = Some(e);
958        }
959    }
960
961    /// Resolve the accumulated misses into the final error for a read where no
962    /// backend succeeded. Prefers a soft error (more informative → 500) over a
963    /// bare `NotFound`; falls back to a synthesised `Unsupported` only if
964    /// nothing was recorded at all (an empty backend list, which cannot happen
965    /// in practice since the owner is always present).
966    fn into_error(self, what: &str) -> AgentError {
967        self.soft_err
968            .or(self.not_found)
969            .unwrap_or_else(|| AgentError::Unsupported(format!("no backend could serve {what}")))
970    }
971}
972
973/// Build a bounded one-shot [`LogsStream`] from a captured-log snapshot.
974///
975/// Used by [`CompositeRuntime::logs_stream`] when no backend offers a native
976/// log stream but one can produce a `container_logs` snapshot (e.g. the macOS
977/// `SandboxRuntime`). Mirrors the VZ-Linux runtime's own snapshot-to-stream
978/// translation so the wire shape is identical regardless of which backend
979/// served the data: honour the per-channel filters and re-attach the newline
980/// the line-splitter stripped.
981fn one_shot_logs_stream(entries: Vec<LogEntry>, opts: &LogsStreamOptions) -> LogsStream {
982    use futures_util::stream;
983
984    // Docker's default (neither stdout nor stderr explicitly requested) means
985    // "both"; equivalently, keep stdout unless stderr was the *only* channel
986    // requested, and vice-versa.
987    let want_stdout = opts.stdout || !opts.stderr;
988    let want_stderr = opts.stderr || !opts.stdout;
989    let timestamps = opts.timestamps;
990
991    let chunks: Vec<Result<LogChunk>> = entries
992        .into_iter()
993        .filter_map(|e| {
994            let channel = match e.stream {
995                LogStream::Stdout => LogChannel::Stdout,
996                LogStream::Stderr => LogChannel::Stderr,
997            };
998            let keep = match channel {
999                LogChannel::Stdout => want_stdout,
1000                LogChannel::Stderr => want_stderr,
1001                LogChannel::Stdin => false,
1002            };
1003            if !keep {
1004                return None;
1005            }
1006            let mut bytes = e.message.into_bytes();
1007            bytes.push(b'\n');
1008            Some(Ok(LogChunk {
1009                stream: channel,
1010                bytes: bytes::Bytes::from(bytes),
1011                timestamp: timestamps.then_some(e.timestamp),
1012            }))
1013        })
1014        .collect();
1015
1016    Box::pin(stream::iter(chunks))
1017}
1018
1019/// Build a bounded one-shot [`StatsStream`] from a single [`ContainerStats`]
1020/// snapshot.
1021///
1022/// Used by [`CompositeRuntime::stats_stream`] when no backend offers a native
1023/// stats stream but one can produce a `get_container_stats` snapshot. The
1024/// [`ContainerStats`] CPU figure is microseconds; [`StatsSample::cpu_total_ns`]
1025/// is nanoseconds, so we scale. `online_cpus` is unknown from this coarse
1026/// snapshot (the non-streaming API does not carry it) and is reported as `1`
1027/// so the Docker-compat CPU-percent math has a sane divisor.
1028fn one_shot_stats_stream(stats: &ContainerStats) -> StatsStream {
1029    use futures_util::stream;
1030
1031    let sample = StatsSample {
1032        cpu_total_ns: stats.cpu_usage_usec.saturating_mul(1_000),
1033        cpu_system_ns: 0,
1034        online_cpus: 1,
1035        mem_used_bytes: stats.memory_bytes,
1036        mem_limit_bytes: stats.memory_limit,
1037        net_rx_bytes: 0,
1038        net_tx_bytes: 0,
1039        blkio_read_bytes: 0,
1040        blkio_write_bytes: 0,
1041        pids_current: 0,
1042        pids_limit: None,
1043        timestamp: chrono::Utc::now(),
1044    };
1045    Box::pin(stream::iter(vec![Ok(sample)]))
1046}
1047
1048#[async_trait]
1049impl Runtime for CompositeRuntime {
1050    /// Delegate to whichever backend actually owns the on-disk image store.
1051    /// The primary runtime (youki on Linux) holds the `LocalRegistry` + blob
1052    /// cache; probe it first, then fall back to the delegate so a composite
1053    /// whose Linux workloads run on a delegate still exposes that delegate's
1054    /// store. Returns `None` when neither backend owns a store.
1055    fn image_store_handles(&self) -> Option<ImageStoreHandles> {
1056        self.primary
1057            .image_store_handles()
1058            .or_else(|| self.delegate.as_ref().and_then(|d| d.image_store_handles()))
1059    }
1060
1061    async fn pull_image(&self, image: &str) -> Result<()> {
1062        // Primary pull. `WrongPlatform` here means the image's OCI config
1063        // reports an OS the primary cannot service (e.g. a Linux image on the
1064        // Windows HCS runtime). That is a *soft* failure: the delegate's pull
1065        // below owns the image, so we log and continue rather than failing
1066        // the whole composite call. Any other error is a real pull failure
1067        // and must bubble.
1068        if let Err(e) = self.primary.pull_image(image).await {
1069            if matches!(e, AgentError::WrongPlatform { .. }) {
1070                tracing::debug!(
1071                    image,
1072                    error = %e,
1073                    "primary runtime cannot service image (wrong platform); delegating",
1074                );
1075            } else {
1076                return Err(e);
1077            }
1078        }
1079        if let Some(delegate) = &self.delegate {
1080            if let Err(e) = delegate.pull_image(image).await {
1081                // Foreign-OS images will reliably fail one of the two pulls
1082                // (primary can't store a Linux image's config on Windows, or
1083                // vice versa). That's expected — the successful side owns the
1084                // layers we'll actually use — so we keep this at debug.
1085                tracing::debug!(
1086                    image,
1087                    error = %e,
1088                    "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
1089                );
1090            }
1091        }
1092        // VZ + VZ-Linux delegates (macOS). The VZ-Linux runtime is the default
1093        // execution path for Linux images on macOS and owns its OWN image store
1094        // (`image_rootfs`); if we never pull into it, the image is absent both
1095        // when `create_container` dispatches there AND from `list_images` /
1096        // `inspect_image` (which is what `docker pull` verifies). Pulling here
1097        // makes the image actually present where it runs and listable. Errors
1098        // are non-fatal for the same wrong-OS reason as the delegate above.
1099        for (label, rt) in [
1100            self.vz.as_ref().map(|r| ("vz", r)),
1101            self.vz_linux.as_ref().map(|r| ("vz_linux", r)),
1102        ]
1103        .into_iter()
1104        .flatten()
1105        {
1106            if let Err(e) = rt.pull_image(image).await {
1107                tracing::debug!(
1108                    image,
1109                    runtime = label,
1110                    error = %e,
1111                    "vz delegate failed to pull image (likely wrong OS); continuing",
1112                );
1113            }
1114        }
1115
1116        // Inspect the OCI manifest's `config.os` so `select_for(spec)` can
1117        // dispatch correctly when `spec.platform` is `None`. Non-fatal: any
1118        // failure here just means dispatch falls through to primary.
1119        let os_result = self.inspect_image_os(image).await;
1120        self.apply_image_os_inspection(image, os_result).await;
1121        let marker_result = self.inspect_image_runtime_marker(image, None).await;
1122        self.apply_image_runtime_inspection(image, marker_result)
1123            .await;
1124        let isolation_result = self.inspect_image_isolation_marker(image, None).await;
1125        self.apply_image_isolation_inspection(image, isolation_result)
1126            .await;
1127
1128        Ok(())
1129    }
1130
1131    async fn pull_image_with_policy(
1132        &self,
1133        image: &str,
1134        policy: PullPolicy,
1135        auth: Option<&RegistryAuth>,
1136        source: zlayer_spec::SourcePolicy,
1137    ) -> Result<()> {
1138        // A locally-built image (`zlayer build`) lives in a backend's store as a
1139        // ref + rootfs + config with NO pulled manifest/digest. `create_container`
1140        // resolves it fine by that ref, but an `IfNotPresent` pull whose presence
1141        // check keys on a manifest finds none and falls through to a remote
1142        // manifest fetch — which 401s for a private ghcr ref the node was never
1143        // handed creds for, failing the step before create ever runs. Honor real
1144        // `IfNotPresent` semantics here: if ANY backend already lists this
1145        // reference (the SAME store `create_container` launches from), the image
1146        // is present — short-circuit as a no-op instead of reaching the wire.
1147        // `Always`/`Newer` are deliberately untouched (they must re-check the
1148        // origin), and a genuinely-absent image lists nowhere, so it still pulls
1149        // below.
1150        if matches!(policy, PullPolicy::IfNotPresent) && self.image_present_locally(image).await {
1151            tracing::debug!(
1152                image,
1153                "composite: image already present in a local store (resolvable by ref); \
1154                 skipping the remote presence check on the primary",
1155            );
1156            // Warm the OS-dispatch cache from the local blob caches exactly as a
1157            // real pull would, so `select_for` routes the cached image correctly.
1158            // Local-only (`inspect_image_os` never touches the wire).
1159            let os_result = self.inspect_image_os(image).await;
1160            self.apply_image_os_inspection(image, os_result).await;
1161            // The image is resolvable by ref in SOME store, but the runtime that
1162            // will EXECUTE it may not have its rootfs materialized yet — a
1163            // buildah-sidecar `zlayer build` (and `zlayer import`) lands the image
1164            // in the daemon-wide local OCI registry but NOT in the VZ-Linux store,
1165            // so `create_container` there would otherwise find no rootfs. Let the
1166            // delegate / VZ runtimes materialize their own store from that local
1167            // registry: each chains it, so an `IfNotPresent` pull resolves
1168            // entirely offline (manifest + layer blobs both served locally — no
1169            // wire) and is a no-op when the rootfs is already populated. The
1170            // PRIMARY is deliberately skipped — its remote presence check is
1171            // exactly the 401-prone fetch this short-circuit exists to avoid.
1172            for (label, rt) in [
1173                self.delegate.as_ref().map(|r| ("delegate", r)),
1174                self.vz.as_ref().map(|r| ("vz", r)),
1175                self.vz_linux.as_ref().map(|r| ("vz_linux", r)),
1176            ]
1177            .into_iter()
1178            .flatten()
1179            {
1180                if let Err(e) = rt.pull_image_with_policy(image, policy, auth, source).await {
1181                    tracing::debug!(
1182                        image,
1183                        runtime = label,
1184                        error = %e,
1185                        "composite: local materialization in delegate runtime failed; continuing",
1186                    );
1187                }
1188            }
1189            return Ok(());
1190        }
1191
1192        // See `pull_image` above for the `WrongPlatform` soft-skip rationale.
1193        if let Err(e) = self
1194            .primary
1195            .pull_image_with_policy(image, policy, auth, source)
1196            .await
1197        {
1198            if matches!(e, AgentError::WrongPlatform { .. }) {
1199                tracing::debug!(
1200                    image,
1201                    error = %e,
1202                    "primary runtime cannot service image (wrong platform); delegating",
1203                );
1204            } else {
1205                return Err(e);
1206            }
1207        }
1208        if let Some(delegate) = &self.delegate {
1209            if let Err(e) = delegate
1210                .pull_image_with_policy(image, policy, auth, source)
1211                .await
1212            {
1213                tracing::debug!(
1214                    image,
1215                    error = %e,
1216                    "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
1217                );
1218            }
1219        }
1220        // See `pull_image` above: the VZ-Linux runtime owns its own image store
1221        // and is the default Linux execution path on macOS, so pull into it (and
1222        // the opt-in VZ delegate) too. Non-fatal per-backend errors.
1223        for (label, rt) in [
1224            self.vz.as_ref().map(|r| ("vz", r)),
1225            self.vz_linux.as_ref().map(|r| ("vz_linux", r)),
1226        ]
1227        .into_iter()
1228        .flatten()
1229        {
1230            if let Err(e) = rt.pull_image_with_policy(image, policy, auth, source).await {
1231                tracing::debug!(
1232                    image,
1233                    runtime = label,
1234                    error = %e,
1235                    "vz delegate failed to pull image (likely wrong OS); continuing",
1236                );
1237            }
1238        }
1239
1240        let os_result = self.inspect_image_os(image).await;
1241        self.apply_image_os_inspection(image, os_result).await;
1242        let marker_result = self.inspect_image_runtime_marker(image, auth).await;
1243        self.apply_image_runtime_inspection(image, marker_result)
1244            .await;
1245        let isolation_result = self.inspect_image_isolation_marker(image, auth).await;
1246        self.apply_image_isolation_inspection(image, isolation_result)
1247            .await;
1248
1249        Ok(())
1250    }
1251
1252    /// Create a container on the backend chosen by [`Self::select_for`].
1253    ///
1254    /// On macOS the runtime preference order is **Seatbelt → native-VZ**: when
1255    /// `select_for` routes a positively-macOS image to the Seatbelt sandbox
1256    /// (`Primary`) and that create fails, this falls back to the native-VZ
1257    /// runtime (`Vz`) if one is attached, updating the dispatch cache so all
1258    /// subsequent operations on the id route to VZ. The fallback is
1259    /// collision-safe: Seatbelt registers in-memory container state only on
1260    /// success, and native-VZ owns disjoint on-disk state, so retrying the same
1261    /// id on VZ cannot clash (a Seatbelt partial on-disk dir self-heals on a
1262    /// future create). All other failures roll back the cache insert.
1263    async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
1264        let target = self.select_for(&id.service, spec).await?;
1265        // Compute the macOS gate from the SAME image-OS cache `select_for`
1266        // reads, so the fallback only fires for images positively known to be
1267        // macOS (never Linux / unknown).
1268        let is_macos_image = matches!(
1269            self.image_os
1270                .read()
1271                .await
1272                .get(&spec.image.name.to_string())
1273                .copied(),
1274            Some(OsKind::Macos)
1275        );
1276        {
1277            let mut dispatch = self.dispatch.write().await;
1278            dispatch.insert(id.clone(), target);
1279        }
1280        let rt = self.runtime_for(target).clone();
1281        match rt.create_container(id, spec).await {
1282            Ok(()) => Ok(()),
1283            Err(e) => {
1284                // macOS preference order: Seatbelt (Primary) -> native-VZ (Vz).
1285                if target == DispatchTarget::Primary && is_macos_image && self.vz.is_some() {
1286                    tracing::warn!(
1287                        service = %id.service,
1288                        error = %e,
1289                        "seatbelt create failed for macOS image; falling back to native-VZ"
1290                    );
1291                    {
1292                        let mut dispatch = self.dispatch.write().await;
1293                        dispatch.insert(id.clone(), DispatchTarget::Vz);
1294                    }
1295                    return match self
1296                        .runtime_for(DispatchTarget::Vz)
1297                        .create_container(id, spec)
1298                        .await
1299                    {
1300                        Ok(()) => Ok(()),
1301                        Err(e2) => {
1302                            self.dispatch.write().await.remove(id);
1303                            Err(e2)
1304                        }
1305                    };
1306                }
1307                // Roll back the cache insert on failure so subsequent lookups
1308                // don't find a dangling entry.
1309                self.dispatch.write().await.remove(id);
1310                Err(e)
1311            }
1312        }
1313    }
1314
1315    async fn start_container(&self, id: &ContainerId) -> Result<()> {
1316        let rt = self.lookup(id).await?;
1317        rt.start_container(id).await
1318    }
1319
1320    async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
1321        let rt = self.lookup(id).await?;
1322        rt.stop_container(id, timeout).await
1323    }
1324
1325    async fn remove_container(&self, id: &ContainerId) -> Result<()> {
1326        let rt = self.lookup(id).await?;
1327        let res = rt.remove_container(id).await;
1328        self.dispatch.write().await.remove(id);
1329        res
1330    }
1331
1332    async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
1333        let rt = self.lookup(id).await?;
1334        rt.container_state(id).await
1335    }
1336
1337    async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
1338        let backends = self.read_backends(id).await?;
1339        let mut misses = ReadMissAccumulator::default();
1340        for (label, rt) in backends {
1341            match rt.container_logs(id, tail).await {
1342                Ok(logs) => return Ok(logs),
1343                Err(e) => {
1344                    tracing::warn!(
1345                        container = %id,
1346                        runtime = label,
1347                        error = %e,
1348                        "composite container_logs: backend could not serve logs; trying next backend",
1349                    );
1350                    misses.record(e);
1351                }
1352            }
1353        }
1354        Err(misses.into_error("container_logs"))
1355    }
1356
1357    async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
1358        let rt = self.lookup(id).await?;
1359        rt.exec(id, cmd).await
1360    }
1361
1362    async fn exec_with_opts(
1363        &self,
1364        id: &ContainerId,
1365        opts: &crate::runtime::ExecOptions,
1366    ) -> Result<(i32, String, String)> {
1367        // Forward to the resolved backend's `exec_with_opts` so Docker exec
1368        // options (`--user`, `-w`, `-e`) reach the runtime that actually owns
1369        // the container. Without this override the trait default would call
1370        // `self.exec(opts.command)` and silently drop user/cwd/env.
1371        let rt = self.lookup(id).await?;
1372        rt.exec_with_opts(id, opts).await
1373    }
1374
1375    async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
1376        let rt = self.lookup(id).await?;
1377        rt.exec_stream(id, cmd).await
1378    }
1379
1380    async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
1381        let backends = self.read_backends(id).await?;
1382        let mut misses = ReadMissAccumulator::default();
1383        for (label, rt) in backends {
1384            match rt.get_container_stats(id).await {
1385                Ok(stats) => return Ok(stats),
1386                Err(e) => {
1387                    tracing::warn!(
1388                        container = %id,
1389                        runtime = label,
1390                        error = %e,
1391                        "composite get_container_stats: backend could not serve stats; \
1392                         trying next backend",
1393                    );
1394                    misses.record(e);
1395                }
1396            }
1397        }
1398        Err(misses.into_error("get_container_stats"))
1399    }
1400
1401    async fn update_container_resources(
1402        &self,
1403        id: &ContainerId,
1404        update: &ContainerResourceUpdate,
1405    ) -> Result<ContainerUpdateOutcome> {
1406        // Forward to the owning backend (owner-first, with the same defensive
1407        // fallback chain `get_container_stats` uses). Without this override the
1408        // trait default returns `Unsupported`, so the autoscaler's vertical
1409        // `Auto` mode would no-op on every composite-managed container instead
1410        // of resizing it. A non-owning backend returns `NotFound` (a skip); the
1411        // owner either applies the update or returns `Unsupported` (its own
1412        // signal that it cannot live-update a running cgroup — the autoscaler
1413        // then performs a rolling restart).
1414        let backends = self.read_backends(id).await?;
1415        let mut misses = ReadMissAccumulator::default();
1416        for (label, rt) in backends {
1417            match rt.update_container_resources(id, update).await {
1418                Ok(outcome) => return Ok(outcome),
1419                Err(e) => {
1420                    tracing::warn!(
1421                        container = %id,
1422                        runtime = label,
1423                        error = %e,
1424                        "composite update_container_resources: backend could not apply update; \
1425                         trying next backend",
1426                    );
1427                    misses.record(e);
1428                }
1429            }
1430        }
1431        Err(misses.into_error("update_container_resources"))
1432    }
1433
1434    async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
1435        let rt = self.lookup(id).await?;
1436        rt.wait_container(id).await
1437    }
1438
1439    async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
1440        let rt = self.lookup(id).await?;
1441        rt.wait_outcome(id).await
1442    }
1443
1444    async fn wait_outcome_with_condition(
1445        &self,
1446        id: &ContainerId,
1447        condition: WaitCondition,
1448    ) -> Result<WaitOutcome> {
1449        let rt = self.lookup(id).await?;
1450        rt.wait_outcome_with_condition(id, condition).await
1451    }
1452
1453    async fn rename_container(&self, id: &ContainerId, new_name: &str) -> Result<()> {
1454        let rt = self.lookup(id).await?;
1455        rt.rename_container(id, new_name).await
1456    }
1457
1458    async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
1459        let backends = self.read_backends(id).await?;
1460        let mut misses = ReadMissAccumulator::default();
1461        for (label, rt) in backends {
1462            match rt.get_logs(id).await {
1463                Ok(logs) => return Ok(logs),
1464                Err(e) => {
1465                    tracing::warn!(
1466                        container = %id,
1467                        runtime = label,
1468                        error = %e,
1469                        "composite get_logs: backend could not serve logs; trying next backend",
1470                    );
1471                    misses.record(e);
1472                }
1473            }
1474        }
1475        Err(misses.into_error("get_logs"))
1476    }
1477
1478    async fn logs_stream(&self, id: &ContainerId, opts: LogsStreamOptions) -> Result<LogsStream> {
1479        // Route to the backend that actually created the container. The default
1480        // trait impl returns `Unsupported`, which surfaced as a swallowed 500 on
1481        // `GET /containers/{id}/logs` whenever the owning backend did not
1482        // implement streaming (e.g. the macOS `SandboxRuntime` primary, which
1483        // implements `container_logs` but not `logs_stream`).
1484        //
1485        // Try each backend's `logs_stream` (owner first); on a soft miss
1486        // (`Unsupported`/error that is not `NotFound`) fall back to the same
1487        // backend's non-streaming `container_logs` and SYNTHESISE a one-shot
1488        // stream from it. Only a genuine `NotFound` propagates (→ 404).
1489        let backends = self.read_backends(id).await?;
1490        let mut misses = ReadMissAccumulator::default();
1491        for (label, rt) in &backends {
1492            match rt.logs_stream(id, opts.clone()).await {
1493                Ok(stream) => return Ok(stream),
1494                Err(e) => {
1495                    tracing::warn!(
1496                        container = %id,
1497                        runtime = label,
1498                        error = %e,
1499                        "composite logs_stream: backend has no native log stream; \
1500                         falling back to a one-shot snapshot",
1501                    );
1502                    misses.record(e);
1503                }
1504            }
1505        }
1506
1507        // No backend offered a native stream. Synthesise one from whichever
1508        // backend can produce a captured-log snapshot (`container_logs`).
1509        let tail = opts
1510            .tail
1511            .map_or(1000, |n| usize::try_from(n).unwrap_or(1000));
1512        for (label, rt) in &backends {
1513            match rt.container_logs(id, tail).await {
1514                Ok(entries) => {
1515                    return Ok(one_shot_logs_stream(entries, &opts));
1516                }
1517                Err(e) => {
1518                    tracing::warn!(
1519                        container = %id,
1520                        runtime = label,
1521                        error = %e,
1522                        "composite logs_stream: backend snapshot fallback failed; trying next",
1523                    );
1524                    misses.record(e);
1525                }
1526            }
1527        }
1528        Err(misses.into_error("container logs"))
1529    }
1530
1531    async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
1532        // Same rationale as `logs_stream`: forward to the owning backend so
1533        // `GET /containers/{id}/stats` reaches the runtime that ran the
1534        // container instead of hitting the `Unsupported` default (→ swallowed
1535        // 500). On a soft miss, fall back to the non-streaming
1536        // `get_container_stats` and synthesise a one-shot sample.
1537        let backends = self.read_backends(id).await?;
1538        let mut misses = ReadMissAccumulator::default();
1539        for (label, rt) in &backends {
1540            match rt.stats_stream(id).await {
1541                Ok(stream) => return Ok(stream),
1542                Err(e) => {
1543                    tracing::warn!(
1544                        container = %id,
1545                        runtime = label,
1546                        error = %e,
1547                        "composite stats_stream: backend has no native stats stream; \
1548                         falling back to a one-shot sample",
1549                    );
1550                    misses.record(e);
1551                }
1552            }
1553        }
1554
1555        for (label, rt) in &backends {
1556            match rt.get_container_stats(id).await {
1557                Ok(stats) => return Ok(one_shot_stats_stream(&stats)),
1558                Err(e) => {
1559                    tracing::warn!(
1560                        container = %id,
1561                        runtime = label,
1562                        error = %e,
1563                        "composite stats_stream: backend sample fallback failed; trying next",
1564                    );
1565                    misses.record(e);
1566                }
1567            }
1568        }
1569        Err(misses.into_error("container stats"))
1570    }
1571
1572    async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
1573        let rt = self.lookup(id).await?;
1574        rt.get_container_pid(id).await
1575    }
1576
1577    fn overlay_attach_kind(&self) -> OverlayAttachKind {
1578        // Linux workloads on macOS execute in the VZ-Linux delegate, which joins
1579        // the overlay from inside the guest (`GuestManaged`). Defer to it when
1580        // present so the service layer takes the guest-managed attach path and
1581        // calls `push_overlay_config` (routed per-container below); otherwise use
1582        // the primary's kind. Non-VZ containers route to a runtime whose
1583        // `push_overlay_config` is unsupported and degrade gracefully.
1584        self.vz_linux.as_ref().map_or_else(
1585            || self.primary.overlay_attach_kind(),
1586            |vz| vz.overlay_attach_kind(),
1587        )
1588    }
1589
1590    async fn overlay_attach_kind_for(&self, id: &ContainerId) -> OverlayAttachKind {
1591        // Per-container dispatch: a mixed macOS node runs Seatbelt native
1592        // processes (`HostProxy`) AND VZ-Linux guests (`GuestManaged`) under the
1593        // same daemon, so the attach kind must be resolved from the backend that
1594        // actually owns `id`, not the runtime-wide default. Fall back to the
1595        // runtime-wide kind if the container can't be located.
1596        match self.lookup(id).await {
1597            Ok(rt) => rt.overlay_attach_kind(),
1598            Err(_) => self.overlay_attach_kind(),
1599        }
1600    }
1601
1602    async fn push_overlay_config(
1603        &self,
1604        id: &ContainerId,
1605        config: &zlayer_types::overlayd::GuestOverlayConfig,
1606    ) -> Result<()> {
1607        let rt = self.lookup(id).await?;
1608        rt.push_overlay_config(id, config).await
1609    }
1610
1611    async fn attach_overlay_ip(&self, id: &ContainerId, overlay_ip: IpAddr) -> Result<()> {
1612        let rt = self.lookup(id).await?;
1613        rt.attach_overlay_ip(id, overlay_ip).await
1614    }
1615
1616    async fn detach_overlay_ip(&self, id: &ContainerId) -> Result<()> {
1617        let rt = self.lookup(id).await?;
1618        rt.detach_overlay_ip(id).await
1619    }
1620
1621    async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
1622        let rt = self.lookup(id).await?;
1623        rt.get_container_ip(id).await
1624    }
1625
1626    async fn get_container_port_override(&self, id: &ContainerId) -> Result<Option<u16>> {
1627        let rt = self.lookup(id).await?;
1628        rt.get_container_port_override(id).await
1629    }
1630
1631    #[cfg(target_os = "windows")]
1632    async fn get_container_namespace_id(
1633        &self,
1634        id: &ContainerId,
1635    ) -> Result<Option<windows::core::GUID>> {
1636        let rt = self.lookup(id).await?;
1637        rt.get_container_namespace_id(id).await
1638    }
1639
1640    async fn sync_container_volumes(&self, id: &ContainerId) -> Result<()> {
1641        let rt = self.lookup(id).await?;
1642        rt.sync_container_volumes(id).await
1643    }
1644
1645    async fn list_images(&self) -> Result<Vec<ImageInfo>> {
1646        // Fan out over every configured runtime and merge their image lists.
1647        // Crucially, a *single* backend's failure must not fail the whole
1648        // call: on macOS the `primary` (SandboxRuntime) does not implement
1649        // `list_images` at all (it returns `Unsupported`), yet pulled Linux
1650        // images live in the `vz_linux` delegate's store. Propagating the
1651        // primary's error via `?` here used to surface as a 500 on
1652        // `GET /images/json` (and, via the inspect fallback, broke every
1653        // `docker pull` verification). Tolerate per-backend errors the same
1654        // way we already tolerate the delegate's, and include the VZ +
1655        // VZ-Linux delegates so their images are actually listable.
1656        let mut out: Vec<ImageInfo> = Vec::new();
1657        let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1658        let mut any_ok = false;
1659        let mut last_err: Option<AgentError> = None;
1660
1661        for (label, rt) in [
1662            Some(("primary", &self.primary)),
1663            self.delegate.as_ref().map(|d| ("delegate", d)),
1664            self.vz.as_ref().map(|d| ("vz", d)),
1665            self.vz_linux.as_ref().map(|d| ("vz_linux", d)),
1666        ]
1667        .into_iter()
1668        .flatten()
1669        {
1670            match rt.list_images().await {
1671                Ok(images) => {
1672                    any_ok = true;
1673                    for img in images {
1674                        // De-dup by reference so an image registered in more
1675                        // than one backend isn't reported twice.
1676                        if seen.insert(img.reference.clone()) {
1677                            out.push(img);
1678                        }
1679                    }
1680                }
1681                Err(e) => {
1682                    tracing::debug!(
1683                        runtime = label,
1684                        error = %e,
1685                        "composite list_images: backend returned an error; skipping it",
1686                    );
1687                    last_err = Some(e);
1688                }
1689            }
1690        }
1691
1692        // Only fail if *every* backend errored. With at least one success we
1693        // return the merged (possibly empty) list — an empty image set is a
1694        // valid response, not an error.
1695        if any_ok {
1696            Ok(out)
1697        } else {
1698            Err(last_err.unwrap_or_else(|| {
1699                AgentError::Unsupported("no runtime implements list_images".into())
1700            }))
1701        }
1702    }
1703
1704    async fn remove_image(&self, image: &str, force: bool) -> Result<()> {
1705        match self.primary.remove_image(image, force).await {
1706            Ok(()) => Ok(()),
1707            Err(primary_err) => {
1708                if let Some(delegate) = &self.delegate {
1709                    match delegate.remove_image(image, force).await {
1710                        Ok(()) => Ok(()),
1711                        Err(delegate_err) => {
1712                            tracing::debug!(
1713                                image,
1714                                %delegate_err,
1715                                "delegate remove_image also failed; returning primary error",
1716                            );
1717                            Err(primary_err)
1718                        }
1719                    }
1720                } else {
1721                    Err(primary_err)
1722                }
1723            }
1724        }
1725    }
1726
1727    async fn prune_images(&self) -> Result<PruneResult> {
1728        // Symmetric with `remove_image` / `tag_image`: a primary that does not
1729        // implement pruning (e.g. a cache-less backend that returns
1730        // `Unsupported`) must not 501 the whole call when a delegate exists and
1731        // could still reclaim space. Only a primary `Unsupported` is tolerated;
1732        // any other primary error still propagates.
1733        let mut result = match self.primary.prune_images().await {
1734            Ok(r) => r,
1735            Err(AgentError::Unsupported(reason)) if self.delegate.is_some() => {
1736                tracing::debug!(
1737                    %reason,
1738                    "primary runtime does not support prune_images; relying on delegate",
1739                );
1740                PruneResult::default()
1741            }
1742            Err(e) => return Err(e),
1743        };
1744        if let Some(delegate) = &self.delegate {
1745            match delegate.prune_images().await {
1746                Ok(extra) => {
1747                    result.deleted.extend(extra.deleted);
1748                    result.space_reclaimed =
1749                        result.space_reclaimed.saturating_add(extra.space_reclaimed);
1750                }
1751                Err(e) => tracing::warn!(
1752                    error = %e,
1753                    "delegate runtime prune_images failed; returning primary result only",
1754                ),
1755            }
1756        }
1757        Ok(result)
1758    }
1759
1760    async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
1761        let rt = self.lookup(id).await?;
1762        rt.kill_container(id, signal).await
1763    }
1764
1765    async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
1766        match self.primary.tag_image(source, target).await {
1767            Ok(()) => Ok(()),
1768            Err(primary_err) => {
1769                if let Some(delegate) = &self.delegate {
1770                    match delegate.tag_image(source, target).await {
1771                        Ok(()) => Ok(()),
1772                        Err(delegate_err) => {
1773                            tracing::debug!(
1774                                source,
1775                                target,
1776                                %delegate_err,
1777                                "delegate tag_image also failed; returning primary error",
1778                            );
1779                            Err(primary_err)
1780                        }
1781                    }
1782                } else {
1783                    Err(primary_err)
1784                }
1785            }
1786        }
1787    }
1788
1789    async fn inspect_image_native(&self, image: &str) -> Result<ImageInspectInfo> {
1790        // Same shape as `remove_image` / `tag_image`: try the primary first,
1791        // fall back to the delegate. On macOS the primary is the Seatbelt
1792        // SandboxRuntime, which reads a natively-built image's os/config from
1793        // its `metadata.json` sidecar (returns `os = darwin`). Without this
1794        // override the trait default returns `Unsupported` and the inspect
1795        // handler 501s even though the primary can answer.
1796        match self.primary.inspect_image_native(image).await {
1797            Ok(info) => Ok(info),
1798            Err(primary_err) => {
1799                if let Some(delegate) = &self.delegate {
1800                    match delegate.inspect_image_native(image).await {
1801                        Ok(info) => Ok(info),
1802                        Err(delegate_err) => {
1803                            tracing::debug!(
1804                                image,
1805                                %delegate_err,
1806                                "delegate inspect_image_native also failed; returning primary error",
1807                            );
1808                            Err(primary_err)
1809                        }
1810                    }
1811                } else {
1812                    Err(primary_err)
1813                }
1814            }
1815        }
1816    }
1817
1818    async fn inspect_detailed(&self, id: &ContainerId) -> Result<ContainerInspectDetails> {
1819        let rt = self.lookup(id).await?;
1820        rt.inspect_detailed(id).await
1821    }
1822
1823    /// Forward the daemon's secrets provider to every inner runtime that can
1824    /// build containers.
1825    ///
1826    /// `create_container` dispatches via [`Self::select_for`] →
1827    /// [`Self::runtime_for`], which can return ANY of `primary`, `delegate`,
1828    /// `vz`, or `vz_linux` depending on the workload's OS / isolation label.
1829    /// The composite is the live daemon's `Arc<dyn Runtime>`, so if we did not
1830    /// override this the trait's no-op default would swallow the injection and
1831    /// `$S:` secret references would never resolve in whichever backend ends up
1832    /// creating the container. Forward to all four candidates so every possible
1833    /// dispatch target has the provider (each clones the `Arc`; the last moves
1834    /// it).
1835    fn set_secrets_provider(&self, provider: std::sync::Arc<dyn zlayer_secrets::SecretsProvider>) {
1836        self.primary.set_secrets_provider(provider.clone());
1837        if let Some(delegate) = &self.delegate {
1838            delegate.set_secrets_provider(provider.clone());
1839        }
1840        if let Some(vz) = &self.vz {
1841            vz.set_secrets_provider(provider.clone());
1842        }
1843        if let Some(vz_linux) = &self.vz_linux {
1844            vz_linux.set_secrets_provider(provider);
1845        }
1846    }
1847}
1848
1849/// Sanitize an image reference into its on-disk image-directory name, matching
1850/// the scheme the Seatbelt builder and sandbox runtime use
1851/// (`{images}/{sanitized}/`). Kept identical to those copies so the sidecar
1852/// probe in [`CompositeRuntime::image_os_from_local_sidecar`] lands on the same
1853/// directory the builder wrote.
1854fn sanitize_image_name(image: &str) -> String {
1855    image.replace(['/', ':', '@'], "_")
1856}
1857
1858#[cfg(test)]
1859mod tests {
1860    use super::*;
1861    use crate::cgroups_stats::ContainerStats;
1862    use std::sync::Mutex as StdMutex;
1863    use zlayer_spec::{ArchKind, DeploymentSpec, TargetPlatform};
1864
1865    /// A `TempDir` for tests that materialize a real on-disk **database** (the
1866    /// persistent blob cache these tests seed). It is rooted on a REAL disk,
1867    /// never the platform default `/tmp` — which on Linux is almost always a
1868    /// RAM-backed `tmpfs`. Under full-workspace `cargo test` parallelism,
1869    /// hundreds of these caches are created at once; in the ZQL build each
1870    /// cache writes a WAL(+SSTable) tree (orders of magnitude larger than the
1871    /// `sqlx` build's single small `SQLite` file), so placing them on `tmpfs`
1872    /// exhausts that RAM-backed mount. A write that hits `ENOSPC` mid-flush is
1873    /// then silently dropped (the engine's `Drop`-time flush ignores errors),
1874    /// and a later `get` on a *separate* cache instance reopened at the same
1875    /// path returns `None` — exactly the intermittent
1876    /// `cache.get(config_digest) == None` flake that only the ZQL build
1877    /// exhibits. Rooting these temp dirs on the workspace `target/` directory
1878    /// (a real filesystem) removes the tmpfs pressure entirely and is harmless
1879    /// to the `sqlx` build.
1880    ///
1881    /// Resolution: prefer `$CARGO_TARGET_DIR`, else the crate's
1882    /// `target/test-tmp` (via `CARGO_MANIFEST_DIR`); fall back to the platform
1883    /// temp dir only if that real-disk base cannot be created (e.g. a
1884    /// read-only source checkout) so the test never hard-fails on environment.
1885    fn db_tempdir() -> tempfile::TempDir {
1886        let base = std::env::var_os("CARGO_TARGET_DIR").map_or_else(
1887            || {
1888                std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
1889                    .join("../../target")
1890                    .join("test-tmp")
1891            },
1892            std::path::PathBuf::from,
1893        );
1894        if std::fs::create_dir_all(&base).is_ok() {
1895            if let Ok(td) = tempfile::Builder::new()
1896                .prefix("zql-test-")
1897                .tempdir_in(&base)
1898            {
1899                return td;
1900            }
1901        }
1902        // Last resort: platform default (may be tmpfs). Better a possibly-flaky
1903        // test than a hard failure on an unusual environment.
1904        tempfile::tempdir().unwrap()
1905    }
1906
1907    /// Which runtime a mock represents. Only used for labelling invocation
1908    /// records in tests.
1909    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1910    enum Role {
1911        Primary,
1912        Delegate,
1913        Vz,
1914        VzLinux,
1915    }
1916
1917    /// One recorded invocation: (runtime role, method name, container id).
1918    type CallRecord = (Role, String, Option<ContainerId>);
1919    /// Shared, thread-safe log of every mock call made in a single test.
1920    type CallLog = Arc<StdMutex<Vec<CallRecord>>>;
1921
1922    /// Mock runtime that records every method call it receives.
1923    ///
1924    /// This is intentionally minimal — just enough trait surface to exercise
1925    /// the composite's dispatch logic. Every recorded call includes the role
1926    /// (primary vs delegate), the method name, and the container id (or
1927    /// `None` for cross-cutting image operations).
1928    struct MockRuntime {
1929        role: Role,
1930        calls: CallLog,
1931        list_images_response: Vec<ImageInfo>,
1932        /// When set, `list_images` returns `AgentError::Unsupported(msg)`
1933        /// instead of `list_images_response`. Models a backend (e.g. the macOS
1934        /// `SandboxRuntime` primary) that does not implement image listing.
1935        list_images_error: Option<String>,
1936        pull_image_error: Option<String>,
1937        /// When set, both `pull_image` and `pull_image_with_policy` return a
1938        /// freshly-built [`AgentError::WrongPlatform`] using these fields
1939        /// (`expected`, `actual`). Takes precedence over `pull_image_error`
1940        /// so tests can simulate a wrong-platform soft skip end-to-end.
1941        pull_image_wrong_platform: Option<(&'static str, &'static str)>,
1942        /// When `true`, the *streaming* reads (`logs_stream` / `stats_stream`)
1943        /// return `AgentError::Unsupported`, modelling a backend (e.g. the macOS
1944        /// `SandboxRuntime` primary) that implements the snapshot reads
1945        /// (`container_logs` / `get_container_stats`) but not the streaming
1946        /// ones — exactly the case that used to surface as a swallowed 500.
1947        stream_unsupported: bool,
1948        /// When `true`, *every* per-container read (`container_logs`,
1949        /// `get_logs`, `get_container_stats`, `logs_stream`, `stats_stream`)
1950        /// returns `AgentError::NotFound`, modelling a backend that does not own
1951        /// the container at all. The composite must NOT mask this as success,
1952        /// and a genuine all-not-found must propagate as `NotFound` (404).
1953        reads_not_found: bool,
1954        /// Captured-log snapshot returned by `container_logs` / `get_logs`
1955        /// (unless `reads_not_found`). Lets a delegate model real workload
1956        /// output the composite's snapshot fallback should surface.
1957        logs_response: Vec<LogEntry>,
1958        /// When `true`, the snapshot `get_container_stats` returns
1959        /// `AgentError::Unsupported` (a soft miss), modelling a backend that
1960        /// owns the container but cannot report stats at all. Forces the
1961        /// composite to fall back to another backend.
1962        stats_snapshot_unsupported: bool,
1963        /// `prune_images` response. `None` models a backend that does not
1964        /// implement pruning (returns `AgentError::Unsupported`, like the trait
1965        /// default); `Some(result)` models a backend that prunes and reports
1966        /// the given [`PruneResult`].
1967        prune_images_response: Option<PruneResult>,
1968        /// When set, `create_container` returns
1969        /// `AgentError::CreateFailed { reason }` instead of `Ok(())`. Models a
1970        /// backend (e.g. the macOS Seatbelt sandbox) whose create fails so the
1971        /// composite must fall back to native-VZ.
1972        create_err: Option<String>,
1973    }
1974
1975    impl MockRuntime {
1976        fn new(role: Role, calls: CallLog) -> Self {
1977            Self {
1978                role,
1979                calls,
1980                list_images_response: Vec::new(),
1981                list_images_error: None,
1982                pull_image_error: None,
1983                pull_image_wrong_platform: None,
1984                stream_unsupported: false,
1985                reads_not_found: false,
1986                logs_response: Vec::new(),
1987                stats_snapshot_unsupported: false,
1988                prune_images_response: None,
1989                create_err: None,
1990            }
1991        }
1992
1993        /// `create_container` fails with `CreateFailed { reason: msg }`.
1994        fn with_create_error(mut self, msg: &str) -> Self {
1995            self.create_err = Some(msg.to_string());
1996            self
1997        }
1998
1999        /// Streaming reads return `Unsupported`; snapshot reads still work.
2000        fn with_stream_unsupported(mut self) -> Self {
2001            self.stream_unsupported = true;
2002            self
2003        }
2004
2005        /// Every per-container read returns `NotFound`.
2006        fn with_reads_not_found(mut self) -> Self {
2007            self.reads_not_found = true;
2008            self
2009        }
2010
2011        /// Set the captured-log snapshot returned by the snapshot reads.
2012        fn with_logs(mut self, logs: Vec<LogEntry>) -> Self {
2013            self.logs_response = logs;
2014            self
2015        }
2016
2017        /// Snapshot `get_container_stats` returns `Unsupported` (a soft miss).
2018        fn with_stats_snapshot_unsupported(mut self) -> Self {
2019            self.stats_snapshot_unsupported = true;
2020            self
2021        }
2022
2023        /// `prune_images` succeeds and reports the given [`PruneResult`].
2024        fn with_prune_result(mut self, result: PruneResult) -> Self {
2025            self.prune_images_response = Some(result);
2026            self
2027        }
2028
2029        fn build_wrong_platform_error(&self, image: &str) -> Option<AgentError> {
2030            self.pull_image_wrong_platform
2031                .map(|(expected, actual)| AgentError::WrongPlatform {
2032                    runtime: match self.role {
2033                        Role::Primary => "primary-mock".to_string(),
2034                        Role::Delegate => "delegate-mock".to_string(),
2035                        Role::Vz => "vz-mock".to_string(),
2036                        Role::VzLinux => "vz-linux-mock".to_string(),
2037                    },
2038                    expected: expected.to_string(),
2039                    actual: actual.to_string(),
2040                    image: image.to_string(),
2041                })
2042        }
2043
2044        fn record(&self, method: &str, id: Option<&ContainerId>) {
2045            self.calls
2046                .lock()
2047                .expect("mock call-log mutex poisoned")
2048                .push((self.role, method.to_string(), id.cloned()));
2049        }
2050    }
2051
2052    #[async_trait]
2053    impl Runtime for MockRuntime {
2054        async fn pull_image(&self, image: &str) -> Result<()> {
2055            self.record("pull_image", None);
2056            if let Some(err) = self.build_wrong_platform_error(image) {
2057                return Err(err);
2058            }
2059            if let Some(msg) = &self.pull_image_error {
2060                return Err(AgentError::Internal(msg.clone()));
2061            }
2062            Ok(())
2063        }
2064
2065        async fn pull_image_with_policy(
2066            &self,
2067            image: &str,
2068            _policy: PullPolicy,
2069            _auth: Option<&RegistryAuth>,
2070            _source: zlayer_spec::SourcePolicy,
2071        ) -> Result<()> {
2072            self.record("pull_image_with_policy", None);
2073            if let Some(err) = self.build_wrong_platform_error(image) {
2074                return Err(err);
2075            }
2076            if let Some(msg) = &self.pull_image_error {
2077                return Err(AgentError::Internal(msg.clone()));
2078            }
2079            Ok(())
2080        }
2081
2082        async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
2083            self.record("create_container", Some(id));
2084            if let Some(reason) = &self.create_err {
2085                return Err(AgentError::CreateFailed {
2086                    id: id.to_string(),
2087                    reason: reason.clone(),
2088                });
2089            }
2090            Ok(())
2091        }
2092
2093        async fn start_container(&self, id: &ContainerId) -> Result<()> {
2094            self.record("start_container", Some(id));
2095            Ok(())
2096        }
2097
2098        async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
2099            self.record("stop_container", Some(id));
2100            Ok(())
2101        }
2102
2103        async fn remove_container(&self, id: &ContainerId) -> Result<()> {
2104            self.record("remove_container", Some(id));
2105            Ok(())
2106        }
2107
2108        async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
2109            self.record("container_state", Some(id));
2110            Ok(ContainerState::Running)
2111        }
2112
2113        async fn container_logs(&self, id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
2114            self.record("container_logs", Some(id));
2115            if self.reads_not_found {
2116                return Err(mock_not_found());
2117            }
2118            Ok(self.logs_response.clone())
2119        }
2120
2121        async fn exec(&self, id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
2122            self.record("exec", Some(id));
2123            Ok((0, String::new(), String::new()))
2124        }
2125
2126        async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
2127            self.record("get_container_stats", Some(id));
2128            if self.reads_not_found {
2129                return Err(mock_not_found());
2130            }
2131            if self.stats_snapshot_unsupported {
2132                return Err(AgentError::Unsupported("mock has no snapshot stats".into()));
2133            }
2134            Ok(ContainerStats {
2135                cpu_usage_usec: 1_000,
2136                memory_bytes: 4096,
2137                memory_limit: 8192,
2138                timestamp: std::time::Instant::now(),
2139            })
2140        }
2141
2142        async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
2143            self.record("wait_container", Some(id));
2144            Ok(0)
2145        }
2146
2147        async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
2148            self.record("get_logs", Some(id));
2149            if self.reads_not_found {
2150                return Err(mock_not_found());
2151            }
2152            Ok(self.logs_response.clone())
2153        }
2154
2155        async fn logs_stream(
2156            &self,
2157            id: &ContainerId,
2158            _opts: LogsStreamOptions,
2159        ) -> Result<LogsStream> {
2160            self.record("logs_stream", Some(id));
2161            if self.reads_not_found {
2162                return Err(mock_not_found());
2163            }
2164            if self.stream_unsupported {
2165                return Err(AgentError::Unsupported("mock has no log stream".into()));
2166            }
2167            // A backend that owns a native stream replays its captured logs.
2168            Ok(one_shot_logs_stream(
2169                self.logs_response.clone(),
2170                &LogsStreamOptions::default(),
2171            ))
2172        }
2173
2174        async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
2175            use futures_util::stream;
2176            self.record("stats_stream", Some(id));
2177            if self.reads_not_found {
2178                return Err(mock_not_found());
2179            }
2180            if self.stream_unsupported {
2181                return Err(AgentError::Unsupported("mock has no stats stream".into()));
2182            }
2183            Ok(Box::pin(stream::iter(vec![Ok(StatsSample {
2184                cpu_total_ns: 0,
2185                cpu_system_ns: 0,
2186                online_cpus: 1,
2187                mem_used_bytes: 4096,
2188                mem_limit_bytes: 8192,
2189                net_rx_bytes: 0,
2190                net_tx_bytes: 0,
2191                blkio_read_bytes: 0,
2192                blkio_write_bytes: 0,
2193                pids_current: 0,
2194                pids_limit: None,
2195                timestamp: chrono::Utc::now(),
2196            })])))
2197        }
2198
2199        async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
2200            self.record("get_container_pid", Some(id));
2201            Ok(None)
2202        }
2203
2204        async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
2205            self.record("get_container_ip", Some(id));
2206            Ok(None)
2207        }
2208
2209        async fn list_images(&self) -> Result<Vec<ImageInfo>> {
2210            self.record("list_images", None);
2211            if let Some(msg) = &self.list_images_error {
2212                return Err(AgentError::Unsupported(msg.clone()));
2213            }
2214            Ok(self.list_images_response.clone())
2215        }
2216
2217        async fn prune_images(&self) -> Result<PruneResult> {
2218            self.record("prune_images", None);
2219            match &self.prune_images_response {
2220                Some(result) => Ok(result.clone()),
2221                None => Err(AgentError::Unsupported(
2222                    "mock runtime does not support prune_images".into(),
2223                )),
2224            }
2225        }
2226    }
2227
2228    /// Build a [`ServiceSpec`] (with the given image name) from the minimal
2229    /// inline YAML the existing runtime tests use, then optionally set a
2230    /// target platform on it.
2231    fn make_spec(image: &str, platform: Option<TargetPlatform>) -> ServiceSpec {
2232        let yaml = format!(
2233            r"
2234version: v1
2235deployment: test
2236services:
2237  test:
2238    rtype: service
2239    image:
2240      name: {image}
2241    endpoints:
2242      - name: http
2243        protocol: http
2244        port: 8080
2245"
2246        );
2247        let mut spec = serde_yaml::from_str::<DeploymentSpec>(&yaml)
2248            .expect("valid deployment yaml")
2249            .services
2250            .remove("test")
2251            .expect("service 'test' present");
2252        spec.platform = platform;
2253        spec
2254    }
2255
2256    fn cid(service: &str, replica: u32) -> ContainerId {
2257        ContainerId::new(service.to_string(), replica)
2258    }
2259
2260    fn make_composite(with_delegate: bool) -> (CompositeRuntime, CallLog) {
2261        let calls = Arc::new(StdMutex::new(Vec::new()));
2262        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
2263        let delegate = if with_delegate {
2264            Some(Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>)
2265        } else {
2266            None
2267        };
2268        (
2269            CompositeRuntime::new(primary as Arc<dyn Runtime>, delegate),
2270            calls,
2271        )
2272    }
2273
2274    fn role_for(calls: &[CallRecord], method: &str) -> Option<Role> {
2275        calls
2276            .iter()
2277            .find(|(_, m, _)| m == method)
2278            .map(|(role, _, _)| *role)
2279    }
2280
2281    /// The `NotFound` a `MockRuntime` returns when it does not own a container.
2282    fn mock_not_found() -> AgentError {
2283        AgentError::NotFound {
2284            container: "mock".to_string(),
2285            reason: "mock backend does not own this container".to_string(),
2286        }
2287    }
2288
2289    #[tokio::test]
2290    async fn dispatch_windows_spec_goes_to_primary() {
2291        let (rt, calls) = make_composite(true);
2292        let id = cid("win-svc", 0);
2293        let spec = make_spec(
2294            "mcr.microsoft.com/windows/nanoserver:ltsc2022",
2295            Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
2296        );
2297
2298        rt.create_container(&id, &spec).await.unwrap();
2299        rt.start_container(&id).await.unwrap();
2300
2301        let calls = calls.lock().unwrap();
2302        assert_eq!(
2303            role_for(&calls, "create_container"),
2304            Some(Role::Primary),
2305            "create_container should hit primary for Windows spec"
2306        );
2307        assert_eq!(
2308            role_for(&calls, "start_container"),
2309            Some(Role::Primary),
2310            "start_container should hit primary for Windows spec"
2311        );
2312    }
2313
2314    #[tokio::test]
2315    async fn dispatch_linux_spec_goes_to_delegate() {
2316        let (rt, calls) = make_composite(true);
2317        let id = cid("lin-svc", 0);
2318        let spec = make_spec(
2319            "docker.io/library/alpine:3.19",
2320            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
2321        );
2322
2323        rt.create_container(&id, &spec).await.unwrap();
2324        rt.start_container(&id).await.unwrap();
2325
2326        let calls = calls.lock().unwrap();
2327        assert_eq!(
2328            role_for(&calls, "create_container"),
2329            Some(Role::Delegate),
2330            "create_container should hit delegate for Linux spec"
2331        );
2332        assert_eq!(
2333            role_for(&calls, "start_container"),
2334            Some(Role::Delegate),
2335            "start_container should hit delegate for Linux spec"
2336        );
2337    }
2338
2339    #[tokio::test]
2340    async fn dispatch_linux_without_delegate_errors() {
2341        // H-7 policy: a Linux spec on a node without a delegate must return
2342        // `RouteToPeer` (not `Unsupported`, not a silent primary fall-through)
2343        // so the scheduler can re-place the workload on a capable peer.
2344        let (rt, _calls) = make_composite(false);
2345        let id = cid("lin-svc", 0);
2346        let spec = make_spec(
2347            "docker.io/library/alpine:3.19",
2348            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
2349        );
2350
2351        let err = rt.create_container(&id, &spec).await.unwrap_err();
2352        match err {
2353            AgentError::RouteToPeer {
2354                service,
2355                required_os,
2356                reason,
2357            } => {
2358                assert_eq!(service, "lin-svc");
2359                assert_eq!(required_os, "linux");
2360                assert!(
2361                    reason.contains("--install-wsl") && reason.contains("Linux peer"),
2362                    "reason must name both remediations, got: {reason}"
2363                );
2364            }
2365            other => panic!("expected RouteToPeer, got {other:?}"),
2366        }
2367    }
2368
2369    #[tokio::test]
2370    async fn dispatch_linux_image_cache_without_delegate_routes_to_peer() {
2371        // H-7 policy: even when `spec.platform` is unset, a Linux image in the
2372        // OS cache must route to a peer instead of falling through to primary.
2373        // This is the old permissive-fallthrough path the comment at lines
2374        // 172-178 used to describe; the behavior is now strict.
2375        let (rt, _calls) = make_composite(false);
2376        let id = cid("svc", 0);
2377        let image = "docker.io/library/nginx:1.25";
2378        rt.record_image_os(image, OsKind::Linux).await;
2379
2380        let spec = make_spec(image, None);
2381        let err = rt.create_container(&id, &spec).await.unwrap_err();
2382        match err {
2383            AgentError::RouteToPeer {
2384                service,
2385                required_os,
2386                reason,
2387            } => {
2388                assert_eq!(service, "svc");
2389                assert_eq!(required_os, "linux");
2390                assert!(
2391                    reason.contains(image),
2392                    "reason should mention the image name, got: {reason}"
2393                );
2394                assert!(
2395                    reason.contains("--install-wsl") && reason.contains("Linux peer"),
2396                    "reason must name both remediations, got: {reason}"
2397                );
2398            }
2399            other => panic!("expected RouteToPeer, got {other:?}"),
2400        }
2401    }
2402
2403    #[tokio::test]
2404    async fn dispatch_macos_spec_goes_to_primary() {
2405        let (rt, calls) = make_composite(true);
2406        let id = cid("mac-svc", 0);
2407        let spec = make_spec(
2408            "ghcr.io/zlayer/macos:latest",
2409            Some(TargetPlatform::new(OsKind::Macos, ArchKind::Arm64)),
2410        );
2411
2412        rt.create_container(&id, &spec).await.unwrap();
2413
2414        let calls = calls.lock().unwrap();
2415        assert_eq!(
2416            role_for(&calls, "create_container"),
2417            Some(Role::Primary),
2418            "create_container should hit primary for Macos spec"
2419        );
2420    }
2421
2422    #[tokio::test]
2423    async fn dispatch_no_platform_no_image_os_falls_through_to_primary() {
2424        let (rt, calls) = make_composite(true);
2425        let id = cid("svc", 0);
2426        let spec = make_spec("docker.io/library/nginx:1.25", None);
2427
2428        rt.create_container(&id, &spec).await.unwrap();
2429
2430        let calls = calls.lock().unwrap();
2431        assert_eq!(
2432            role_for(&calls, "create_container"),
2433            Some(Role::Primary),
2434            "fall-through should pick primary when both platform and image-OS cache are unknown"
2435        );
2436    }
2437
2438    #[tokio::test]
2439    async fn dispatch_uses_image_os_cache_when_platform_missing() {
2440        let (rt, calls) = make_composite(true);
2441        let id = cid("svc", 0);
2442        let image = "docker.io/library/nginx:1.25";
2443        rt.record_image_os(image, OsKind::Linux).await;
2444
2445        let spec = make_spec(image, None);
2446        rt.create_container(&id, &spec).await.unwrap();
2447
2448        let calls = calls.lock().unwrap();
2449        assert_eq!(
2450            role_for(&calls, "create_container"),
2451            Some(Role::Delegate),
2452            "image-OS cache should route Linux images to the delegate"
2453        );
2454    }
2455
2456    /// Composite with primary + delegate + an attached VZ delegate, all sharing
2457    /// one call log.
2458    fn make_composite_with_vz() -> (CompositeRuntime, CallLog) {
2459        let calls = Arc::new(StdMutex::new(Vec::new()));
2460        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
2461        let delegate =
2462            Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>;
2463        let vz = Arc::new(MockRuntime::new(Role::Vz, Arc::clone(&calls))) as Arc<dyn Runtime>;
2464        let rt = CompositeRuntime::new(primary as Arc<dyn Runtime>, Some(delegate))
2465            .with_vz_delegate(Some(vz));
2466        (rt, calls)
2467    }
2468
2469    #[tokio::test]
2470    async fn dispatch_vz_bundle_annotation_auto_routes_to_vz() {
2471        let (rt, calls) = make_composite_with_vz();
2472        let id = cid("mac-svc", 0);
2473        let image = "ghcr.io/org/macos-vz:sequoia";
2474        // Simulate the manifest inspection having cached `com.zlayer.runtime=vz`.
2475        rt.record_image_runtime(image, "vz".to_string()).await;
2476
2477        let spec = make_spec(image, None);
2478        rt.create_container(&id, &spec).await.unwrap();
2479
2480        let calls = calls.lock().unwrap();
2481        assert_eq!(
2482            role_for(&calls, "create_container"),
2483            Some(Role::Vz),
2484            "a com.zlayer.runtime=vz bundle should auto-route to the VZ runtime"
2485        );
2486    }
2487
2488    #[tokio::test]
2489    async fn dispatch_vz_label_forces_vz() {
2490        let (rt, calls) = make_composite_with_vz();
2491        let id = cid("mac-svc", 0);
2492        let mut spec = make_spec("ghcr.io/org/whatever:1", None);
2493        spec.labels
2494            .insert("com.zlayer.isolation".to_string(), "vz".to_string());
2495
2496        rt.create_container(&id, &spec).await.unwrap();
2497
2498        let calls = calls.lock().unwrap();
2499        assert_eq!(
2500            role_for(&calls, "create_container"),
2501            Some(Role::Vz),
2502            "an explicit com.zlayer.isolation=vz label should force the VZ runtime"
2503        );
2504    }
2505
2506    #[tokio::test]
2507    async fn dispatch_sandbox_label_overrides_vz_bundle() {
2508        let (rt, calls) = make_composite_with_vz();
2509        let id = cid("mac-svc", 0);
2510        let image = "ghcr.io/org/macos-vz:sequoia";
2511        rt.record_image_runtime(image, "vz".to_string()).await;
2512
2513        let mut spec = make_spec(image, None);
2514        spec.labels
2515            .insert("com.zlayer.isolation".to_string(), "sandbox".to_string());
2516        rt.create_container(&id, &spec).await.unwrap();
2517
2518        let calls = calls.lock().unwrap();
2519        assert_eq!(
2520            role_for(&calls, "create_container"),
2521            Some(Role::Primary),
2522            "com.zlayer.isolation=sandbox should opt out of VZ auto-detect (force the sandbox)"
2523        );
2524    }
2525
2526    /// Like `make_composite_with_vz`, but the caller supplies a pre-configured
2527    /// primary (Seatbelt) mock — e.g. one set to fail `create_container` — so a
2528    /// test can exercise the Seatbelt -> native-VZ fallback.
2529    fn make_composite_with_vz_primary(primary: MockRuntime) -> (CompositeRuntime, CallLog) {
2530        let calls = Arc::clone(&primary.calls);
2531        let primary = Arc::new(primary);
2532        let delegate =
2533            Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>;
2534        let vz = Arc::new(MockRuntime::new(Role::Vz, Arc::clone(&calls))) as Arc<dyn Runtime>;
2535        let rt = CompositeRuntime::new(primary as Arc<dyn Runtime>, Some(delegate))
2536            .with_vz_delegate(Some(vz));
2537        (rt, calls)
2538    }
2539
2540    #[tokio::test]
2541    async fn macos_image_falls_back_to_vz_when_seatbelt_create_fails() {
2542        let primary = MockRuntime::new(Role::Primary, Arc::new(StdMutex::new(Vec::new())))
2543            .with_create_error("seatbelt boom");
2544        let (rt, calls) = make_composite_with_vz_primary(primary);
2545        let id = cid("mac-svc", 0);
2546        let image = "ghcr.io/org/macos-native:sequoia";
2547        rt.record_image_os(image, OsKind::Macos).await;
2548
2549        let spec = make_spec(image, None);
2550        // Seatbelt create fails, but native-VZ fallback succeeds.
2551        rt.create_container(&id, &spec).await.unwrap();
2552
2553        // A subsequent start must route to VZ (dispatch cache updated).
2554        rt.start_container(&id).await.unwrap();
2555
2556        let calls = calls.lock().unwrap();
2557        let creates: Vec<Role> = calls
2558            .iter()
2559            .filter(|(_, m, _)| m == "create_container")
2560            .map(|(role, _, _)| *role)
2561            .collect();
2562        assert_eq!(
2563            creates,
2564            vec![Role::Primary, Role::Vz],
2565            "seatbelt create should be attempted first, then native-VZ as fallback"
2566        );
2567        assert_eq!(
2568            role_for(&calls, "start_container"),
2569            Some(Role::Vz),
2570            "after the fallback the dispatch cache should route the container to VZ"
2571        );
2572    }
2573
2574    #[tokio::test]
2575    async fn macos_image_stays_on_primary_when_seatbelt_create_succeeds() {
2576        let (rt, calls) = make_composite_with_vz();
2577        let id = cid("mac-svc", 0);
2578        let image = "ghcr.io/org/macos-native:sequoia";
2579        rt.record_image_os(image, OsKind::Macos).await;
2580
2581        let spec = make_spec(image, None);
2582        rt.create_container(&id, &spec).await.unwrap();
2583
2584        let calls = calls.lock().unwrap();
2585        let creates: Vec<Role> = calls
2586            .iter()
2587            .filter(|(_, m, _)| m == "create_container")
2588            .map(|(role, _, _)| *role)
2589            .collect();
2590        assert_eq!(
2591            creates,
2592            vec![Role::Primary],
2593            "a successful Seatbelt create must stay on Primary with no VZ fallback"
2594        );
2595    }
2596
2597    #[tokio::test]
2598    async fn linux_image_create_failure_does_not_fall_back_to_vz() {
2599        // A Linux image routes to the VZ Linux-guest, not Primary, and a Primary
2600        // failure path must never be reached for it. Even if the chosen backend
2601        // fails, the macOS Seatbelt -> native-VZ fallback must not engage.
2602        let primary = MockRuntime::new(Role::Primary, Arc::new(StdMutex::new(Vec::new())))
2603            .with_create_error("seatbelt boom");
2604        let (rt, calls) = make_composite_with_vz_primary(primary);
2605        let id = cid("lin-svc", 0);
2606        let image = "docker.io/library/alpine:3.19";
2607        rt.record_image_os(image, OsKind::Linux).await;
2608
2609        let spec = make_spec(image, None);
2610        // With only a VZ (native) delegate attached and no vz_linux/delegate-less
2611        // routing surprises, a Linux image routes to the Delegate here, which
2612        // succeeds — no Primary create, no VZ fallback.
2613        rt.create_container(&id, &spec).await.unwrap();
2614
2615        let calls = calls.lock().unwrap();
2616        let creates: Vec<Role> = calls
2617            .iter()
2618            .filter(|(_, m, _)| m == "create_container")
2619            .map(|(role, _, _)| *role)
2620            .collect();
2621        assert_eq!(
2622            creates,
2623            vec![Role::Delegate],
2624            "a Linux image must route to the delegate and never trigger the macOS VZ fallback"
2625        );
2626    }
2627
2628    /// Composite with primary + delegate (libkrun) + a VZ Linux-guest delegate,
2629    /// all sharing one call log. Mirrors `make_composite_with_vz`.
2630    fn make_composite_with_vz_linux() -> (CompositeRuntime, CallLog) {
2631        let calls = Arc::new(StdMutex::new(Vec::new()));
2632        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
2633        let delegate =
2634            Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>;
2635        let vz_linux =
2636            Arc::new(MockRuntime::new(Role::VzLinux, Arc::clone(&calls))) as Arc<dyn Runtime>;
2637        let rt = CompositeRuntime::new(primary as Arc<dyn Runtime>, Some(delegate))
2638            .with_vz_linux_delegate(Some(vz_linux));
2639        (rt, calls)
2640    }
2641
2642    #[tokio::test]
2643    async fn dispatch_vz_linux_label_forces_vz_linux() {
2644        let (rt, calls) = make_composite_with_vz_linux();
2645        let id = cid("lin-svc", 0);
2646        let mut spec = make_spec("docker.io/library/alpine:3.19", None);
2647        spec.labels
2648            .insert("com.zlayer.isolation".to_string(), "vz-linux".to_string());
2649
2650        rt.create_container(&id, &spec).await.unwrap();
2651
2652        let calls = calls.lock().unwrap();
2653        assert_eq!(
2654            role_for(&calls, "create_container"),
2655            Some(Role::VzLinux),
2656            "com.zlayer.isolation=vz-linux must force the VZ Linux runtime"
2657        );
2658    }
2659
2660    #[tokio::test]
2661    async fn dispatch_vz_linux_marker_auto_routes_to_vz_linux() {
2662        let (rt, calls) = make_composite_with_vz_linux();
2663        let id = cid("lin-svc", 0);
2664        let image = "ghcr.io/org/linux-vz:bookworm";
2665        rt.record_image_runtime(image, "vz-linux".to_string()).await;
2666
2667        let spec = make_spec(image, None);
2668        rt.create_container(&id, &spec).await.unwrap();
2669
2670        let calls = calls.lock().unwrap();
2671        assert_eq!(
2672            role_for(&calls, "create_container"),
2673            Some(Role::VzLinux),
2674            "a com.zlayer.runtime=vz-linux marker should auto-route to the VZ Linux runtime"
2675        );
2676    }
2677
2678    #[tokio::test]
2679    async fn dispatch_image_isolation_default_sandbox_routes_to_primary() {
2680        // An image that DECLARES `com.zlayer.isolation=sandbox` (cached at pull
2681        // time) and is not known-Linux must route to the Seatbelt sandbox even
2682        // though a VZ-Linux delegate is present.
2683        let (rt, calls) = make_composite_with_vz_linux();
2684        let id = cid("mac-svc", 0);
2685        let image = "ghcr.io/org/seatbelt-app:latest";
2686        rt.record_image_isolation(image, "sandbox".to_string())
2687            .await;
2688
2689        let spec = make_spec(image, None);
2690        rt.create_container(&id, &spec).await.unwrap();
2691
2692        let calls = calls.lock().unwrap();
2693        assert_eq!(
2694            role_for(&calls, "create_container"),
2695            Some(Role::Primary),
2696            "an image-declared isolation=sandbox default must route to the Seatbelt sandbox"
2697        );
2698    }
2699
2700    #[tokio::test]
2701    async fn dispatch_image_isolation_default_vz_linux_routes_to_vz_linux() {
2702        // An image that DECLARES `com.zlayer.isolation=vz-linux` must route to
2703        // the VZ Linux runtime with no per-service label or platform.
2704        let (rt, calls) = make_composite_with_vz_linux();
2705        let id = cid("lin-svc", 0);
2706        let image = "ghcr.io/org/linux-app:latest";
2707        rt.record_image_isolation(image, "vz-linux".to_string())
2708            .await;
2709
2710        let spec = make_spec(image, None);
2711        rt.create_container(&id, &spec).await.unwrap();
2712
2713        let calls = calls.lock().unwrap();
2714        assert_eq!(
2715            role_for(&calls, "create_container"),
2716            Some(Role::VzLinux),
2717            "an image-declared isolation=vz-linux default must route to the VZ Linux runtime"
2718        );
2719    }
2720
2721    #[tokio::test]
2722    async fn dispatch_image_isolation_sandbox_guarded_to_vz_linux_for_linux_image() {
2723        // Guard: an image-declared sandbox default must NOT send a KNOWN-Linux
2724        // image to the Seatbelt sandbox (exit-127) — it is rerouted to VZ-Linux.
2725        let (rt, calls) = make_composite_with_vz_linux();
2726        let id = cid("lin-svc", 0);
2727        let image = "docker.io/library/nginx:1.25";
2728        rt.record_image_os(image, OsKind::Linux).await;
2729        rt.record_image_isolation(image, "sandbox".to_string())
2730            .await;
2731
2732        let spec = make_spec(image, None);
2733        rt.create_container(&id, &spec).await.unwrap();
2734
2735        let calls = calls.lock().unwrap();
2736        assert_eq!(
2737            role_for(&calls, "create_container"),
2738            Some(Role::VzLinux),
2739            "the Linux-image guard must override an image-declared sandbox default to VZ-Linux"
2740        );
2741    }
2742
2743    #[tokio::test]
2744    async fn dispatch_spec_runtime_overrides_image_isolation_default() {
2745        // Precedence #1 > #3: an explicit create-request `spec.runtime` beats an
2746        // image-declared isolation default.
2747        let (rt, calls) = make_composite_with_vz();
2748        let id = cid("mac-svc", 0);
2749        let image = "ghcr.io/org/whatever:1";
2750        rt.record_image_isolation(image, "sandbox".to_string())
2751            .await;
2752
2753        let mut spec = make_spec(image, None);
2754        spec.runtime = Some(RuntimeIsolation::Vz);
2755        rt.create_container(&id, &spec).await.unwrap();
2756
2757        let calls = calls.lock().unwrap();
2758        assert_eq!(
2759            role_for(&calls, "create_container"),
2760            Some(Role::Vz),
2761            "spec.runtime=Vz must override an image-declared isolation=sandbox default"
2762        );
2763    }
2764
2765    #[tokio::test]
2766    async fn dispatch_isolation_label_overrides_image_isolation_default() {
2767        // Precedence #2 > #3: an explicit `com.zlayer.isolation` service label
2768        // beats an image-declared isolation default.
2769        let (rt, calls) = make_composite_with_vz();
2770        let id = cid("mac-svc", 0);
2771        let image = "ghcr.io/org/whatever:1";
2772        rt.record_image_isolation(image, "sandbox".to_string())
2773            .await;
2774
2775        let mut spec = make_spec(image, None);
2776        spec.labels
2777            .insert("com.zlayer.isolation".to_string(), "vz".to_string());
2778        rt.create_container(&id, &spec).await.unwrap();
2779
2780        let calls = calls.lock().unwrap();
2781        assert_eq!(
2782            role_for(&calls, "create_container"),
2783            Some(Role::Vz),
2784            "an explicit com.zlayer.isolation=vz label must override an image-declared default"
2785        );
2786    }
2787
2788    #[tokio::test]
2789    async fn dispatch_linux_platform_with_vz_linux_routes_to_vz_linux() {
2790        let (rt, calls) = make_composite_with_vz_linux();
2791        let id = cid("lin-svc", 0);
2792        // platform.os = linux: with a VZ Linux delegate present this is the
2793        // default Linux path, NOT the libkrun delegate.
2794        let spec = make_spec(
2795            "docker.io/library/alpine:3.19",
2796            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Arm64)),
2797        );
2798
2799        rt.create_container(&id, &spec).await.unwrap();
2800
2801        let calls = calls.lock().unwrap();
2802        assert_eq!(
2803            role_for(&calls, "create_container"),
2804            Some(Role::VzLinux),
2805            "a Linux platform spec must default to the VZ Linux runtime when present"
2806        );
2807    }
2808
2809    #[tokio::test]
2810    async fn dispatch_linux_image_os_with_vz_linux_routes_to_vz_linux() {
2811        let (rt, calls) = make_composite_with_vz_linux();
2812        let id = cid("lin-svc", 0);
2813        let image = "docker.io/library/nginx:1.25";
2814        rt.record_image_os(image, OsKind::Linux).await;
2815
2816        let spec = make_spec(image, None);
2817        rt.create_container(&id, &spec).await.unwrap();
2818
2819        let calls = calls.lock().unwrap();
2820        assert_eq!(
2821            role_for(&calls, "create_container"),
2822            Some(Role::VzLinux),
2823            "a Linux image-OS cache hit must default to the VZ Linux runtime when present"
2824        );
2825    }
2826
2827    #[tokio::test]
2828    async fn dispatch_macos_image_os_with_vz_linux_routes_to_primary() {
2829        // A macOS-native rootfs must NEVER go to the Linux VM. Even with a
2830        // VZ-Linux delegate present (the default Linux path), an image whose
2831        // locally-known OS is macOS routes to the primary (Seatbelt sandbox).
2832        let (rt, calls) = make_composite_with_vz_linux();
2833        let id = cid("mac-svc", 0);
2834        let image = "ghcr.io/zlayer/macos-native:latest";
2835        rt.record_image_os(image, OsKind::Macos).await;
2836
2837        let spec = make_spec(image, None);
2838        rt.create_container(&id, &spec).await.unwrap();
2839
2840        let calls = calls.lock().unwrap();
2841        assert_eq!(
2842            role_for(&calls, "create_container"),
2843            Some(Role::Primary),
2844            "image_os == Macos must route to primary even when VZ-Linux is the default",
2845        );
2846    }
2847
2848    #[tokio::test]
2849    async fn dispatch_unknown_os_with_vz_linux_defaults_to_vz_linux() {
2850        // OS genuinely unknown (no isolation label, no runtime marker, no
2851        // platform, no image-OS cache hit) on a macOS host with a VZ-Linux
2852        // delegate: default to VZ-Linux. Sending an unknown (overwhelmingly
2853        // Linux) image to the Seatbelt sandbox is the exit-127 failure this fix
2854        // exists to prevent.
2855        let (rt, calls) = make_composite_with_vz_linux();
2856        let id = cid("svc", 0);
2857        let spec = make_spec("docker.io/library/whatever:latest", None);
2858
2859        rt.create_container(&id, &spec).await.unwrap();
2860
2861        let calls = calls.lock().unwrap();
2862        assert_eq!(
2863            role_for(&calls, "create_container"),
2864            Some(Role::VzLinux),
2865            "an unknown-OS image must default to VZ-Linux when the delegate is present",
2866        );
2867    }
2868
2869    #[tokio::test]
2870    async fn dispatch_unknown_os_without_vz_linux_falls_through_to_primary() {
2871        // The unknown-OS default to VZ-Linux is keyed on the delegate's
2872        // presence (a proxy for "macOS host"). Without a VZ-Linux delegate the
2873        // historical primary fallthrough is preserved for non-macOS hosts.
2874        let (rt, calls) = make_composite(true);
2875        let id = cid("svc", 0);
2876        let spec = make_spec("docker.io/library/whatever:latest", None);
2877
2878        rt.create_container(&id, &spec).await.unwrap();
2879
2880        let calls = calls.lock().unwrap();
2881        assert_eq!(
2882            role_for(&calls, "create_container"),
2883            Some(Role::Primary),
2884            "without a VZ-Linux delegate an unknown-OS image keeps the primary fallthrough",
2885        );
2886    }
2887
2888    /// Seed a persistent blob cache at `path` with a manifest + config blob for
2889    /// `image` whose config declares `os = linux`, mirroring what a real
2890    /// VZ-Linux pull writes to `{data_dir}/vz/linux/images/blobs.redb`.
2891    async fn seed_persistent_linux_cache(path: &std::path::Path, image: &str) {
2892        seed_persistent_cache_with_os(path, image, "linux").await;
2893    }
2894
2895    /// Like [`seed_persistent_linux_cache`] but lets the test pick the config
2896    /// `os` value (e.g. `"darwin"` for a macOS-native bundle).
2897    async fn seed_persistent_cache_with_os(path: &std::path::Path, image: &str, os: &str) {
2898        let cache = zlayer_registry::CacheType::persistent_at(path)
2899            .build()
2900            .await
2901            .expect("open persistent blob cache");
2902
2903        let config_json = serde_json::json!({
2904            "architecture": "arm64",
2905            "os": os,
2906            "config": {},
2907        });
2908        let config_bytes = serde_json::to_vec(&config_json).unwrap();
2909        let config_digest = zlayer_registry::compute_digest(&config_bytes);
2910        cache.put(&config_digest, &config_bytes).await.unwrap();
2911
2912        let manifest = zlayer_registry::OciImageManifest {
2913            schema_version: 2,
2914            media_type: Some("application/vnd.oci.image.manifest.v1+json".to_string()),
2915            artifact_type: None,
2916            config: oci_client::manifest::OciDescriptor {
2917                media_type: "application/vnd.oci.image.config.v1+json".to_string(),
2918                digest: config_digest.clone(),
2919                size: i64::try_from(config_bytes.len()).unwrap(),
2920                urls: None,
2921                annotations: None,
2922            },
2923            layers: vec![],
2924            annotations: None,
2925            subject: None,
2926        };
2927        let manifest_bytes = serde_json::to_vec(&manifest).unwrap();
2928        let manifest_digest = zlayer_registry::compute_digest(&manifest_bytes);
2929        cache
2930            .put(&zlayer_registry::manifest_cache_key(image), &manifest_bytes)
2931            .await
2932            .unwrap();
2933        cache
2934            .put(
2935                &zlayer_registry::manifest_digest_cache_key(image),
2936                manifest_digest.as_bytes(),
2937            )
2938            .await
2939            .unwrap();
2940    }
2941
2942    /// End-to-end of the macOS rate-limit routing fix: a Linux image whose OS
2943    /// lives ONLY in the local persistent blob cache (no network) must be
2944    /// inspected at `pull_image` time and then routed to the VZ-Linux runtime
2945    /// by `select_for` — exactly the path that breaks under a Docker Hub 429
2946    /// when inspection goes to the wire.
2947    #[tokio::test]
2948    async fn pull_then_dispatch_resolves_linux_os_from_local_cache_routes_to_vz_linux() {
2949        let tmp = db_tempdir();
2950        let cache_path = tmp.path().join("blobs.redb");
2951        let image = "docker.io/library/alpine:latest";
2952        seed_persistent_linux_cache(&cache_path, image).await;
2953
2954        let (rt, calls) = make_composite_with_vz_linux();
2955        let rt = rt.with_os_inspect_cache_path(Some(cache_path));
2956
2957        // pull_image drives the real local-first OS inspection; no network.
2958        rt.pull_image(image).await.unwrap();
2959
2960        // The OS must now be cached as Linux purely from the local store.
2961        assert_eq!(
2962            rt.image_os.read().await.get(image).copied(),
2963            Some(OsKind::Linux),
2964            "pull_image must resolve Linux OS from the local persistent cache",
2965        );
2966
2967        // And select_for must route the (platform-less) spec to VZ-Linux.
2968        let id = cid("lin-svc", 0);
2969        let spec = make_spec(image, None);
2970        rt.create_container(&id, &spec).await.unwrap();
2971
2972        let calls = calls.lock().unwrap();
2973        assert_eq!(
2974            role_for(&calls, "create_container"),
2975            Some(Role::VzLinux),
2976            "a Linux image whose OS came from the local cache must route to VZ-Linux",
2977        );
2978    }
2979
2980    /// LIVE BUG #1, end-to-end: the cache is seeded under the QUALIFIED ref
2981    /// (`docker.io/library/alpine:latest`, as the pull writes it) but the spec —
2982    /// and therefore every `pull_image` / `inspect_image_os` / `select_for`
2983    /// lookup — uses the BARE `alpine:latest`. With the canonical manifest-key
2984    /// normalization, the bare-ref inspect hits the qualified-seeded cache with
2985    /// NO network call, so the Linux image still routes to VZ-Linux.
2986    #[tokio::test]
2987    async fn bare_ref_spec_resolves_os_from_qualified_seeded_cache_routes_to_vz_linux() {
2988        let tmp = db_tempdir();
2989        let cache_path = tmp.path().join("blobs.redb");
2990        // Seed under the QUALIFIED ref, exactly as a real pull persists it.
2991        seed_persistent_linux_cache(&cache_path, "docker.io/library/alpine:latest").await;
2992
2993        let (rt, calls) = make_composite_with_vz_linux();
2994        let rt = rt.with_os_inspect_cache_paths(vec![cache_path]);
2995
2996        // Everything below uses the BARE ref, exactly as the live daemon does
2997        // (`ImageRef::Display` yields the user-original string).
2998        let bare = "alpine:latest";
2999        rt.pull_image(bare).await.unwrap();
3000
3001        assert_eq!(
3002            rt.image_os.read().await.get(bare).copied(),
3003            Some(OsKind::Linux),
3004            "bare-ref inspect must resolve Linux from the qualified-seeded cache",
3005        );
3006
3007        let id = cid("lin-svc", 0);
3008        let spec = make_spec(bare, None);
3009        rt.create_container(&id, &spec).await.unwrap();
3010
3011        let calls = calls.lock().unwrap();
3012        assert_eq!(
3013            role_for(&calls, "create_container"),
3014            Some(Role::VzLinux),
3015            "bare-ref Linux image routes to VZ-Linux via the canonical-key cache hit",
3016        );
3017    }
3018
3019    /// LIVE BUG #2 / multi-cache fallback: the manifest+config live ONLY in the
3020    /// SECOND configured cache (the primary Sandbox store), because the
3021    /// VZ-Linux pull short-circuited under `IfNotPresent`. Inspection must probe
3022    /// the empty first cache (no network), then resolve from the second — still
3023    /// with NO network — and route to VZ-Linux.
3024    #[tokio::test]
3025    async fn os_resolves_from_second_cache_when_first_is_empty() {
3026        let tmp = db_tempdir();
3027        let empty_cache = tmp.path().join("vz-linux-blobs.redb");
3028        let primary_cache = tmp.path().join("primary-blobs.redb");
3029        // Create the first cache empty (so opening it succeeds but it misses).
3030        zlayer_registry::CacheType::persistent_at(&empty_cache)
3031            .build()
3032            .await
3033            .unwrap();
3034        // Only the SECOND cache has the image.
3035        seed_persistent_linux_cache(&primary_cache, "docker.io/library/alpine:latest").await;
3036
3037        let (rt, calls) = make_composite_with_vz_linux();
3038        let rt = rt.with_os_inspect_cache_paths(vec![empty_cache, primary_cache]);
3039
3040        let bare = "alpine:latest";
3041        rt.pull_image(bare).await.unwrap();
3042
3043        assert_eq!(
3044            rt.image_os.read().await.get(bare).copied(),
3045            Some(OsKind::Linux),
3046            "OS must resolve from the second cache after the first misses (no network)",
3047        );
3048
3049        let id = cid("lin-svc", 0);
3050        let spec = make_spec(bare, None);
3051        rt.create_container(&id, &spec).await.unwrap();
3052
3053        let calls = calls.lock().unwrap();
3054        assert_eq!(role_for(&calls, "create_container"), Some(Role::VzLinux),);
3055    }
3056
3057    /// A macOS Seatbelt build writes a `metadata.json` platform sidecar beside
3058    /// its rootfs (NOT the blob cache). Dispatch must resolve `os = darwin` from
3059    /// that sidecar — even with VZ-Linux present as the default Linux path — and
3060    /// route the image to the Seatbelt primary with no isolation label.
3061    #[tokio::test]
3062    async fn local_darwin_sidecar_resolves_macos_and_routes_to_primary() {
3063        let tmp = db_tempdir();
3064        // The OS-inspect cache path's PARENT is the image store; the sidecar
3065        // lives at `{parent}/{sanitized}/metadata.json`. No redb file need exist.
3066        let cache_path = tmp.path().join("blobs.redb");
3067        let image = "myapp:latest";
3068        let sanitized = sanitize_image_name(image);
3069        let image_dir = tmp.path().join(&sanitized);
3070        std::fs::create_dir_all(&image_dir).unwrap();
3071        let meta = zlayer_types::local_image::LocalImageMetadata::new(image, "darwin", "arm64");
3072        std::fs::write(
3073            image_dir.join(zlayer_types::local_image::LOCAL_IMAGE_METADATA_FILE),
3074            serde_json::to_vec(&meta).unwrap(),
3075        )
3076        .unwrap();
3077
3078        let (rt, calls) = make_composite_with_vz_linux();
3079        let rt = rt.with_os_inspect_cache_path(Some(cache_path));
3080
3081        // pull_image drives the local-first OS inspection (the sidecar probe).
3082        rt.pull_image(image).await.unwrap();
3083
3084        assert_eq!(
3085            rt.image_os.read().await.get(image).copied(),
3086            Some(OsKind::Macos),
3087            "the darwin sidecar must resolve os=Macos from the local image store",
3088        );
3089
3090        let id = cid("mac-svc", 0);
3091        let spec = make_spec(image, None);
3092        rt.create_container(&id, &spec).await.unwrap();
3093
3094        let calls = calls.lock().unwrap();
3095        assert_eq!(
3096            role_for(&calls, "create_container"),
3097            Some(Role::Primary),
3098            "a darwin sidecar image must route to the Seatbelt primary, not VZ-Linux",
3099        );
3100    }
3101
3102    /// The exact LIVE bug, simulated end-to-end: a `pull_image` whose network OS
3103    /// re-inspection WOULD 429 still leaves dispatch fully working, because the
3104    /// image's OS is resolved purely from the local persistent blob cache the
3105    /// runtime already populated during extract — with NO network call at all.
3106    ///
3107    /// We model the 429 by pointing `os_inspect_cache_paths` at a real seeded
3108    /// cache (so the local resolver succeeds) while using a synthetic
3109    /// `*.invalid` registry host: if the dispatch-population path ever reached
3110    /// the network it would fail to resolve, leaving the cache empty and routing
3111    /// the Linux image to the Seatbelt primary (exit 127). It must not — the
3112    /// local cache hit is authoritative and the image routes to VZ-Linux.
3113    #[tokio::test]
3114    async fn pull_with_network_429_still_dispatches_via_local_cache() {
3115        let tmp = db_tempdir();
3116        let cache_path = tmp.path().join("blobs.redb");
3117        // The image ref uses a host that cannot be resolved on the wire; only
3118        // the LOCAL cache knows its OS.
3119        let image = "registry.invalid.example/library/alpine:latest";
3120        seed_persistent_linux_cache(&cache_path, image).await;
3121
3122        let (rt, calls) = make_composite_with_vz_linux();
3123        let rt = rt.with_os_inspect_cache_path(Some(cache_path));
3124
3125        // `pull_image` drives the dispatch-population inspection. Even though a
3126        // real registry inspection of `*.invalid` would fail (our stand-in for a
3127        // 429), the local-only path resolves Linux and the call succeeds.
3128        rt.pull_image(image).await.unwrap();
3129        assert_eq!(
3130            rt.image_os.read().await.get(image).copied(),
3131            Some(OsKind::Linux),
3132            "OS must be resolved from the local cache with no network call",
3133        );
3134
3135        // And dispatch routes the Linux image to VZ-Linux, not the primary.
3136        let id = cid("lin-svc", 0);
3137        let spec = make_spec(image, None);
3138        rt.create_container(&id, &spec).await.unwrap();
3139
3140        let calls = calls.lock().unwrap();
3141        assert_eq!(
3142            role_for(&calls, "create_container"),
3143            Some(Role::VzLinux),
3144            "a would-be-429 pull must still route the cached Linux image to VZ-Linux",
3145        );
3146    }
3147
3148    /// Companion to the macOS-native dispatch guard, but driving the resolution
3149    /// through the real local-cache inspection at `pull_image` time: a bundle
3150    /// whose config declares `os = darwin` in the local cache must route to the
3151    /// primary, never the Linux VM.
3152    #[tokio::test]
3153    async fn pull_then_dispatch_resolves_macos_os_from_local_cache_routes_to_primary() {
3154        let tmp = db_tempdir();
3155        let cache_path = tmp.path().join("blobs.redb");
3156        let image = "ghcr.io/zlayer/macos-native:latest";
3157        seed_persistent_cache_with_os(&cache_path, image, "darwin").await;
3158
3159        let (rt, calls) = make_composite_with_vz_linux();
3160        let rt = rt.with_os_inspect_cache_path(Some(cache_path));
3161
3162        rt.pull_image(image).await.unwrap();
3163        assert_eq!(
3164            rt.image_os.read().await.get(image).copied(),
3165            Some(OsKind::Macos),
3166            "pull_image must resolve macOS OS from the local persistent cache",
3167        );
3168
3169        let id = cid("mac-svc", 0);
3170        let spec = make_spec(image, None);
3171        rt.create_container(&id, &spec).await.unwrap();
3172
3173        let calls = calls.lock().unwrap();
3174        assert_eq!(
3175            role_for(&calls, "create_container"),
3176            Some(Role::Primary),
3177            "a macOS-native rootfs must route to primary even with VZ-Linux as default",
3178        );
3179    }
3180
3181    #[tokio::test]
3182    async fn dispatch_vm_label_forces_libkrun_delegate() {
3183        let (rt, calls) = make_composite_with_vz_linux();
3184        let id = cid("lin-svc", 0);
3185        // Even with a VZ Linux delegate as the default, an explicit
3186        // `com.zlayer.isolation=vm` label forces the libkrun delegate.
3187        let mut spec = make_spec(
3188            "docker.io/library/alpine:3.19",
3189            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Arm64)),
3190        );
3191        spec.labels
3192            .insert("com.zlayer.isolation".to_string(), "vm".to_string());
3193
3194        rt.create_container(&id, &spec).await.unwrap();
3195
3196        let calls = calls.lock().unwrap();
3197        assert_eq!(
3198            role_for(&calls, "create_container"),
3199            Some(Role::Delegate),
3200            "com.zlayer.isolation=vm must force the libkrun delegate even when VZ Linux is default"
3201        );
3202    }
3203
3204    #[tokio::test]
3205    async fn dispatch_unmarked_image_with_vz_delegate_falls_through_to_primary() {
3206        let (rt, calls) = make_composite_with_vz();
3207        let id = cid("mac-svc", 0);
3208        // No runtime marker, no platform, no image-OS cache: VZ must NOT capture
3209        // ordinary images just because the delegate exists.
3210        let spec = make_spec("ghcr.io/org/plain:1", None);
3211        rt.create_container(&id, &spec).await.unwrap();
3212
3213        let calls = calls.lock().unwrap();
3214        assert_eq!(
3215            role_for(&calls, "create_container"),
3216            Some(Role::Primary),
3217            "an unmarked image must fall through to primary even when a VZ delegate is attached"
3218        );
3219    }
3220
3221    #[tokio::test]
3222    async fn per_container_dispatch_cache_persists_through_start_stop() {
3223        let (rt, calls) = make_composite(true);
3224        let id = cid("win-svc", 0);
3225        let spec = make_spec(
3226            "mcr.microsoft.com/windows/nanoserver:ltsc2022",
3227            Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
3228        );
3229
3230        rt.create_container(&id, &spec).await.unwrap();
3231        rt.start_container(&id).await.unwrap();
3232        rt.stop_container(&id, Duration::from_secs(1))
3233            .await
3234            .unwrap();
3235        rt.remove_container(&id).await.unwrap();
3236
3237        let recorded = calls.lock().unwrap().clone();
3238        for method in [
3239            "create_container",
3240            "start_container",
3241            "stop_container",
3242            "remove_container",
3243        ] {
3244            assert_eq!(
3245                role_for(&recorded, method),
3246                Some(Role::Primary),
3247                "{method} should have dispatched to primary"
3248            );
3249        }
3250
3251        // After remove, the dispatch cache entry should be gone.
3252        let after = rt
3253            .start_container(&id)
3254            .await
3255            .expect_err("lookup after remove should fail");
3256        assert!(
3257            matches!(after, AgentError::NotFound { .. }),
3258            "expected NotFound after remove, got {after:?}"
3259        );
3260    }
3261
3262    #[tokio::test]
3263    async fn pull_image_calls_both_runtimes() {
3264        let (rt, calls) = make_composite(true);
3265        rt.pull_image("docker.io/library/alpine:3.19")
3266            .await
3267            .unwrap();
3268
3269        let recorded = calls.lock().unwrap();
3270        let pull_calls: Vec<Role> = recorded
3271            .iter()
3272            .filter(|(_, m, _)| m == "pull_image")
3273            .map(|(r, _, _)| *r)
3274            .collect();
3275        assert!(
3276            pull_calls.contains(&Role::Primary),
3277            "primary should have been pulled: {pull_calls:?}",
3278        );
3279        assert!(
3280            pull_calls.contains(&Role::Delegate),
3281            "delegate should have been pulled: {pull_calls:?}",
3282        );
3283    }
3284
3285    #[tokio::test]
3286    async fn pull_image_delegate_error_does_not_fail() {
3287        // Build the composite by hand so we can flip the delegate's
3288        // pull_image_error before wrapping it in an Arc<dyn Runtime>.
3289        let calls = Arc::new(StdMutex::new(Vec::new()));
3290        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
3291        let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
3292        delegate.pull_image_error = Some("simulated delegate pull failure".to_string());
3293        let rt = CompositeRuntime::new(
3294            primary as Arc<dyn Runtime>,
3295            Some(Arc::new(delegate) as Arc<dyn Runtime>),
3296        );
3297
3298        // Top-level call must succeed despite the delegate error.
3299        rt.pull_image("docker.io/library/alpine:3.19")
3300            .await
3301            .unwrap();
3302
3303        let recorded = calls.lock().unwrap();
3304        let pull_calls: Vec<Role> = recorded
3305            .iter()
3306            .filter(|(_, m, _)| m == "pull_image")
3307            .map(|(r, _, _)| *r)
3308            .collect();
3309        assert!(
3310            pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
3311            "both runtimes should have been called: {pull_calls:?}",
3312        );
3313    }
3314
3315    #[tokio::test]
3316    async fn pull_image_primary_wrong_platform_does_not_fail() {
3317        // The HCS runtime returns `AgentError::WrongPlatform` when the image's
3318        // OCI config reports a non-Windows OS (calling `ProcessBaseImage` on a
3319        // Linux base layer is guaranteed to fail with 0x80070003). The
3320        // composite must treat that as a soft skip and let the delegate's
3321        // pull own the image — the overall pull must NOT fail.
3322        let calls = Arc::new(StdMutex::new(Vec::new()));
3323        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3324        primary.pull_image_wrong_platform = Some(("windows", "linux"));
3325        let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
3326        let rt = CompositeRuntime::new(
3327            Arc::new(primary) as Arc<dyn Runtime>,
3328            Some(Arc::new(delegate) as Arc<dyn Runtime>),
3329        );
3330
3331        // Top-level call must succeed despite the primary's wrong-platform err.
3332        rt.pull_image("docker.io/library/alpine:3.19")
3333            .await
3334            .expect("composite pull must tolerate WrongPlatform from primary");
3335
3336        let recorded = calls.lock().unwrap();
3337        let pull_calls: Vec<Role> = recorded
3338            .iter()
3339            .filter(|(_, m, _)| m == "pull_image")
3340            .map(|(r, _, _)| *r)
3341            .collect();
3342        assert!(
3343            pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
3344            "delegate must still be called when primary soft-skips: {pull_calls:?}",
3345        );
3346    }
3347
3348    #[tokio::test]
3349    async fn pull_image_with_policy_primary_wrong_platform_does_not_fail() {
3350        // Same contract as `pull_image_primary_wrong_platform_does_not_fail`
3351        // but exercising the `pull_image_with_policy` entry point. The
3352        // policy/auth path is what the daemon's create-container hot loop
3353        // actually invokes, so it has to honour the same soft-skip rule.
3354        let calls = Arc::new(StdMutex::new(Vec::new()));
3355        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3356        primary.pull_image_wrong_platform = Some(("windows", "linux"));
3357        let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
3358        let rt = CompositeRuntime::new(
3359            Arc::new(primary) as Arc<dyn Runtime>,
3360            Some(Arc::new(delegate) as Arc<dyn Runtime>),
3361        );
3362
3363        rt.pull_image_with_policy(
3364            "docker.io/library/alpine:3.19",
3365            PullPolicy::IfNotPresent,
3366            None,
3367            zlayer_spec::SourcePolicy::default(),
3368        )
3369        .await
3370        .expect("composite pull_image_with_policy must tolerate WrongPlatform from primary");
3371
3372        let recorded = calls.lock().unwrap();
3373        let pull_calls: Vec<Role> = recorded
3374            .iter()
3375            .filter(|(_, m, _)| m == "pull_image_with_policy")
3376            .map(|(r, _, _)| *r)
3377            .collect();
3378        assert!(
3379            pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
3380            "delegate must still be called when primary soft-skips: {pull_calls:?}",
3381        );
3382    }
3383
3384    #[tokio::test]
3385    async fn pull_image_primary_non_wrong_platform_error_still_fails() {
3386        // Sanity check: only `WrongPlatform` is soft-skipped. Any other error
3387        // from the primary must still bubble up so real pull failures aren't
3388        // silently swallowed.
3389        let calls = Arc::new(StdMutex::new(Vec::new()));
3390        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3391        primary.pull_image_error = Some("simulated real failure".to_string());
3392        let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
3393        let rt = CompositeRuntime::new(
3394            Arc::new(primary) as Arc<dyn Runtime>,
3395            Some(Arc::new(delegate) as Arc<dyn Runtime>),
3396        );
3397
3398        let err = rt
3399            .pull_image("docker.io/library/alpine:3.19")
3400            .await
3401            .expect_err("real primary error must propagate");
3402        assert!(
3403            matches!(err, AgentError::Internal(_)),
3404            "expected Internal, got {err:?}",
3405        );
3406    }
3407
3408    #[tokio::test]
3409    async fn list_images_merges_both() {
3410        // Hand-build so we can seed each mock's list_images_response.
3411        let calls = Arc::new(StdMutex::new(Vec::new()));
3412        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3413        primary.list_images_response = vec![ImageInfo {
3414            reference: "primary/image:1".to_string(),
3415            digest: None,
3416            size_bytes: None,
3417        }];
3418        let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
3419        delegate.list_images_response = vec![ImageInfo {
3420            reference: "delegate/image:1".to_string(),
3421            digest: None,
3422            size_bytes: None,
3423        }];
3424        let rt = CompositeRuntime::new(
3425            Arc::new(primary) as Arc<dyn Runtime>,
3426            Some(Arc::new(delegate) as Arc<dyn Runtime>),
3427        );
3428
3429        let merged = rt.list_images().await.unwrap();
3430        let refs: Vec<&str> = merged.iter().map(|i| i.reference.as_str()).collect();
3431        assert!(
3432            refs.contains(&"primary/image:1") && refs.contains(&"delegate/image:1"),
3433            "merged list should contain both entries, got {refs:?}",
3434        );
3435    }
3436
3437    /// Regression (macOS `GET /images/json` 500): when the *primary* runtime
3438    /// does not implement `list_images` (the `SandboxRuntime` returns
3439    /// `Unsupported`), the composite must NOT propagate that error. It must
3440    /// fall back to the other backends — in particular the VZ-Linux delegate
3441    /// that actually owns pulled Linux images — and return their list. Before
3442    /// the fix the composite used `self.primary.list_images().await?`, which
3443    /// surfaced as a 500 and (via the inspect fallback) broke `docker pull`.
3444    #[tokio::test]
3445    async fn list_images_tolerates_primary_unsupported_and_uses_vz_linux() {
3446        let calls = Arc::new(StdMutex::new(Vec::new()));
3447        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3448        primary.list_images_error = Some("list_images is not supported".to_string());
3449        let mut vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
3450        vz_linux.list_images_response = vec![ImageInfo {
3451            reference: "docker.io/library/alpine:latest".to_string(),
3452            digest: None,
3453            size_bytes: None,
3454        }];
3455
3456        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
3457            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
3458
3459        let images = rt
3460            .list_images()
3461            .await
3462            .expect("primary Unsupported must not fail the composite list_images");
3463        let refs: Vec<&str> = images.iter().map(|i| i.reference.as_str()).collect();
3464        assert_eq!(
3465            refs,
3466            vec!["docker.io/library/alpine:latest"],
3467            "should return the VZ-Linux delegate's images, got {refs:?}",
3468        );
3469    }
3470
3471    /// When EVERY backend fails `list_images`, the composite surfaces an error
3472    /// (rather than silently returning an empty list, which would mask a total
3473    /// backend outage).
3474    #[tokio::test]
3475    async fn list_images_errors_only_when_all_backends_fail() {
3476        let calls = Arc::new(StdMutex::new(Vec::new()));
3477        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3478        primary.list_images_error = Some("unsupported".to_string());
3479        let mut vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
3480        vz_linux.list_images_error = Some("also unsupported".to_string());
3481
3482        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
3483            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
3484
3485        let err = rt.list_images().await.unwrap_err();
3486        assert!(
3487            matches!(err, AgentError::Unsupported(_)),
3488            "all-backends-fail should surface Unsupported, got {err:?}",
3489        );
3490    }
3491
3492    /// When the PRIMARY does not implement `prune_images` (returns
3493    /// `AgentError::Unsupported`) but a delegate does, the composite must
3494    /// tolerate the primary miss and return the delegate's result — symmetric
3495    /// with how `remove_image` / `tag_image` tolerate a primary failure when a
3496    /// delegate exists. A future cache-less primary must not 501 the whole call.
3497    #[tokio::test]
3498    async fn prune_images_tolerates_primary_unsupported_and_uses_delegate() {
3499        let calls = Arc::new(StdMutex::new(Vec::new()));
3500        // Primary leaves `prune_images_response` as `None` → returns Unsupported.
3501        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3502        let delegate =
3503            MockRuntime::new(Role::Delegate, Arc::clone(&calls)).with_prune_result(PruneResult {
3504                deleted: vec![
3505                    "docker.io/library/alpine:3.19".to_string(),
3506                    "docker.io/library/nginx:1.25".to_string(),
3507                ],
3508                space_reclaimed: 4096,
3509            });
3510
3511        let rt = CompositeRuntime::new(
3512            Arc::new(primary) as Arc<dyn Runtime>,
3513            Some(Arc::new(delegate) as Arc<dyn Runtime>),
3514        );
3515
3516        let result = rt
3517            .prune_images()
3518            .await
3519            .expect("primary Unsupported must not fail the composite prune_images");
3520        assert_eq!(
3521            result.deleted,
3522            vec![
3523                "docker.io/library/alpine:3.19".to_string(),
3524                "docker.io/library/nginx:1.25".to_string(),
3525            ],
3526            "should return the delegate's deleted images, got {:?}",
3527            result.deleted,
3528        );
3529        assert_eq!(
3530            result.space_reclaimed, 4096,
3531            "should return the delegate's reclaimed bytes",
3532        );
3533
3534        let calls = calls.lock().unwrap();
3535        assert_eq!(
3536            role_for(&calls, "prune_images"),
3537            Some(Role::Primary),
3538            "primary prune_images must still be attempted first",
3539        );
3540        assert!(
3541            calls
3542                .iter()
3543                .any(|(role, m, _)| *role == Role::Delegate && m == "prune_images"),
3544            "delegate prune_images must be invoked after the primary miss",
3545        );
3546    }
3547
3548    // ----------------------------------------------------------------------
3549    // Per-container read routing (logs / stats).
3550    //
3551    // These guard the macOS Docker-compat `/logs` and `/stats` 500 fix: when
3552    // the owning backend cannot serve a particular read (the primary
3553    // `SandboxRuntime` implements snapshot reads but returns `Unsupported` for
3554    // the *streaming* ones, or a different backend owns the container), the
3555    // composite must route to / fall back across backends and return real data
3556    // instead of propagating `Unsupported` as a swallowed 500. Only a genuine
3557    // all-not-found is a 404.
3558    // ----------------------------------------------------------------------
3559
3560    /// Build a `LogEntry` with the given stream + message for read tests.
3561    fn log_entry(stream: LogStream, message: &str) -> LogEntry {
3562        LogEntry {
3563            timestamp: chrono::Utc::now(),
3564            stream,
3565            source: zlayer_observability::logs::LogSource::Container("test".to_string()),
3566            message: message.to_string(),
3567            service: None,
3568            deployment: None,
3569        }
3570    }
3571
3572    /// Drain a `LogsStream` into the concatenated UTF-8 body bytes.
3573    async fn drain_logs(stream: LogsStream) -> String {
3574        use futures_util::StreamExt as _;
3575        let mut out = Vec::new();
3576        let mut s = stream;
3577        while let Some(item) = s.next().await {
3578            out.extend_from_slice(&item.expect("log chunk ok").bytes);
3579        }
3580        String::from_utf8(out).expect("utf8 log body")
3581    }
3582
3583    /// Collect a `StatsStream` into a Vec of samples.
3584    async fn drain_stats(stream: StatsStream) -> Vec<StatsSample> {
3585        use futures_util::StreamExt as _;
3586        let mut out = Vec::new();
3587        let mut s = stream;
3588        while let Some(item) = s.next().await {
3589            out.push(item.expect("stats sample ok"));
3590        }
3591        out
3592    }
3593
3594    /// Build a composite whose primary models the macOS `SandboxRuntime`
3595    /// (snapshot reads work, streaming reads return `Unsupported`) and whose
3596    /// VZ-Linux delegate owns the container with working native streams.
3597    /// Returns (composite, call-log) with a container already dispatched to the
3598    /// chosen owner.
3599    async fn make_read_composite(owner: Role) -> (CompositeRuntime, ContainerId, CallLog) {
3600        let calls = Arc::new(StdMutex::new(Vec::new()));
3601        let logs = vec![
3602            log_entry(LogStream::Stdout, "hello stdout"),
3603            log_entry(LogStream::Stderr, "hello stderr"),
3604        ];
3605        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls))
3606            .with_stream_unsupported()
3607            .with_logs(logs.clone());
3608        let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls)).with_logs(logs);
3609        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
3610            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
3611
3612        let id = cid("read-svc", 0);
3613        // Dispatch the container to the chosen owner without going through the
3614        // (platform-dependent) `select_for` path.
3615        let target = match owner {
3616            Role::Primary => DispatchTarget::Primary,
3617            Role::VzLinux => DispatchTarget::VzLinux,
3618            other => panic!("make_read_composite supports Primary/VzLinux, not {other:?}"),
3619        };
3620        rt.dispatch.write().await.insert(id.clone(), target);
3621        (rt, id, calls)
3622    }
3623
3624    #[tokio::test]
3625    async fn logs_stream_falls_back_to_snapshot_when_owner_has_no_stream() {
3626        // Sole backend = primary (SandboxRuntime model): `logs_stream` is
3627        // Unsupported, but `container_logs` works. With no other backend the
3628        // composite must synthesise a stream from the snapshot rather than 500.
3629        let calls = Arc::new(StdMutex::new(Vec::new()));
3630        let logs = vec![
3631            log_entry(LogStream::Stdout, "hello stdout"),
3632            log_entry(LogStream::Stderr, "hello stderr"),
3633        ];
3634        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls))
3635            .with_stream_unsupported()
3636            .with_logs(logs);
3637        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None);
3638        let id = cid("read-svc", 0);
3639        rt.dispatch
3640            .write()
3641            .await
3642            .insert(id.clone(), DispatchTarget::Primary);
3643
3644        let stream = rt
3645            .logs_stream(&id, LogsStreamOptions::default())
3646            .await
3647            .expect("logs_stream must not 500 when snapshot reads work");
3648        let body = drain_logs(stream).await;
3649        assert!(
3650            body.contains("hello stdout") && body.contains("hello stderr"),
3651            "synthesised stream must carry the captured logs, got: {body:?}",
3652        );
3653    }
3654
3655    #[tokio::test]
3656    async fn logs_stream_routes_to_delegate_owner_native_stream() {
3657        // Owner = VZ-Linux delegate with a working native stream; the primary's
3658        // streaming read is Unsupported but must not be consulted first.
3659        let (rt, id, calls) = make_read_composite(Role::VzLinux).await;
3660        let stream = rt
3661            .logs_stream(&id, LogsStreamOptions::default())
3662            .await
3663            .expect("delegate-owned logs_stream must succeed");
3664        let body = drain_logs(stream).await;
3665        assert!(body.contains("hello stdout"), "got: {body:?}");
3666
3667        let log = calls.lock().expect("call-log mutex poisoned");
3668        assert_eq!(
3669            role_for(&log, "logs_stream"),
3670            Some(Role::VzLinux),
3671            "logs_stream must hit the owning delegate first, calls: {log:?}",
3672        );
3673    }
3674
3675    #[tokio::test]
3676    async fn get_logs_falls_back_across_backends() {
3677        // Owner = primary; here snapshot `get_logs` works on primary directly,
3678        // so it should succeed on the owner without ever consulting the
3679        // delegate. (Soft-miss fallback is exercised by the stats test below.)
3680        let (rt, id, _calls) = make_read_composite(Role::Primary).await;
3681        let logs = rt.get_logs(&id).await.expect("get_logs must succeed");
3682        assert_eq!(logs.len(), 2, "owner snapshot logs should be returned");
3683    }
3684
3685    #[tokio::test]
3686    async fn stats_stream_falls_back_to_snapshot_when_owner_has_no_stream() {
3687        // Sole backend = primary (SandboxRuntime model): `stats_stream` is
3688        // Unsupported but `get_container_stats` works. With no other backend
3689        // offering a native stream, the composite must synthesise a single
3690        // non-empty sample from the snapshot rather than 500.
3691        let calls = Arc::new(StdMutex::new(Vec::new()));
3692        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_stream_unsupported();
3693        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None);
3694        let id = cid("read-svc", 0);
3695        rt.dispatch
3696            .write()
3697            .await
3698            .insert(id.clone(), DispatchTarget::Primary);
3699
3700        let stream = rt
3701            .stats_stream(&id)
3702            .await
3703            .expect("stats_stream must not 500 when get_container_stats works");
3704        let samples = drain_stats(stream).await;
3705        assert_eq!(samples.len(), 1, "snapshot fallback yields one sample");
3706        assert!(
3707            samples[0].mem_used_bytes > 0,
3708            "synthesised sample must carry non-zero memory, got {:?}",
3709            samples[0],
3710        );
3711        assert_eq!(
3712            samples[0].cpu_total_ns, 1_000_000,
3713            "cpu microseconds must be scaled to nanoseconds in the synthesised sample",
3714        );
3715    }
3716
3717    #[tokio::test]
3718    async fn get_container_stats_tolerates_owner_miss_and_uses_other_backend() {
3719        // Owner = primary whose snapshot `get_container_stats` returns
3720        // `Unsupported` (a soft miss); the delegate that follows in the fallback
3721        // chain serves it. The composite must NOT propagate the owner's
3722        // Unsupported as a 500.
3723        let calls = Arc::new(StdMutex::new(Vec::new()));
3724        let primary =
3725            MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_stats_snapshot_unsupported();
3726        let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
3727        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
3728            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
3729        let id = cid("read-svc", 0);
3730        rt.dispatch
3731            .write()
3732            .await
3733            .insert(id.clone(), DispatchTarget::Primary);
3734
3735        let stats = rt
3736            .get_container_stats(&id)
3737            .await
3738            .expect("owner Unsupported must fall back to the delegate, not 500");
3739        assert!(stats.memory_bytes > 0, "delegate stats should be returned");
3740
3741        let log = calls.lock().expect("call-log mutex poisoned");
3742        assert!(
3743            log.iter()
3744                .any(|(role, method, _)| *role == Role::Primary && method == "get_container_stats"),
3745            "primary must have been tried first, calls: {log:?}",
3746        );
3747        assert!(
3748            log.iter()
3749                .any(|(role, method, _)| *role == Role::VzLinux && method == "get_container_stats"),
3750            "delegate must have served the fallback, calls: {log:?}",
3751        );
3752    }
3753
3754    #[tokio::test]
3755    async fn reads_propagate_not_found_when_no_backend_owns_container() {
3756        // Every backend returns NotFound for the dispatched container: the
3757        // composite must surface NotFound (→ 404), NOT mask it as Unsupported
3758        // or empty success.
3759        let calls = Arc::new(StdMutex::new(Vec::new()));
3760        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_reads_not_found();
3761        let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls)).with_reads_not_found();
3762        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
3763            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
3764        let id = cid("read-svc", 0);
3765        rt.dispatch
3766            .write()
3767            .await
3768            .insert(id.clone(), DispatchTarget::Primary);
3769
3770        // `LogsStream`/`StatsStream` are not `Debug`, so match instead of
3771        // `unwrap_err()`.
3772        match rt.logs_stream(&id, LogsStreamOptions::default()).await {
3773            Err(AgentError::NotFound { .. }) => {}
3774            other => panic!(
3775                "all-not-found logs_stream must be NotFound (404), got {:?}",
3776                other.err(),
3777            ),
3778        }
3779        match rt.stats_stream(&id).await {
3780            Err(AgentError::NotFound { .. }) => {}
3781            other => panic!(
3782                "all-not-found stats_stream must be NotFound (404), got {:?}",
3783                other.err(),
3784            ),
3785        }
3786        let cl_err = rt.container_logs(&id, 10).await.unwrap_err();
3787        assert!(
3788            matches!(cl_err, AgentError::NotFound { .. }),
3789            "all-not-found container_logs must be NotFound (404), got {cl_err:?}",
3790        );
3791    }
3792
3793    #[tokio::test]
3794    async fn reads_on_undispatched_container_are_not_found() {
3795        // No dispatch record at all → NotFound (the id was never created here).
3796        let (rt, _calls) = make_composite(false);
3797        let id = cid("ghost", 0);
3798        match rt.logs_stream(&id, LogsStreamOptions::default()).await {
3799            Err(AgentError::NotFound { .. }) => {}
3800            other => panic!(
3801                "undispatched logs_stream must be NotFound, got {:?}",
3802                other.err()
3803            ),
3804        }
3805    }
3806
3807    /// Regression: `pull_image` must fan out to the VZ-Linux delegate so the
3808    /// image lands in the store where Linux containers actually execute on
3809    /// macOS (and so it becomes listable/inspectable). Before the fix the
3810    /// composite only pulled into `primary` + `delegate`, leaving the
3811    /// VZ-Linux `image_rootfs` empty.
3812    #[tokio::test]
3813    async fn pull_image_fans_out_to_vz_linux() {
3814        let calls = Arc::new(StdMutex::new(Vec::new()));
3815        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3816        let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
3817
3818        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
3819            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
3820
3821        rt.pull_image("docker.io/library/alpine:latest")
3822            .await
3823            .expect("pull should succeed");
3824
3825        let log = calls.lock().expect("call-log mutex poisoned");
3826        assert!(
3827            log.iter()
3828                .any(|(role, method, _)| *role == Role::VzLinux && method == "pull_image"),
3829            "pull_image must reach the VZ-Linux delegate, recorded calls: {log:?}",
3830        );
3831    }
3832
3833    #[tokio::test]
3834    async fn dispatch_lookup_unknown_container_errors() {
3835        let (rt, _calls) = make_composite(true);
3836        let id = cid("ghost", 0);
3837
3838        let err = rt.start_container(&id).await.unwrap_err();
3839        assert!(
3840            matches!(err, AgentError::NotFound { .. }),
3841            "expected NotFound for unknown container, got {err:?}"
3842        );
3843    }
3844
3845    /// Helper: read the internal image-OS cache for test assertions.
3846    async fn cached_os(rt: &CompositeRuntime, image: &str) -> Option<OsKind> {
3847        rt.image_os.read().await.get(image).copied()
3848    }
3849
3850    #[tokio::test]
3851    async fn apply_image_os_inspection_populates_cache_on_ok_some() {
3852        // Contract: when `fetch_image_os` resolves to a recognized OS, the
3853        // cache is populated so subsequent `select_for` calls for specs
3854        // without `platform` dispatch correctly.
3855        let (rt, _calls) = make_composite(true);
3856        let image = "docker.io/library/alpine:3.19";
3857
3858        rt.apply_image_os_inspection(image, Ok(Some(OsKind::Linux)))
3859            .await;
3860
3861        assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
3862    }
3863
3864    #[tokio::test]
3865    async fn apply_image_os_inspection_leaves_cache_untouched_on_ok_none() {
3866        // Contract: when the manifest carries no (or an unrecognized) `os`
3867        // field the cache is left alone. Dispatch will fall through to the
3868        // primary on `create_container`.
3869        let (rt, _calls) = make_composite(true);
3870        let image = "docker.io/library/nginx:1.25";
3871
3872        rt.apply_image_os_inspection(image, Ok(None)).await;
3873
3874        assert_eq!(cached_os(&rt, image).await, None);
3875    }
3876
3877    #[tokio::test]
3878    async fn apply_image_os_inspection_leaves_cache_untouched_on_err() {
3879        // Contract: a registry error during inspection is non-fatal and must
3880        // not poison the cache. Dispatch falls through to primary on lookup.
3881        let (rt, _calls) = make_composite(true);
3882        let image = "docker.io/library/nginx:1.25";
3883
3884        // Pre-seed the cache so we can assert the error path doesn't
3885        // overwrite or clear an existing entry.
3886        rt.record_image_os(image, OsKind::Linux).await;
3887
3888        let err = zlayer_registry::RegistryError::NotFound {
3889            registry: "docker.io".to_string(),
3890            image: image.to_string(),
3891        };
3892        rt.apply_image_os_inspection(image, Err(err)).await;
3893
3894        // Cache is still whatever it was before the failed inspection.
3895        assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
3896    }
3897
3898    #[tokio::test]
3899    async fn pull_image_inspection_failure_does_not_fail_pull() {
3900        // End-to-end: even when the registry fetch fails (inevitable for the
3901        // synthetic image refs used in unit tests), `pull_image` still
3902        // returns `Ok`. The mock primary/delegate both succeed; the
3903        // inspection step logs and moves on. The cache must remain empty
3904        // because there was no successful inspection to record.
3905        let (rt, _calls) = make_composite(true);
3906        let image = "invalid.example.invalid/ghost:v1";
3907
3908        rt.pull_image(image).await.unwrap();
3909
3910        assert_eq!(
3911            cached_os(&rt, image).await,
3912            None,
3913            "failed inspection must not populate the image-OS cache"
3914        );
3915    }
3916
3917    #[tokio::test]
3918    async fn pull_image_with_policy_inspection_failure_does_not_fail_pull() {
3919        // Same contract as `pull_image_inspection_failure_does_not_fail_pull`
3920        // but exercising the policy-aware entry point.
3921        let (rt, _calls) = make_composite(true);
3922        let image = "invalid.example.invalid/ghost:v1";
3923
3924        rt.pull_image_with_policy(
3925            image,
3926            PullPolicy::IfNotPresent,
3927            None,
3928            zlayer_spec::SourcePolicy::default(),
3929        )
3930        .await
3931        .unwrap();
3932
3933        assert_eq!(cached_os(&rt, image).await, None);
3934    }
3935
3936    #[test]
3937    fn os_kind_from_oci_str_roundtrip() {
3938        // Guards the `as_oci_str` ↔ `from_oci_str` relationship used by the
3939        // inspection path. If a new variant is added to `OsKind` without
3940        // updating `from_oci_str` we want the miss here, not a silent
3941        // "dispatch to primary" regression in production.
3942        for os in [OsKind::Linux, OsKind::Windows, OsKind::Macos] {
3943            assert_eq!(OsKind::from_oci_str(os.as_oci_str()), Some(os));
3944        }
3945        assert_eq!(OsKind::from_oci_str(""), None);
3946        assert_eq!(OsKind::from_oci_str("freebsd"), None);
3947    }
3948}