Skip to main content

zlayer_agent/runtimes/
composite.rs

1//! Composite runtime that dispatches per-container to a primary + optional delegate.
2//!
3//! The [`CompositeRuntime`] owns a *primary* runtime (the node-native runtime —
4//! e.g. `HcsRuntime` on Windows, `YoukiRuntime` on Linux, Docker elsewhere) and
5//! an optional *delegate* runtime used for foreign-OS workloads (e.g. a WSL2
6//! delegate on Windows that runs Linux containers). Each call is routed based
7//! on the container's identity:
8//!
9//! * **[`Runtime::create_container`]** consults
10//!   [`ServiceSpec::platform`](zlayer_spec::ServiceSpec) first; when the
11//!   spec's OS targets the delegate we route there, otherwise primary. When
12//!   `platform` is `None`, a secondary **image-OS cache** (populated by
13//!   [`CompositeRuntime::record_image_os`] from OCI manifest inspection at
14//!   pull time) is consulted. If both are unknown we fall through to the
15//!   primary. **Strict policy (H-7):** if either source identifies the
16//!   workload as Linux and this node has no delegate configured, dispatch
17//!   returns [`AgentError::RouteToPeer`] so the scheduler can re-place the
18//!   workload on a Linux peer — the old permissive "fall through to primary"
19//!   behavior is gone.
20//! * All subsequent per-container operations (start/stop/remove/logs/exec/…)
21//!   look up the container in an internal **dispatch cache** that records
22//!   which runtime created it. This guarantees the same runtime sees the
23//!   container for its whole lifecycle, even after daemon restarts within
24//!   the same process.
25//! * Cross-cutting image operations (`pull_image`, `pull_image_with_policy`,
26//!   `list_images`, `prune_images`) fan out to both runtimes — we cannot know
27//!   in advance which runtime will execute a pulled image, and merged image
28//!   listings give users a single coherent view. `remove_image` / `tag_image`
29//!   try primary first and fall back to delegate.
30//!
31//! The dispatch cache is populated on `create_container` and cleared on
32//! `remove_container`. Looking up an unknown id yields
33//! [`AgentError::NotFound`], which surfaces as a clean 404 at the API layer
34//! rather than silently forwarding to the wrong runtime.
35
36use std::collections::HashMap;
37use std::net::IpAddr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use async_trait::async_trait;
42use tokio::sync::RwLock;
43use zlayer_observability::logs::{LogEntry, LogStream};
44use zlayer_spec::{OsKind, PullPolicy, RegistryAuth, ServiceSpec};
45
46use crate::cgroups_stats::ContainerStats;
47use crate::error::{AgentError, Result};
48use crate::runtime::{
49    ContainerId, ContainerInspectDetails, ContainerState, ExecEventStream, ImageInfo, LogChannel,
50    LogChunk, LogsStream, LogsStreamOptions, OverlayAttachKind, PruneResult, Runtime, StatsSample,
51    StatsStream, WaitCondition, WaitOutcome,
52};
53
54/// Which underlying runtime a given container was dispatched to.
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56enum DispatchTarget {
57    Primary,
58    Delegate,
59    /// The Apple-Virtualization (VZ) delegate (macOS only). Selected
60    /// automatically for `com.zlayer.runtime=vz` base bundles, or per-service
61    /// via the `com.zlayer.isolation=vz` label.
62    Vz,
63    /// The Apple-Virtualization **Linux-guest** delegate (macOS only). The
64    /// default Linux path on macOS: selected for Linux images, the
65    /// `com.zlayer.runtime=vz-linux` marker, or the
66    /// `com.zlayer.isolation=vz-linux` label.
67    VzLinux,
68}
69
70/// Routes each container to either the primary runtime or an optional delegate.
71///
72/// See the module-level documentation for the dispatch rules.
73pub struct CompositeRuntime {
74    primary: Arc<dyn Runtime>,
75    delegate: Option<Arc<dyn Runtime>>,
76    /// Opt-in Apple-Virtualization delegate (macOS). Selected only when a
77    /// service carries `com.zlayer.isolation=vz`.
78    vz: Option<Arc<dyn Runtime>>,
79    /// Apple-Virtualization Linux-guest delegate (macOS). When present, it is
80    /// the default runtime for Linux images on this node; libkrun
81    /// (`delegate`) is then reachable only via `com.zlayer.isolation=vm`.
82    vz_linux: Option<Arc<dyn Runtime>>,
83    /// Per-container dispatch cache. Populated on `create_container`, removed
84    /// on `remove_container`.
85    dispatch: Arc<RwLock<HashMap<ContainerId, DispatchTarget>>>,
86    /// Image-OS cache consulted when a spec has no explicit `platform`.
87    /// Populated by [`CompositeRuntime::record_image_os`], which is driven
88    /// from [`zlayer_registry::fetch_image_os`] during `pull_image*`.
89    image_os: Arc<RwLock<HashMap<String, OsKind>>>,
90    /// Image runtime-marker cache (the `com.zlayer.runtime` manifest
91    /// annotation, e.g. `"vz"`). Populated from
92    /// [`zlayer_registry::fetch_image_runtime_marker`] during `pull_image*` so
93    /// `select_for` can auto-detect a VZ base bundle and prefer the VZ runtime
94    /// for it without requiring a per-service label.
95    image_runtime: Arc<RwLock<HashMap<String, String>>>,
96    /// Filesystem paths of the persistent blob caches that the runtimes pull
97    /// into, tried IN ORDER for image-OS / runtime-marker inspection. Typically:
98    ///
99    /// 1. the VZ-Linux runtime's `{data_dir}/vz/linux/images/blobs.redb` (the
100    ///    delegate that actually runs the Linux workload), and
101    /// 2. the primary Sandbox runtime's `{data_dir}/images/blobs.redb`.
102    ///
103    /// Both stores matter because `pull_image` pulls into BOTH (primary first,
104    /// then VZ-Linux), and either pull may short-circuit under
105    /// `PullPolicy::IfNotPresent` when its rootfs already exists — leaving the
106    /// manifest/config in only ONE of the two caches. Inspection therefore
107    /// probes them in order and stops at the first store that resolves the OS,
108    /// LOCAL-ONLY via [`zlayer_registry::fetch_image_os_in_cache_only`] — so an
109    /// already-pulled Linux image is detected as Linux (and routed to VZ-Linux)
110    /// with NO network call, even under a Docker Hub rate-limit. For the OS
111    /// dispatch path there is intentionally **no** network fallback: a local
112    /// miss yields "OS unknown" and dispatch uses its safe macOS default rather
113    /// than risking a 429 (see [`CompositeRuntime::inspect_image_os`]).
114    os_inspect_cache_paths: Vec<std::path::PathBuf>,
115}
116
117impl CompositeRuntime {
118    /// Construct a new composite runtime.
119    ///
120    /// `primary` handles containers whose platform matches the host node.
121    /// `delegate`, when present, handles foreign-OS containers (currently:
122    /// Linux containers on a Windows host via the WSL2 delegate runtime).
123    #[must_use]
124    pub fn new(primary: Arc<dyn Runtime>, delegate: Option<Arc<dyn Runtime>>) -> Self {
125        Self {
126            primary,
127            delegate,
128            vz: None,
129            vz_linux: None,
130            dispatch: Arc::new(RwLock::new(HashMap::new())),
131            image_os: Arc::new(RwLock::new(HashMap::new())),
132            image_runtime: Arc::new(RwLock::new(HashMap::new())),
133            os_inspect_cache_paths: Vec::new(),
134        }
135    }
136
137    /// Point image-OS / runtime-marker inspection at a single persistent blob
138    /// cache the runtimes pull into, so the OS of an already-pulled image
139    /// resolves from the LOCAL config blob with no network round-trip.
140    ///
141    /// Convenience wrapper over [`CompositeRuntime::with_os_inspect_cache_paths`]
142    /// for callers that only have one store. `path` is the on-disk blob-cache
143    /// file (e.g. the VZ-Linux runtime's `{data_dir}/vz/linux/images/blobs.redb`).
144    #[must_use]
145    pub fn with_os_inspect_cache_path(self, path: Option<std::path::PathBuf>) -> Self {
146        self.with_os_inspect_cache_paths(path.into_iter().collect())
147    }
148
149    /// Point image-OS / runtime-marker inspection at an ORDERED list of
150    /// persistent blob caches the runtimes pull into.
151    ///
152    /// Inspection probes each store LOCAL-ONLY (no network) in order and stops
153    /// at the first that resolves the image's OS / marker. This matters because
154    /// `pull_image` pulls into BOTH the VZ-Linux store and the primary Sandbox
155    /// store, and either pull may short-circuit under `PullPolicy::IfNotPresent`
156    /// when its rootfs already exists — leaving the manifest/config in only ONE
157    /// of the two caches. Probing both (VZ-Linux first, then primary) is what
158    /// lets a locally-cached Linux image route to VZ-Linux under a Docker Hub
159    /// rate-limit (see [`zlayer_registry::fetch_image_os_in_cache_only`]).
160    #[must_use]
161    pub fn with_os_inspect_cache_paths(mut self, paths: Vec<std::path::PathBuf>) -> Self {
162        self.os_inspect_cache_paths = paths;
163        self
164    }
165
166    /// Resolve `image`'s OS for **dispatch**, probing each configured local blob
167    /// cache in order, **LOCAL-ONLY — never a network call**.
168    ///
169    /// This is the dispatch-population path: it runs inside `pull_image*` purely
170    /// to fill the image-OS cache that [`CompositeRuntime::select_for`] consults.
171    /// It MUST NOT touch the wire. The image's layers have already been pulled
172    /// and extracted by the time we get here, and the runtimes that did the pull
173    /// (VZ-Linux / Sandbox) wrote the manifest+config into the very blob caches
174    /// `os_inspect_cache_paths` points at — so the OS is knowable with zero
175    /// network round-trips.
176    ///
177    /// The old code fell back to a network inspection (`fetch_image_os`) when no
178    /// local cache resolved the OS. That network call was reachable on a Docker
179    /// Hub 429, and a failed inspection left the cache empty → a cached Linux
180    /// image (e.g. `alpine`) fell through to the Seatbelt sandbox (`Primary`),
181    /// which cannot exec a Linux ELF (exit 127). The network fallback is gone:
182    /// a registry rate-limit can no longer break dispatch here. A genuine local
183    /// miss simply returns `Ok(None)` (dispatch then uses its safe macOS
184    /// fallthrough), and it never errors or blocks.
185    async fn inspect_image_os(
186        &self,
187        image: &str,
188    ) -> std::result::Result<Option<OsKind>, zlayer_registry::RegistryError> {
189        for path in &self.os_inspect_cache_paths {
190            match zlayer_registry::CacheType::persistent_at(path)
191                .build()
192                .await
193            {
194                Ok(cache) => {
195                    match zlayer_registry::fetch_image_os_in_cache_only(image, cache, None).await {
196                        Ok(Some(os)) => return Ok(Some(os)),
197                        Ok(None) => {
198                            tracing::trace!(
199                                image,
200                                cache = %path.display(),
201                                "image OS not resolvable from this local cache; trying next",
202                            );
203                        }
204                        Err(e) => return Err(e),
205                    }
206                }
207                Err(e) => {
208                    tracing::debug!(
209                        image,
210                        cache = %path.display(),
211                        error = %e,
212                        "failed to open OS-inspect blob cache; trying next",
213                    );
214                }
215            }
216        }
217        // No local cache resolved it. We deliberately do NOT fall back to a
218        // network inspection: a Docker Hub 429 must never reach this
219        // dispatch-population path (see the doc comment above). A clean local
220        // miss is `Ok(None)` — dispatch falls through to its safe macOS default.
221        Ok(None)
222    }
223
224    /// Resolve `image`'s `com.zlayer.runtime` marker, probing each configured
225    /// local blob cache in order (no network per cache) before any network call.
226    async fn inspect_image_runtime_marker(
227        &self,
228        image: &str,
229        auth: Option<&RegistryAuth>,
230    ) -> std::result::Result<Option<String>, zlayer_registry::RegistryError> {
231        for path in &self.os_inspect_cache_paths {
232            match zlayer_registry::CacheType::persistent_at(path)
233                .build()
234                .await
235            {
236                Ok(cache) => {
237                    match zlayer_registry::fetch_image_runtime_marker_in_cache_only(
238                        image, cache, None,
239                    )
240                    .await
241                    {
242                        Ok(Some(marker)) => return Ok(Some(marker)),
243                        Ok(None) => {
244                            tracing::trace!(
245                                image,
246                                cache = %path.display(),
247                                "runtime marker not resolvable from this local cache; trying next",
248                            );
249                        }
250                        Err(e) => return Err(e),
251                    }
252                }
253                Err(e) => {
254                    tracing::debug!(
255                        image,
256                        cache = %path.display(),
257                        error = %e,
258                        "failed to open marker-inspect blob cache; trying next",
259                    );
260                }
261            }
262        }
263        zlayer_registry::fetch_image_runtime_marker(image, auth).await
264    }
265
266    /// Attach an opt-in Apple-Virtualization delegate. Services labelled
267    /// `com.zlayer.isolation=vz` route to it; everything else is unaffected.
268    #[must_use]
269    pub fn with_vz_delegate(mut self, vz: Option<Arc<dyn Runtime>>) -> Self {
270        self.vz = vz;
271        self
272    }
273
274    /// Attach the Apple-Virtualization Linux-guest delegate. When present it
275    /// becomes the **default** runtime for Linux images on this node (libkrun
276    /// is then reachable only via the explicit `com.zlayer.isolation=vm`
277    /// label); when `None`, Linux dispatch falls back to the libkrun delegate
278    /// or `RouteToPeer` as before.
279    #[must_use]
280    pub fn with_vz_linux_delegate(mut self, vz_linux: Option<Arc<dyn Runtime>>) -> Self {
281        self.vz_linux = vz_linux;
282        self
283    }
284
285    /// Access the primary runtime (for introspection / tests).
286    #[must_use]
287    pub fn primary(&self) -> &Arc<dyn Runtime> {
288        &self.primary
289    }
290
291    /// Access the delegate runtime, if one is configured.
292    #[must_use]
293    pub fn delegate(&self) -> Option<&Arc<dyn Runtime>> {
294        self.delegate.as_ref()
295    }
296
297    /// Record that `image` is known to target operating system `os`.
298    ///
299    /// Wired from [`zlayer_registry::fetch_image_os`] during `pull_image*`
300    /// (see [`CompositeRuntime::apply_image_os_inspection`]) so that specs
301    /// without an explicit `platform` still dispatch correctly.
302    pub(crate) async fn record_image_os(&self, image: &str, os: OsKind) {
303        self.image_os.write().await.insert(image.to_string(), os);
304    }
305
306    /// Record an image's `com.zlayer.runtime` marker (e.g. `"vz"`), used by
307    /// [`CompositeRuntime::select_for`] to auto-detect a runtime-specific bundle.
308    pub(crate) async fn record_image_runtime(&self, image: &str, marker: String) {
309        self.image_runtime
310            .write()
311            .await
312            .insert(image.to_string(), marker);
313    }
314
315    /// Apply a manifest runtime-marker inspection to the cache. Mirrors
316    /// [`CompositeRuntime::apply_image_os_inspection`]'s non-fatal contract:
317    /// only a present marker updates the cache; absence or error leaves it
318    /// untouched (dispatch falls through to the OS/platform rules).
319    async fn apply_image_runtime_inspection(
320        &self,
321        image: &str,
322        result: std::result::Result<Option<String>, zlayer_registry::RegistryError>,
323    ) {
324        match result {
325            Ok(Some(marker)) => {
326                tracing::debug!(image, marker, "cached image runtime marker for dispatch");
327                self.record_image_runtime(image, marker).await;
328            }
329            Ok(None) => {}
330            Err(e) => {
331                tracing::trace!(
332                    image,
333                    error = %e,
334                    "failed to inspect image runtime marker — dispatch unaffected",
335                );
336            }
337        }
338    }
339
340    /// Apply the result of a manifest OS inspection to the image-OS cache.
341    ///
342    /// Factored out of [`Runtime::pull_image`] and
343    /// [`Runtime::pull_image_with_policy`] so the cache-update policy can be
344    /// unit-tested without depending on a live registry. The three branches
345    /// mirror the contract of [`zlayer_registry::fetch_image_os`]:
346    ///
347    /// * `Ok(Some(os))` — populate the cache so future `create_container`
348    ///   calls without an explicit `spec.platform` dispatch to the right
349    ///   runtime.
350    /// * `Ok(None)` — the config blob had no (or an unrecognized) `os`
351    ///   field. Leave the cache untouched; dispatch falls through to primary.
352    /// * `Err(_)` — transient or registry error. Log at warn and leave the
353    ///   cache untouched. We never fail the overall `pull_image*` call on
354    ///   inspection failure: the primary runtime's own pull already
355    ///   succeeded, and the safe fall-through is "primary".
356    async fn apply_image_os_inspection(
357        &self,
358        image: &str,
359        result: std::result::Result<Option<OsKind>, zlayer_registry::RegistryError>,
360    ) {
361        match result {
362            Ok(Some(os)) => {
363                self.record_image_os(image, os).await;
364                tracing::debug!(image, ?os, "cached image OS for dispatch");
365            }
366            Ok(None) => {
367                tracing::trace!(
368                    image,
369                    "image manifest has no OS field — dispatch will fall through to primary",
370                );
371            }
372            Err(e) => {
373                tracing::warn!(
374                    image,
375                    error = %e,
376                    "failed to inspect image manifest OS — dispatch will fall through to primary",
377                );
378            }
379        }
380    }
381
382    /// Decide which runtime should handle a `create_container` call for `spec`.
383    ///
384    /// The `service` argument is the originating service name, used to build an
385    /// actionable [`AgentError::RouteToPeer`] when a Linux workload lands on
386    /// this node without a local delegate so the scheduler can re-place it on
387    /// a capable peer.
388    ///
389    /// Policy (H-7): Linux workloads are never silently routed to the primary
390    /// on nodes without a delegate. The old "permissive" fall-through (primary
391    /// handles everything) returned an `Unsupported` error only when
392    /// `spec.platform` was explicitly set, but fell through to primary for
393    /// specs without a platform — producing cryptic downstream errors when the
394    /// image-OS cache said `Linux`. We now return `RouteToPeer` in both cases.
395    ///
396    /// Routing precedence, locally-known OS only (NO network call ever happens
397    /// here — the image-OS cache is filled local-only at pull time):
398    /// 1. explicit `com.zlayer.isolation` label,
399    /// 2. `com.zlayer.runtime` manifest marker (`vz` / `vz-linux`),
400    /// 3. `spec.platform.os`,
401    /// 4. the image-OS cache: `Linux` -> `VzLinux` (when present), `Macos` /
402    ///    `Windows` -> `Primary`,
403    /// 5. FINAL fallthrough — OS genuinely unknown: on a macOS host (proxied by
404    ///    the presence of a `vz_linux` delegate) default to `VzLinux`, because
405    ///    almost every registry image is Linux and the Seatbelt sandbox cannot
406    ///    exec a Linux ELF. A macOS-native rootfs never reaches this branch: it
407    ///    resolves `os == Macos` at step 4 and routes to `Primary`.
408    async fn select_for(&self, service: &str, spec: &ServiceSpec) -> Result<DispatchTarget> {
409        // Explicit per-service isolation label wins over everything below.
410        //   `com.zlayer.isolation=vz`               -> VZ (native-macOS guest VM)
411        //   `com.zlayer.isolation=vz-linux`         -> VZ Linux-guest VM
412        //   `com.zlayer.isolation=vm|libkrun`       -> libkrun delegate (force VM)
413        //   `com.zlayer.isolation=sandbox|seatbelt` -> Seatbelt sandbox (primary),
414        //                                              opting OUT of VZ auto-detect.
415        if let Some(label) = spec.labels.get("com.zlayer.isolation") {
416            if self.vz.is_some() && label.eq_ignore_ascii_case("vz") {
417                return Ok(DispatchTarget::Vz);
418            }
419            if self.vz_linux.is_some() && label.eq_ignore_ascii_case("vz-linux") {
420                return Ok(DispatchTarget::VzLinux);
421            }
422            if label.eq_ignore_ascii_case("vm") || label.eq_ignore_ascii_case("libkrun") {
423                // Force the libkrun delegate. If no delegate exists the
424                // platform/image-OS rules below produce the appropriate
425                // `RouteToPeer`, so only short-circuit when one is present.
426                if self.delegate.is_some() {
427                    return Ok(DispatchTarget::Delegate);
428                }
429            }
430            if label.eq_ignore_ascii_case("sandbox") || label.eq_ignore_ascii_case("seatbelt") {
431                return Ok(DispatchTarget::Primary);
432            }
433        }
434
435        // Auto-detect a VZ base bundle: when the image's manifest carries
436        // `com.zlayer.runtime=vz` (stamped by `zlayer vz build-base`), prefer the
437        // VZ runtime — it is the only runtime that can boot such a bundle. This
438        // is the "prefer VZ by default" behaviour: it only fires for genuine VZ
439        // bundles, so Seatbelt-rootfs and Linux images are unaffected.
440        if self.vz.is_some()
441            && self
442                .image_runtime
443                .read()
444                .await
445                .get(&spec.image.name.to_string())
446                .is_some_and(|m| m.eq_ignore_ascii_case(zlayer_registry::ZLAYER_RUNTIME_VZ))
447        {
448            return Ok(DispatchTarget::Vz);
449        }
450
451        // Auto-detect a VZ Linux-guest image: when the manifest carries
452        // `com.zlayer.runtime=vz-linux`, prefer the VZ Linux runtime.
453        if self.vz_linux.is_some()
454            && self
455                .image_runtime
456                .read()
457                .await
458                .get(&spec.image.name.to_string())
459                .is_some_and(|m| m.eq_ignore_ascii_case(zlayer_registry::ZLAYER_RUNTIME_LINUX_VZ))
460        {
461            return Ok(DispatchTarget::VzLinux);
462        }
463
464        if let Some(platform) = &spec.platform {
465            let target = match platform.os {
466                OsKind::Windows | OsKind::Macos => DispatchTarget::Primary,
467                // On macOS the VZ Linux-guest runtime is the default Linux path;
468                // only fall back to the libkrun delegate when it is absent.
469                OsKind::Linux if self.vz_linux.is_some() => DispatchTarget::VzLinux,
470                OsKind::Linux => DispatchTarget::Delegate,
471            };
472            if matches!(target, DispatchTarget::Delegate) && self.delegate.is_none() {
473                return Err(AgentError::RouteToPeer {
474                    service: service.to_string(),
475                    required_os: OsKind::Linux.as_oci_str().to_string(),
476                    reason: "spec.platform.os = linux but this node has no WSL2 delegate \
477                            configured; enable `--install-wsl yes` on this node or add a Linux \
478                            peer to the cluster"
479                        .to_string(),
480                });
481            }
482            return Ok(target);
483        }
484
485        if let Some(os) = self
486            .image_os
487            .read()
488            .await
489            .get(&spec.image.name.to_string())
490            .copied()
491        {
492            return match os {
493                OsKind::Linux => {
494                    if self.vz_linux.is_some() {
495                        // VZ Linux-guest is the default Linux path on macOS.
496                        Ok(DispatchTarget::VzLinux)
497                    } else if self.delegate.is_some() {
498                        Ok(DispatchTarget::Delegate)
499                    } else {
500                        // No delegate and the image manifest says Linux —
501                        // refuse at the composite layer so the scheduler can
502                        // re-place on a Linux peer instead of the primary
503                        // failing with a cryptic HCS error.
504                        Err(AgentError::RouteToPeer {
505                            service: service.to_string(),
506                            required_os: OsKind::Linux.as_oci_str().to_string(),
507                            reason: format!(
508                                "image '{}' manifest reports os=linux but this node has no WSL2 \
509                                 delegate configured; enable `--install-wsl yes` on this node or \
510                                 add a Linux peer to the cluster",
511                                spec.image.name
512                            ),
513                        })
514                    }
515                }
516                OsKind::Windows | OsKind::Macos => Ok(DispatchTarget::Primary),
517            };
518        }
519
520        // OS genuinely unknown (no isolation label, no runtime marker, no
521        // `spec.platform`, no image-OS cache hit). On a macOS host with a
522        // VZ-Linux delegate, default to VZ-Linux: the overwhelming majority of
523        // images pulled from public registries are Linux, and the Seatbelt
524        // sandbox (the primary) cannot exec a Linux ELF — sending an unknown
525        // image there is the exit-127 failure this fix exists to prevent. The
526        // user is fine with VZ-Linux as the default; the only hard rule is that
527        // a macOS-native rootfs must never go to the Linux VM, and that is
528        // already guaranteed above by the `image_os == Macos -> Primary` branch
529        // (a native bundle resolves its OS locally and never reaches here).
530        //
531        // The `vz_linux` delegate is only ever attached on a macOS host, so its
532        // presence is a sufficient proxy for "macOS host" — non-macOS hosts
533        // (Windows HCS, Linux) keep the historical primary fallthrough.
534        if self.vz_linux.is_some() {
535            return Ok(DispatchTarget::VzLinux);
536        }
537
538        Ok(DispatchTarget::Primary)
539    }
540
541    /// Look up an existing dispatch decision for `id`, or return `NotFound`.
542    async fn lookup(&self, id: &ContainerId) -> Result<Arc<dyn Runtime>> {
543        let target =
544            self.dispatch
545                .read()
546                .await
547                .get(id)
548                .copied()
549                .ok_or_else(|| AgentError::NotFound {
550                    container: id.to_string(),
551                    reason: "no dispatch record in CompositeRuntime".to_string(),
552                })?;
553        Ok(self.runtime_for(target).clone())
554    }
555
556    /// Resolve a [`DispatchTarget`] to the concrete runtime reference.
557    ///
558    /// Unwrapping the delegate is safe because [`Self::select_for`] returns
559    /// `Err` whenever a delegate would be required but is missing, so a
560    /// `DispatchTarget::Delegate` can never end up in the dispatch map
561    /// without a delegate being present.
562    fn runtime_for(&self, t: DispatchTarget) -> &Arc<dyn Runtime> {
563        match t {
564            DispatchTarget::Primary => &self.primary,
565            DispatchTarget::Delegate => self
566                .delegate
567                .as_ref()
568                .expect("delegate target requires delegate to exist"),
569            // `select_for` only returns `Vz` when a vz delegate is present;
570            // fall back to primary defensively.
571            DispatchTarget::Vz => self.vz.as_ref().unwrap_or(&self.primary),
572            // `select_for` only returns `VzLinux` when a vz-linux delegate is
573            // present; fall back to primary defensively.
574            DispatchTarget::VzLinux => self.vz_linux.as_ref().unwrap_or(&self.primary),
575        }
576    }
577
578    /// Build the ordered list of backends to try for a per-container read
579    /// (logs / stats), owning backend first.
580    ///
581    /// The container's dispatch record (recorded at `create_container`) names
582    /// the runtime that actually ran it, so we try that one first. The other
583    /// configured backends follow as a defensive fallback for the case where
584    /// the owning backend can answer container lifecycle calls but not a
585    /// particular read (e.g. the macOS `SandboxRuntime` primary implements
586    /// `container_logs`/`get_container_stats` but not the *streaming*
587    /// `logs_stream`/`stats_stream`, so it returns `Unsupported` for the
588    /// latter). Returns `NotFound` when the id was never dispatched.
589    async fn read_backends(
590        &self,
591        id: &ContainerId,
592    ) -> Result<Vec<(&'static str, Arc<dyn Runtime>)>> {
593        let owner =
594            self.dispatch
595                .read()
596                .await
597                .get(id)
598                .copied()
599                .ok_or_else(|| AgentError::NotFound {
600                    container: id.to_string(),
601                    reason: "no dispatch record in CompositeRuntime".to_string(),
602                })?;
603
604        // Owning backend first, then every other configured backend (de-duped
605        // against the owner) so a read the owner can't serve can still be
606        // satisfied elsewhere instead of 500-ing.
607        let all: [(DispatchTarget, Option<&Arc<dyn Runtime>>); 4] = [
608            (DispatchTarget::Primary, Some(&self.primary)),
609            (DispatchTarget::Delegate, self.delegate.as_ref()),
610            (DispatchTarget::Vz, self.vz.as_ref()),
611            (DispatchTarget::VzLinux, self.vz_linux.as_ref()),
612        ];
613
614        let label_for = |t: DispatchTarget| match t {
615            DispatchTarget::Primary => "primary",
616            DispatchTarget::Delegate => "delegate",
617            DispatchTarget::Vz => "vz",
618            DispatchTarget::VzLinux => "vz_linux",
619        };
620
621        let mut out: Vec<(&'static str, Arc<dyn Runtime>)> =
622            vec![(label_for(owner), self.runtime_for(owner).clone())];
623        for (target, rt) in all {
624            if target != owner {
625                if let Some(rt) = rt {
626                    out.push((label_for(target), rt.clone()));
627                }
628            }
629        }
630        Ok(out)
631    }
632}
633
634/// Accumulates per-backend errors while a read fans out across the
635/// owner-first fallback chain, so the *final* error reflects the right HTTP
636/// status.
637///
638/// Every backend in the chain is tried; a backend that does not own the
639/// container returns [`AgentError::NotFound`] (a *skip*, not authoritative),
640/// while a backend that owns it but cannot serve this particular read returns
641/// some other error (notably the `Unsupported` default for an unimplemented
642/// streaming read) — a *soft miss* we fall back from. The distinction matters
643/// for the final error: if **every** backend returned `NotFound`, the container
644/// genuinely does not exist here and we surface `NotFound` (→ 404); if any
645/// backend produced a non-`NotFound` error, that is the more informative
646/// failure to report (→ 500) once no backend could serve the read.
647#[derive(Default)]
648struct ReadMissAccumulator {
649    /// The most recent non-`NotFound` error, if any backend produced one.
650    soft_err: Option<AgentError>,
651    /// The most recent `NotFound`, used only when *no* soft error occurred.
652    not_found: Option<AgentError>,
653}
654
655impl ReadMissAccumulator {
656    fn record(&mut self, e: AgentError) {
657        if matches!(e, AgentError::NotFound { .. }) {
658            self.not_found = Some(e);
659        } else {
660            self.soft_err = Some(e);
661        }
662    }
663
664    /// Resolve the accumulated misses into the final error for a read where no
665    /// backend succeeded. Prefers a soft error (more informative → 500) over a
666    /// bare `NotFound`; falls back to a synthesised `Unsupported` only if
667    /// nothing was recorded at all (an empty backend list, which cannot happen
668    /// in practice since the owner is always present).
669    fn into_error(self, what: &str) -> AgentError {
670        self.soft_err
671            .or(self.not_found)
672            .unwrap_or_else(|| AgentError::Unsupported(format!("no backend could serve {what}")))
673    }
674}
675
676/// Build a bounded one-shot [`LogsStream`] from a captured-log snapshot.
677///
678/// Used by [`CompositeRuntime::logs_stream`] when no backend offers a native
679/// log stream but one can produce a `container_logs` snapshot (e.g. the macOS
680/// `SandboxRuntime`). Mirrors the VZ-Linux runtime's own snapshot-to-stream
681/// translation so the wire shape is identical regardless of which backend
682/// served the data: honour the per-channel filters and re-attach the newline
683/// the line-splitter stripped.
684fn one_shot_logs_stream(entries: Vec<LogEntry>, opts: &LogsStreamOptions) -> LogsStream {
685    use futures_util::stream;
686
687    // Docker's default (neither stdout nor stderr explicitly requested) means
688    // "both"; equivalently, keep stdout unless stderr was the *only* channel
689    // requested, and vice-versa.
690    let want_stdout = opts.stdout || !opts.stderr;
691    let want_stderr = opts.stderr || !opts.stdout;
692    let timestamps = opts.timestamps;
693
694    let chunks: Vec<Result<LogChunk>> = entries
695        .into_iter()
696        .filter_map(|e| {
697            let channel = match e.stream {
698                LogStream::Stdout => LogChannel::Stdout,
699                LogStream::Stderr => LogChannel::Stderr,
700            };
701            let keep = match channel {
702                LogChannel::Stdout => want_stdout,
703                LogChannel::Stderr => want_stderr,
704                LogChannel::Stdin => false,
705            };
706            if !keep {
707                return None;
708            }
709            let mut bytes = e.message.into_bytes();
710            bytes.push(b'\n');
711            Some(Ok(LogChunk {
712                stream: channel,
713                bytes: bytes::Bytes::from(bytes),
714                timestamp: timestamps.then_some(e.timestamp),
715            }))
716        })
717        .collect();
718
719    Box::pin(stream::iter(chunks))
720}
721
722/// Build a bounded one-shot [`StatsStream`] from a single [`ContainerStats`]
723/// snapshot.
724///
725/// Used by [`CompositeRuntime::stats_stream`] when no backend offers a native
726/// stats stream but one can produce a `get_container_stats` snapshot. The
727/// [`ContainerStats`] CPU figure is microseconds; [`StatsSample::cpu_total_ns`]
728/// is nanoseconds, so we scale. `online_cpus` is unknown from this coarse
729/// snapshot (the non-streaming API does not carry it) and is reported as `1`
730/// so the Docker-compat CPU-percent math has a sane divisor.
731fn one_shot_stats_stream(stats: &ContainerStats) -> StatsStream {
732    use futures_util::stream;
733
734    let sample = StatsSample {
735        cpu_total_ns: stats.cpu_usage_usec.saturating_mul(1_000),
736        cpu_system_ns: 0,
737        online_cpus: 1,
738        mem_used_bytes: stats.memory_bytes,
739        mem_limit_bytes: stats.memory_limit,
740        net_rx_bytes: 0,
741        net_tx_bytes: 0,
742        blkio_read_bytes: 0,
743        blkio_write_bytes: 0,
744        pids_current: 0,
745        pids_limit: None,
746        timestamp: chrono::Utc::now(),
747    };
748    Box::pin(stream::iter(vec![Ok(sample)]))
749}
750
751#[async_trait]
752impl Runtime for CompositeRuntime {
753    async fn pull_image(&self, image: &str) -> Result<()> {
754        // Primary pull. `WrongPlatform` here means the image's OCI config
755        // reports an OS the primary cannot service (e.g. a Linux image on the
756        // Windows HCS runtime). That is a *soft* failure: the delegate's pull
757        // below owns the image, so we log and continue rather than failing
758        // the whole composite call. Any other error is a real pull failure
759        // and must bubble.
760        if let Err(e) = self.primary.pull_image(image).await {
761            if matches!(e, AgentError::WrongPlatform { .. }) {
762                tracing::debug!(
763                    image,
764                    error = %e,
765                    "primary runtime cannot service image (wrong platform); delegating",
766                );
767            } else {
768                return Err(e);
769            }
770        }
771        if let Some(delegate) = &self.delegate {
772            if let Err(e) = delegate.pull_image(image).await {
773                // Foreign-OS images will reliably fail one of the two pulls
774                // (primary can't store a Linux image's config on Windows, or
775                // vice versa). That's expected — the successful side owns the
776                // layers we'll actually use — so we keep this at debug.
777                tracing::debug!(
778                    image,
779                    error = %e,
780                    "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
781                );
782            }
783        }
784        // VZ + VZ-Linux delegates (macOS). The VZ-Linux runtime is the default
785        // execution path for Linux images on macOS and owns its OWN image store
786        // (`image_rootfs`); if we never pull into it, the image is absent both
787        // when `create_container` dispatches there AND from `list_images` /
788        // `inspect_image` (which is what `docker pull` verifies). Pulling here
789        // makes the image actually present where it runs and listable. Errors
790        // are non-fatal for the same wrong-OS reason as the delegate above.
791        for (label, rt) in [
792            self.vz.as_ref().map(|r| ("vz", r)),
793            self.vz_linux.as_ref().map(|r| ("vz_linux", r)),
794        ]
795        .into_iter()
796        .flatten()
797        {
798            if let Err(e) = rt.pull_image(image).await {
799                tracing::debug!(
800                    image,
801                    runtime = label,
802                    error = %e,
803                    "vz delegate failed to pull image (likely wrong OS); continuing",
804                );
805            }
806        }
807
808        // Inspect the OCI manifest's `config.os` so `select_for(spec)` can
809        // dispatch correctly when `spec.platform` is `None`. Non-fatal: any
810        // failure here just means dispatch falls through to primary.
811        let os_result = self.inspect_image_os(image).await;
812        self.apply_image_os_inspection(image, os_result).await;
813        let marker_result = self.inspect_image_runtime_marker(image, None).await;
814        self.apply_image_runtime_inspection(image, marker_result)
815            .await;
816
817        Ok(())
818    }
819
820    async fn pull_image_with_policy(
821        &self,
822        image: &str,
823        policy: PullPolicy,
824        auth: Option<&RegistryAuth>,
825        source: zlayer_spec::SourcePolicy,
826    ) -> Result<()> {
827        // See `pull_image` above for the `WrongPlatform` soft-skip rationale.
828        if let Err(e) = self
829            .primary
830            .pull_image_with_policy(image, policy, auth, source)
831            .await
832        {
833            if matches!(e, AgentError::WrongPlatform { .. }) {
834                tracing::debug!(
835                    image,
836                    error = %e,
837                    "primary runtime cannot service image (wrong platform); delegating",
838                );
839            } else {
840                return Err(e);
841            }
842        }
843        if let Some(delegate) = &self.delegate {
844            if let Err(e) = delegate
845                .pull_image_with_policy(image, policy, auth, source)
846                .await
847            {
848                tracing::debug!(
849                    image,
850                    error = %e,
851                    "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
852                );
853            }
854        }
855        // See `pull_image` above: the VZ-Linux runtime owns its own image store
856        // and is the default Linux execution path on macOS, so pull into it (and
857        // the opt-in VZ delegate) too. Non-fatal per-backend errors.
858        for (label, rt) in [
859            self.vz.as_ref().map(|r| ("vz", r)),
860            self.vz_linux.as_ref().map(|r| ("vz_linux", r)),
861        ]
862        .into_iter()
863        .flatten()
864        {
865            if let Err(e) = rt.pull_image_with_policy(image, policy, auth, source).await {
866                tracing::debug!(
867                    image,
868                    runtime = label,
869                    error = %e,
870                    "vz delegate failed to pull image (likely wrong OS); continuing",
871                );
872            }
873        }
874
875        let os_result = self.inspect_image_os(image).await;
876        self.apply_image_os_inspection(image, os_result).await;
877        let marker_result = self.inspect_image_runtime_marker(image, auth).await;
878        self.apply_image_runtime_inspection(image, marker_result)
879            .await;
880
881        Ok(())
882    }
883
884    async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
885        let target = self.select_for(&id.service, spec).await?;
886        {
887            let mut dispatch = self.dispatch.write().await;
888            dispatch.insert(id.clone(), target);
889        }
890        let rt = self.runtime_for(target).clone();
891        match rt.create_container(id, spec).await {
892            Ok(()) => Ok(()),
893            Err(e) => {
894                // Roll back the cache insert on failure so subsequent lookups
895                // don't find a dangling entry.
896                self.dispatch.write().await.remove(id);
897                Err(e)
898            }
899        }
900    }
901
902    async fn start_container(&self, id: &ContainerId) -> Result<()> {
903        let rt = self.lookup(id).await?;
904        rt.start_container(id).await
905    }
906
907    async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
908        let rt = self.lookup(id).await?;
909        rt.stop_container(id, timeout).await
910    }
911
912    async fn remove_container(&self, id: &ContainerId) -> Result<()> {
913        let rt = self.lookup(id).await?;
914        let res = rt.remove_container(id).await;
915        self.dispatch.write().await.remove(id);
916        res
917    }
918
919    async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
920        let rt = self.lookup(id).await?;
921        rt.container_state(id).await
922    }
923
924    async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
925        let backends = self.read_backends(id).await?;
926        let mut misses = ReadMissAccumulator::default();
927        for (label, rt) in backends {
928            match rt.container_logs(id, tail).await {
929                Ok(logs) => return Ok(logs),
930                Err(e) => {
931                    tracing::warn!(
932                        container = %id,
933                        runtime = label,
934                        error = %e,
935                        "composite container_logs: backend could not serve logs; trying next backend",
936                    );
937                    misses.record(e);
938                }
939            }
940        }
941        Err(misses.into_error("container_logs"))
942    }
943
944    async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
945        let rt = self.lookup(id).await?;
946        rt.exec(id, cmd).await
947    }
948
949    async fn exec_with_opts(
950        &self,
951        id: &ContainerId,
952        opts: &crate::runtime::ExecOptions,
953    ) -> Result<(i32, String, String)> {
954        // Forward to the resolved backend's `exec_with_opts` so Docker exec
955        // options (`--user`, `-w`, `-e`) reach the runtime that actually owns
956        // the container. Without this override the trait default would call
957        // `self.exec(opts.command)` and silently drop user/cwd/env.
958        let rt = self.lookup(id).await?;
959        rt.exec_with_opts(id, opts).await
960    }
961
962    async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
963        let rt = self.lookup(id).await?;
964        rt.exec_stream(id, cmd).await
965    }
966
967    async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
968        let backends = self.read_backends(id).await?;
969        let mut misses = ReadMissAccumulator::default();
970        for (label, rt) in backends {
971            match rt.get_container_stats(id).await {
972                Ok(stats) => return Ok(stats),
973                Err(e) => {
974                    tracing::warn!(
975                        container = %id,
976                        runtime = label,
977                        error = %e,
978                        "composite get_container_stats: backend could not serve stats; \
979                         trying next backend",
980                    );
981                    misses.record(e);
982                }
983            }
984        }
985        Err(misses.into_error("get_container_stats"))
986    }
987
988    async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
989        let rt = self.lookup(id).await?;
990        rt.wait_container(id).await
991    }
992
993    async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
994        let rt = self.lookup(id).await?;
995        rt.wait_outcome(id).await
996    }
997
998    async fn wait_outcome_with_condition(
999        &self,
1000        id: &ContainerId,
1001        condition: WaitCondition,
1002    ) -> Result<WaitOutcome> {
1003        let rt = self.lookup(id).await?;
1004        rt.wait_outcome_with_condition(id, condition).await
1005    }
1006
1007    async fn rename_container(&self, id: &ContainerId, new_name: &str) -> Result<()> {
1008        let rt = self.lookup(id).await?;
1009        rt.rename_container(id, new_name).await
1010    }
1011
1012    async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
1013        let backends = self.read_backends(id).await?;
1014        let mut misses = ReadMissAccumulator::default();
1015        for (label, rt) in backends {
1016            match rt.get_logs(id).await {
1017                Ok(logs) => return Ok(logs),
1018                Err(e) => {
1019                    tracing::warn!(
1020                        container = %id,
1021                        runtime = label,
1022                        error = %e,
1023                        "composite get_logs: backend could not serve logs; trying next backend",
1024                    );
1025                    misses.record(e);
1026                }
1027            }
1028        }
1029        Err(misses.into_error("get_logs"))
1030    }
1031
1032    async fn logs_stream(&self, id: &ContainerId, opts: LogsStreamOptions) -> Result<LogsStream> {
1033        // Route to the backend that actually created the container. The default
1034        // trait impl returns `Unsupported`, which surfaced as a swallowed 500 on
1035        // `GET /containers/{id}/logs` whenever the owning backend did not
1036        // implement streaming (e.g. the macOS `SandboxRuntime` primary, which
1037        // implements `container_logs` but not `logs_stream`).
1038        //
1039        // Try each backend's `logs_stream` (owner first); on a soft miss
1040        // (`Unsupported`/error that is not `NotFound`) fall back to the same
1041        // backend's non-streaming `container_logs` and SYNTHESISE a one-shot
1042        // stream from it. Only a genuine `NotFound` propagates (→ 404).
1043        let backends = self.read_backends(id).await?;
1044        let mut misses = ReadMissAccumulator::default();
1045        for (label, rt) in &backends {
1046            match rt.logs_stream(id, opts.clone()).await {
1047                Ok(stream) => return Ok(stream),
1048                Err(e) => {
1049                    tracing::warn!(
1050                        container = %id,
1051                        runtime = label,
1052                        error = %e,
1053                        "composite logs_stream: backend has no native log stream; \
1054                         falling back to a one-shot snapshot",
1055                    );
1056                    misses.record(e);
1057                }
1058            }
1059        }
1060
1061        // No backend offered a native stream. Synthesise one from whichever
1062        // backend can produce a captured-log snapshot (`container_logs`).
1063        let tail = opts
1064            .tail
1065            .map_or(1000, |n| usize::try_from(n).unwrap_or(1000));
1066        for (label, rt) in &backends {
1067            match rt.container_logs(id, tail).await {
1068                Ok(entries) => {
1069                    return Ok(one_shot_logs_stream(entries, &opts));
1070                }
1071                Err(e) => {
1072                    tracing::warn!(
1073                        container = %id,
1074                        runtime = label,
1075                        error = %e,
1076                        "composite logs_stream: backend snapshot fallback failed; trying next",
1077                    );
1078                    misses.record(e);
1079                }
1080            }
1081        }
1082        Err(misses.into_error("container logs"))
1083    }
1084
1085    async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
1086        // Same rationale as `logs_stream`: forward to the owning backend so
1087        // `GET /containers/{id}/stats` reaches the runtime that ran the
1088        // container instead of hitting the `Unsupported` default (→ swallowed
1089        // 500). On a soft miss, fall back to the non-streaming
1090        // `get_container_stats` and synthesise a one-shot sample.
1091        let backends = self.read_backends(id).await?;
1092        let mut misses = ReadMissAccumulator::default();
1093        for (label, rt) in &backends {
1094            match rt.stats_stream(id).await {
1095                Ok(stream) => return Ok(stream),
1096                Err(e) => {
1097                    tracing::warn!(
1098                        container = %id,
1099                        runtime = label,
1100                        error = %e,
1101                        "composite stats_stream: backend has no native stats stream; \
1102                         falling back to a one-shot sample",
1103                    );
1104                    misses.record(e);
1105                }
1106            }
1107        }
1108
1109        for (label, rt) in &backends {
1110            match rt.get_container_stats(id).await {
1111                Ok(stats) => return Ok(one_shot_stats_stream(&stats)),
1112                Err(e) => {
1113                    tracing::warn!(
1114                        container = %id,
1115                        runtime = label,
1116                        error = %e,
1117                        "composite stats_stream: backend sample fallback failed; trying next",
1118                    );
1119                    misses.record(e);
1120                }
1121            }
1122        }
1123        Err(misses.into_error("container stats"))
1124    }
1125
1126    async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
1127        let rt = self.lookup(id).await?;
1128        rt.get_container_pid(id).await
1129    }
1130
1131    fn overlay_attach_kind(&self) -> OverlayAttachKind {
1132        // Linux workloads on macOS execute in the VZ-Linux delegate, which joins
1133        // the overlay from inside the guest (`InGuestVsock`). Defer to it when
1134        // present so the service layer takes the guest-managed attach path and
1135        // calls `push_overlay_config` (routed per-container below); otherwise use
1136        // the primary's kind. Non-VZ containers route to a runtime whose
1137        // `push_overlay_config` is unsupported and degrade gracefully.
1138        self.vz_linux.as_ref().map_or_else(
1139            || self.primary.overlay_attach_kind(),
1140            |vz| vz.overlay_attach_kind(),
1141        )
1142    }
1143
1144    async fn push_overlay_config(
1145        &self,
1146        id: &ContainerId,
1147        config: &zlayer_types::overlayd::GuestOverlayConfig,
1148    ) -> Result<()> {
1149        let rt = self.lookup(id).await?;
1150        rt.push_overlay_config(id, config).await
1151    }
1152
1153    async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
1154        let rt = self.lookup(id).await?;
1155        rt.get_container_ip(id).await
1156    }
1157
1158    async fn get_container_port_override(&self, id: &ContainerId) -> Result<Option<u16>> {
1159        let rt = self.lookup(id).await?;
1160        rt.get_container_port_override(id).await
1161    }
1162
1163    #[cfg(target_os = "windows")]
1164    async fn get_container_namespace_id(
1165        &self,
1166        id: &ContainerId,
1167    ) -> Result<Option<windows::core::GUID>> {
1168        let rt = self.lookup(id).await?;
1169        rt.get_container_namespace_id(id).await
1170    }
1171
1172    async fn sync_container_volumes(&self, id: &ContainerId) -> Result<()> {
1173        let rt = self.lookup(id).await?;
1174        rt.sync_container_volumes(id).await
1175    }
1176
1177    async fn list_images(&self) -> Result<Vec<ImageInfo>> {
1178        // Fan out over every configured runtime and merge their image lists.
1179        // Crucially, a *single* backend's failure must not fail the whole
1180        // call: on macOS the `primary` (SandboxRuntime) does not implement
1181        // `list_images` at all (it returns `Unsupported`), yet pulled Linux
1182        // images live in the `vz_linux` delegate's store. Propagating the
1183        // primary's error via `?` here used to surface as a 500 on
1184        // `GET /images/json` (and, via the inspect fallback, broke every
1185        // `docker pull` verification). Tolerate per-backend errors the same
1186        // way we already tolerate the delegate's, and include the VZ +
1187        // VZ-Linux delegates so their images are actually listable.
1188        let mut out: Vec<ImageInfo> = Vec::new();
1189        let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1190        let mut any_ok = false;
1191        let mut last_err: Option<AgentError> = None;
1192
1193        for (label, rt) in [
1194            Some(("primary", &self.primary)),
1195            self.delegate.as_ref().map(|d| ("delegate", d)),
1196            self.vz.as_ref().map(|d| ("vz", d)),
1197            self.vz_linux.as_ref().map(|d| ("vz_linux", d)),
1198        ]
1199        .into_iter()
1200        .flatten()
1201        {
1202            match rt.list_images().await {
1203                Ok(images) => {
1204                    any_ok = true;
1205                    for img in images {
1206                        // De-dup by reference so an image registered in more
1207                        // than one backend isn't reported twice.
1208                        if seen.insert(img.reference.clone()) {
1209                            out.push(img);
1210                        }
1211                    }
1212                }
1213                Err(e) => {
1214                    tracing::debug!(
1215                        runtime = label,
1216                        error = %e,
1217                        "composite list_images: backend returned an error; skipping it",
1218                    );
1219                    last_err = Some(e);
1220                }
1221            }
1222        }
1223
1224        // Only fail if *every* backend errored. With at least one success we
1225        // return the merged (possibly empty) list — an empty image set is a
1226        // valid response, not an error.
1227        if any_ok {
1228            Ok(out)
1229        } else {
1230            Err(last_err.unwrap_or_else(|| {
1231                AgentError::Unsupported("no runtime implements list_images".into())
1232            }))
1233        }
1234    }
1235
1236    async fn remove_image(&self, image: &str, force: bool) -> Result<()> {
1237        match self.primary.remove_image(image, force).await {
1238            Ok(()) => Ok(()),
1239            Err(primary_err) => {
1240                if let Some(delegate) = &self.delegate {
1241                    match delegate.remove_image(image, force).await {
1242                        Ok(()) => Ok(()),
1243                        Err(delegate_err) => {
1244                            tracing::debug!(
1245                                image,
1246                                %delegate_err,
1247                                "delegate remove_image also failed; returning primary error",
1248                            );
1249                            Err(primary_err)
1250                        }
1251                    }
1252                } else {
1253                    Err(primary_err)
1254                }
1255            }
1256        }
1257    }
1258
1259    async fn prune_images(&self) -> Result<PruneResult> {
1260        let mut result = self.primary.prune_images().await?;
1261        if let Some(delegate) = &self.delegate {
1262            match delegate.prune_images().await {
1263                Ok(extra) => {
1264                    result.deleted.extend(extra.deleted);
1265                    result.space_reclaimed =
1266                        result.space_reclaimed.saturating_add(extra.space_reclaimed);
1267                }
1268                Err(e) => tracing::warn!(
1269                    error = %e,
1270                    "delegate runtime prune_images failed; returning primary result only",
1271                ),
1272            }
1273        }
1274        Ok(result)
1275    }
1276
1277    async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
1278        let rt = self.lookup(id).await?;
1279        rt.kill_container(id, signal).await
1280    }
1281
1282    async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
1283        match self.primary.tag_image(source, target).await {
1284            Ok(()) => Ok(()),
1285            Err(primary_err) => {
1286                if let Some(delegate) = &self.delegate {
1287                    match delegate.tag_image(source, target).await {
1288                        Ok(()) => Ok(()),
1289                        Err(delegate_err) => {
1290                            tracing::debug!(
1291                                source,
1292                                target,
1293                                %delegate_err,
1294                                "delegate tag_image also failed; returning primary error",
1295                            );
1296                            Err(primary_err)
1297                        }
1298                    }
1299                } else {
1300                    Err(primary_err)
1301                }
1302            }
1303        }
1304    }
1305
1306    async fn inspect_detailed(&self, id: &ContainerId) -> Result<ContainerInspectDetails> {
1307        let rt = self.lookup(id).await?;
1308        rt.inspect_detailed(id).await
1309    }
1310}
1311
1312#[cfg(test)]
1313mod tests {
1314    use super::*;
1315    use crate::cgroups_stats::ContainerStats;
1316    use std::sync::Mutex as StdMutex;
1317    use zlayer_spec::{ArchKind, DeploymentSpec, TargetPlatform};
1318
1319    /// Which runtime a mock represents. Only used for labelling invocation
1320    /// records in tests.
1321    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1322    enum Role {
1323        Primary,
1324        Delegate,
1325        Vz,
1326        VzLinux,
1327    }
1328
1329    /// One recorded invocation: (runtime role, method name, container id).
1330    type CallRecord = (Role, String, Option<ContainerId>);
1331    /// Shared, thread-safe log of every mock call made in a single test.
1332    type CallLog = Arc<StdMutex<Vec<CallRecord>>>;
1333
1334    /// Mock runtime that records every method call it receives.
1335    ///
1336    /// This is intentionally minimal — just enough trait surface to exercise
1337    /// the composite's dispatch logic. Every recorded call includes the role
1338    /// (primary vs delegate), the method name, and the container id (or
1339    /// `None` for cross-cutting image operations).
1340    struct MockRuntime {
1341        role: Role,
1342        calls: CallLog,
1343        list_images_response: Vec<ImageInfo>,
1344        /// When set, `list_images` returns `AgentError::Unsupported(msg)`
1345        /// instead of `list_images_response`. Models a backend (e.g. the macOS
1346        /// `SandboxRuntime` primary) that does not implement image listing.
1347        list_images_error: Option<String>,
1348        pull_image_error: Option<String>,
1349        /// When set, both `pull_image` and `pull_image_with_policy` return a
1350        /// freshly-built [`AgentError::WrongPlatform`] using these fields
1351        /// (`expected`, `actual`). Takes precedence over `pull_image_error`
1352        /// so tests can simulate a wrong-platform soft skip end-to-end.
1353        pull_image_wrong_platform: Option<(&'static str, &'static str)>,
1354        /// When `true`, the *streaming* reads (`logs_stream` / `stats_stream`)
1355        /// return `AgentError::Unsupported`, modelling a backend (e.g. the macOS
1356        /// `SandboxRuntime` primary) that implements the snapshot reads
1357        /// (`container_logs` / `get_container_stats`) but not the streaming
1358        /// ones — exactly the case that used to surface as a swallowed 500.
1359        stream_unsupported: bool,
1360        /// When `true`, *every* per-container read (`container_logs`,
1361        /// `get_logs`, `get_container_stats`, `logs_stream`, `stats_stream`)
1362        /// returns `AgentError::NotFound`, modelling a backend that does not own
1363        /// the container at all. The composite must NOT mask this as success,
1364        /// and a genuine all-not-found must propagate as `NotFound` (404).
1365        reads_not_found: bool,
1366        /// Captured-log snapshot returned by `container_logs` / `get_logs`
1367        /// (unless `reads_not_found`). Lets a delegate model real workload
1368        /// output the composite's snapshot fallback should surface.
1369        logs_response: Vec<LogEntry>,
1370        /// When `true`, the snapshot `get_container_stats` returns
1371        /// `AgentError::Unsupported` (a soft miss), modelling a backend that
1372        /// owns the container but cannot report stats at all. Forces the
1373        /// composite to fall back to another backend.
1374        stats_snapshot_unsupported: bool,
1375    }
1376
1377    impl MockRuntime {
1378        fn new(role: Role, calls: CallLog) -> Self {
1379            Self {
1380                role,
1381                calls,
1382                list_images_response: Vec::new(),
1383                list_images_error: None,
1384                pull_image_error: None,
1385                pull_image_wrong_platform: None,
1386                stream_unsupported: false,
1387                reads_not_found: false,
1388                logs_response: Vec::new(),
1389                stats_snapshot_unsupported: false,
1390            }
1391        }
1392
1393        /// Streaming reads return `Unsupported`; snapshot reads still work.
1394        fn with_stream_unsupported(mut self) -> Self {
1395            self.stream_unsupported = true;
1396            self
1397        }
1398
1399        /// Every per-container read returns `NotFound`.
1400        fn with_reads_not_found(mut self) -> Self {
1401            self.reads_not_found = true;
1402            self
1403        }
1404
1405        /// Set the captured-log snapshot returned by the snapshot reads.
1406        fn with_logs(mut self, logs: Vec<LogEntry>) -> Self {
1407            self.logs_response = logs;
1408            self
1409        }
1410
1411        /// Snapshot `get_container_stats` returns `Unsupported` (a soft miss).
1412        fn with_stats_snapshot_unsupported(mut self) -> Self {
1413            self.stats_snapshot_unsupported = true;
1414            self
1415        }
1416
1417        fn build_wrong_platform_error(&self, image: &str) -> Option<AgentError> {
1418            self.pull_image_wrong_platform
1419                .map(|(expected, actual)| AgentError::WrongPlatform {
1420                    runtime: match self.role {
1421                        Role::Primary => "primary-mock".to_string(),
1422                        Role::Delegate => "delegate-mock".to_string(),
1423                        Role::Vz => "vz-mock".to_string(),
1424                        Role::VzLinux => "vz-linux-mock".to_string(),
1425                    },
1426                    expected: expected.to_string(),
1427                    actual: actual.to_string(),
1428                    image: image.to_string(),
1429                })
1430        }
1431
1432        fn record(&self, method: &str, id: Option<&ContainerId>) {
1433            self.calls
1434                .lock()
1435                .expect("mock call-log mutex poisoned")
1436                .push((self.role, method.to_string(), id.cloned()));
1437        }
1438    }
1439
1440    #[async_trait]
1441    impl Runtime for MockRuntime {
1442        async fn pull_image(&self, image: &str) -> Result<()> {
1443            self.record("pull_image", None);
1444            if let Some(err) = self.build_wrong_platform_error(image) {
1445                return Err(err);
1446            }
1447            if let Some(msg) = &self.pull_image_error {
1448                return Err(AgentError::Internal(msg.clone()));
1449            }
1450            Ok(())
1451        }
1452
1453        async fn pull_image_with_policy(
1454            &self,
1455            image: &str,
1456            _policy: PullPolicy,
1457            _auth: Option<&RegistryAuth>,
1458            _source: zlayer_spec::SourcePolicy,
1459        ) -> Result<()> {
1460            self.record("pull_image_with_policy", None);
1461            if let Some(err) = self.build_wrong_platform_error(image) {
1462                return Err(err);
1463            }
1464            if let Some(msg) = &self.pull_image_error {
1465                return Err(AgentError::Internal(msg.clone()));
1466            }
1467            Ok(())
1468        }
1469
1470        async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
1471            self.record("create_container", Some(id));
1472            Ok(())
1473        }
1474
1475        async fn start_container(&self, id: &ContainerId) -> Result<()> {
1476            self.record("start_container", Some(id));
1477            Ok(())
1478        }
1479
1480        async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
1481            self.record("stop_container", Some(id));
1482            Ok(())
1483        }
1484
1485        async fn remove_container(&self, id: &ContainerId) -> Result<()> {
1486            self.record("remove_container", Some(id));
1487            Ok(())
1488        }
1489
1490        async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
1491            self.record("container_state", Some(id));
1492            Ok(ContainerState::Running)
1493        }
1494
1495        async fn container_logs(&self, id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
1496            self.record("container_logs", Some(id));
1497            if self.reads_not_found {
1498                return Err(mock_not_found());
1499            }
1500            Ok(self.logs_response.clone())
1501        }
1502
1503        async fn exec(&self, id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
1504            self.record("exec", Some(id));
1505            Ok((0, String::new(), String::new()))
1506        }
1507
1508        async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
1509            self.record("get_container_stats", Some(id));
1510            if self.reads_not_found {
1511                return Err(mock_not_found());
1512            }
1513            if self.stats_snapshot_unsupported {
1514                return Err(AgentError::Unsupported("mock has no snapshot stats".into()));
1515            }
1516            Ok(ContainerStats {
1517                cpu_usage_usec: 1_000,
1518                memory_bytes: 4096,
1519                memory_limit: 8192,
1520                timestamp: std::time::Instant::now(),
1521            })
1522        }
1523
1524        async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
1525            self.record("wait_container", Some(id));
1526            Ok(0)
1527        }
1528
1529        async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
1530            self.record("get_logs", Some(id));
1531            if self.reads_not_found {
1532                return Err(mock_not_found());
1533            }
1534            Ok(self.logs_response.clone())
1535        }
1536
1537        async fn logs_stream(
1538            &self,
1539            id: &ContainerId,
1540            _opts: LogsStreamOptions,
1541        ) -> Result<LogsStream> {
1542            self.record("logs_stream", Some(id));
1543            if self.reads_not_found {
1544                return Err(mock_not_found());
1545            }
1546            if self.stream_unsupported {
1547                return Err(AgentError::Unsupported("mock has no log stream".into()));
1548            }
1549            // A backend that owns a native stream replays its captured logs.
1550            Ok(one_shot_logs_stream(
1551                self.logs_response.clone(),
1552                &LogsStreamOptions::default(),
1553            ))
1554        }
1555
1556        async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
1557            use futures_util::stream;
1558            self.record("stats_stream", Some(id));
1559            if self.reads_not_found {
1560                return Err(mock_not_found());
1561            }
1562            if self.stream_unsupported {
1563                return Err(AgentError::Unsupported("mock has no stats stream".into()));
1564            }
1565            Ok(Box::pin(stream::iter(vec![Ok(StatsSample {
1566                cpu_total_ns: 0,
1567                cpu_system_ns: 0,
1568                online_cpus: 1,
1569                mem_used_bytes: 4096,
1570                mem_limit_bytes: 8192,
1571                net_rx_bytes: 0,
1572                net_tx_bytes: 0,
1573                blkio_read_bytes: 0,
1574                blkio_write_bytes: 0,
1575                pids_current: 0,
1576                pids_limit: None,
1577                timestamp: chrono::Utc::now(),
1578            })])))
1579        }
1580
1581        async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
1582            self.record("get_container_pid", Some(id));
1583            Ok(None)
1584        }
1585
1586        async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
1587            self.record("get_container_ip", Some(id));
1588            Ok(None)
1589        }
1590
1591        async fn list_images(&self) -> Result<Vec<ImageInfo>> {
1592            self.record("list_images", None);
1593            if let Some(msg) = &self.list_images_error {
1594                return Err(AgentError::Unsupported(msg.clone()));
1595            }
1596            Ok(self.list_images_response.clone())
1597        }
1598    }
1599
1600    /// Build a [`ServiceSpec`] (with the given image name) from the minimal
1601    /// inline YAML the existing runtime tests use, then optionally set a
1602    /// target platform on it.
1603    fn make_spec(image: &str, platform: Option<TargetPlatform>) -> ServiceSpec {
1604        let yaml = format!(
1605            r"
1606version: v1
1607deployment: test
1608services:
1609  test:
1610    rtype: service
1611    image:
1612      name: {image}
1613    endpoints:
1614      - name: http
1615        protocol: http
1616        port: 8080
1617"
1618        );
1619        let mut spec = serde_yaml::from_str::<DeploymentSpec>(&yaml)
1620            .expect("valid deployment yaml")
1621            .services
1622            .remove("test")
1623            .expect("service 'test' present");
1624        spec.platform = platform;
1625        spec
1626    }
1627
1628    fn cid(service: &str, replica: u32) -> ContainerId {
1629        ContainerId::new(service.to_string(), replica)
1630    }
1631
1632    fn make_composite(with_delegate: bool) -> (CompositeRuntime, CallLog) {
1633        let calls = Arc::new(StdMutex::new(Vec::new()));
1634        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
1635        let delegate = if with_delegate {
1636            Some(Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>)
1637        } else {
1638            None
1639        };
1640        (
1641            CompositeRuntime::new(primary as Arc<dyn Runtime>, delegate),
1642            calls,
1643        )
1644    }
1645
1646    fn role_for(calls: &[CallRecord], method: &str) -> Option<Role> {
1647        calls
1648            .iter()
1649            .find(|(_, m, _)| m == method)
1650            .map(|(role, _, _)| *role)
1651    }
1652
1653    /// The `NotFound` a `MockRuntime` returns when it does not own a container.
1654    fn mock_not_found() -> AgentError {
1655        AgentError::NotFound {
1656            container: "mock".to_string(),
1657            reason: "mock backend does not own this container".to_string(),
1658        }
1659    }
1660
1661    #[tokio::test]
1662    async fn dispatch_windows_spec_goes_to_primary() {
1663        let (rt, calls) = make_composite(true);
1664        let id = cid("win-svc", 0);
1665        let spec = make_spec(
1666            "mcr.microsoft.com/windows/nanoserver:ltsc2022",
1667            Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
1668        );
1669
1670        rt.create_container(&id, &spec).await.unwrap();
1671        rt.start_container(&id).await.unwrap();
1672
1673        let calls = calls.lock().unwrap();
1674        assert_eq!(
1675            role_for(&calls, "create_container"),
1676            Some(Role::Primary),
1677            "create_container should hit primary for Windows spec"
1678        );
1679        assert_eq!(
1680            role_for(&calls, "start_container"),
1681            Some(Role::Primary),
1682            "start_container should hit primary for Windows spec"
1683        );
1684    }
1685
1686    #[tokio::test]
1687    async fn dispatch_linux_spec_goes_to_delegate() {
1688        let (rt, calls) = make_composite(true);
1689        let id = cid("lin-svc", 0);
1690        let spec = make_spec(
1691            "docker.io/library/alpine:3.19",
1692            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
1693        );
1694
1695        rt.create_container(&id, &spec).await.unwrap();
1696        rt.start_container(&id).await.unwrap();
1697
1698        let calls = calls.lock().unwrap();
1699        assert_eq!(
1700            role_for(&calls, "create_container"),
1701            Some(Role::Delegate),
1702            "create_container should hit delegate for Linux spec"
1703        );
1704        assert_eq!(
1705            role_for(&calls, "start_container"),
1706            Some(Role::Delegate),
1707            "start_container should hit delegate for Linux spec"
1708        );
1709    }
1710
1711    #[tokio::test]
1712    async fn dispatch_linux_without_delegate_errors() {
1713        // H-7 policy: a Linux spec on a node without a delegate must return
1714        // `RouteToPeer` (not `Unsupported`, not a silent primary fall-through)
1715        // so the scheduler can re-place the workload on a capable peer.
1716        let (rt, _calls) = make_composite(false);
1717        let id = cid("lin-svc", 0);
1718        let spec = make_spec(
1719            "docker.io/library/alpine:3.19",
1720            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
1721        );
1722
1723        let err = rt.create_container(&id, &spec).await.unwrap_err();
1724        match err {
1725            AgentError::RouteToPeer {
1726                service,
1727                required_os,
1728                reason,
1729            } => {
1730                assert_eq!(service, "lin-svc");
1731                assert_eq!(required_os, "linux");
1732                assert!(
1733                    reason.contains("--install-wsl") && reason.contains("Linux peer"),
1734                    "reason must name both remediations, got: {reason}"
1735                );
1736            }
1737            other => panic!("expected RouteToPeer, got {other:?}"),
1738        }
1739    }
1740
1741    #[tokio::test]
1742    async fn dispatch_linux_image_cache_without_delegate_routes_to_peer() {
1743        // H-7 policy: even when `spec.platform` is unset, a Linux image in the
1744        // OS cache must route to a peer instead of falling through to primary.
1745        // This is the old permissive-fallthrough path the comment at lines
1746        // 172-178 used to describe; the behavior is now strict.
1747        let (rt, _calls) = make_composite(false);
1748        let id = cid("svc", 0);
1749        let image = "docker.io/library/nginx:1.25";
1750        rt.record_image_os(image, OsKind::Linux).await;
1751
1752        let spec = make_spec(image, None);
1753        let err = rt.create_container(&id, &spec).await.unwrap_err();
1754        match err {
1755            AgentError::RouteToPeer {
1756                service,
1757                required_os,
1758                reason,
1759            } => {
1760                assert_eq!(service, "svc");
1761                assert_eq!(required_os, "linux");
1762                assert!(
1763                    reason.contains(image),
1764                    "reason should mention the image name, got: {reason}"
1765                );
1766                assert!(
1767                    reason.contains("--install-wsl") && reason.contains("Linux peer"),
1768                    "reason must name both remediations, got: {reason}"
1769                );
1770            }
1771            other => panic!("expected RouteToPeer, got {other:?}"),
1772        }
1773    }
1774
1775    #[tokio::test]
1776    async fn dispatch_macos_spec_goes_to_primary() {
1777        let (rt, calls) = make_composite(true);
1778        let id = cid("mac-svc", 0);
1779        let spec = make_spec(
1780            "ghcr.io/zlayer/macos:latest",
1781            Some(TargetPlatform::new(OsKind::Macos, ArchKind::Arm64)),
1782        );
1783
1784        rt.create_container(&id, &spec).await.unwrap();
1785
1786        let calls = calls.lock().unwrap();
1787        assert_eq!(
1788            role_for(&calls, "create_container"),
1789            Some(Role::Primary),
1790            "create_container should hit primary for Macos spec"
1791        );
1792    }
1793
1794    #[tokio::test]
1795    async fn dispatch_no_platform_no_image_os_falls_through_to_primary() {
1796        let (rt, calls) = make_composite(true);
1797        let id = cid("svc", 0);
1798        let spec = make_spec("docker.io/library/nginx:1.25", None);
1799
1800        rt.create_container(&id, &spec).await.unwrap();
1801
1802        let calls = calls.lock().unwrap();
1803        assert_eq!(
1804            role_for(&calls, "create_container"),
1805            Some(Role::Primary),
1806            "fall-through should pick primary when both platform and image-OS cache are unknown"
1807        );
1808    }
1809
1810    #[tokio::test]
1811    async fn dispatch_uses_image_os_cache_when_platform_missing() {
1812        let (rt, calls) = make_composite(true);
1813        let id = cid("svc", 0);
1814        let image = "docker.io/library/nginx:1.25";
1815        rt.record_image_os(image, OsKind::Linux).await;
1816
1817        let spec = make_spec(image, None);
1818        rt.create_container(&id, &spec).await.unwrap();
1819
1820        let calls = calls.lock().unwrap();
1821        assert_eq!(
1822            role_for(&calls, "create_container"),
1823            Some(Role::Delegate),
1824            "image-OS cache should route Linux images to the delegate"
1825        );
1826    }
1827
1828    /// Composite with primary + delegate + an attached VZ delegate, all sharing
1829    /// one call log.
1830    fn make_composite_with_vz() -> (CompositeRuntime, CallLog) {
1831        let calls = Arc::new(StdMutex::new(Vec::new()));
1832        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
1833        let delegate =
1834            Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>;
1835        let vz = Arc::new(MockRuntime::new(Role::Vz, Arc::clone(&calls))) as Arc<dyn Runtime>;
1836        let rt = CompositeRuntime::new(primary as Arc<dyn Runtime>, Some(delegate))
1837            .with_vz_delegate(Some(vz));
1838        (rt, calls)
1839    }
1840
1841    #[tokio::test]
1842    async fn dispatch_vz_bundle_annotation_auto_routes_to_vz() {
1843        let (rt, calls) = make_composite_with_vz();
1844        let id = cid("mac-svc", 0);
1845        let image = "ghcr.io/org/macos-vz:sequoia";
1846        // Simulate the manifest inspection having cached `com.zlayer.runtime=vz`.
1847        rt.record_image_runtime(image, "vz".to_string()).await;
1848
1849        let spec = make_spec(image, None);
1850        rt.create_container(&id, &spec).await.unwrap();
1851
1852        let calls = calls.lock().unwrap();
1853        assert_eq!(
1854            role_for(&calls, "create_container"),
1855            Some(Role::Vz),
1856            "a com.zlayer.runtime=vz bundle should auto-route to the VZ runtime"
1857        );
1858    }
1859
1860    #[tokio::test]
1861    async fn dispatch_vz_label_forces_vz() {
1862        let (rt, calls) = make_composite_with_vz();
1863        let id = cid("mac-svc", 0);
1864        let mut spec = make_spec("ghcr.io/org/whatever:1", None);
1865        spec.labels
1866            .insert("com.zlayer.isolation".to_string(), "vz".to_string());
1867
1868        rt.create_container(&id, &spec).await.unwrap();
1869
1870        let calls = calls.lock().unwrap();
1871        assert_eq!(
1872            role_for(&calls, "create_container"),
1873            Some(Role::Vz),
1874            "an explicit com.zlayer.isolation=vz label should force the VZ runtime"
1875        );
1876    }
1877
1878    #[tokio::test]
1879    async fn dispatch_sandbox_label_overrides_vz_bundle() {
1880        let (rt, calls) = make_composite_with_vz();
1881        let id = cid("mac-svc", 0);
1882        let image = "ghcr.io/org/macos-vz:sequoia";
1883        rt.record_image_runtime(image, "vz".to_string()).await;
1884
1885        let mut spec = make_spec(image, None);
1886        spec.labels
1887            .insert("com.zlayer.isolation".to_string(), "sandbox".to_string());
1888        rt.create_container(&id, &spec).await.unwrap();
1889
1890        let calls = calls.lock().unwrap();
1891        assert_eq!(
1892            role_for(&calls, "create_container"),
1893            Some(Role::Primary),
1894            "com.zlayer.isolation=sandbox should opt out of VZ auto-detect (force the sandbox)"
1895        );
1896    }
1897
1898    /// Composite with primary + delegate (libkrun) + a VZ Linux-guest delegate,
1899    /// all sharing one call log. Mirrors `make_composite_with_vz`.
1900    fn make_composite_with_vz_linux() -> (CompositeRuntime, CallLog) {
1901        let calls = Arc::new(StdMutex::new(Vec::new()));
1902        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
1903        let delegate =
1904            Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>;
1905        let vz_linux =
1906            Arc::new(MockRuntime::new(Role::VzLinux, Arc::clone(&calls))) as Arc<dyn Runtime>;
1907        let rt = CompositeRuntime::new(primary as Arc<dyn Runtime>, Some(delegate))
1908            .with_vz_linux_delegate(Some(vz_linux));
1909        (rt, calls)
1910    }
1911
1912    #[tokio::test]
1913    async fn dispatch_vz_linux_label_forces_vz_linux() {
1914        let (rt, calls) = make_composite_with_vz_linux();
1915        let id = cid("lin-svc", 0);
1916        let mut spec = make_spec("docker.io/library/alpine:3.19", None);
1917        spec.labels
1918            .insert("com.zlayer.isolation".to_string(), "vz-linux".to_string());
1919
1920        rt.create_container(&id, &spec).await.unwrap();
1921
1922        let calls = calls.lock().unwrap();
1923        assert_eq!(
1924            role_for(&calls, "create_container"),
1925            Some(Role::VzLinux),
1926            "com.zlayer.isolation=vz-linux must force the VZ Linux runtime"
1927        );
1928    }
1929
1930    #[tokio::test]
1931    async fn dispatch_vz_linux_marker_auto_routes_to_vz_linux() {
1932        let (rt, calls) = make_composite_with_vz_linux();
1933        let id = cid("lin-svc", 0);
1934        let image = "ghcr.io/org/linux-vz:bookworm";
1935        rt.record_image_runtime(image, "vz-linux".to_string()).await;
1936
1937        let spec = make_spec(image, None);
1938        rt.create_container(&id, &spec).await.unwrap();
1939
1940        let calls = calls.lock().unwrap();
1941        assert_eq!(
1942            role_for(&calls, "create_container"),
1943            Some(Role::VzLinux),
1944            "a com.zlayer.runtime=vz-linux marker should auto-route to the VZ Linux runtime"
1945        );
1946    }
1947
1948    #[tokio::test]
1949    async fn dispatch_linux_platform_with_vz_linux_routes_to_vz_linux() {
1950        let (rt, calls) = make_composite_with_vz_linux();
1951        let id = cid("lin-svc", 0);
1952        // platform.os = linux: with a VZ Linux delegate present this is the
1953        // default Linux path, NOT the libkrun delegate.
1954        let spec = make_spec(
1955            "docker.io/library/alpine:3.19",
1956            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Arm64)),
1957        );
1958
1959        rt.create_container(&id, &spec).await.unwrap();
1960
1961        let calls = calls.lock().unwrap();
1962        assert_eq!(
1963            role_for(&calls, "create_container"),
1964            Some(Role::VzLinux),
1965            "a Linux platform spec must default to the VZ Linux runtime when present"
1966        );
1967    }
1968
1969    #[tokio::test]
1970    async fn dispatch_linux_image_os_with_vz_linux_routes_to_vz_linux() {
1971        let (rt, calls) = make_composite_with_vz_linux();
1972        let id = cid("lin-svc", 0);
1973        let image = "docker.io/library/nginx:1.25";
1974        rt.record_image_os(image, OsKind::Linux).await;
1975
1976        let spec = make_spec(image, None);
1977        rt.create_container(&id, &spec).await.unwrap();
1978
1979        let calls = calls.lock().unwrap();
1980        assert_eq!(
1981            role_for(&calls, "create_container"),
1982            Some(Role::VzLinux),
1983            "a Linux image-OS cache hit must default to the VZ Linux runtime when present"
1984        );
1985    }
1986
1987    #[tokio::test]
1988    async fn dispatch_macos_image_os_with_vz_linux_routes_to_primary() {
1989        // A macOS-native rootfs must NEVER go to the Linux VM. Even with a
1990        // VZ-Linux delegate present (the default Linux path), an image whose
1991        // locally-known OS is macOS routes to the primary (Seatbelt sandbox).
1992        let (rt, calls) = make_composite_with_vz_linux();
1993        let id = cid("mac-svc", 0);
1994        let image = "ghcr.io/zlayer/macos-native:latest";
1995        rt.record_image_os(image, OsKind::Macos).await;
1996
1997        let spec = make_spec(image, None);
1998        rt.create_container(&id, &spec).await.unwrap();
1999
2000        let calls = calls.lock().unwrap();
2001        assert_eq!(
2002            role_for(&calls, "create_container"),
2003            Some(Role::Primary),
2004            "image_os == Macos must route to primary even when VZ-Linux is the default",
2005        );
2006    }
2007
2008    #[tokio::test]
2009    async fn dispatch_unknown_os_with_vz_linux_defaults_to_vz_linux() {
2010        // OS genuinely unknown (no isolation label, no runtime marker, no
2011        // platform, no image-OS cache hit) on a macOS host with a VZ-Linux
2012        // delegate: default to VZ-Linux. Sending an unknown (overwhelmingly
2013        // Linux) image to the Seatbelt sandbox is the exit-127 failure this fix
2014        // exists to prevent.
2015        let (rt, calls) = make_composite_with_vz_linux();
2016        let id = cid("svc", 0);
2017        let spec = make_spec("docker.io/library/whatever:latest", None);
2018
2019        rt.create_container(&id, &spec).await.unwrap();
2020
2021        let calls = calls.lock().unwrap();
2022        assert_eq!(
2023            role_for(&calls, "create_container"),
2024            Some(Role::VzLinux),
2025            "an unknown-OS image must default to VZ-Linux when the delegate is present",
2026        );
2027    }
2028
2029    #[tokio::test]
2030    async fn dispatch_unknown_os_without_vz_linux_falls_through_to_primary() {
2031        // The unknown-OS default to VZ-Linux is keyed on the delegate's
2032        // presence (a proxy for "macOS host"). Without a VZ-Linux delegate the
2033        // historical primary fallthrough is preserved for non-macOS hosts.
2034        let (rt, calls) = make_composite(true);
2035        let id = cid("svc", 0);
2036        let spec = make_spec("docker.io/library/whatever:latest", None);
2037
2038        rt.create_container(&id, &spec).await.unwrap();
2039
2040        let calls = calls.lock().unwrap();
2041        assert_eq!(
2042            role_for(&calls, "create_container"),
2043            Some(Role::Primary),
2044            "without a VZ-Linux delegate an unknown-OS image keeps the primary fallthrough",
2045        );
2046    }
2047
2048    /// Seed a persistent blob cache at `path` with a manifest + config blob for
2049    /// `image` whose config declares `os = linux`, mirroring what a real
2050    /// VZ-Linux pull writes to `{data_dir}/vz/linux/images/blobs.redb`.
2051    async fn seed_persistent_linux_cache(path: &std::path::Path, image: &str) {
2052        seed_persistent_cache_with_os(path, image, "linux").await;
2053    }
2054
2055    /// Like [`seed_persistent_linux_cache`] but lets the test pick the config
2056    /// `os` value (e.g. `"darwin"` for a macOS-native bundle).
2057    async fn seed_persistent_cache_with_os(path: &std::path::Path, image: &str, os: &str) {
2058        let cache = zlayer_registry::CacheType::persistent_at(path)
2059            .build()
2060            .await
2061            .expect("open persistent blob cache");
2062
2063        let config_json = serde_json::json!({
2064            "architecture": "arm64",
2065            "os": os,
2066            "config": {},
2067        });
2068        let config_bytes = serde_json::to_vec(&config_json).unwrap();
2069        let config_digest = zlayer_registry::compute_digest(&config_bytes);
2070        cache.put(&config_digest, &config_bytes).await.unwrap();
2071
2072        let manifest = zlayer_registry::OciImageManifest {
2073            schema_version: 2,
2074            media_type: Some("application/vnd.oci.image.manifest.v1+json".to_string()),
2075            artifact_type: None,
2076            config: oci_client::manifest::OciDescriptor {
2077                media_type: "application/vnd.oci.image.config.v1+json".to_string(),
2078                digest: config_digest.clone(),
2079                size: i64::try_from(config_bytes.len()).unwrap(),
2080                urls: None,
2081                annotations: None,
2082            },
2083            layers: vec![],
2084            annotations: None,
2085            subject: None,
2086        };
2087        let manifest_bytes = serde_json::to_vec(&manifest).unwrap();
2088        let manifest_digest = zlayer_registry::compute_digest(&manifest_bytes);
2089        cache
2090            .put(&zlayer_registry::manifest_cache_key(image), &manifest_bytes)
2091            .await
2092            .unwrap();
2093        cache
2094            .put(
2095                &zlayer_registry::manifest_digest_cache_key(image),
2096                manifest_digest.as_bytes(),
2097            )
2098            .await
2099            .unwrap();
2100    }
2101
2102    /// End-to-end of the macOS rate-limit routing fix: a Linux image whose OS
2103    /// lives ONLY in the local persistent blob cache (no network) must be
2104    /// inspected at `pull_image` time and then routed to the VZ-Linux runtime
2105    /// by `select_for` — exactly the path that breaks under a Docker Hub 429
2106    /// when inspection goes to the wire.
2107    #[tokio::test]
2108    async fn pull_then_dispatch_resolves_linux_os_from_local_cache_routes_to_vz_linux() {
2109        let tmp = tempfile::tempdir().unwrap();
2110        let cache_path = tmp.path().join("blobs.redb");
2111        let image = "docker.io/library/alpine:latest";
2112        seed_persistent_linux_cache(&cache_path, image).await;
2113
2114        let (rt, calls) = make_composite_with_vz_linux();
2115        let rt = rt.with_os_inspect_cache_path(Some(cache_path));
2116
2117        // pull_image drives the real local-first OS inspection; no network.
2118        rt.pull_image(image).await.unwrap();
2119
2120        // The OS must now be cached as Linux purely from the local store.
2121        assert_eq!(
2122            rt.image_os.read().await.get(image).copied(),
2123            Some(OsKind::Linux),
2124            "pull_image must resolve Linux OS from the local persistent cache",
2125        );
2126
2127        // And select_for must route the (platform-less) spec to VZ-Linux.
2128        let id = cid("lin-svc", 0);
2129        let spec = make_spec(image, None);
2130        rt.create_container(&id, &spec).await.unwrap();
2131
2132        let calls = calls.lock().unwrap();
2133        assert_eq!(
2134            role_for(&calls, "create_container"),
2135            Some(Role::VzLinux),
2136            "a Linux image whose OS came from the local cache must route to VZ-Linux",
2137        );
2138    }
2139
2140    /// LIVE BUG #1, end-to-end: the cache is seeded under the QUALIFIED ref
2141    /// (`docker.io/library/alpine:latest`, as the pull writes it) but the spec —
2142    /// and therefore every `pull_image` / `inspect_image_os` / `select_for`
2143    /// lookup — uses the BARE `alpine:latest`. With the canonical manifest-key
2144    /// normalization, the bare-ref inspect hits the qualified-seeded cache with
2145    /// NO network call, so the Linux image still routes to VZ-Linux.
2146    #[tokio::test]
2147    async fn bare_ref_spec_resolves_os_from_qualified_seeded_cache_routes_to_vz_linux() {
2148        let tmp = tempfile::tempdir().unwrap();
2149        let cache_path = tmp.path().join("blobs.redb");
2150        // Seed under the QUALIFIED ref, exactly as a real pull persists it.
2151        seed_persistent_linux_cache(&cache_path, "docker.io/library/alpine:latest").await;
2152
2153        let (rt, calls) = make_composite_with_vz_linux();
2154        let rt = rt.with_os_inspect_cache_paths(vec![cache_path]);
2155
2156        // Everything below uses the BARE ref, exactly as the live daemon does
2157        // (`ImageRef::Display` yields the user-original string).
2158        let bare = "alpine:latest";
2159        rt.pull_image(bare).await.unwrap();
2160
2161        assert_eq!(
2162            rt.image_os.read().await.get(bare).copied(),
2163            Some(OsKind::Linux),
2164            "bare-ref inspect must resolve Linux from the qualified-seeded cache",
2165        );
2166
2167        let id = cid("lin-svc", 0);
2168        let spec = make_spec(bare, None);
2169        rt.create_container(&id, &spec).await.unwrap();
2170
2171        let calls = calls.lock().unwrap();
2172        assert_eq!(
2173            role_for(&calls, "create_container"),
2174            Some(Role::VzLinux),
2175            "bare-ref Linux image routes to VZ-Linux via the canonical-key cache hit",
2176        );
2177    }
2178
2179    /// LIVE BUG #2 / multi-cache fallback: the manifest+config live ONLY in the
2180    /// SECOND configured cache (the primary Sandbox store), because the
2181    /// VZ-Linux pull short-circuited under `IfNotPresent`. Inspection must probe
2182    /// the empty first cache (no network), then resolve from the second — still
2183    /// with NO network — and route to VZ-Linux.
2184    #[tokio::test]
2185    async fn os_resolves_from_second_cache_when_first_is_empty() {
2186        let tmp = tempfile::tempdir().unwrap();
2187        let empty_cache = tmp.path().join("vz-linux-blobs.redb");
2188        let primary_cache = tmp.path().join("primary-blobs.redb");
2189        // Create the first cache empty (so opening it succeeds but it misses).
2190        zlayer_registry::CacheType::persistent_at(&empty_cache)
2191            .build()
2192            .await
2193            .unwrap();
2194        // Only the SECOND cache has the image.
2195        seed_persistent_linux_cache(&primary_cache, "docker.io/library/alpine:latest").await;
2196
2197        let (rt, calls) = make_composite_with_vz_linux();
2198        let rt = rt.with_os_inspect_cache_paths(vec![empty_cache, primary_cache]);
2199
2200        let bare = "alpine:latest";
2201        rt.pull_image(bare).await.unwrap();
2202
2203        assert_eq!(
2204            rt.image_os.read().await.get(bare).copied(),
2205            Some(OsKind::Linux),
2206            "OS must resolve from the second cache after the first misses (no network)",
2207        );
2208
2209        let id = cid("lin-svc", 0);
2210        let spec = make_spec(bare, None);
2211        rt.create_container(&id, &spec).await.unwrap();
2212
2213        let calls = calls.lock().unwrap();
2214        assert_eq!(role_for(&calls, "create_container"), Some(Role::VzLinux),);
2215    }
2216
2217    /// The exact LIVE bug, simulated end-to-end: a `pull_image` whose network OS
2218    /// re-inspection WOULD 429 still leaves dispatch fully working, because the
2219    /// image's OS is resolved purely from the local persistent blob cache the
2220    /// runtime already populated during extract — with NO network call at all.
2221    ///
2222    /// We model the 429 by pointing `os_inspect_cache_paths` at a real seeded
2223    /// cache (so the local resolver succeeds) while using a synthetic
2224    /// `*.invalid` registry host: if the dispatch-population path ever reached
2225    /// the network it would fail to resolve, leaving the cache empty and routing
2226    /// the Linux image to the Seatbelt primary (exit 127). It must not — the
2227    /// local cache hit is authoritative and the image routes to VZ-Linux.
2228    #[tokio::test]
2229    async fn pull_with_network_429_still_dispatches_via_local_cache() {
2230        let tmp = tempfile::tempdir().unwrap();
2231        let cache_path = tmp.path().join("blobs.redb");
2232        // The image ref uses a host that cannot be resolved on the wire; only
2233        // the LOCAL cache knows its OS.
2234        let image = "registry.invalid.example/library/alpine:latest";
2235        seed_persistent_linux_cache(&cache_path, image).await;
2236
2237        let (rt, calls) = make_composite_with_vz_linux();
2238        let rt = rt.with_os_inspect_cache_path(Some(cache_path));
2239
2240        // `pull_image` drives the dispatch-population inspection. Even though a
2241        // real registry inspection of `*.invalid` would fail (our stand-in for a
2242        // 429), the local-only path resolves Linux and the call succeeds.
2243        rt.pull_image(image).await.unwrap();
2244        assert_eq!(
2245            rt.image_os.read().await.get(image).copied(),
2246            Some(OsKind::Linux),
2247            "OS must be resolved from the local cache with no network call",
2248        );
2249
2250        // And dispatch routes the Linux image to VZ-Linux, not the primary.
2251        let id = cid("lin-svc", 0);
2252        let spec = make_spec(image, None);
2253        rt.create_container(&id, &spec).await.unwrap();
2254
2255        let calls = calls.lock().unwrap();
2256        assert_eq!(
2257            role_for(&calls, "create_container"),
2258            Some(Role::VzLinux),
2259            "a would-be-429 pull must still route the cached Linux image to VZ-Linux",
2260        );
2261    }
2262
2263    /// Companion to the macOS-native dispatch guard, but driving the resolution
2264    /// through the real local-cache inspection at `pull_image` time: a bundle
2265    /// whose config declares `os = darwin` in the local cache must route to the
2266    /// primary, never the Linux VM.
2267    #[tokio::test]
2268    async fn pull_then_dispatch_resolves_macos_os_from_local_cache_routes_to_primary() {
2269        let tmp = tempfile::tempdir().unwrap();
2270        let cache_path = tmp.path().join("blobs.redb");
2271        let image = "ghcr.io/zlayer/macos-native:latest";
2272        seed_persistent_cache_with_os(&cache_path, image, "darwin").await;
2273
2274        let (rt, calls) = make_composite_with_vz_linux();
2275        let rt = rt.with_os_inspect_cache_path(Some(cache_path));
2276
2277        rt.pull_image(image).await.unwrap();
2278        assert_eq!(
2279            rt.image_os.read().await.get(image).copied(),
2280            Some(OsKind::Macos),
2281            "pull_image must resolve macOS OS from the local persistent cache",
2282        );
2283
2284        let id = cid("mac-svc", 0);
2285        let spec = make_spec(image, None);
2286        rt.create_container(&id, &spec).await.unwrap();
2287
2288        let calls = calls.lock().unwrap();
2289        assert_eq!(
2290            role_for(&calls, "create_container"),
2291            Some(Role::Primary),
2292            "a macOS-native rootfs must route to primary even with VZ-Linux as default",
2293        );
2294    }
2295
2296    #[tokio::test]
2297    async fn dispatch_vm_label_forces_libkrun_delegate() {
2298        let (rt, calls) = make_composite_with_vz_linux();
2299        let id = cid("lin-svc", 0);
2300        // Even with a VZ Linux delegate as the default, an explicit
2301        // `com.zlayer.isolation=vm` label forces the libkrun delegate.
2302        let mut spec = make_spec(
2303            "docker.io/library/alpine:3.19",
2304            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Arm64)),
2305        );
2306        spec.labels
2307            .insert("com.zlayer.isolation".to_string(), "vm".to_string());
2308
2309        rt.create_container(&id, &spec).await.unwrap();
2310
2311        let calls = calls.lock().unwrap();
2312        assert_eq!(
2313            role_for(&calls, "create_container"),
2314            Some(Role::Delegate),
2315            "com.zlayer.isolation=vm must force the libkrun delegate even when VZ Linux is default"
2316        );
2317    }
2318
2319    #[tokio::test]
2320    async fn dispatch_unmarked_image_with_vz_delegate_falls_through_to_primary() {
2321        let (rt, calls) = make_composite_with_vz();
2322        let id = cid("mac-svc", 0);
2323        // No runtime marker, no platform, no image-OS cache: VZ must NOT capture
2324        // ordinary images just because the delegate exists.
2325        let spec = make_spec("ghcr.io/org/plain:1", None);
2326        rt.create_container(&id, &spec).await.unwrap();
2327
2328        let calls = calls.lock().unwrap();
2329        assert_eq!(
2330            role_for(&calls, "create_container"),
2331            Some(Role::Primary),
2332            "an unmarked image must fall through to primary even when a VZ delegate is attached"
2333        );
2334    }
2335
2336    #[tokio::test]
2337    async fn per_container_dispatch_cache_persists_through_start_stop() {
2338        let (rt, calls) = make_composite(true);
2339        let id = cid("win-svc", 0);
2340        let spec = make_spec(
2341            "mcr.microsoft.com/windows/nanoserver:ltsc2022",
2342            Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
2343        );
2344
2345        rt.create_container(&id, &spec).await.unwrap();
2346        rt.start_container(&id).await.unwrap();
2347        rt.stop_container(&id, Duration::from_secs(1))
2348            .await
2349            .unwrap();
2350        rt.remove_container(&id).await.unwrap();
2351
2352        let recorded = calls.lock().unwrap().clone();
2353        for method in [
2354            "create_container",
2355            "start_container",
2356            "stop_container",
2357            "remove_container",
2358        ] {
2359            assert_eq!(
2360                role_for(&recorded, method),
2361                Some(Role::Primary),
2362                "{method} should have dispatched to primary"
2363            );
2364        }
2365
2366        // After remove, the dispatch cache entry should be gone.
2367        let after = rt
2368            .start_container(&id)
2369            .await
2370            .expect_err("lookup after remove should fail");
2371        assert!(
2372            matches!(after, AgentError::NotFound { .. }),
2373            "expected NotFound after remove, got {after:?}"
2374        );
2375    }
2376
2377    #[tokio::test]
2378    async fn pull_image_calls_both_runtimes() {
2379        let (rt, calls) = make_composite(true);
2380        rt.pull_image("docker.io/library/alpine:3.19")
2381            .await
2382            .unwrap();
2383
2384        let recorded = calls.lock().unwrap();
2385        let pull_calls: Vec<Role> = recorded
2386            .iter()
2387            .filter(|(_, m, _)| m == "pull_image")
2388            .map(|(r, _, _)| *r)
2389            .collect();
2390        assert!(
2391            pull_calls.contains(&Role::Primary),
2392            "primary should have been pulled: {pull_calls:?}",
2393        );
2394        assert!(
2395            pull_calls.contains(&Role::Delegate),
2396            "delegate should have been pulled: {pull_calls:?}",
2397        );
2398    }
2399
2400    #[tokio::test]
2401    async fn pull_image_delegate_error_does_not_fail() {
2402        // Build the composite by hand so we can flip the delegate's
2403        // pull_image_error before wrapping it in an Arc<dyn Runtime>.
2404        let calls = Arc::new(StdMutex::new(Vec::new()));
2405        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
2406        let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2407        delegate.pull_image_error = Some("simulated delegate pull failure".to_string());
2408        let rt = CompositeRuntime::new(
2409            primary as Arc<dyn Runtime>,
2410            Some(Arc::new(delegate) as Arc<dyn Runtime>),
2411        );
2412
2413        // Top-level call must succeed despite the delegate error.
2414        rt.pull_image("docker.io/library/alpine:3.19")
2415            .await
2416            .unwrap();
2417
2418        let recorded = calls.lock().unwrap();
2419        let pull_calls: Vec<Role> = recorded
2420            .iter()
2421            .filter(|(_, m, _)| m == "pull_image")
2422            .map(|(r, _, _)| *r)
2423            .collect();
2424        assert!(
2425            pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
2426            "both runtimes should have been called: {pull_calls:?}",
2427        );
2428    }
2429
2430    #[tokio::test]
2431    async fn pull_image_primary_wrong_platform_does_not_fail() {
2432        // The HCS runtime returns `AgentError::WrongPlatform` when the image's
2433        // OCI config reports a non-Windows OS (calling `ProcessBaseImage` on a
2434        // Linux base layer is guaranteed to fail with 0x80070003). The
2435        // composite must treat that as a soft skip and let the delegate's
2436        // pull own the image — the overall pull must NOT fail.
2437        let calls = Arc::new(StdMutex::new(Vec::new()));
2438        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2439        primary.pull_image_wrong_platform = Some(("windows", "linux"));
2440        let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2441        let rt = CompositeRuntime::new(
2442            Arc::new(primary) as Arc<dyn Runtime>,
2443            Some(Arc::new(delegate) as Arc<dyn Runtime>),
2444        );
2445
2446        // Top-level call must succeed despite the primary's wrong-platform err.
2447        rt.pull_image("docker.io/library/alpine:3.19")
2448            .await
2449            .expect("composite pull must tolerate WrongPlatform from primary");
2450
2451        let recorded = calls.lock().unwrap();
2452        let pull_calls: Vec<Role> = recorded
2453            .iter()
2454            .filter(|(_, m, _)| m == "pull_image")
2455            .map(|(r, _, _)| *r)
2456            .collect();
2457        assert!(
2458            pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
2459            "delegate must still be called when primary soft-skips: {pull_calls:?}",
2460        );
2461    }
2462
2463    #[tokio::test]
2464    async fn pull_image_with_policy_primary_wrong_platform_does_not_fail() {
2465        // Same contract as `pull_image_primary_wrong_platform_does_not_fail`
2466        // but exercising the `pull_image_with_policy` entry point. The
2467        // policy/auth path is what the daemon's create-container hot loop
2468        // actually invokes, so it has to honour the same soft-skip rule.
2469        let calls = Arc::new(StdMutex::new(Vec::new()));
2470        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2471        primary.pull_image_wrong_platform = Some(("windows", "linux"));
2472        let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2473        let rt = CompositeRuntime::new(
2474            Arc::new(primary) as Arc<dyn Runtime>,
2475            Some(Arc::new(delegate) as Arc<dyn Runtime>),
2476        );
2477
2478        rt.pull_image_with_policy(
2479            "docker.io/library/alpine:3.19",
2480            PullPolicy::IfNotPresent,
2481            None,
2482            zlayer_spec::SourcePolicy::default(),
2483        )
2484        .await
2485        .expect("composite pull_image_with_policy must tolerate WrongPlatform from primary");
2486
2487        let recorded = calls.lock().unwrap();
2488        let pull_calls: Vec<Role> = recorded
2489            .iter()
2490            .filter(|(_, m, _)| m == "pull_image_with_policy")
2491            .map(|(r, _, _)| *r)
2492            .collect();
2493        assert!(
2494            pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
2495            "delegate must still be called when primary soft-skips: {pull_calls:?}",
2496        );
2497    }
2498
2499    #[tokio::test]
2500    async fn pull_image_primary_non_wrong_platform_error_still_fails() {
2501        // Sanity check: only `WrongPlatform` is soft-skipped. Any other error
2502        // from the primary must still bubble up so real pull failures aren't
2503        // silently swallowed.
2504        let calls = Arc::new(StdMutex::new(Vec::new()));
2505        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2506        primary.pull_image_error = Some("simulated real failure".to_string());
2507        let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2508        let rt = CompositeRuntime::new(
2509            Arc::new(primary) as Arc<dyn Runtime>,
2510            Some(Arc::new(delegate) as Arc<dyn Runtime>),
2511        );
2512
2513        let err = rt
2514            .pull_image("docker.io/library/alpine:3.19")
2515            .await
2516            .expect_err("real primary error must propagate");
2517        assert!(
2518            matches!(err, AgentError::Internal(_)),
2519            "expected Internal, got {err:?}",
2520        );
2521    }
2522
2523    #[tokio::test]
2524    async fn list_images_merges_both() {
2525        // Hand-build so we can seed each mock's list_images_response.
2526        let calls = Arc::new(StdMutex::new(Vec::new()));
2527        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2528        primary.list_images_response = vec![ImageInfo {
2529            reference: "primary/image:1".to_string(),
2530            digest: None,
2531            size_bytes: None,
2532        }];
2533        let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2534        delegate.list_images_response = vec![ImageInfo {
2535            reference: "delegate/image:1".to_string(),
2536            digest: None,
2537            size_bytes: None,
2538        }];
2539        let rt = CompositeRuntime::new(
2540            Arc::new(primary) as Arc<dyn Runtime>,
2541            Some(Arc::new(delegate) as Arc<dyn Runtime>),
2542        );
2543
2544        let merged = rt.list_images().await.unwrap();
2545        let refs: Vec<&str> = merged.iter().map(|i| i.reference.as_str()).collect();
2546        assert!(
2547            refs.contains(&"primary/image:1") && refs.contains(&"delegate/image:1"),
2548            "merged list should contain both entries, got {refs:?}",
2549        );
2550    }
2551
2552    /// Regression (macOS `GET /images/json` 500): when the *primary* runtime
2553    /// does not implement `list_images` (the `SandboxRuntime` returns
2554    /// `Unsupported`), the composite must NOT propagate that error. It must
2555    /// fall back to the other backends — in particular the VZ-Linux delegate
2556    /// that actually owns pulled Linux images — and return their list. Before
2557    /// the fix the composite used `self.primary.list_images().await?`, which
2558    /// surfaced as a 500 and (via the inspect fallback) broke `docker pull`.
2559    #[tokio::test]
2560    async fn list_images_tolerates_primary_unsupported_and_uses_vz_linux() {
2561        let calls = Arc::new(StdMutex::new(Vec::new()));
2562        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2563        primary.list_images_error = Some("list_images is not supported".to_string());
2564        let mut vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2565        vz_linux.list_images_response = vec![ImageInfo {
2566            reference: "docker.io/library/alpine:latest".to_string(),
2567            digest: None,
2568            size_bytes: None,
2569        }];
2570
2571        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2572            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2573
2574        let images = rt
2575            .list_images()
2576            .await
2577            .expect("primary Unsupported must not fail the composite list_images");
2578        let refs: Vec<&str> = images.iter().map(|i| i.reference.as_str()).collect();
2579        assert_eq!(
2580            refs,
2581            vec!["docker.io/library/alpine:latest"],
2582            "should return the VZ-Linux delegate's images, got {refs:?}",
2583        );
2584    }
2585
2586    /// When EVERY backend fails `list_images`, the composite surfaces an error
2587    /// (rather than silently returning an empty list, which would mask a total
2588    /// backend outage).
2589    #[tokio::test]
2590    async fn list_images_errors_only_when_all_backends_fail() {
2591        let calls = Arc::new(StdMutex::new(Vec::new()));
2592        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2593        primary.list_images_error = Some("unsupported".to_string());
2594        let mut vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2595        vz_linux.list_images_error = Some("also unsupported".to_string());
2596
2597        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2598            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2599
2600        let err = rt.list_images().await.unwrap_err();
2601        assert!(
2602            matches!(err, AgentError::Unsupported(_)),
2603            "all-backends-fail should surface Unsupported, got {err:?}",
2604        );
2605    }
2606
2607    // ----------------------------------------------------------------------
2608    // Per-container read routing (logs / stats).
2609    //
2610    // These guard the macOS Docker-compat `/logs` and `/stats` 500 fix: when
2611    // the owning backend cannot serve a particular read (the primary
2612    // `SandboxRuntime` implements snapshot reads but returns `Unsupported` for
2613    // the *streaming* ones, or a different backend owns the container), the
2614    // composite must route to / fall back across backends and return real data
2615    // instead of propagating `Unsupported` as a swallowed 500. Only a genuine
2616    // all-not-found is a 404.
2617    // ----------------------------------------------------------------------
2618
2619    /// Build a `LogEntry` with the given stream + message for read tests.
2620    fn log_entry(stream: LogStream, message: &str) -> LogEntry {
2621        LogEntry {
2622            timestamp: chrono::Utc::now(),
2623            stream,
2624            source: zlayer_observability::logs::LogSource::Container("test".to_string()),
2625            message: message.to_string(),
2626            service: None,
2627            deployment: None,
2628        }
2629    }
2630
2631    /// Drain a `LogsStream` into the concatenated UTF-8 body bytes.
2632    async fn drain_logs(stream: LogsStream) -> String {
2633        use futures_util::StreamExt as _;
2634        let mut out = Vec::new();
2635        let mut s = stream;
2636        while let Some(item) = s.next().await {
2637            out.extend_from_slice(&item.expect("log chunk ok").bytes);
2638        }
2639        String::from_utf8(out).expect("utf8 log body")
2640    }
2641
2642    /// Collect a `StatsStream` into a Vec of samples.
2643    async fn drain_stats(stream: StatsStream) -> Vec<StatsSample> {
2644        use futures_util::StreamExt as _;
2645        let mut out = Vec::new();
2646        let mut s = stream;
2647        while let Some(item) = s.next().await {
2648            out.push(item.expect("stats sample ok"));
2649        }
2650        out
2651    }
2652
2653    /// Build a composite whose primary models the macOS `SandboxRuntime`
2654    /// (snapshot reads work, streaming reads return `Unsupported`) and whose
2655    /// VZ-Linux delegate owns the container with working native streams.
2656    /// Returns (composite, call-log) with a container already dispatched to the
2657    /// chosen owner.
2658    async fn make_read_composite(owner: Role) -> (CompositeRuntime, ContainerId, CallLog) {
2659        let calls = Arc::new(StdMutex::new(Vec::new()));
2660        let logs = vec![
2661            log_entry(LogStream::Stdout, "hello stdout"),
2662            log_entry(LogStream::Stderr, "hello stderr"),
2663        ];
2664        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls))
2665            .with_stream_unsupported()
2666            .with_logs(logs.clone());
2667        let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls)).with_logs(logs);
2668        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2669            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2670
2671        let id = cid("read-svc", 0);
2672        // Dispatch the container to the chosen owner without going through the
2673        // (platform-dependent) `select_for` path.
2674        let target = match owner {
2675            Role::Primary => DispatchTarget::Primary,
2676            Role::VzLinux => DispatchTarget::VzLinux,
2677            other => panic!("make_read_composite supports Primary/VzLinux, not {other:?}"),
2678        };
2679        rt.dispatch.write().await.insert(id.clone(), target);
2680        (rt, id, calls)
2681    }
2682
2683    #[tokio::test]
2684    async fn logs_stream_falls_back_to_snapshot_when_owner_has_no_stream() {
2685        // Sole backend = primary (SandboxRuntime model): `logs_stream` is
2686        // Unsupported, but `container_logs` works. With no other backend the
2687        // composite must synthesise a stream from the snapshot rather than 500.
2688        let calls = Arc::new(StdMutex::new(Vec::new()));
2689        let logs = vec![
2690            log_entry(LogStream::Stdout, "hello stdout"),
2691            log_entry(LogStream::Stderr, "hello stderr"),
2692        ];
2693        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls))
2694            .with_stream_unsupported()
2695            .with_logs(logs);
2696        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None);
2697        let id = cid("read-svc", 0);
2698        rt.dispatch
2699            .write()
2700            .await
2701            .insert(id.clone(), DispatchTarget::Primary);
2702
2703        let stream = rt
2704            .logs_stream(&id, LogsStreamOptions::default())
2705            .await
2706            .expect("logs_stream must not 500 when snapshot reads work");
2707        let body = drain_logs(stream).await;
2708        assert!(
2709            body.contains("hello stdout") && body.contains("hello stderr"),
2710            "synthesised stream must carry the captured logs, got: {body:?}",
2711        );
2712    }
2713
2714    #[tokio::test]
2715    async fn logs_stream_routes_to_delegate_owner_native_stream() {
2716        // Owner = VZ-Linux delegate with a working native stream; the primary's
2717        // streaming read is Unsupported but must not be consulted first.
2718        let (rt, id, calls) = make_read_composite(Role::VzLinux).await;
2719        let stream = rt
2720            .logs_stream(&id, LogsStreamOptions::default())
2721            .await
2722            .expect("delegate-owned logs_stream must succeed");
2723        let body = drain_logs(stream).await;
2724        assert!(body.contains("hello stdout"), "got: {body:?}");
2725
2726        let log = calls.lock().expect("call-log mutex poisoned");
2727        assert_eq!(
2728            role_for(&log, "logs_stream"),
2729            Some(Role::VzLinux),
2730            "logs_stream must hit the owning delegate first, calls: {log:?}",
2731        );
2732    }
2733
2734    #[tokio::test]
2735    async fn get_logs_falls_back_across_backends() {
2736        // Owner = primary; here snapshot `get_logs` works on primary directly,
2737        // so it should succeed on the owner without ever consulting the
2738        // delegate. (Soft-miss fallback is exercised by the stats test below.)
2739        let (rt, id, _calls) = make_read_composite(Role::Primary).await;
2740        let logs = rt.get_logs(&id).await.expect("get_logs must succeed");
2741        assert_eq!(logs.len(), 2, "owner snapshot logs should be returned");
2742    }
2743
2744    #[tokio::test]
2745    async fn stats_stream_falls_back_to_snapshot_when_owner_has_no_stream() {
2746        // Sole backend = primary (SandboxRuntime model): `stats_stream` is
2747        // Unsupported but `get_container_stats` works. With no other backend
2748        // offering a native stream, the composite must synthesise a single
2749        // non-empty sample from the snapshot rather than 500.
2750        let calls = Arc::new(StdMutex::new(Vec::new()));
2751        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_stream_unsupported();
2752        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None);
2753        let id = cid("read-svc", 0);
2754        rt.dispatch
2755            .write()
2756            .await
2757            .insert(id.clone(), DispatchTarget::Primary);
2758
2759        let stream = rt
2760            .stats_stream(&id)
2761            .await
2762            .expect("stats_stream must not 500 when get_container_stats works");
2763        let samples = drain_stats(stream).await;
2764        assert_eq!(samples.len(), 1, "snapshot fallback yields one sample");
2765        assert!(
2766            samples[0].mem_used_bytes > 0,
2767            "synthesised sample must carry non-zero memory, got {:?}",
2768            samples[0],
2769        );
2770        assert_eq!(
2771            samples[0].cpu_total_ns, 1_000_000,
2772            "cpu microseconds must be scaled to nanoseconds in the synthesised sample",
2773        );
2774    }
2775
2776    #[tokio::test]
2777    async fn get_container_stats_tolerates_owner_miss_and_uses_other_backend() {
2778        // Owner = primary whose snapshot `get_container_stats` returns
2779        // `Unsupported` (a soft miss); the delegate that follows in the fallback
2780        // chain serves it. The composite must NOT propagate the owner's
2781        // Unsupported as a 500.
2782        let calls = Arc::new(StdMutex::new(Vec::new()));
2783        let primary =
2784            MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_stats_snapshot_unsupported();
2785        let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2786        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2787            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2788        let id = cid("read-svc", 0);
2789        rt.dispatch
2790            .write()
2791            .await
2792            .insert(id.clone(), DispatchTarget::Primary);
2793
2794        let stats = rt
2795            .get_container_stats(&id)
2796            .await
2797            .expect("owner Unsupported must fall back to the delegate, not 500");
2798        assert!(stats.memory_bytes > 0, "delegate stats should be returned");
2799
2800        let log = calls.lock().expect("call-log mutex poisoned");
2801        assert!(
2802            log.iter()
2803                .any(|(role, method, _)| *role == Role::Primary && method == "get_container_stats"),
2804            "primary must have been tried first, calls: {log:?}",
2805        );
2806        assert!(
2807            log.iter()
2808                .any(|(role, method, _)| *role == Role::VzLinux && method == "get_container_stats"),
2809            "delegate must have served the fallback, calls: {log:?}",
2810        );
2811    }
2812
2813    #[tokio::test]
2814    async fn reads_propagate_not_found_when_no_backend_owns_container() {
2815        // Every backend returns NotFound for the dispatched container: the
2816        // composite must surface NotFound (→ 404), NOT mask it as Unsupported
2817        // or empty success.
2818        let calls = Arc::new(StdMutex::new(Vec::new()));
2819        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_reads_not_found();
2820        let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls)).with_reads_not_found();
2821        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2822            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2823        let id = cid("read-svc", 0);
2824        rt.dispatch
2825            .write()
2826            .await
2827            .insert(id.clone(), DispatchTarget::Primary);
2828
2829        // `LogsStream`/`StatsStream` are not `Debug`, so match instead of
2830        // `unwrap_err()`.
2831        match rt.logs_stream(&id, LogsStreamOptions::default()).await {
2832            Err(AgentError::NotFound { .. }) => {}
2833            other => panic!(
2834                "all-not-found logs_stream must be NotFound (404), got {:?}",
2835                other.err(),
2836            ),
2837        }
2838        match rt.stats_stream(&id).await {
2839            Err(AgentError::NotFound { .. }) => {}
2840            other => panic!(
2841                "all-not-found stats_stream must be NotFound (404), got {:?}",
2842                other.err(),
2843            ),
2844        }
2845        let cl_err = rt.container_logs(&id, 10).await.unwrap_err();
2846        assert!(
2847            matches!(cl_err, AgentError::NotFound { .. }),
2848            "all-not-found container_logs must be NotFound (404), got {cl_err:?}",
2849        );
2850    }
2851
2852    #[tokio::test]
2853    async fn reads_on_undispatched_container_are_not_found() {
2854        // No dispatch record at all → NotFound (the id was never created here).
2855        let (rt, _calls) = make_composite(false);
2856        let id = cid("ghost", 0);
2857        match rt.logs_stream(&id, LogsStreamOptions::default()).await {
2858            Err(AgentError::NotFound { .. }) => {}
2859            other => panic!(
2860                "undispatched logs_stream must be NotFound, got {:?}",
2861                other.err()
2862            ),
2863        }
2864    }
2865
2866    /// Regression: `pull_image` must fan out to the VZ-Linux delegate so the
2867    /// image lands in the store where Linux containers actually execute on
2868    /// macOS (and so it becomes listable/inspectable). Before the fix the
2869    /// composite only pulled into `primary` + `delegate`, leaving the
2870    /// VZ-Linux `image_rootfs` empty.
2871    #[tokio::test]
2872    async fn pull_image_fans_out_to_vz_linux() {
2873        let calls = Arc::new(StdMutex::new(Vec::new()));
2874        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2875        let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2876
2877        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2878            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2879
2880        rt.pull_image("docker.io/library/alpine:latest")
2881            .await
2882            .expect("pull should succeed");
2883
2884        let log = calls.lock().expect("call-log mutex poisoned");
2885        assert!(
2886            log.iter()
2887                .any(|(role, method, _)| *role == Role::VzLinux && method == "pull_image"),
2888            "pull_image must reach the VZ-Linux delegate, recorded calls: {log:?}",
2889        );
2890    }
2891
2892    #[tokio::test]
2893    async fn dispatch_lookup_unknown_container_errors() {
2894        let (rt, _calls) = make_composite(true);
2895        let id = cid("ghost", 0);
2896
2897        let err = rt.start_container(&id).await.unwrap_err();
2898        assert!(
2899            matches!(err, AgentError::NotFound { .. }),
2900            "expected NotFound for unknown container, got {err:?}"
2901        );
2902    }
2903
2904    /// Helper: read the internal image-OS cache for test assertions.
2905    async fn cached_os(rt: &CompositeRuntime, image: &str) -> Option<OsKind> {
2906        rt.image_os.read().await.get(image).copied()
2907    }
2908
2909    #[tokio::test]
2910    async fn apply_image_os_inspection_populates_cache_on_ok_some() {
2911        // Contract: when `fetch_image_os` resolves to a recognized OS, the
2912        // cache is populated so subsequent `select_for` calls for specs
2913        // without `platform` dispatch correctly.
2914        let (rt, _calls) = make_composite(true);
2915        let image = "docker.io/library/alpine:3.19";
2916
2917        rt.apply_image_os_inspection(image, Ok(Some(OsKind::Linux)))
2918            .await;
2919
2920        assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
2921    }
2922
2923    #[tokio::test]
2924    async fn apply_image_os_inspection_leaves_cache_untouched_on_ok_none() {
2925        // Contract: when the manifest carries no (or an unrecognized) `os`
2926        // field the cache is left alone. Dispatch will fall through to the
2927        // primary on `create_container`.
2928        let (rt, _calls) = make_composite(true);
2929        let image = "docker.io/library/nginx:1.25";
2930
2931        rt.apply_image_os_inspection(image, Ok(None)).await;
2932
2933        assert_eq!(cached_os(&rt, image).await, None);
2934    }
2935
2936    #[tokio::test]
2937    async fn apply_image_os_inspection_leaves_cache_untouched_on_err() {
2938        // Contract: a registry error during inspection is non-fatal and must
2939        // not poison the cache. Dispatch falls through to primary on lookup.
2940        let (rt, _calls) = make_composite(true);
2941        let image = "docker.io/library/nginx:1.25";
2942
2943        // Pre-seed the cache so we can assert the error path doesn't
2944        // overwrite or clear an existing entry.
2945        rt.record_image_os(image, OsKind::Linux).await;
2946
2947        let err = zlayer_registry::RegistryError::NotFound {
2948            registry: "docker.io".to_string(),
2949            image: image.to_string(),
2950        };
2951        rt.apply_image_os_inspection(image, Err(err)).await;
2952
2953        // Cache is still whatever it was before the failed inspection.
2954        assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
2955    }
2956
2957    #[tokio::test]
2958    async fn pull_image_inspection_failure_does_not_fail_pull() {
2959        // End-to-end: even when the registry fetch fails (inevitable for the
2960        // synthetic image refs used in unit tests), `pull_image` still
2961        // returns `Ok`. The mock primary/delegate both succeed; the
2962        // inspection step logs and moves on. The cache must remain empty
2963        // because there was no successful inspection to record.
2964        let (rt, _calls) = make_composite(true);
2965        let image = "invalid.example.invalid/ghost:v1";
2966
2967        rt.pull_image(image).await.unwrap();
2968
2969        assert_eq!(
2970            cached_os(&rt, image).await,
2971            None,
2972            "failed inspection must not populate the image-OS cache"
2973        );
2974    }
2975
2976    #[tokio::test]
2977    async fn pull_image_with_policy_inspection_failure_does_not_fail_pull() {
2978        // Same contract as `pull_image_inspection_failure_does_not_fail_pull`
2979        // but exercising the policy-aware entry point.
2980        let (rt, _calls) = make_composite(true);
2981        let image = "invalid.example.invalid/ghost:v1";
2982
2983        rt.pull_image_with_policy(
2984            image,
2985            PullPolicy::IfNotPresent,
2986            None,
2987            zlayer_spec::SourcePolicy::default(),
2988        )
2989        .await
2990        .unwrap();
2991
2992        assert_eq!(cached_os(&rt, image).await, None);
2993    }
2994
2995    #[test]
2996    fn os_kind_from_oci_str_roundtrip() {
2997        // Guards the `as_oci_str` ↔ `from_oci_str` relationship used by the
2998        // inspection path. If a new variant is added to `OsKind` without
2999        // updating `from_oci_str` we want the miss here, not a silent
3000        // "dispatch to primary" regression in production.
3001        for os in [OsKind::Linux, OsKind::Windows, OsKind::Macos] {
3002            assert_eq!(OsKind::from_oci_str(os.as_oci_str()), Some(os));
3003        }
3004        assert_eq!(OsKind::from_oci_str(""), None);
3005        assert_eq!(OsKind::from_oci_str("freebsd"), None);
3006    }
3007}