Skip to main content

zlayer_agent/runtimes/
composite.rs

1//! Composite runtime that dispatches per-container to a primary + optional delegate.
2//!
3//! The [`CompositeRuntime`] owns a *primary* runtime (the node-native runtime —
4//! e.g. `HcsRuntime` on Windows, `YoukiRuntime` on Linux, Docker elsewhere) and
5//! an optional *delegate* runtime used for foreign-OS workloads (e.g. a WSL2
6//! delegate on Windows that runs Linux containers). Each call is routed based
7//! on the container's identity:
8//!
9//! * **[`Runtime::create_container`]** consults
10//!   [`ServiceSpec::platform`](zlayer_spec::ServiceSpec) first; when the
11//!   spec's OS targets the delegate we route there, otherwise primary. When
12//!   `platform` is `None`, a secondary **image-OS cache** (populated by
13//!   [`CompositeRuntime::record_image_os`] from OCI manifest inspection at
14//!   pull time) is consulted. If both are unknown we fall through to the
15//!   primary. **Strict policy (H-7):** if either source identifies the
16//!   workload as Linux and this node has no delegate configured, dispatch
17//!   returns [`AgentError::RouteToPeer`] so the scheduler can re-place the
18//!   workload on a Linux peer — the old permissive "fall through to primary"
19//!   behavior is gone.
20//! * All subsequent per-container operations (start/stop/remove/logs/exec/…)
21//!   look up the container in an internal **dispatch cache** that records
22//!   which runtime created it. This guarantees the same runtime sees the
23//!   container for its whole lifecycle, even after daemon restarts within
24//!   the same process.
25//! * Cross-cutting image operations (`pull_image`, `pull_image_with_policy`,
26//!   `list_images`, `prune_images`) fan out to both runtimes — we cannot know
27//!   in advance which runtime will execute a pulled image, and merged image
28//!   listings give users a single coherent view. `remove_image` / `tag_image`
29//!   try primary first and fall back to delegate.
30//!
31//! The dispatch cache is populated on `create_container` and cleared on
32//! `remove_container`. Looking up an unknown id yields
33//! [`AgentError::NotFound`], which surfaces as a clean 404 at the API layer
34//! rather than silently forwarding to the wrong runtime.
35
36use std::collections::HashMap;
37use std::net::IpAddr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use async_trait::async_trait;
42use tokio::sync::RwLock;
43use zlayer_observability::logs::{LogEntry, LogStream};
44use zlayer_spec::{OsKind, PullPolicy, RegistryAuth, ServiceSpec};
45
46use crate::cgroups_stats::ContainerStats;
47use crate::error::{AgentError, Result};
48use crate::runtime::{
49    ContainerId, ContainerInspectDetails, ContainerState, ExecEventStream, ImageInfo, LogChannel,
50    LogChunk, LogsStream, LogsStreamOptions, OverlayAttachKind, PruneResult, Runtime, StatsSample,
51    StatsStream, WaitCondition, WaitOutcome,
52};
53
54/// Which underlying runtime a given container was dispatched to.
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56enum DispatchTarget {
57    Primary,
58    Delegate,
59    /// The Apple-Virtualization (VZ) delegate (macOS only). Selected
60    /// automatically for `com.zlayer.runtime=vz` base bundles, or per-service
61    /// via the `com.zlayer.isolation=vz` label.
62    Vz,
63    /// The Apple-Virtualization **Linux-guest** delegate (macOS only). The
64    /// default Linux path on macOS: selected for Linux images, the
65    /// `com.zlayer.runtime=vz-linux` marker, or the
66    /// `com.zlayer.isolation=vz-linux` label.
67    VzLinux,
68}
69
70/// Routes each container to either the primary runtime or an optional delegate.
71///
72/// See the module-level documentation for the dispatch rules.
73pub struct CompositeRuntime {
74    primary: Arc<dyn Runtime>,
75    delegate: Option<Arc<dyn Runtime>>,
76    /// Opt-in Apple-Virtualization delegate (macOS). Selected only when a
77    /// service carries `com.zlayer.isolation=vz`.
78    vz: Option<Arc<dyn Runtime>>,
79    /// Apple-Virtualization Linux-guest delegate (macOS). When present, it is
80    /// the default runtime for Linux images on this node; libkrun
81    /// (`delegate`) is then reachable only via `com.zlayer.isolation=vm`.
82    vz_linux: Option<Arc<dyn Runtime>>,
83    /// Per-container dispatch cache. Populated on `create_container`, removed
84    /// on `remove_container`.
85    dispatch: Arc<RwLock<HashMap<ContainerId, DispatchTarget>>>,
86    /// Image-OS cache consulted when a spec has no explicit `platform`.
87    /// Populated by [`CompositeRuntime::record_image_os`], which is driven
88    /// from [`zlayer_registry::fetch_image_os`] during `pull_image*`.
89    image_os: Arc<RwLock<HashMap<String, OsKind>>>,
90    /// Image runtime-marker cache (the `com.zlayer.runtime` manifest
91    /// annotation, e.g. `"vz"`). Populated from
92    /// [`zlayer_registry::fetch_image_runtime_marker`] during `pull_image*` so
93    /// `select_for` can auto-detect a VZ base bundle and prefer the VZ runtime
94    /// for it without requiring a per-service label.
95    image_runtime: Arc<RwLock<HashMap<String, String>>>,
96    /// Filesystem paths of the persistent blob caches that the runtimes pull
97    /// into, tried IN ORDER for image-OS / runtime-marker inspection. Typically:
98    ///
99    /// 1. the VZ-Linux runtime's `{data_dir}/vz/linux/images/blobs.redb` (the
100    ///    delegate that actually runs the Linux workload), and
101    /// 2. the primary Sandbox runtime's `{data_dir}/images/blobs.redb`.
102    ///
103    /// Both stores matter because `pull_image` pulls into BOTH (primary first,
104    /// then VZ-Linux), and either pull may short-circuit under
105    /// `PullPolicy::IfNotPresent` when its rootfs already exists — leaving the
106    /// manifest/config in only ONE of the two caches. Inspection therefore
107    /// probes them in order and stops at the first store that resolves the OS,
108    /// LOCAL-ONLY via [`zlayer_registry::fetch_image_os_in_cache_only`] — so an
109    /// already-pulled Linux image is detected as Linux (and routed to VZ-Linux)
110    /// with NO network call, even under a Docker Hub rate-limit. For the OS
111    /// dispatch path there is intentionally **no** network fallback: a local
112    /// miss yields "OS unknown" and dispatch uses its safe macOS default rather
113    /// than risking a 429 (see [`CompositeRuntime::inspect_image_os`]).
114    os_inspect_cache_paths: Vec<std::path::PathBuf>,
115}
116
117impl CompositeRuntime {
118    /// Construct a new composite runtime.
119    ///
120    /// `primary` handles containers whose platform matches the host node.
121    /// `delegate`, when present, handles foreign-OS containers (currently:
122    /// Linux containers on a Windows host via the WSL2 delegate runtime).
123    #[must_use]
124    pub fn new(primary: Arc<dyn Runtime>, delegate: Option<Arc<dyn Runtime>>) -> Self {
125        Self {
126            primary,
127            delegate,
128            vz: None,
129            vz_linux: None,
130            dispatch: Arc::new(RwLock::new(HashMap::new())),
131            image_os: Arc::new(RwLock::new(HashMap::new())),
132            image_runtime: Arc::new(RwLock::new(HashMap::new())),
133            os_inspect_cache_paths: Vec::new(),
134        }
135    }
136
137    /// Point image-OS / runtime-marker inspection at a single persistent blob
138    /// cache the runtimes pull into, so the OS of an already-pulled image
139    /// resolves from the LOCAL config blob with no network round-trip.
140    ///
141    /// Convenience wrapper over [`CompositeRuntime::with_os_inspect_cache_paths`]
142    /// for callers that only have one store. `path` is the on-disk blob-cache
143    /// file (e.g. the VZ-Linux runtime's `{data_dir}/vz/linux/images/blobs.redb`).
144    #[must_use]
145    pub fn with_os_inspect_cache_path(self, path: Option<std::path::PathBuf>) -> Self {
146        self.with_os_inspect_cache_paths(path.into_iter().collect())
147    }
148
149    /// Point image-OS / runtime-marker inspection at an ORDERED list of
150    /// persistent blob caches the runtimes pull into.
151    ///
152    /// Inspection probes each store LOCAL-ONLY (no network) in order and stops
153    /// at the first that resolves the image's OS / marker. This matters because
154    /// `pull_image` pulls into BOTH the VZ-Linux store and the primary Sandbox
155    /// store, and either pull may short-circuit under `PullPolicy::IfNotPresent`
156    /// when its rootfs already exists — leaving the manifest/config in only ONE
157    /// of the two caches. Probing both (VZ-Linux first, then primary) is what
158    /// lets a locally-cached Linux image route to VZ-Linux under a Docker Hub
159    /// rate-limit (see [`zlayer_registry::fetch_image_os_in_cache_only`]).
160    #[must_use]
161    pub fn with_os_inspect_cache_paths(mut self, paths: Vec<std::path::PathBuf>) -> Self {
162        self.os_inspect_cache_paths = paths;
163        self
164    }
165
166    /// Resolve `image`'s OS for **dispatch**, probing each configured local blob
167    /// cache in order, **LOCAL-ONLY — never a network call**.
168    ///
169    /// This is the dispatch-population path: it runs inside `pull_image*` purely
170    /// to fill the image-OS cache that [`CompositeRuntime::select_for`] consults.
171    /// It MUST NOT touch the wire. The image's layers have already been pulled
172    /// and extracted by the time we get here, and the runtimes that did the pull
173    /// (VZ-Linux / Sandbox) wrote the manifest+config into the very blob caches
174    /// `os_inspect_cache_paths` points at — so the OS is knowable with zero
175    /// network round-trips.
176    ///
177    /// The old code fell back to a network inspection (`fetch_image_os`) when no
178    /// local cache resolved the OS. That network call was reachable on a Docker
179    /// Hub 429, and a failed inspection left the cache empty → a cached Linux
180    /// image (e.g. `alpine`) fell through to the Seatbelt sandbox (`Primary`),
181    /// which cannot exec a Linux ELF (exit 127). The network fallback is gone:
182    /// a registry rate-limit can no longer break dispatch here. A genuine local
183    /// miss simply returns `Ok(None)` (dispatch then uses its safe macOS
184    /// fallthrough), and it never errors or blocks.
185    async fn inspect_image_os(
186        &self,
187        image: &str,
188    ) -> std::result::Result<Option<OsKind>, zlayer_registry::RegistryError> {
189        for path in &self.os_inspect_cache_paths {
190            match zlayer_registry::CacheType::persistent_at(path)
191                .build()
192                .await
193            {
194                Ok(cache) => {
195                    match zlayer_registry::fetch_image_os_in_cache_only(image, cache, None).await {
196                        Ok(Some(os)) => return Ok(Some(os)),
197                        Ok(None) => {
198                            tracing::trace!(
199                                image,
200                                cache = %path.display(),
201                                "image OS not resolvable from this local cache; trying next",
202                            );
203                        }
204                        Err(e) => return Err(e),
205                    }
206                }
207                Err(e) => {
208                    tracing::debug!(
209                        image,
210                        cache = %path.display(),
211                        error = %e,
212                        "failed to open OS-inspect blob cache; trying next",
213                    );
214                }
215            }
216        }
217        // No local cache resolved it. We deliberately do NOT fall back to a
218        // network inspection: a Docker Hub 429 must never reach this
219        // dispatch-population path (see the doc comment above). A clean local
220        // miss is `Ok(None)` — dispatch falls through to its safe macOS default.
221        Ok(None)
222    }
223
224    /// Resolve `image`'s `com.zlayer.runtime` marker, probing each configured
225    /// local blob cache in order (no network per cache) before any network call.
226    async fn inspect_image_runtime_marker(
227        &self,
228        image: &str,
229        auth: Option<&RegistryAuth>,
230    ) -> std::result::Result<Option<String>, zlayer_registry::RegistryError> {
231        for path in &self.os_inspect_cache_paths {
232            match zlayer_registry::CacheType::persistent_at(path)
233                .build()
234                .await
235            {
236                Ok(cache) => {
237                    match zlayer_registry::fetch_image_runtime_marker_in_cache_only(
238                        image, cache, None,
239                    )
240                    .await
241                    {
242                        Ok(Some(marker)) => return Ok(Some(marker)),
243                        Ok(None) => {
244                            tracing::trace!(
245                                image,
246                                cache = %path.display(),
247                                "runtime marker not resolvable from this local cache; trying next",
248                            );
249                        }
250                        Err(e) => return Err(e),
251                    }
252                }
253                Err(e) => {
254                    tracing::debug!(
255                        image,
256                        cache = %path.display(),
257                        error = %e,
258                        "failed to open marker-inspect blob cache; trying next",
259                    );
260                }
261            }
262        }
263        zlayer_registry::fetch_image_runtime_marker(image, auth).await
264    }
265
266    /// Attach an opt-in Apple-Virtualization delegate. Services labelled
267    /// `com.zlayer.isolation=vz` route to it; everything else is unaffected.
268    #[must_use]
269    pub fn with_vz_delegate(mut self, vz: Option<Arc<dyn Runtime>>) -> Self {
270        self.vz = vz;
271        self
272    }
273
274    /// Attach the Apple-Virtualization Linux-guest delegate. When present it
275    /// becomes the **default** runtime for Linux images on this node (libkrun
276    /// is then reachable only via the explicit `com.zlayer.isolation=vm`
277    /// label); when `None`, Linux dispatch falls back to the libkrun delegate
278    /// or `RouteToPeer` as before.
279    #[must_use]
280    pub fn with_vz_linux_delegate(mut self, vz_linux: Option<Arc<dyn Runtime>>) -> Self {
281        self.vz_linux = vz_linux;
282        self
283    }
284
285    /// Access the primary runtime (for introspection / tests).
286    #[must_use]
287    pub fn primary(&self) -> &Arc<dyn Runtime> {
288        &self.primary
289    }
290
291    /// Access the delegate runtime, if one is configured.
292    #[must_use]
293    pub fn delegate(&self) -> Option<&Arc<dyn Runtime>> {
294        self.delegate.as_ref()
295    }
296
297    /// Record that `image` is known to target operating system `os`.
298    ///
299    /// Wired from [`zlayer_registry::fetch_image_os`] during `pull_image*`
300    /// (see [`CompositeRuntime::apply_image_os_inspection`]) so that specs
301    /// without an explicit `platform` still dispatch correctly.
302    pub(crate) async fn record_image_os(&self, image: &str, os: OsKind) {
303        self.image_os.write().await.insert(image.to_string(), os);
304    }
305
306    /// Record an image's `com.zlayer.runtime` marker (e.g. `"vz"`), used by
307    /// [`CompositeRuntime::select_for`] to auto-detect a runtime-specific bundle.
308    pub(crate) async fn record_image_runtime(&self, image: &str, marker: String) {
309        self.image_runtime
310            .write()
311            .await
312            .insert(image.to_string(), marker);
313    }
314
315    /// Apply a manifest runtime-marker inspection to the cache. Mirrors
316    /// [`CompositeRuntime::apply_image_os_inspection`]'s non-fatal contract:
317    /// only a present marker updates the cache; absence or error leaves it
318    /// untouched (dispatch falls through to the OS/platform rules).
319    async fn apply_image_runtime_inspection(
320        &self,
321        image: &str,
322        result: std::result::Result<Option<String>, zlayer_registry::RegistryError>,
323    ) {
324        match result {
325            Ok(Some(marker)) => {
326                tracing::debug!(image, marker, "cached image runtime marker for dispatch");
327                self.record_image_runtime(image, marker).await;
328            }
329            Ok(None) => {}
330            Err(e) => {
331                tracing::trace!(
332                    image,
333                    error = %e,
334                    "failed to inspect image runtime marker — dispatch unaffected",
335                );
336            }
337        }
338    }
339
340    /// Apply the result of a manifest OS inspection to the image-OS cache.
341    ///
342    /// Factored out of [`Runtime::pull_image`] and
343    /// [`Runtime::pull_image_with_policy`] so the cache-update policy can be
344    /// unit-tested without depending on a live registry. The three branches
345    /// mirror the contract of [`zlayer_registry::fetch_image_os`]:
346    ///
347    /// * `Ok(Some(os))` — populate the cache so future `create_container`
348    ///   calls without an explicit `spec.platform` dispatch to the right
349    ///   runtime.
350    /// * `Ok(None)` — the config blob had no (or an unrecognized) `os`
351    ///   field. Leave the cache untouched; dispatch falls through to primary.
352    /// * `Err(_)` — transient or registry error. Log at warn and leave the
353    ///   cache untouched. We never fail the overall `pull_image*` call on
354    ///   inspection failure: the primary runtime's own pull already
355    ///   succeeded, and the safe fall-through is "primary".
356    async fn apply_image_os_inspection(
357        &self,
358        image: &str,
359        result: std::result::Result<Option<OsKind>, zlayer_registry::RegistryError>,
360    ) {
361        match result {
362            Ok(Some(os)) => {
363                self.record_image_os(image, os).await;
364                tracing::debug!(image, ?os, "cached image OS for dispatch");
365            }
366            Ok(None) => {
367                tracing::trace!(
368                    image,
369                    "image manifest has no OS field — dispatch will fall through to primary",
370                );
371            }
372            Err(e) => {
373                tracing::warn!(
374                    image,
375                    error = %e,
376                    "failed to inspect image manifest OS — dispatch will fall through to primary",
377                );
378            }
379        }
380    }
381
382    /// Decide which runtime should handle a `create_container` call for `spec`.
383    ///
384    /// The `service` argument is the originating service name, used to build an
385    /// actionable [`AgentError::RouteToPeer`] when a Linux workload lands on
386    /// this node without a local delegate so the scheduler can re-place it on
387    /// a capable peer.
388    ///
389    /// Policy (H-7): Linux workloads are never silently routed to the primary
390    /// on nodes without a delegate. The old "permissive" fall-through (primary
391    /// handles everything) returned an `Unsupported` error only when
392    /// `spec.platform` was explicitly set, but fell through to primary for
393    /// specs without a platform — producing cryptic downstream errors when the
394    /// image-OS cache said `Linux`. We now return `RouteToPeer` in both cases.
395    ///
396    /// Routing precedence, locally-known OS only (NO network call ever happens
397    /// here — the image-OS cache is filled local-only at pull time):
398    /// 1. explicit `com.zlayer.isolation` label,
399    /// 2. `com.zlayer.runtime` manifest marker (`vz` / `vz-linux`),
400    /// 3. `spec.platform.os`,
401    /// 4. the image-OS cache: `Linux` -> `VzLinux` (when present), `Macos` /
402    ///    `Windows` -> `Primary`,
403    /// 5. FINAL fallthrough — OS genuinely unknown: on a macOS host (proxied by
404    ///    the presence of a `vz_linux` delegate) default to `VzLinux`, because
405    ///    almost every registry image is Linux and the Seatbelt sandbox cannot
406    ///    exec a Linux ELF. A macOS-native rootfs never reaches this branch: it
407    ///    resolves `os == Macos` at step 4 and routes to `Primary`.
408    async fn select_for(&self, service: &str, spec: &ServiceSpec) -> Result<DispatchTarget> {
409        // Explicit per-service isolation label wins over everything below.
410        //   `com.zlayer.isolation=vz`               -> VZ (native-macOS guest VM)
411        //   `com.zlayer.isolation=vz-linux`         -> VZ Linux-guest VM
412        //   `com.zlayer.isolation=vm|libkrun`       -> libkrun delegate (force VM)
413        //   `com.zlayer.isolation=sandbox|seatbelt` -> Seatbelt sandbox (primary),
414        //                                              opting OUT of VZ auto-detect.
415        if let Some(label) = spec.labels.get("com.zlayer.isolation") {
416            if self.vz.is_some() && label.eq_ignore_ascii_case("vz") {
417                return Ok(DispatchTarget::Vz);
418            }
419            if self.vz_linux.is_some() && label.eq_ignore_ascii_case("vz-linux") {
420                return Ok(DispatchTarget::VzLinux);
421            }
422            if label.eq_ignore_ascii_case("vm") || label.eq_ignore_ascii_case("libkrun") {
423                // Force the libkrun delegate. If no delegate exists the
424                // platform/image-OS rules below produce the appropriate
425                // `RouteToPeer`, so only short-circuit when one is present.
426                if self.delegate.is_some() {
427                    return Ok(DispatchTarget::Delegate);
428                }
429            }
430            if label.eq_ignore_ascii_case("sandbox") || label.eq_ignore_ascii_case("seatbelt") {
431                return Ok(DispatchTarget::Primary);
432            }
433        }
434
435        // Auto-detect a VZ base bundle: when the image's manifest carries
436        // `com.zlayer.runtime=vz` (stamped by `zlayer vz build-base`), prefer the
437        // VZ runtime — it is the only runtime that can boot such a bundle. This
438        // is the "prefer VZ by default" behaviour: it only fires for genuine VZ
439        // bundles, so Seatbelt-rootfs and Linux images are unaffected.
440        if self.vz.is_some()
441            && self
442                .image_runtime
443                .read()
444                .await
445                .get(&spec.image.name.to_string())
446                .is_some_and(|m| m.eq_ignore_ascii_case(zlayer_registry::ZLAYER_RUNTIME_VZ))
447        {
448            return Ok(DispatchTarget::Vz);
449        }
450
451        // Auto-detect a VZ Linux-guest image: when the manifest carries
452        // `com.zlayer.runtime=vz-linux`, prefer the VZ Linux runtime.
453        if self.vz_linux.is_some()
454            && self
455                .image_runtime
456                .read()
457                .await
458                .get(&spec.image.name.to_string())
459                .is_some_and(|m| m.eq_ignore_ascii_case(zlayer_registry::ZLAYER_RUNTIME_LINUX_VZ))
460        {
461            return Ok(DispatchTarget::VzLinux);
462        }
463
464        if let Some(platform) = &spec.platform {
465            let target = match platform.os {
466                OsKind::Windows | OsKind::Macos => DispatchTarget::Primary,
467                // On macOS the VZ Linux-guest runtime is the default Linux path;
468                // only fall back to the libkrun delegate when it is absent.
469                OsKind::Linux if self.vz_linux.is_some() => DispatchTarget::VzLinux,
470                OsKind::Linux => DispatchTarget::Delegate,
471            };
472            if matches!(target, DispatchTarget::Delegate) && self.delegate.is_none() {
473                return Err(AgentError::RouteToPeer {
474                    service: service.to_string(),
475                    required_os: OsKind::Linux.as_oci_str().to_string(),
476                    reason: "spec.platform.os = linux but this node has no WSL2 delegate \
477                            configured; enable `--install-wsl yes` on this node or add a Linux \
478                            peer to the cluster"
479                        .to_string(),
480                });
481            }
482            return Ok(target);
483        }
484
485        if let Some(os) = self
486            .image_os
487            .read()
488            .await
489            .get(&spec.image.name.to_string())
490            .copied()
491        {
492            return match os {
493                OsKind::Linux => {
494                    if self.vz_linux.is_some() {
495                        // VZ Linux-guest is the default Linux path on macOS.
496                        Ok(DispatchTarget::VzLinux)
497                    } else if self.delegate.is_some() {
498                        Ok(DispatchTarget::Delegate)
499                    } else {
500                        // No delegate and the image manifest says Linux —
501                        // refuse at the composite layer so the scheduler can
502                        // re-place on a Linux peer instead of the primary
503                        // failing with a cryptic HCS error.
504                        Err(AgentError::RouteToPeer {
505                            service: service.to_string(),
506                            required_os: OsKind::Linux.as_oci_str().to_string(),
507                            reason: format!(
508                                "image '{}' manifest reports os=linux but this node has no WSL2 \
509                                 delegate configured; enable `--install-wsl yes` on this node or \
510                                 add a Linux peer to the cluster",
511                                spec.image.name
512                            ),
513                        })
514                    }
515                }
516                OsKind::Windows | OsKind::Macos => Ok(DispatchTarget::Primary),
517            };
518        }
519
520        // OS genuinely unknown (no isolation label, no runtime marker, no
521        // `spec.platform`, no image-OS cache hit). On a macOS host with a
522        // VZ-Linux delegate, default to VZ-Linux: the overwhelming majority of
523        // images pulled from public registries are Linux, and the Seatbelt
524        // sandbox (the primary) cannot exec a Linux ELF — sending an unknown
525        // image there is the exit-127 failure this fix exists to prevent. The
526        // user is fine with VZ-Linux as the default; the only hard rule is that
527        // a macOS-native rootfs must never go to the Linux VM, and that is
528        // already guaranteed above by the `image_os == Macos -> Primary` branch
529        // (a native bundle resolves its OS locally and never reaches here).
530        //
531        // The `vz_linux` delegate is only ever attached on a macOS host, so its
532        // presence is a sufficient proxy for "macOS host" — non-macOS hosts
533        // (Windows HCS, Linux) keep the historical primary fallthrough.
534        if self.vz_linux.is_some() {
535            return Ok(DispatchTarget::VzLinux);
536        }
537
538        Ok(DispatchTarget::Primary)
539    }
540
541    /// Look up an existing dispatch decision for `id`, or return `NotFound`.
542    async fn lookup(&self, id: &ContainerId) -> Result<Arc<dyn Runtime>> {
543        let target =
544            self.dispatch
545                .read()
546                .await
547                .get(id)
548                .copied()
549                .ok_or_else(|| AgentError::NotFound {
550                    container: id.to_string(),
551                    reason: "no dispatch record in CompositeRuntime".to_string(),
552                })?;
553        Ok(self.runtime_for(target).clone())
554    }
555
556    /// Resolve a [`DispatchTarget`] to the concrete runtime reference.
557    ///
558    /// Unwrapping the delegate is safe because [`Self::select_for`] returns
559    /// `Err` whenever a delegate would be required but is missing, so a
560    /// `DispatchTarget::Delegate` can never end up in the dispatch map
561    /// without a delegate being present.
562    fn runtime_for(&self, t: DispatchTarget) -> &Arc<dyn Runtime> {
563        match t {
564            DispatchTarget::Primary => &self.primary,
565            DispatchTarget::Delegate => self
566                .delegate
567                .as_ref()
568                .expect("delegate target requires delegate to exist"),
569            // `select_for` only returns `Vz` when a vz delegate is present;
570            // fall back to primary defensively.
571            DispatchTarget::Vz => self.vz.as_ref().unwrap_or(&self.primary),
572            // `select_for` only returns `VzLinux` when a vz-linux delegate is
573            // present; fall back to primary defensively.
574            DispatchTarget::VzLinux => self.vz_linux.as_ref().unwrap_or(&self.primary),
575        }
576    }
577
578    /// Build the ordered list of backends to try for a per-container read
579    /// (logs / stats), owning backend first.
580    ///
581    /// The container's dispatch record (recorded at `create_container`) names
582    /// the runtime that actually ran it, so we try that one first. The other
583    /// configured backends follow as a defensive fallback for the case where
584    /// the owning backend can answer container lifecycle calls but not a
585    /// particular read (e.g. the macOS `SandboxRuntime` primary implements
586    /// `container_logs`/`get_container_stats` but not the *streaming*
587    /// `logs_stream`/`stats_stream`, so it returns `Unsupported` for the
588    /// latter). Returns `NotFound` when the id was never dispatched.
589    async fn read_backends(
590        &self,
591        id: &ContainerId,
592    ) -> Result<Vec<(&'static str, Arc<dyn Runtime>)>> {
593        let owner =
594            self.dispatch
595                .read()
596                .await
597                .get(id)
598                .copied()
599                .ok_or_else(|| AgentError::NotFound {
600                    container: id.to_string(),
601                    reason: "no dispatch record in CompositeRuntime".to_string(),
602                })?;
603
604        // Owning backend first, then every other configured backend (de-duped
605        // against the owner) so a read the owner can't serve can still be
606        // satisfied elsewhere instead of 500-ing.
607        let all: [(DispatchTarget, Option<&Arc<dyn Runtime>>); 4] = [
608            (DispatchTarget::Primary, Some(&self.primary)),
609            (DispatchTarget::Delegate, self.delegate.as_ref()),
610            (DispatchTarget::Vz, self.vz.as_ref()),
611            (DispatchTarget::VzLinux, self.vz_linux.as_ref()),
612        ];
613
614        let label_for = |t: DispatchTarget| match t {
615            DispatchTarget::Primary => "primary",
616            DispatchTarget::Delegate => "delegate",
617            DispatchTarget::Vz => "vz",
618            DispatchTarget::VzLinux => "vz_linux",
619        };
620
621        let mut out: Vec<(&'static str, Arc<dyn Runtime>)> =
622            vec![(label_for(owner), self.runtime_for(owner).clone())];
623        for (target, rt) in all {
624            if target != owner {
625                if let Some(rt) = rt {
626                    out.push((label_for(target), rt.clone()));
627                }
628            }
629        }
630        Ok(out)
631    }
632}
633
634/// Accumulates per-backend errors while a read fans out across the
635/// owner-first fallback chain, so the *final* error reflects the right HTTP
636/// status.
637///
638/// Every backend in the chain is tried; a backend that does not own the
639/// container returns [`AgentError::NotFound`] (a *skip*, not authoritative),
640/// while a backend that owns it but cannot serve this particular read returns
641/// some other error (notably the `Unsupported` default for an unimplemented
642/// streaming read) — a *soft miss* we fall back from. The distinction matters
643/// for the final error: if **every** backend returned `NotFound`, the container
644/// genuinely does not exist here and we surface `NotFound` (→ 404); if any
645/// backend produced a non-`NotFound` error, that is the more informative
646/// failure to report (→ 500) once no backend could serve the read.
647#[derive(Default)]
648struct ReadMissAccumulator {
649    /// The most recent non-`NotFound` error, if any backend produced one.
650    soft_err: Option<AgentError>,
651    /// The most recent `NotFound`, used only when *no* soft error occurred.
652    not_found: Option<AgentError>,
653}
654
655impl ReadMissAccumulator {
656    fn record(&mut self, e: AgentError) {
657        if matches!(e, AgentError::NotFound { .. }) {
658            self.not_found = Some(e);
659        } else {
660            self.soft_err = Some(e);
661        }
662    }
663
664    /// Resolve the accumulated misses into the final error for a read where no
665    /// backend succeeded. Prefers a soft error (more informative → 500) over a
666    /// bare `NotFound`; falls back to a synthesised `Unsupported` only if
667    /// nothing was recorded at all (an empty backend list, which cannot happen
668    /// in practice since the owner is always present).
669    fn into_error(self, what: &str) -> AgentError {
670        self.soft_err
671            .or(self.not_found)
672            .unwrap_or_else(|| AgentError::Unsupported(format!("no backend could serve {what}")))
673    }
674}
675
676/// Build a bounded one-shot [`LogsStream`] from a captured-log snapshot.
677///
678/// Used by [`CompositeRuntime::logs_stream`] when no backend offers a native
679/// log stream but one can produce a `container_logs` snapshot (e.g. the macOS
680/// `SandboxRuntime`). Mirrors the VZ-Linux runtime's own snapshot-to-stream
681/// translation so the wire shape is identical regardless of which backend
682/// served the data: honour the per-channel filters and re-attach the newline
683/// the line-splitter stripped.
684fn one_shot_logs_stream(entries: Vec<LogEntry>, opts: &LogsStreamOptions) -> LogsStream {
685    use futures_util::stream;
686
687    // Docker's default (neither stdout nor stderr explicitly requested) means
688    // "both"; equivalently, keep stdout unless stderr was the *only* channel
689    // requested, and vice-versa.
690    let want_stdout = opts.stdout || !opts.stderr;
691    let want_stderr = opts.stderr || !opts.stdout;
692    let timestamps = opts.timestamps;
693
694    let chunks: Vec<Result<LogChunk>> = entries
695        .into_iter()
696        .filter_map(|e| {
697            let channel = match e.stream {
698                LogStream::Stdout => LogChannel::Stdout,
699                LogStream::Stderr => LogChannel::Stderr,
700            };
701            let keep = match channel {
702                LogChannel::Stdout => want_stdout,
703                LogChannel::Stderr => want_stderr,
704                LogChannel::Stdin => false,
705            };
706            if !keep {
707                return None;
708            }
709            let mut bytes = e.message.into_bytes();
710            bytes.push(b'\n');
711            Some(Ok(LogChunk {
712                stream: channel,
713                bytes: bytes::Bytes::from(bytes),
714                timestamp: timestamps.then_some(e.timestamp),
715            }))
716        })
717        .collect();
718
719    Box::pin(stream::iter(chunks))
720}
721
722/// Build a bounded one-shot [`StatsStream`] from a single [`ContainerStats`]
723/// snapshot.
724///
725/// Used by [`CompositeRuntime::stats_stream`] when no backend offers a native
726/// stats stream but one can produce a `get_container_stats` snapshot. The
727/// [`ContainerStats`] CPU figure is microseconds; [`StatsSample::cpu_total_ns`]
728/// is nanoseconds, so we scale. `online_cpus` is unknown from this coarse
729/// snapshot (the non-streaming API does not carry it) and is reported as `1`
730/// so the Docker-compat CPU-percent math has a sane divisor.
731fn one_shot_stats_stream(stats: &ContainerStats) -> StatsStream {
732    use futures_util::stream;
733
734    let sample = StatsSample {
735        cpu_total_ns: stats.cpu_usage_usec.saturating_mul(1_000),
736        cpu_system_ns: 0,
737        online_cpus: 1,
738        mem_used_bytes: stats.memory_bytes,
739        mem_limit_bytes: stats.memory_limit,
740        net_rx_bytes: 0,
741        net_tx_bytes: 0,
742        blkio_read_bytes: 0,
743        blkio_write_bytes: 0,
744        pids_current: 0,
745        pids_limit: None,
746        timestamp: chrono::Utc::now(),
747    };
748    Box::pin(stream::iter(vec![Ok(sample)]))
749}
750
751#[async_trait]
752impl Runtime for CompositeRuntime {
753    async fn pull_image(&self, image: &str) -> Result<()> {
754        // Primary pull. `WrongPlatform` here means the image's OCI config
755        // reports an OS the primary cannot service (e.g. a Linux image on the
756        // Windows HCS runtime). That is a *soft* failure: the delegate's pull
757        // below owns the image, so we log and continue rather than failing
758        // the whole composite call. Any other error is a real pull failure
759        // and must bubble.
760        if let Err(e) = self.primary.pull_image(image).await {
761            if matches!(e, AgentError::WrongPlatform { .. }) {
762                tracing::debug!(
763                    image,
764                    error = %e,
765                    "primary runtime cannot service image (wrong platform); delegating",
766                );
767            } else {
768                return Err(e);
769            }
770        }
771        if let Some(delegate) = &self.delegate {
772            if let Err(e) = delegate.pull_image(image).await {
773                // Foreign-OS images will reliably fail one of the two pulls
774                // (primary can't store a Linux image's config on Windows, or
775                // vice versa). That's expected — the successful side owns the
776                // layers we'll actually use — so we keep this at debug.
777                tracing::debug!(
778                    image,
779                    error = %e,
780                    "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
781                );
782            }
783        }
784        // VZ + VZ-Linux delegates (macOS). The VZ-Linux runtime is the default
785        // execution path for Linux images on macOS and owns its OWN image store
786        // (`image_rootfs`); if we never pull into it, the image is absent both
787        // when `create_container` dispatches there AND from `list_images` /
788        // `inspect_image` (which is what `docker pull` verifies). Pulling here
789        // makes the image actually present where it runs and listable. Errors
790        // are non-fatal for the same wrong-OS reason as the delegate above.
791        for (label, rt) in [
792            self.vz.as_ref().map(|r| ("vz", r)),
793            self.vz_linux.as_ref().map(|r| ("vz_linux", r)),
794        ]
795        .into_iter()
796        .flatten()
797        {
798            if let Err(e) = rt.pull_image(image).await {
799                tracing::debug!(
800                    image,
801                    runtime = label,
802                    error = %e,
803                    "vz delegate failed to pull image (likely wrong OS); continuing",
804                );
805            }
806        }
807
808        // Inspect the OCI manifest's `config.os` so `select_for(spec)` can
809        // dispatch correctly when `spec.platform` is `None`. Non-fatal: any
810        // failure here just means dispatch falls through to primary.
811        let os_result = self.inspect_image_os(image).await;
812        self.apply_image_os_inspection(image, os_result).await;
813        let marker_result = self.inspect_image_runtime_marker(image, None).await;
814        self.apply_image_runtime_inspection(image, marker_result)
815            .await;
816
817        Ok(())
818    }
819
820    async fn pull_image_with_policy(
821        &self,
822        image: &str,
823        policy: PullPolicy,
824        auth: Option<&RegistryAuth>,
825        source: zlayer_spec::SourcePolicy,
826    ) -> Result<()> {
827        // See `pull_image` above for the `WrongPlatform` soft-skip rationale.
828        if let Err(e) = self
829            .primary
830            .pull_image_with_policy(image, policy, auth, source)
831            .await
832        {
833            if matches!(e, AgentError::WrongPlatform { .. }) {
834                tracing::debug!(
835                    image,
836                    error = %e,
837                    "primary runtime cannot service image (wrong platform); delegating",
838                );
839            } else {
840                return Err(e);
841            }
842        }
843        if let Some(delegate) = &self.delegate {
844            if let Err(e) = delegate
845                .pull_image_with_policy(image, policy, auth, source)
846                .await
847            {
848                tracing::debug!(
849                    image,
850                    error = %e,
851                    "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
852                );
853            }
854        }
855        // See `pull_image` above: the VZ-Linux runtime owns its own image store
856        // and is the default Linux execution path on macOS, so pull into it (and
857        // the opt-in VZ delegate) too. Non-fatal per-backend errors.
858        for (label, rt) in [
859            self.vz.as_ref().map(|r| ("vz", r)),
860            self.vz_linux.as_ref().map(|r| ("vz_linux", r)),
861        ]
862        .into_iter()
863        .flatten()
864        {
865            if let Err(e) = rt.pull_image_with_policy(image, policy, auth, source).await {
866                tracing::debug!(
867                    image,
868                    runtime = label,
869                    error = %e,
870                    "vz delegate failed to pull image (likely wrong OS); continuing",
871                );
872            }
873        }
874
875        let os_result = self.inspect_image_os(image).await;
876        self.apply_image_os_inspection(image, os_result).await;
877        let marker_result = self.inspect_image_runtime_marker(image, auth).await;
878        self.apply_image_runtime_inspection(image, marker_result)
879            .await;
880
881        Ok(())
882    }
883
884    async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
885        let target = self.select_for(&id.service, spec).await?;
886        {
887            let mut dispatch = self.dispatch.write().await;
888            dispatch.insert(id.clone(), target);
889        }
890        let rt = self.runtime_for(target).clone();
891        match rt.create_container(id, spec).await {
892            Ok(()) => Ok(()),
893            Err(e) => {
894                // Roll back the cache insert on failure so subsequent lookups
895                // don't find a dangling entry.
896                self.dispatch.write().await.remove(id);
897                Err(e)
898            }
899        }
900    }
901
902    async fn start_container(&self, id: &ContainerId) -> Result<()> {
903        let rt = self.lookup(id).await?;
904        rt.start_container(id).await
905    }
906
907    async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
908        let rt = self.lookup(id).await?;
909        rt.stop_container(id, timeout).await
910    }
911
912    async fn remove_container(&self, id: &ContainerId) -> Result<()> {
913        let rt = self.lookup(id).await?;
914        let res = rt.remove_container(id).await;
915        self.dispatch.write().await.remove(id);
916        res
917    }
918
919    async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
920        let rt = self.lookup(id).await?;
921        rt.container_state(id).await
922    }
923
924    async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
925        let backends = self.read_backends(id).await?;
926        let mut misses = ReadMissAccumulator::default();
927        for (label, rt) in backends {
928            match rt.container_logs(id, tail).await {
929                Ok(logs) => return Ok(logs),
930                Err(e) => {
931                    tracing::warn!(
932                        container = %id,
933                        runtime = label,
934                        error = %e,
935                        "composite container_logs: backend could not serve logs; trying next backend",
936                    );
937                    misses.record(e);
938                }
939            }
940        }
941        Err(misses.into_error("container_logs"))
942    }
943
944    async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
945        let rt = self.lookup(id).await?;
946        rt.exec(id, cmd).await
947    }
948
949    async fn exec_with_opts(
950        &self,
951        id: &ContainerId,
952        opts: &crate::runtime::ExecOptions,
953    ) -> Result<(i32, String, String)> {
954        // Forward to the resolved backend's `exec_with_opts` so Docker exec
955        // options (`--user`, `-w`, `-e`) reach the runtime that actually owns
956        // the container. Without this override the trait default would call
957        // `self.exec(opts.command)` and silently drop user/cwd/env.
958        let rt = self.lookup(id).await?;
959        rt.exec_with_opts(id, opts).await
960    }
961
962    async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
963        let rt = self.lookup(id).await?;
964        rt.exec_stream(id, cmd).await
965    }
966
967    async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
968        let backends = self.read_backends(id).await?;
969        let mut misses = ReadMissAccumulator::default();
970        for (label, rt) in backends {
971            match rt.get_container_stats(id).await {
972                Ok(stats) => return Ok(stats),
973                Err(e) => {
974                    tracing::warn!(
975                        container = %id,
976                        runtime = label,
977                        error = %e,
978                        "composite get_container_stats: backend could not serve stats; \
979                         trying next backend",
980                    );
981                    misses.record(e);
982                }
983            }
984        }
985        Err(misses.into_error("get_container_stats"))
986    }
987
988    async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
989        let rt = self.lookup(id).await?;
990        rt.wait_container(id).await
991    }
992
993    async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
994        let rt = self.lookup(id).await?;
995        rt.wait_outcome(id).await
996    }
997
998    async fn wait_outcome_with_condition(
999        &self,
1000        id: &ContainerId,
1001        condition: WaitCondition,
1002    ) -> Result<WaitOutcome> {
1003        let rt = self.lookup(id).await?;
1004        rt.wait_outcome_with_condition(id, condition).await
1005    }
1006
1007    async fn rename_container(&self, id: &ContainerId, new_name: &str) -> Result<()> {
1008        let rt = self.lookup(id).await?;
1009        rt.rename_container(id, new_name).await
1010    }
1011
1012    async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
1013        let backends = self.read_backends(id).await?;
1014        let mut misses = ReadMissAccumulator::default();
1015        for (label, rt) in backends {
1016            match rt.get_logs(id).await {
1017                Ok(logs) => return Ok(logs),
1018                Err(e) => {
1019                    tracing::warn!(
1020                        container = %id,
1021                        runtime = label,
1022                        error = %e,
1023                        "composite get_logs: backend could not serve logs; trying next backend",
1024                    );
1025                    misses.record(e);
1026                }
1027            }
1028        }
1029        Err(misses.into_error("get_logs"))
1030    }
1031
1032    async fn logs_stream(&self, id: &ContainerId, opts: LogsStreamOptions) -> Result<LogsStream> {
1033        // Route to the backend that actually created the container. The default
1034        // trait impl returns `Unsupported`, which surfaced as a swallowed 500 on
1035        // `GET /containers/{id}/logs` whenever the owning backend did not
1036        // implement streaming (e.g. the macOS `SandboxRuntime` primary, which
1037        // implements `container_logs` but not `logs_stream`).
1038        //
1039        // Try each backend's `logs_stream` (owner first); on a soft miss
1040        // (`Unsupported`/error that is not `NotFound`) fall back to the same
1041        // backend's non-streaming `container_logs` and SYNTHESISE a one-shot
1042        // stream from it. Only a genuine `NotFound` propagates (→ 404).
1043        let backends = self.read_backends(id).await?;
1044        let mut misses = ReadMissAccumulator::default();
1045        for (label, rt) in &backends {
1046            match rt.logs_stream(id, opts.clone()).await {
1047                Ok(stream) => return Ok(stream),
1048                Err(e) => {
1049                    tracing::warn!(
1050                        container = %id,
1051                        runtime = label,
1052                        error = %e,
1053                        "composite logs_stream: backend has no native log stream; \
1054                         falling back to a one-shot snapshot",
1055                    );
1056                    misses.record(e);
1057                }
1058            }
1059        }
1060
1061        // No backend offered a native stream. Synthesise one from whichever
1062        // backend can produce a captured-log snapshot (`container_logs`).
1063        let tail = opts
1064            .tail
1065            .map_or(1000, |n| usize::try_from(n).unwrap_or(1000));
1066        for (label, rt) in &backends {
1067            match rt.container_logs(id, tail).await {
1068                Ok(entries) => {
1069                    return Ok(one_shot_logs_stream(entries, &opts));
1070                }
1071                Err(e) => {
1072                    tracing::warn!(
1073                        container = %id,
1074                        runtime = label,
1075                        error = %e,
1076                        "composite logs_stream: backend snapshot fallback failed; trying next",
1077                    );
1078                    misses.record(e);
1079                }
1080            }
1081        }
1082        Err(misses.into_error("container logs"))
1083    }
1084
1085    async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
1086        // Same rationale as `logs_stream`: forward to the owning backend so
1087        // `GET /containers/{id}/stats` reaches the runtime that ran the
1088        // container instead of hitting the `Unsupported` default (→ swallowed
1089        // 500). On a soft miss, fall back to the non-streaming
1090        // `get_container_stats` and synthesise a one-shot sample.
1091        let backends = self.read_backends(id).await?;
1092        let mut misses = ReadMissAccumulator::default();
1093        for (label, rt) in &backends {
1094            match rt.stats_stream(id).await {
1095                Ok(stream) => return Ok(stream),
1096                Err(e) => {
1097                    tracing::warn!(
1098                        container = %id,
1099                        runtime = label,
1100                        error = %e,
1101                        "composite stats_stream: backend has no native stats stream; \
1102                         falling back to a one-shot sample",
1103                    );
1104                    misses.record(e);
1105                }
1106            }
1107        }
1108
1109        for (label, rt) in &backends {
1110            match rt.get_container_stats(id).await {
1111                Ok(stats) => return Ok(one_shot_stats_stream(&stats)),
1112                Err(e) => {
1113                    tracing::warn!(
1114                        container = %id,
1115                        runtime = label,
1116                        error = %e,
1117                        "composite stats_stream: backend sample fallback failed; trying next",
1118                    );
1119                    misses.record(e);
1120                }
1121            }
1122        }
1123        Err(misses.into_error("container stats"))
1124    }
1125
1126    async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
1127        let rt = self.lookup(id).await?;
1128        rt.get_container_pid(id).await
1129    }
1130
1131    fn overlay_attach_kind(&self) -> OverlayAttachKind {
1132        // Linux workloads on macOS execute in the VZ-Linux delegate, which joins
1133        // the overlay from inside the guest (`InGuestVsock`). Defer to it when
1134        // present so the service layer takes the guest-managed attach path and
1135        // calls `push_overlay_config` (routed per-container below); otherwise use
1136        // the primary's kind. Non-VZ containers route to a runtime whose
1137        // `push_overlay_config` is unsupported and degrade gracefully.
1138        self.vz_linux.as_ref().map_or_else(
1139            || self.primary.overlay_attach_kind(),
1140            |vz| vz.overlay_attach_kind(),
1141        )
1142    }
1143
1144    async fn push_overlay_config(
1145        &self,
1146        id: &ContainerId,
1147        config: &zlayer_types::overlayd::GuestOverlayConfig,
1148    ) -> Result<()> {
1149        let rt = self.lookup(id).await?;
1150        rt.push_overlay_config(id, config).await
1151    }
1152
1153    async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
1154        let rt = self.lookup(id).await?;
1155        rt.get_container_ip(id).await
1156    }
1157
1158    async fn get_container_port_override(&self, id: &ContainerId) -> Result<Option<u16>> {
1159        let rt = self.lookup(id).await?;
1160        rt.get_container_port_override(id).await
1161    }
1162
1163    #[cfg(target_os = "windows")]
1164    async fn get_container_namespace_id(
1165        &self,
1166        id: &ContainerId,
1167    ) -> Result<Option<windows::core::GUID>> {
1168        let rt = self.lookup(id).await?;
1169        rt.get_container_namespace_id(id).await
1170    }
1171
1172    async fn sync_container_volumes(&self, id: &ContainerId) -> Result<()> {
1173        let rt = self.lookup(id).await?;
1174        rt.sync_container_volumes(id).await
1175    }
1176
1177    async fn list_images(&self) -> Result<Vec<ImageInfo>> {
1178        // Fan out over every configured runtime and merge their image lists.
1179        // Crucially, a *single* backend's failure must not fail the whole
1180        // call: on macOS the `primary` (SandboxRuntime) does not implement
1181        // `list_images` at all (it returns `Unsupported`), yet pulled Linux
1182        // images live in the `vz_linux` delegate's store. Propagating the
1183        // primary's error via `?` here used to surface as a 500 on
1184        // `GET /images/json` (and, via the inspect fallback, broke every
1185        // `docker pull` verification). Tolerate per-backend errors the same
1186        // way we already tolerate the delegate's, and include the VZ +
1187        // VZ-Linux delegates so their images are actually listable.
1188        let mut out: Vec<ImageInfo> = Vec::new();
1189        let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1190        let mut any_ok = false;
1191        let mut last_err: Option<AgentError> = None;
1192
1193        for (label, rt) in [
1194            Some(("primary", &self.primary)),
1195            self.delegate.as_ref().map(|d| ("delegate", d)),
1196            self.vz.as_ref().map(|d| ("vz", d)),
1197            self.vz_linux.as_ref().map(|d| ("vz_linux", d)),
1198        ]
1199        .into_iter()
1200        .flatten()
1201        {
1202            match rt.list_images().await {
1203                Ok(images) => {
1204                    any_ok = true;
1205                    for img in images {
1206                        // De-dup by reference so an image registered in more
1207                        // than one backend isn't reported twice.
1208                        if seen.insert(img.reference.clone()) {
1209                            out.push(img);
1210                        }
1211                    }
1212                }
1213                Err(e) => {
1214                    tracing::debug!(
1215                        runtime = label,
1216                        error = %e,
1217                        "composite list_images: backend returned an error; skipping it",
1218                    );
1219                    last_err = Some(e);
1220                }
1221            }
1222        }
1223
1224        // Only fail if *every* backend errored. With at least one success we
1225        // return the merged (possibly empty) list — an empty image set is a
1226        // valid response, not an error.
1227        if any_ok {
1228            Ok(out)
1229        } else {
1230            Err(last_err.unwrap_or_else(|| {
1231                AgentError::Unsupported("no runtime implements list_images".into())
1232            }))
1233        }
1234    }
1235
1236    async fn remove_image(&self, image: &str, force: bool) -> Result<()> {
1237        match self.primary.remove_image(image, force).await {
1238            Ok(()) => Ok(()),
1239            Err(primary_err) => {
1240                if let Some(delegate) = &self.delegate {
1241                    match delegate.remove_image(image, force).await {
1242                        Ok(()) => Ok(()),
1243                        Err(delegate_err) => {
1244                            tracing::debug!(
1245                                image,
1246                                %delegate_err,
1247                                "delegate remove_image also failed; returning primary error",
1248                            );
1249                            Err(primary_err)
1250                        }
1251                    }
1252                } else {
1253                    Err(primary_err)
1254                }
1255            }
1256        }
1257    }
1258
1259    async fn prune_images(&self) -> Result<PruneResult> {
1260        // Symmetric with `remove_image` / `tag_image`: a primary that does not
1261        // implement pruning (e.g. a cache-less backend that returns
1262        // `Unsupported`) must not 501 the whole call when a delegate exists and
1263        // could still reclaim space. Only a primary `Unsupported` is tolerated;
1264        // any other primary error still propagates.
1265        let mut result = match self.primary.prune_images().await {
1266            Ok(r) => r,
1267            Err(AgentError::Unsupported(reason)) if self.delegate.is_some() => {
1268                tracing::debug!(
1269                    %reason,
1270                    "primary runtime does not support prune_images; relying on delegate",
1271                );
1272                PruneResult::default()
1273            }
1274            Err(e) => return Err(e),
1275        };
1276        if let Some(delegate) = &self.delegate {
1277            match delegate.prune_images().await {
1278                Ok(extra) => {
1279                    result.deleted.extend(extra.deleted);
1280                    result.space_reclaimed =
1281                        result.space_reclaimed.saturating_add(extra.space_reclaimed);
1282                }
1283                Err(e) => tracing::warn!(
1284                    error = %e,
1285                    "delegate runtime prune_images failed; returning primary result only",
1286                ),
1287            }
1288        }
1289        Ok(result)
1290    }
1291
1292    async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
1293        let rt = self.lookup(id).await?;
1294        rt.kill_container(id, signal).await
1295    }
1296
1297    async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
1298        match self.primary.tag_image(source, target).await {
1299            Ok(()) => Ok(()),
1300            Err(primary_err) => {
1301                if let Some(delegate) = &self.delegate {
1302                    match delegate.tag_image(source, target).await {
1303                        Ok(()) => Ok(()),
1304                        Err(delegate_err) => {
1305                            tracing::debug!(
1306                                source,
1307                                target,
1308                                %delegate_err,
1309                                "delegate tag_image also failed; returning primary error",
1310                            );
1311                            Err(primary_err)
1312                        }
1313                    }
1314                } else {
1315                    Err(primary_err)
1316                }
1317            }
1318        }
1319    }
1320
1321    async fn inspect_detailed(&self, id: &ContainerId) -> Result<ContainerInspectDetails> {
1322        let rt = self.lookup(id).await?;
1323        rt.inspect_detailed(id).await
1324    }
1325}
1326
1327#[cfg(test)]
1328mod tests {
1329    use super::*;
1330    use crate::cgroups_stats::ContainerStats;
1331    use std::sync::Mutex as StdMutex;
1332    use zlayer_spec::{ArchKind, DeploymentSpec, TargetPlatform};
1333
1334    /// Which runtime a mock represents. Only used for labelling invocation
1335    /// records in tests.
1336    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1337    enum Role {
1338        Primary,
1339        Delegate,
1340        Vz,
1341        VzLinux,
1342    }
1343
1344    /// One recorded invocation: (runtime role, method name, container id).
1345    type CallRecord = (Role, String, Option<ContainerId>);
1346    /// Shared, thread-safe log of every mock call made in a single test.
1347    type CallLog = Arc<StdMutex<Vec<CallRecord>>>;
1348
1349    /// Mock runtime that records every method call it receives.
1350    ///
1351    /// This is intentionally minimal — just enough trait surface to exercise
1352    /// the composite's dispatch logic. Every recorded call includes the role
1353    /// (primary vs delegate), the method name, and the container id (or
1354    /// `None` for cross-cutting image operations).
1355    struct MockRuntime {
1356        role: Role,
1357        calls: CallLog,
1358        list_images_response: Vec<ImageInfo>,
1359        /// When set, `list_images` returns `AgentError::Unsupported(msg)`
1360        /// instead of `list_images_response`. Models a backend (e.g. the macOS
1361        /// `SandboxRuntime` primary) that does not implement image listing.
1362        list_images_error: Option<String>,
1363        pull_image_error: Option<String>,
1364        /// When set, both `pull_image` and `pull_image_with_policy` return a
1365        /// freshly-built [`AgentError::WrongPlatform`] using these fields
1366        /// (`expected`, `actual`). Takes precedence over `pull_image_error`
1367        /// so tests can simulate a wrong-platform soft skip end-to-end.
1368        pull_image_wrong_platform: Option<(&'static str, &'static str)>,
1369        /// When `true`, the *streaming* reads (`logs_stream` / `stats_stream`)
1370        /// return `AgentError::Unsupported`, modelling a backend (e.g. the macOS
1371        /// `SandboxRuntime` primary) that implements the snapshot reads
1372        /// (`container_logs` / `get_container_stats`) but not the streaming
1373        /// ones — exactly the case that used to surface as a swallowed 500.
1374        stream_unsupported: bool,
1375        /// When `true`, *every* per-container read (`container_logs`,
1376        /// `get_logs`, `get_container_stats`, `logs_stream`, `stats_stream`)
1377        /// returns `AgentError::NotFound`, modelling a backend that does not own
1378        /// the container at all. The composite must NOT mask this as success,
1379        /// and a genuine all-not-found must propagate as `NotFound` (404).
1380        reads_not_found: bool,
1381        /// Captured-log snapshot returned by `container_logs` / `get_logs`
1382        /// (unless `reads_not_found`). Lets a delegate model real workload
1383        /// output the composite's snapshot fallback should surface.
1384        logs_response: Vec<LogEntry>,
1385        /// When `true`, the snapshot `get_container_stats` returns
1386        /// `AgentError::Unsupported` (a soft miss), modelling a backend that
1387        /// owns the container but cannot report stats at all. Forces the
1388        /// composite to fall back to another backend.
1389        stats_snapshot_unsupported: bool,
1390        /// `prune_images` response. `None` models a backend that does not
1391        /// implement pruning (returns `AgentError::Unsupported`, like the trait
1392        /// default); `Some(result)` models a backend that prunes and reports
1393        /// the given [`PruneResult`].
1394        prune_images_response: Option<PruneResult>,
1395    }
1396
1397    impl MockRuntime {
1398        fn new(role: Role, calls: CallLog) -> Self {
1399            Self {
1400                role,
1401                calls,
1402                list_images_response: Vec::new(),
1403                list_images_error: None,
1404                pull_image_error: None,
1405                pull_image_wrong_platform: None,
1406                stream_unsupported: false,
1407                reads_not_found: false,
1408                logs_response: Vec::new(),
1409                stats_snapshot_unsupported: false,
1410                prune_images_response: None,
1411            }
1412        }
1413
1414        /// Streaming reads return `Unsupported`; snapshot reads still work.
1415        fn with_stream_unsupported(mut self) -> Self {
1416            self.stream_unsupported = true;
1417            self
1418        }
1419
1420        /// Every per-container read returns `NotFound`.
1421        fn with_reads_not_found(mut self) -> Self {
1422            self.reads_not_found = true;
1423            self
1424        }
1425
1426        /// Set the captured-log snapshot returned by the snapshot reads.
1427        fn with_logs(mut self, logs: Vec<LogEntry>) -> Self {
1428            self.logs_response = logs;
1429            self
1430        }
1431
1432        /// Snapshot `get_container_stats` returns `Unsupported` (a soft miss).
1433        fn with_stats_snapshot_unsupported(mut self) -> Self {
1434            self.stats_snapshot_unsupported = true;
1435            self
1436        }
1437
1438        /// `prune_images` succeeds and reports the given [`PruneResult`].
1439        fn with_prune_result(mut self, result: PruneResult) -> Self {
1440            self.prune_images_response = Some(result);
1441            self
1442        }
1443
1444        fn build_wrong_platform_error(&self, image: &str) -> Option<AgentError> {
1445            self.pull_image_wrong_platform
1446                .map(|(expected, actual)| AgentError::WrongPlatform {
1447                    runtime: match self.role {
1448                        Role::Primary => "primary-mock".to_string(),
1449                        Role::Delegate => "delegate-mock".to_string(),
1450                        Role::Vz => "vz-mock".to_string(),
1451                        Role::VzLinux => "vz-linux-mock".to_string(),
1452                    },
1453                    expected: expected.to_string(),
1454                    actual: actual.to_string(),
1455                    image: image.to_string(),
1456                })
1457        }
1458
1459        fn record(&self, method: &str, id: Option<&ContainerId>) {
1460            self.calls
1461                .lock()
1462                .expect("mock call-log mutex poisoned")
1463                .push((self.role, method.to_string(), id.cloned()));
1464        }
1465    }
1466
1467    #[async_trait]
1468    impl Runtime for MockRuntime {
1469        async fn pull_image(&self, image: &str) -> Result<()> {
1470            self.record("pull_image", None);
1471            if let Some(err) = self.build_wrong_platform_error(image) {
1472                return Err(err);
1473            }
1474            if let Some(msg) = &self.pull_image_error {
1475                return Err(AgentError::Internal(msg.clone()));
1476            }
1477            Ok(())
1478        }
1479
1480        async fn pull_image_with_policy(
1481            &self,
1482            image: &str,
1483            _policy: PullPolicy,
1484            _auth: Option<&RegistryAuth>,
1485            _source: zlayer_spec::SourcePolicy,
1486        ) -> Result<()> {
1487            self.record("pull_image_with_policy", None);
1488            if let Some(err) = self.build_wrong_platform_error(image) {
1489                return Err(err);
1490            }
1491            if let Some(msg) = &self.pull_image_error {
1492                return Err(AgentError::Internal(msg.clone()));
1493            }
1494            Ok(())
1495        }
1496
1497        async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
1498            self.record("create_container", Some(id));
1499            Ok(())
1500        }
1501
1502        async fn start_container(&self, id: &ContainerId) -> Result<()> {
1503            self.record("start_container", Some(id));
1504            Ok(())
1505        }
1506
1507        async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
1508            self.record("stop_container", Some(id));
1509            Ok(())
1510        }
1511
1512        async fn remove_container(&self, id: &ContainerId) -> Result<()> {
1513            self.record("remove_container", Some(id));
1514            Ok(())
1515        }
1516
1517        async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
1518            self.record("container_state", Some(id));
1519            Ok(ContainerState::Running)
1520        }
1521
1522        async fn container_logs(&self, id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
1523            self.record("container_logs", Some(id));
1524            if self.reads_not_found {
1525                return Err(mock_not_found());
1526            }
1527            Ok(self.logs_response.clone())
1528        }
1529
1530        async fn exec(&self, id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
1531            self.record("exec", Some(id));
1532            Ok((0, String::new(), String::new()))
1533        }
1534
1535        async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
1536            self.record("get_container_stats", Some(id));
1537            if self.reads_not_found {
1538                return Err(mock_not_found());
1539            }
1540            if self.stats_snapshot_unsupported {
1541                return Err(AgentError::Unsupported("mock has no snapshot stats".into()));
1542            }
1543            Ok(ContainerStats {
1544                cpu_usage_usec: 1_000,
1545                memory_bytes: 4096,
1546                memory_limit: 8192,
1547                timestamp: std::time::Instant::now(),
1548            })
1549        }
1550
1551        async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
1552            self.record("wait_container", Some(id));
1553            Ok(0)
1554        }
1555
1556        async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
1557            self.record("get_logs", Some(id));
1558            if self.reads_not_found {
1559                return Err(mock_not_found());
1560            }
1561            Ok(self.logs_response.clone())
1562        }
1563
1564        async fn logs_stream(
1565            &self,
1566            id: &ContainerId,
1567            _opts: LogsStreamOptions,
1568        ) -> Result<LogsStream> {
1569            self.record("logs_stream", Some(id));
1570            if self.reads_not_found {
1571                return Err(mock_not_found());
1572            }
1573            if self.stream_unsupported {
1574                return Err(AgentError::Unsupported("mock has no log stream".into()));
1575            }
1576            // A backend that owns a native stream replays its captured logs.
1577            Ok(one_shot_logs_stream(
1578                self.logs_response.clone(),
1579                &LogsStreamOptions::default(),
1580            ))
1581        }
1582
1583        async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
1584            use futures_util::stream;
1585            self.record("stats_stream", Some(id));
1586            if self.reads_not_found {
1587                return Err(mock_not_found());
1588            }
1589            if self.stream_unsupported {
1590                return Err(AgentError::Unsupported("mock has no stats stream".into()));
1591            }
1592            Ok(Box::pin(stream::iter(vec![Ok(StatsSample {
1593                cpu_total_ns: 0,
1594                cpu_system_ns: 0,
1595                online_cpus: 1,
1596                mem_used_bytes: 4096,
1597                mem_limit_bytes: 8192,
1598                net_rx_bytes: 0,
1599                net_tx_bytes: 0,
1600                blkio_read_bytes: 0,
1601                blkio_write_bytes: 0,
1602                pids_current: 0,
1603                pids_limit: None,
1604                timestamp: chrono::Utc::now(),
1605            })])))
1606        }
1607
1608        async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
1609            self.record("get_container_pid", Some(id));
1610            Ok(None)
1611        }
1612
1613        async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
1614            self.record("get_container_ip", Some(id));
1615            Ok(None)
1616        }
1617
1618        async fn list_images(&self) -> Result<Vec<ImageInfo>> {
1619            self.record("list_images", None);
1620            if let Some(msg) = &self.list_images_error {
1621                return Err(AgentError::Unsupported(msg.clone()));
1622            }
1623            Ok(self.list_images_response.clone())
1624        }
1625
1626        async fn prune_images(&self) -> Result<PruneResult> {
1627            self.record("prune_images", None);
1628            match &self.prune_images_response {
1629                Some(result) => Ok(result.clone()),
1630                None => Err(AgentError::Unsupported(
1631                    "mock runtime does not support prune_images".into(),
1632                )),
1633            }
1634        }
1635    }
1636
1637    /// Build a [`ServiceSpec`] (with the given image name) from the minimal
1638    /// inline YAML the existing runtime tests use, then optionally set a
1639    /// target platform on it.
1640    fn make_spec(image: &str, platform: Option<TargetPlatform>) -> ServiceSpec {
1641        let yaml = format!(
1642            r"
1643version: v1
1644deployment: test
1645services:
1646  test:
1647    rtype: service
1648    image:
1649      name: {image}
1650    endpoints:
1651      - name: http
1652        protocol: http
1653        port: 8080
1654"
1655        );
1656        let mut spec = serde_yaml::from_str::<DeploymentSpec>(&yaml)
1657            .expect("valid deployment yaml")
1658            .services
1659            .remove("test")
1660            .expect("service 'test' present");
1661        spec.platform = platform;
1662        spec
1663    }
1664
1665    fn cid(service: &str, replica: u32) -> ContainerId {
1666        ContainerId::new(service.to_string(), replica)
1667    }
1668
1669    fn make_composite(with_delegate: bool) -> (CompositeRuntime, CallLog) {
1670        let calls = Arc::new(StdMutex::new(Vec::new()));
1671        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
1672        let delegate = if with_delegate {
1673            Some(Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>)
1674        } else {
1675            None
1676        };
1677        (
1678            CompositeRuntime::new(primary as Arc<dyn Runtime>, delegate),
1679            calls,
1680        )
1681    }
1682
1683    fn role_for(calls: &[CallRecord], method: &str) -> Option<Role> {
1684        calls
1685            .iter()
1686            .find(|(_, m, _)| m == method)
1687            .map(|(role, _, _)| *role)
1688    }
1689
1690    /// The `NotFound` a `MockRuntime` returns when it does not own a container.
1691    fn mock_not_found() -> AgentError {
1692        AgentError::NotFound {
1693            container: "mock".to_string(),
1694            reason: "mock backend does not own this container".to_string(),
1695        }
1696    }
1697
1698    #[tokio::test]
1699    async fn dispatch_windows_spec_goes_to_primary() {
1700        let (rt, calls) = make_composite(true);
1701        let id = cid("win-svc", 0);
1702        let spec = make_spec(
1703            "mcr.microsoft.com/windows/nanoserver:ltsc2022",
1704            Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
1705        );
1706
1707        rt.create_container(&id, &spec).await.unwrap();
1708        rt.start_container(&id).await.unwrap();
1709
1710        let calls = calls.lock().unwrap();
1711        assert_eq!(
1712            role_for(&calls, "create_container"),
1713            Some(Role::Primary),
1714            "create_container should hit primary for Windows spec"
1715        );
1716        assert_eq!(
1717            role_for(&calls, "start_container"),
1718            Some(Role::Primary),
1719            "start_container should hit primary for Windows spec"
1720        );
1721    }
1722
1723    #[tokio::test]
1724    async fn dispatch_linux_spec_goes_to_delegate() {
1725        let (rt, calls) = make_composite(true);
1726        let id = cid("lin-svc", 0);
1727        let spec = make_spec(
1728            "docker.io/library/alpine:3.19",
1729            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
1730        );
1731
1732        rt.create_container(&id, &spec).await.unwrap();
1733        rt.start_container(&id).await.unwrap();
1734
1735        let calls = calls.lock().unwrap();
1736        assert_eq!(
1737            role_for(&calls, "create_container"),
1738            Some(Role::Delegate),
1739            "create_container should hit delegate for Linux spec"
1740        );
1741        assert_eq!(
1742            role_for(&calls, "start_container"),
1743            Some(Role::Delegate),
1744            "start_container should hit delegate for Linux spec"
1745        );
1746    }
1747
1748    #[tokio::test]
1749    async fn dispatch_linux_without_delegate_errors() {
1750        // H-7 policy: a Linux spec on a node without a delegate must return
1751        // `RouteToPeer` (not `Unsupported`, not a silent primary fall-through)
1752        // so the scheduler can re-place the workload on a capable peer.
1753        let (rt, _calls) = make_composite(false);
1754        let id = cid("lin-svc", 0);
1755        let spec = make_spec(
1756            "docker.io/library/alpine:3.19",
1757            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
1758        );
1759
1760        let err = rt.create_container(&id, &spec).await.unwrap_err();
1761        match err {
1762            AgentError::RouteToPeer {
1763                service,
1764                required_os,
1765                reason,
1766            } => {
1767                assert_eq!(service, "lin-svc");
1768                assert_eq!(required_os, "linux");
1769                assert!(
1770                    reason.contains("--install-wsl") && reason.contains("Linux peer"),
1771                    "reason must name both remediations, got: {reason}"
1772                );
1773            }
1774            other => panic!("expected RouteToPeer, got {other:?}"),
1775        }
1776    }
1777
1778    #[tokio::test]
1779    async fn dispatch_linux_image_cache_without_delegate_routes_to_peer() {
1780        // H-7 policy: even when `spec.platform` is unset, a Linux image in the
1781        // OS cache must route to a peer instead of falling through to primary.
1782        // This is the old permissive-fallthrough path the comment at lines
1783        // 172-178 used to describe; the behavior is now strict.
1784        let (rt, _calls) = make_composite(false);
1785        let id = cid("svc", 0);
1786        let image = "docker.io/library/nginx:1.25";
1787        rt.record_image_os(image, OsKind::Linux).await;
1788
1789        let spec = make_spec(image, None);
1790        let err = rt.create_container(&id, &spec).await.unwrap_err();
1791        match err {
1792            AgentError::RouteToPeer {
1793                service,
1794                required_os,
1795                reason,
1796            } => {
1797                assert_eq!(service, "svc");
1798                assert_eq!(required_os, "linux");
1799                assert!(
1800                    reason.contains(image),
1801                    "reason should mention the image name, got: {reason}"
1802                );
1803                assert!(
1804                    reason.contains("--install-wsl") && reason.contains("Linux peer"),
1805                    "reason must name both remediations, got: {reason}"
1806                );
1807            }
1808            other => panic!("expected RouteToPeer, got {other:?}"),
1809        }
1810    }
1811
1812    #[tokio::test]
1813    async fn dispatch_macos_spec_goes_to_primary() {
1814        let (rt, calls) = make_composite(true);
1815        let id = cid("mac-svc", 0);
1816        let spec = make_spec(
1817            "ghcr.io/zlayer/macos:latest",
1818            Some(TargetPlatform::new(OsKind::Macos, ArchKind::Arm64)),
1819        );
1820
1821        rt.create_container(&id, &spec).await.unwrap();
1822
1823        let calls = calls.lock().unwrap();
1824        assert_eq!(
1825            role_for(&calls, "create_container"),
1826            Some(Role::Primary),
1827            "create_container should hit primary for Macos spec"
1828        );
1829    }
1830
1831    #[tokio::test]
1832    async fn dispatch_no_platform_no_image_os_falls_through_to_primary() {
1833        let (rt, calls) = make_composite(true);
1834        let id = cid("svc", 0);
1835        let spec = make_spec("docker.io/library/nginx:1.25", None);
1836
1837        rt.create_container(&id, &spec).await.unwrap();
1838
1839        let calls = calls.lock().unwrap();
1840        assert_eq!(
1841            role_for(&calls, "create_container"),
1842            Some(Role::Primary),
1843            "fall-through should pick primary when both platform and image-OS cache are unknown"
1844        );
1845    }
1846
1847    #[tokio::test]
1848    async fn dispatch_uses_image_os_cache_when_platform_missing() {
1849        let (rt, calls) = make_composite(true);
1850        let id = cid("svc", 0);
1851        let image = "docker.io/library/nginx:1.25";
1852        rt.record_image_os(image, OsKind::Linux).await;
1853
1854        let spec = make_spec(image, None);
1855        rt.create_container(&id, &spec).await.unwrap();
1856
1857        let calls = calls.lock().unwrap();
1858        assert_eq!(
1859            role_for(&calls, "create_container"),
1860            Some(Role::Delegate),
1861            "image-OS cache should route Linux images to the delegate"
1862        );
1863    }
1864
1865    /// Composite with primary + delegate + an attached VZ delegate, all sharing
1866    /// one call log.
1867    fn make_composite_with_vz() -> (CompositeRuntime, CallLog) {
1868        let calls = Arc::new(StdMutex::new(Vec::new()));
1869        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
1870        let delegate =
1871            Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>;
1872        let vz = Arc::new(MockRuntime::new(Role::Vz, Arc::clone(&calls))) as Arc<dyn Runtime>;
1873        let rt = CompositeRuntime::new(primary as Arc<dyn Runtime>, Some(delegate))
1874            .with_vz_delegate(Some(vz));
1875        (rt, calls)
1876    }
1877
1878    #[tokio::test]
1879    async fn dispatch_vz_bundle_annotation_auto_routes_to_vz() {
1880        let (rt, calls) = make_composite_with_vz();
1881        let id = cid("mac-svc", 0);
1882        let image = "ghcr.io/org/macos-vz:sequoia";
1883        // Simulate the manifest inspection having cached `com.zlayer.runtime=vz`.
1884        rt.record_image_runtime(image, "vz".to_string()).await;
1885
1886        let spec = make_spec(image, None);
1887        rt.create_container(&id, &spec).await.unwrap();
1888
1889        let calls = calls.lock().unwrap();
1890        assert_eq!(
1891            role_for(&calls, "create_container"),
1892            Some(Role::Vz),
1893            "a com.zlayer.runtime=vz bundle should auto-route to the VZ runtime"
1894        );
1895    }
1896
1897    #[tokio::test]
1898    async fn dispatch_vz_label_forces_vz() {
1899        let (rt, calls) = make_composite_with_vz();
1900        let id = cid("mac-svc", 0);
1901        let mut spec = make_spec("ghcr.io/org/whatever:1", None);
1902        spec.labels
1903            .insert("com.zlayer.isolation".to_string(), "vz".to_string());
1904
1905        rt.create_container(&id, &spec).await.unwrap();
1906
1907        let calls = calls.lock().unwrap();
1908        assert_eq!(
1909            role_for(&calls, "create_container"),
1910            Some(Role::Vz),
1911            "an explicit com.zlayer.isolation=vz label should force the VZ runtime"
1912        );
1913    }
1914
1915    #[tokio::test]
1916    async fn dispatch_sandbox_label_overrides_vz_bundle() {
1917        let (rt, calls) = make_composite_with_vz();
1918        let id = cid("mac-svc", 0);
1919        let image = "ghcr.io/org/macos-vz:sequoia";
1920        rt.record_image_runtime(image, "vz".to_string()).await;
1921
1922        let mut spec = make_spec(image, None);
1923        spec.labels
1924            .insert("com.zlayer.isolation".to_string(), "sandbox".to_string());
1925        rt.create_container(&id, &spec).await.unwrap();
1926
1927        let calls = calls.lock().unwrap();
1928        assert_eq!(
1929            role_for(&calls, "create_container"),
1930            Some(Role::Primary),
1931            "com.zlayer.isolation=sandbox should opt out of VZ auto-detect (force the sandbox)"
1932        );
1933    }
1934
1935    /// Composite with primary + delegate (libkrun) + a VZ Linux-guest delegate,
1936    /// all sharing one call log. Mirrors `make_composite_with_vz`.
1937    fn make_composite_with_vz_linux() -> (CompositeRuntime, CallLog) {
1938        let calls = Arc::new(StdMutex::new(Vec::new()));
1939        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
1940        let delegate =
1941            Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>;
1942        let vz_linux =
1943            Arc::new(MockRuntime::new(Role::VzLinux, Arc::clone(&calls))) as Arc<dyn Runtime>;
1944        let rt = CompositeRuntime::new(primary as Arc<dyn Runtime>, Some(delegate))
1945            .with_vz_linux_delegate(Some(vz_linux));
1946        (rt, calls)
1947    }
1948
1949    #[tokio::test]
1950    async fn dispatch_vz_linux_label_forces_vz_linux() {
1951        let (rt, calls) = make_composite_with_vz_linux();
1952        let id = cid("lin-svc", 0);
1953        let mut spec = make_spec("docker.io/library/alpine:3.19", None);
1954        spec.labels
1955            .insert("com.zlayer.isolation".to_string(), "vz-linux".to_string());
1956
1957        rt.create_container(&id, &spec).await.unwrap();
1958
1959        let calls = calls.lock().unwrap();
1960        assert_eq!(
1961            role_for(&calls, "create_container"),
1962            Some(Role::VzLinux),
1963            "com.zlayer.isolation=vz-linux must force the VZ Linux runtime"
1964        );
1965    }
1966
1967    #[tokio::test]
1968    async fn dispatch_vz_linux_marker_auto_routes_to_vz_linux() {
1969        let (rt, calls) = make_composite_with_vz_linux();
1970        let id = cid("lin-svc", 0);
1971        let image = "ghcr.io/org/linux-vz:bookworm";
1972        rt.record_image_runtime(image, "vz-linux".to_string()).await;
1973
1974        let spec = make_spec(image, None);
1975        rt.create_container(&id, &spec).await.unwrap();
1976
1977        let calls = calls.lock().unwrap();
1978        assert_eq!(
1979            role_for(&calls, "create_container"),
1980            Some(Role::VzLinux),
1981            "a com.zlayer.runtime=vz-linux marker should auto-route to the VZ Linux runtime"
1982        );
1983    }
1984
1985    #[tokio::test]
1986    async fn dispatch_linux_platform_with_vz_linux_routes_to_vz_linux() {
1987        let (rt, calls) = make_composite_with_vz_linux();
1988        let id = cid("lin-svc", 0);
1989        // platform.os = linux: with a VZ Linux delegate present this is the
1990        // default Linux path, NOT the libkrun delegate.
1991        let spec = make_spec(
1992            "docker.io/library/alpine:3.19",
1993            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Arm64)),
1994        );
1995
1996        rt.create_container(&id, &spec).await.unwrap();
1997
1998        let calls = calls.lock().unwrap();
1999        assert_eq!(
2000            role_for(&calls, "create_container"),
2001            Some(Role::VzLinux),
2002            "a Linux platform spec must default to the VZ Linux runtime when present"
2003        );
2004    }
2005
2006    #[tokio::test]
2007    async fn dispatch_linux_image_os_with_vz_linux_routes_to_vz_linux() {
2008        let (rt, calls) = make_composite_with_vz_linux();
2009        let id = cid("lin-svc", 0);
2010        let image = "docker.io/library/nginx:1.25";
2011        rt.record_image_os(image, OsKind::Linux).await;
2012
2013        let spec = make_spec(image, None);
2014        rt.create_container(&id, &spec).await.unwrap();
2015
2016        let calls = calls.lock().unwrap();
2017        assert_eq!(
2018            role_for(&calls, "create_container"),
2019            Some(Role::VzLinux),
2020            "a Linux image-OS cache hit must default to the VZ Linux runtime when present"
2021        );
2022    }
2023
2024    #[tokio::test]
2025    async fn dispatch_macos_image_os_with_vz_linux_routes_to_primary() {
2026        // A macOS-native rootfs must NEVER go to the Linux VM. Even with a
2027        // VZ-Linux delegate present (the default Linux path), an image whose
2028        // locally-known OS is macOS routes to the primary (Seatbelt sandbox).
2029        let (rt, calls) = make_composite_with_vz_linux();
2030        let id = cid("mac-svc", 0);
2031        let image = "ghcr.io/zlayer/macos-native:latest";
2032        rt.record_image_os(image, OsKind::Macos).await;
2033
2034        let spec = make_spec(image, None);
2035        rt.create_container(&id, &spec).await.unwrap();
2036
2037        let calls = calls.lock().unwrap();
2038        assert_eq!(
2039            role_for(&calls, "create_container"),
2040            Some(Role::Primary),
2041            "image_os == Macos must route to primary even when VZ-Linux is the default",
2042        );
2043    }
2044
2045    #[tokio::test]
2046    async fn dispatch_unknown_os_with_vz_linux_defaults_to_vz_linux() {
2047        // OS genuinely unknown (no isolation label, no runtime marker, no
2048        // platform, no image-OS cache hit) on a macOS host with a VZ-Linux
2049        // delegate: default to VZ-Linux. Sending an unknown (overwhelmingly
2050        // Linux) image to the Seatbelt sandbox is the exit-127 failure this fix
2051        // exists to prevent.
2052        let (rt, calls) = make_composite_with_vz_linux();
2053        let id = cid("svc", 0);
2054        let spec = make_spec("docker.io/library/whatever:latest", None);
2055
2056        rt.create_container(&id, &spec).await.unwrap();
2057
2058        let calls = calls.lock().unwrap();
2059        assert_eq!(
2060            role_for(&calls, "create_container"),
2061            Some(Role::VzLinux),
2062            "an unknown-OS image must default to VZ-Linux when the delegate is present",
2063        );
2064    }
2065
2066    #[tokio::test]
2067    async fn dispatch_unknown_os_without_vz_linux_falls_through_to_primary() {
2068        // The unknown-OS default to VZ-Linux is keyed on the delegate's
2069        // presence (a proxy for "macOS host"). Without a VZ-Linux delegate the
2070        // historical primary fallthrough is preserved for non-macOS hosts.
2071        let (rt, calls) = make_composite(true);
2072        let id = cid("svc", 0);
2073        let spec = make_spec("docker.io/library/whatever:latest", None);
2074
2075        rt.create_container(&id, &spec).await.unwrap();
2076
2077        let calls = calls.lock().unwrap();
2078        assert_eq!(
2079            role_for(&calls, "create_container"),
2080            Some(Role::Primary),
2081            "without a VZ-Linux delegate an unknown-OS image keeps the primary fallthrough",
2082        );
2083    }
2084
2085    /// Seed a persistent blob cache at `path` with a manifest + config blob for
2086    /// `image` whose config declares `os = linux`, mirroring what a real
2087    /// VZ-Linux pull writes to `{data_dir}/vz/linux/images/blobs.redb`.
2088    async fn seed_persistent_linux_cache(path: &std::path::Path, image: &str) {
2089        seed_persistent_cache_with_os(path, image, "linux").await;
2090    }
2091
2092    /// Like [`seed_persistent_linux_cache`] but lets the test pick the config
2093    /// `os` value (e.g. `"darwin"` for a macOS-native bundle).
2094    async fn seed_persistent_cache_with_os(path: &std::path::Path, image: &str, os: &str) {
2095        let cache = zlayer_registry::CacheType::persistent_at(path)
2096            .build()
2097            .await
2098            .expect("open persistent blob cache");
2099
2100        let config_json = serde_json::json!({
2101            "architecture": "arm64",
2102            "os": os,
2103            "config": {},
2104        });
2105        let config_bytes = serde_json::to_vec(&config_json).unwrap();
2106        let config_digest = zlayer_registry::compute_digest(&config_bytes);
2107        cache.put(&config_digest, &config_bytes).await.unwrap();
2108
2109        let manifest = zlayer_registry::OciImageManifest {
2110            schema_version: 2,
2111            media_type: Some("application/vnd.oci.image.manifest.v1+json".to_string()),
2112            artifact_type: None,
2113            config: oci_client::manifest::OciDescriptor {
2114                media_type: "application/vnd.oci.image.config.v1+json".to_string(),
2115                digest: config_digest.clone(),
2116                size: i64::try_from(config_bytes.len()).unwrap(),
2117                urls: None,
2118                annotations: None,
2119            },
2120            layers: vec![],
2121            annotations: None,
2122            subject: None,
2123        };
2124        let manifest_bytes = serde_json::to_vec(&manifest).unwrap();
2125        let manifest_digest = zlayer_registry::compute_digest(&manifest_bytes);
2126        cache
2127            .put(&zlayer_registry::manifest_cache_key(image), &manifest_bytes)
2128            .await
2129            .unwrap();
2130        cache
2131            .put(
2132                &zlayer_registry::manifest_digest_cache_key(image),
2133                manifest_digest.as_bytes(),
2134            )
2135            .await
2136            .unwrap();
2137    }
2138
2139    /// End-to-end of the macOS rate-limit routing fix: a Linux image whose OS
2140    /// lives ONLY in the local persistent blob cache (no network) must be
2141    /// inspected at `pull_image` time and then routed to the VZ-Linux runtime
2142    /// by `select_for` — exactly the path that breaks under a Docker Hub 429
2143    /// when inspection goes to the wire.
2144    #[tokio::test]
2145    async fn pull_then_dispatch_resolves_linux_os_from_local_cache_routes_to_vz_linux() {
2146        let tmp = tempfile::tempdir().unwrap();
2147        let cache_path = tmp.path().join("blobs.redb");
2148        let image = "docker.io/library/alpine:latest";
2149        seed_persistent_linux_cache(&cache_path, image).await;
2150
2151        let (rt, calls) = make_composite_with_vz_linux();
2152        let rt = rt.with_os_inspect_cache_path(Some(cache_path));
2153
2154        // pull_image drives the real local-first OS inspection; no network.
2155        rt.pull_image(image).await.unwrap();
2156
2157        // The OS must now be cached as Linux purely from the local store.
2158        assert_eq!(
2159            rt.image_os.read().await.get(image).copied(),
2160            Some(OsKind::Linux),
2161            "pull_image must resolve Linux OS from the local persistent cache",
2162        );
2163
2164        // And select_for must route the (platform-less) spec to VZ-Linux.
2165        let id = cid("lin-svc", 0);
2166        let spec = make_spec(image, None);
2167        rt.create_container(&id, &spec).await.unwrap();
2168
2169        let calls = calls.lock().unwrap();
2170        assert_eq!(
2171            role_for(&calls, "create_container"),
2172            Some(Role::VzLinux),
2173            "a Linux image whose OS came from the local cache must route to VZ-Linux",
2174        );
2175    }
2176
2177    /// LIVE BUG #1, end-to-end: the cache is seeded under the QUALIFIED ref
2178    /// (`docker.io/library/alpine:latest`, as the pull writes it) but the spec —
2179    /// and therefore every `pull_image` / `inspect_image_os` / `select_for`
2180    /// lookup — uses the BARE `alpine:latest`. With the canonical manifest-key
2181    /// normalization, the bare-ref inspect hits the qualified-seeded cache with
2182    /// NO network call, so the Linux image still routes to VZ-Linux.
2183    #[tokio::test]
2184    async fn bare_ref_spec_resolves_os_from_qualified_seeded_cache_routes_to_vz_linux() {
2185        let tmp = tempfile::tempdir().unwrap();
2186        let cache_path = tmp.path().join("blobs.redb");
2187        // Seed under the QUALIFIED ref, exactly as a real pull persists it.
2188        seed_persistent_linux_cache(&cache_path, "docker.io/library/alpine:latest").await;
2189
2190        let (rt, calls) = make_composite_with_vz_linux();
2191        let rt = rt.with_os_inspect_cache_paths(vec![cache_path]);
2192
2193        // Everything below uses the BARE ref, exactly as the live daemon does
2194        // (`ImageRef::Display` yields the user-original string).
2195        let bare = "alpine:latest";
2196        rt.pull_image(bare).await.unwrap();
2197
2198        assert_eq!(
2199            rt.image_os.read().await.get(bare).copied(),
2200            Some(OsKind::Linux),
2201            "bare-ref inspect must resolve Linux from the qualified-seeded cache",
2202        );
2203
2204        let id = cid("lin-svc", 0);
2205        let spec = make_spec(bare, None);
2206        rt.create_container(&id, &spec).await.unwrap();
2207
2208        let calls = calls.lock().unwrap();
2209        assert_eq!(
2210            role_for(&calls, "create_container"),
2211            Some(Role::VzLinux),
2212            "bare-ref Linux image routes to VZ-Linux via the canonical-key cache hit",
2213        );
2214    }
2215
2216    /// LIVE BUG #2 / multi-cache fallback: the manifest+config live ONLY in the
2217    /// SECOND configured cache (the primary Sandbox store), because the
2218    /// VZ-Linux pull short-circuited under `IfNotPresent`. Inspection must probe
2219    /// the empty first cache (no network), then resolve from the second — still
2220    /// with NO network — and route to VZ-Linux.
2221    #[tokio::test]
2222    async fn os_resolves_from_second_cache_when_first_is_empty() {
2223        let tmp = tempfile::tempdir().unwrap();
2224        let empty_cache = tmp.path().join("vz-linux-blobs.redb");
2225        let primary_cache = tmp.path().join("primary-blobs.redb");
2226        // Create the first cache empty (so opening it succeeds but it misses).
2227        zlayer_registry::CacheType::persistent_at(&empty_cache)
2228            .build()
2229            .await
2230            .unwrap();
2231        // Only the SECOND cache has the image.
2232        seed_persistent_linux_cache(&primary_cache, "docker.io/library/alpine:latest").await;
2233
2234        let (rt, calls) = make_composite_with_vz_linux();
2235        let rt = rt.with_os_inspect_cache_paths(vec![empty_cache, primary_cache]);
2236
2237        let bare = "alpine:latest";
2238        rt.pull_image(bare).await.unwrap();
2239
2240        assert_eq!(
2241            rt.image_os.read().await.get(bare).copied(),
2242            Some(OsKind::Linux),
2243            "OS must resolve from the second cache after the first misses (no network)",
2244        );
2245
2246        let id = cid("lin-svc", 0);
2247        let spec = make_spec(bare, None);
2248        rt.create_container(&id, &spec).await.unwrap();
2249
2250        let calls = calls.lock().unwrap();
2251        assert_eq!(role_for(&calls, "create_container"), Some(Role::VzLinux),);
2252    }
2253
2254    /// The exact LIVE bug, simulated end-to-end: a `pull_image` whose network OS
2255    /// re-inspection WOULD 429 still leaves dispatch fully working, because the
2256    /// image's OS is resolved purely from the local persistent blob cache the
2257    /// runtime already populated during extract — with NO network call at all.
2258    ///
2259    /// We model the 429 by pointing `os_inspect_cache_paths` at a real seeded
2260    /// cache (so the local resolver succeeds) while using a synthetic
2261    /// `*.invalid` registry host: if the dispatch-population path ever reached
2262    /// the network it would fail to resolve, leaving the cache empty and routing
2263    /// the Linux image to the Seatbelt primary (exit 127). It must not — the
2264    /// local cache hit is authoritative and the image routes to VZ-Linux.
2265    #[tokio::test]
2266    async fn pull_with_network_429_still_dispatches_via_local_cache() {
2267        let tmp = tempfile::tempdir().unwrap();
2268        let cache_path = tmp.path().join("blobs.redb");
2269        // The image ref uses a host that cannot be resolved on the wire; only
2270        // the LOCAL cache knows its OS.
2271        let image = "registry.invalid.example/library/alpine:latest";
2272        seed_persistent_linux_cache(&cache_path, image).await;
2273
2274        let (rt, calls) = make_composite_with_vz_linux();
2275        let rt = rt.with_os_inspect_cache_path(Some(cache_path));
2276
2277        // `pull_image` drives the dispatch-population inspection. Even though a
2278        // real registry inspection of `*.invalid` would fail (our stand-in for a
2279        // 429), the local-only path resolves Linux and the call succeeds.
2280        rt.pull_image(image).await.unwrap();
2281        assert_eq!(
2282            rt.image_os.read().await.get(image).copied(),
2283            Some(OsKind::Linux),
2284            "OS must be resolved from the local cache with no network call",
2285        );
2286
2287        // And dispatch routes the Linux image to VZ-Linux, not the primary.
2288        let id = cid("lin-svc", 0);
2289        let spec = make_spec(image, None);
2290        rt.create_container(&id, &spec).await.unwrap();
2291
2292        let calls = calls.lock().unwrap();
2293        assert_eq!(
2294            role_for(&calls, "create_container"),
2295            Some(Role::VzLinux),
2296            "a would-be-429 pull must still route the cached Linux image to VZ-Linux",
2297        );
2298    }
2299
2300    /// Companion to the macOS-native dispatch guard, but driving the resolution
2301    /// through the real local-cache inspection at `pull_image` time: a bundle
2302    /// whose config declares `os = darwin` in the local cache must route to the
2303    /// primary, never the Linux VM.
2304    #[tokio::test]
2305    async fn pull_then_dispatch_resolves_macos_os_from_local_cache_routes_to_primary() {
2306        let tmp = tempfile::tempdir().unwrap();
2307        let cache_path = tmp.path().join("blobs.redb");
2308        let image = "ghcr.io/zlayer/macos-native:latest";
2309        seed_persistent_cache_with_os(&cache_path, image, "darwin").await;
2310
2311        let (rt, calls) = make_composite_with_vz_linux();
2312        let rt = rt.with_os_inspect_cache_path(Some(cache_path));
2313
2314        rt.pull_image(image).await.unwrap();
2315        assert_eq!(
2316            rt.image_os.read().await.get(image).copied(),
2317            Some(OsKind::Macos),
2318            "pull_image must resolve macOS OS from the local persistent cache",
2319        );
2320
2321        let id = cid("mac-svc", 0);
2322        let spec = make_spec(image, None);
2323        rt.create_container(&id, &spec).await.unwrap();
2324
2325        let calls = calls.lock().unwrap();
2326        assert_eq!(
2327            role_for(&calls, "create_container"),
2328            Some(Role::Primary),
2329            "a macOS-native rootfs must route to primary even with VZ-Linux as default",
2330        );
2331    }
2332
2333    #[tokio::test]
2334    async fn dispatch_vm_label_forces_libkrun_delegate() {
2335        let (rt, calls) = make_composite_with_vz_linux();
2336        let id = cid("lin-svc", 0);
2337        // Even with a VZ Linux delegate as the default, an explicit
2338        // `com.zlayer.isolation=vm` label forces the libkrun delegate.
2339        let mut spec = make_spec(
2340            "docker.io/library/alpine:3.19",
2341            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Arm64)),
2342        );
2343        spec.labels
2344            .insert("com.zlayer.isolation".to_string(), "vm".to_string());
2345
2346        rt.create_container(&id, &spec).await.unwrap();
2347
2348        let calls = calls.lock().unwrap();
2349        assert_eq!(
2350            role_for(&calls, "create_container"),
2351            Some(Role::Delegate),
2352            "com.zlayer.isolation=vm must force the libkrun delegate even when VZ Linux is default"
2353        );
2354    }
2355
2356    #[tokio::test]
2357    async fn dispatch_unmarked_image_with_vz_delegate_falls_through_to_primary() {
2358        let (rt, calls) = make_composite_with_vz();
2359        let id = cid("mac-svc", 0);
2360        // No runtime marker, no platform, no image-OS cache: VZ must NOT capture
2361        // ordinary images just because the delegate exists.
2362        let spec = make_spec("ghcr.io/org/plain:1", None);
2363        rt.create_container(&id, &spec).await.unwrap();
2364
2365        let calls = calls.lock().unwrap();
2366        assert_eq!(
2367            role_for(&calls, "create_container"),
2368            Some(Role::Primary),
2369            "an unmarked image must fall through to primary even when a VZ delegate is attached"
2370        );
2371    }
2372
2373    #[tokio::test]
2374    async fn per_container_dispatch_cache_persists_through_start_stop() {
2375        let (rt, calls) = make_composite(true);
2376        let id = cid("win-svc", 0);
2377        let spec = make_spec(
2378            "mcr.microsoft.com/windows/nanoserver:ltsc2022",
2379            Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
2380        );
2381
2382        rt.create_container(&id, &spec).await.unwrap();
2383        rt.start_container(&id).await.unwrap();
2384        rt.stop_container(&id, Duration::from_secs(1))
2385            .await
2386            .unwrap();
2387        rt.remove_container(&id).await.unwrap();
2388
2389        let recorded = calls.lock().unwrap().clone();
2390        for method in [
2391            "create_container",
2392            "start_container",
2393            "stop_container",
2394            "remove_container",
2395        ] {
2396            assert_eq!(
2397                role_for(&recorded, method),
2398                Some(Role::Primary),
2399                "{method} should have dispatched to primary"
2400            );
2401        }
2402
2403        // After remove, the dispatch cache entry should be gone.
2404        let after = rt
2405            .start_container(&id)
2406            .await
2407            .expect_err("lookup after remove should fail");
2408        assert!(
2409            matches!(after, AgentError::NotFound { .. }),
2410            "expected NotFound after remove, got {after:?}"
2411        );
2412    }
2413
2414    #[tokio::test]
2415    async fn pull_image_calls_both_runtimes() {
2416        let (rt, calls) = make_composite(true);
2417        rt.pull_image("docker.io/library/alpine:3.19")
2418            .await
2419            .unwrap();
2420
2421        let recorded = calls.lock().unwrap();
2422        let pull_calls: Vec<Role> = recorded
2423            .iter()
2424            .filter(|(_, m, _)| m == "pull_image")
2425            .map(|(r, _, _)| *r)
2426            .collect();
2427        assert!(
2428            pull_calls.contains(&Role::Primary),
2429            "primary should have been pulled: {pull_calls:?}",
2430        );
2431        assert!(
2432            pull_calls.contains(&Role::Delegate),
2433            "delegate should have been pulled: {pull_calls:?}",
2434        );
2435    }
2436
2437    #[tokio::test]
2438    async fn pull_image_delegate_error_does_not_fail() {
2439        // Build the composite by hand so we can flip the delegate's
2440        // pull_image_error before wrapping it in an Arc<dyn Runtime>.
2441        let calls = Arc::new(StdMutex::new(Vec::new()));
2442        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
2443        let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2444        delegate.pull_image_error = Some("simulated delegate pull failure".to_string());
2445        let rt = CompositeRuntime::new(
2446            primary as Arc<dyn Runtime>,
2447            Some(Arc::new(delegate) as Arc<dyn Runtime>),
2448        );
2449
2450        // Top-level call must succeed despite the delegate error.
2451        rt.pull_image("docker.io/library/alpine:3.19")
2452            .await
2453            .unwrap();
2454
2455        let recorded = calls.lock().unwrap();
2456        let pull_calls: Vec<Role> = recorded
2457            .iter()
2458            .filter(|(_, m, _)| m == "pull_image")
2459            .map(|(r, _, _)| *r)
2460            .collect();
2461        assert!(
2462            pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
2463            "both runtimes should have been called: {pull_calls:?}",
2464        );
2465    }
2466
2467    #[tokio::test]
2468    async fn pull_image_primary_wrong_platform_does_not_fail() {
2469        // The HCS runtime returns `AgentError::WrongPlatform` when the image's
2470        // OCI config reports a non-Windows OS (calling `ProcessBaseImage` on a
2471        // Linux base layer is guaranteed to fail with 0x80070003). The
2472        // composite must treat that as a soft skip and let the delegate's
2473        // pull own the image — the overall pull must NOT fail.
2474        let calls = Arc::new(StdMutex::new(Vec::new()));
2475        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2476        primary.pull_image_wrong_platform = Some(("windows", "linux"));
2477        let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2478        let rt = CompositeRuntime::new(
2479            Arc::new(primary) as Arc<dyn Runtime>,
2480            Some(Arc::new(delegate) as Arc<dyn Runtime>),
2481        );
2482
2483        // Top-level call must succeed despite the primary's wrong-platform err.
2484        rt.pull_image("docker.io/library/alpine:3.19")
2485            .await
2486            .expect("composite pull must tolerate WrongPlatform from primary");
2487
2488        let recorded = calls.lock().unwrap();
2489        let pull_calls: Vec<Role> = recorded
2490            .iter()
2491            .filter(|(_, m, _)| m == "pull_image")
2492            .map(|(r, _, _)| *r)
2493            .collect();
2494        assert!(
2495            pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
2496            "delegate must still be called when primary soft-skips: {pull_calls:?}",
2497        );
2498    }
2499
2500    #[tokio::test]
2501    async fn pull_image_with_policy_primary_wrong_platform_does_not_fail() {
2502        // Same contract as `pull_image_primary_wrong_platform_does_not_fail`
2503        // but exercising the `pull_image_with_policy` entry point. The
2504        // policy/auth path is what the daemon's create-container hot loop
2505        // actually invokes, so it has to honour the same soft-skip rule.
2506        let calls = Arc::new(StdMutex::new(Vec::new()));
2507        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2508        primary.pull_image_wrong_platform = Some(("windows", "linux"));
2509        let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2510        let rt = CompositeRuntime::new(
2511            Arc::new(primary) as Arc<dyn Runtime>,
2512            Some(Arc::new(delegate) as Arc<dyn Runtime>),
2513        );
2514
2515        rt.pull_image_with_policy(
2516            "docker.io/library/alpine:3.19",
2517            PullPolicy::IfNotPresent,
2518            None,
2519            zlayer_spec::SourcePolicy::default(),
2520        )
2521        .await
2522        .expect("composite pull_image_with_policy must tolerate WrongPlatform from primary");
2523
2524        let recorded = calls.lock().unwrap();
2525        let pull_calls: Vec<Role> = recorded
2526            .iter()
2527            .filter(|(_, m, _)| m == "pull_image_with_policy")
2528            .map(|(r, _, _)| *r)
2529            .collect();
2530        assert!(
2531            pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
2532            "delegate must still be called when primary soft-skips: {pull_calls:?}",
2533        );
2534    }
2535
2536    #[tokio::test]
2537    async fn pull_image_primary_non_wrong_platform_error_still_fails() {
2538        // Sanity check: only `WrongPlatform` is soft-skipped. Any other error
2539        // from the primary must still bubble up so real pull failures aren't
2540        // silently swallowed.
2541        let calls = Arc::new(StdMutex::new(Vec::new()));
2542        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2543        primary.pull_image_error = Some("simulated real failure".to_string());
2544        let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2545        let rt = CompositeRuntime::new(
2546            Arc::new(primary) as Arc<dyn Runtime>,
2547            Some(Arc::new(delegate) as Arc<dyn Runtime>),
2548        );
2549
2550        let err = rt
2551            .pull_image("docker.io/library/alpine:3.19")
2552            .await
2553            .expect_err("real primary error must propagate");
2554        assert!(
2555            matches!(err, AgentError::Internal(_)),
2556            "expected Internal, got {err:?}",
2557        );
2558    }
2559
2560    #[tokio::test]
2561    async fn list_images_merges_both() {
2562        // Hand-build so we can seed each mock's list_images_response.
2563        let calls = Arc::new(StdMutex::new(Vec::new()));
2564        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2565        primary.list_images_response = vec![ImageInfo {
2566            reference: "primary/image:1".to_string(),
2567            digest: None,
2568            size_bytes: None,
2569        }];
2570        let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2571        delegate.list_images_response = vec![ImageInfo {
2572            reference: "delegate/image:1".to_string(),
2573            digest: None,
2574            size_bytes: None,
2575        }];
2576        let rt = CompositeRuntime::new(
2577            Arc::new(primary) as Arc<dyn Runtime>,
2578            Some(Arc::new(delegate) as Arc<dyn Runtime>),
2579        );
2580
2581        let merged = rt.list_images().await.unwrap();
2582        let refs: Vec<&str> = merged.iter().map(|i| i.reference.as_str()).collect();
2583        assert!(
2584            refs.contains(&"primary/image:1") && refs.contains(&"delegate/image:1"),
2585            "merged list should contain both entries, got {refs:?}",
2586        );
2587    }
2588
2589    /// Regression (macOS `GET /images/json` 500): when the *primary* runtime
2590    /// does not implement `list_images` (the `SandboxRuntime` returns
2591    /// `Unsupported`), the composite must NOT propagate that error. It must
2592    /// fall back to the other backends — in particular the VZ-Linux delegate
2593    /// that actually owns pulled Linux images — and return their list. Before
2594    /// the fix the composite used `self.primary.list_images().await?`, which
2595    /// surfaced as a 500 and (via the inspect fallback) broke `docker pull`.
2596    #[tokio::test]
2597    async fn list_images_tolerates_primary_unsupported_and_uses_vz_linux() {
2598        let calls = Arc::new(StdMutex::new(Vec::new()));
2599        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2600        primary.list_images_error = Some("list_images is not supported".to_string());
2601        let mut vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2602        vz_linux.list_images_response = vec![ImageInfo {
2603            reference: "docker.io/library/alpine:latest".to_string(),
2604            digest: None,
2605            size_bytes: None,
2606        }];
2607
2608        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2609            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2610
2611        let images = rt
2612            .list_images()
2613            .await
2614            .expect("primary Unsupported must not fail the composite list_images");
2615        let refs: Vec<&str> = images.iter().map(|i| i.reference.as_str()).collect();
2616        assert_eq!(
2617            refs,
2618            vec!["docker.io/library/alpine:latest"],
2619            "should return the VZ-Linux delegate's images, got {refs:?}",
2620        );
2621    }
2622
2623    /// When EVERY backend fails `list_images`, the composite surfaces an error
2624    /// (rather than silently returning an empty list, which would mask a total
2625    /// backend outage).
2626    #[tokio::test]
2627    async fn list_images_errors_only_when_all_backends_fail() {
2628        let calls = Arc::new(StdMutex::new(Vec::new()));
2629        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2630        primary.list_images_error = Some("unsupported".to_string());
2631        let mut vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2632        vz_linux.list_images_error = Some("also unsupported".to_string());
2633
2634        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2635            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2636
2637        let err = rt.list_images().await.unwrap_err();
2638        assert!(
2639            matches!(err, AgentError::Unsupported(_)),
2640            "all-backends-fail should surface Unsupported, got {err:?}",
2641        );
2642    }
2643
2644    /// When the PRIMARY does not implement `prune_images` (returns
2645    /// `AgentError::Unsupported`) but a delegate does, the composite must
2646    /// tolerate the primary miss and return the delegate's result — symmetric
2647    /// with how `remove_image` / `tag_image` tolerate a primary failure when a
2648    /// delegate exists. A future cache-less primary must not 501 the whole call.
2649    #[tokio::test]
2650    async fn prune_images_tolerates_primary_unsupported_and_uses_delegate() {
2651        let calls = Arc::new(StdMutex::new(Vec::new()));
2652        // Primary leaves `prune_images_response` as `None` → returns Unsupported.
2653        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2654        let delegate =
2655            MockRuntime::new(Role::Delegate, Arc::clone(&calls)).with_prune_result(PruneResult {
2656                deleted: vec![
2657                    "docker.io/library/alpine:3.19".to_string(),
2658                    "docker.io/library/nginx:1.25".to_string(),
2659                ],
2660                space_reclaimed: 4096,
2661            });
2662
2663        let rt = CompositeRuntime::new(
2664            Arc::new(primary) as Arc<dyn Runtime>,
2665            Some(Arc::new(delegate) as Arc<dyn Runtime>),
2666        );
2667
2668        let result = rt
2669            .prune_images()
2670            .await
2671            .expect("primary Unsupported must not fail the composite prune_images");
2672        assert_eq!(
2673            result.deleted,
2674            vec![
2675                "docker.io/library/alpine:3.19".to_string(),
2676                "docker.io/library/nginx:1.25".to_string(),
2677            ],
2678            "should return the delegate's deleted images, got {:?}",
2679            result.deleted,
2680        );
2681        assert_eq!(
2682            result.space_reclaimed, 4096,
2683            "should return the delegate's reclaimed bytes",
2684        );
2685
2686        let calls = calls.lock().unwrap();
2687        assert_eq!(
2688            role_for(&calls, "prune_images"),
2689            Some(Role::Primary),
2690            "primary prune_images must still be attempted first",
2691        );
2692        assert!(
2693            calls
2694                .iter()
2695                .any(|(role, m, _)| *role == Role::Delegate && m == "prune_images"),
2696            "delegate prune_images must be invoked after the primary miss",
2697        );
2698    }
2699
2700    // ----------------------------------------------------------------------
2701    // Per-container read routing (logs / stats).
2702    //
2703    // These guard the macOS Docker-compat `/logs` and `/stats` 500 fix: when
2704    // the owning backend cannot serve a particular read (the primary
2705    // `SandboxRuntime` implements snapshot reads but returns `Unsupported` for
2706    // the *streaming* ones, or a different backend owns the container), the
2707    // composite must route to / fall back across backends and return real data
2708    // instead of propagating `Unsupported` as a swallowed 500. Only a genuine
2709    // all-not-found is a 404.
2710    // ----------------------------------------------------------------------
2711
2712    /// Build a `LogEntry` with the given stream + message for read tests.
2713    fn log_entry(stream: LogStream, message: &str) -> LogEntry {
2714        LogEntry {
2715            timestamp: chrono::Utc::now(),
2716            stream,
2717            source: zlayer_observability::logs::LogSource::Container("test".to_string()),
2718            message: message.to_string(),
2719            service: None,
2720            deployment: None,
2721        }
2722    }
2723
2724    /// Drain a `LogsStream` into the concatenated UTF-8 body bytes.
2725    async fn drain_logs(stream: LogsStream) -> String {
2726        use futures_util::StreamExt as _;
2727        let mut out = Vec::new();
2728        let mut s = stream;
2729        while let Some(item) = s.next().await {
2730            out.extend_from_slice(&item.expect("log chunk ok").bytes);
2731        }
2732        String::from_utf8(out).expect("utf8 log body")
2733    }
2734
2735    /// Collect a `StatsStream` into a Vec of samples.
2736    async fn drain_stats(stream: StatsStream) -> Vec<StatsSample> {
2737        use futures_util::StreamExt as _;
2738        let mut out = Vec::new();
2739        let mut s = stream;
2740        while let Some(item) = s.next().await {
2741            out.push(item.expect("stats sample ok"));
2742        }
2743        out
2744    }
2745
2746    /// Build a composite whose primary models the macOS `SandboxRuntime`
2747    /// (snapshot reads work, streaming reads return `Unsupported`) and whose
2748    /// VZ-Linux delegate owns the container with working native streams.
2749    /// Returns (composite, call-log) with a container already dispatched to the
2750    /// chosen owner.
2751    async fn make_read_composite(owner: Role) -> (CompositeRuntime, ContainerId, CallLog) {
2752        let calls = Arc::new(StdMutex::new(Vec::new()));
2753        let logs = vec![
2754            log_entry(LogStream::Stdout, "hello stdout"),
2755            log_entry(LogStream::Stderr, "hello stderr"),
2756        ];
2757        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls))
2758            .with_stream_unsupported()
2759            .with_logs(logs.clone());
2760        let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls)).with_logs(logs);
2761        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2762            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2763
2764        let id = cid("read-svc", 0);
2765        // Dispatch the container to the chosen owner without going through the
2766        // (platform-dependent) `select_for` path.
2767        let target = match owner {
2768            Role::Primary => DispatchTarget::Primary,
2769            Role::VzLinux => DispatchTarget::VzLinux,
2770            other => panic!("make_read_composite supports Primary/VzLinux, not {other:?}"),
2771        };
2772        rt.dispatch.write().await.insert(id.clone(), target);
2773        (rt, id, calls)
2774    }
2775
2776    #[tokio::test]
2777    async fn logs_stream_falls_back_to_snapshot_when_owner_has_no_stream() {
2778        // Sole backend = primary (SandboxRuntime model): `logs_stream` is
2779        // Unsupported, but `container_logs` works. With no other backend the
2780        // composite must synthesise a stream from the snapshot rather than 500.
2781        let calls = Arc::new(StdMutex::new(Vec::new()));
2782        let logs = vec![
2783            log_entry(LogStream::Stdout, "hello stdout"),
2784            log_entry(LogStream::Stderr, "hello stderr"),
2785        ];
2786        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls))
2787            .with_stream_unsupported()
2788            .with_logs(logs);
2789        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None);
2790        let id = cid("read-svc", 0);
2791        rt.dispatch
2792            .write()
2793            .await
2794            .insert(id.clone(), DispatchTarget::Primary);
2795
2796        let stream = rt
2797            .logs_stream(&id, LogsStreamOptions::default())
2798            .await
2799            .expect("logs_stream must not 500 when snapshot reads work");
2800        let body = drain_logs(stream).await;
2801        assert!(
2802            body.contains("hello stdout") && body.contains("hello stderr"),
2803            "synthesised stream must carry the captured logs, got: {body:?}",
2804        );
2805    }
2806
2807    #[tokio::test]
2808    async fn logs_stream_routes_to_delegate_owner_native_stream() {
2809        // Owner = VZ-Linux delegate with a working native stream; the primary's
2810        // streaming read is Unsupported but must not be consulted first.
2811        let (rt, id, calls) = make_read_composite(Role::VzLinux).await;
2812        let stream = rt
2813            .logs_stream(&id, LogsStreamOptions::default())
2814            .await
2815            .expect("delegate-owned logs_stream must succeed");
2816        let body = drain_logs(stream).await;
2817        assert!(body.contains("hello stdout"), "got: {body:?}");
2818
2819        let log = calls.lock().expect("call-log mutex poisoned");
2820        assert_eq!(
2821            role_for(&log, "logs_stream"),
2822            Some(Role::VzLinux),
2823            "logs_stream must hit the owning delegate first, calls: {log:?}",
2824        );
2825    }
2826
2827    #[tokio::test]
2828    async fn get_logs_falls_back_across_backends() {
2829        // Owner = primary; here snapshot `get_logs` works on primary directly,
2830        // so it should succeed on the owner without ever consulting the
2831        // delegate. (Soft-miss fallback is exercised by the stats test below.)
2832        let (rt, id, _calls) = make_read_composite(Role::Primary).await;
2833        let logs = rt.get_logs(&id).await.expect("get_logs must succeed");
2834        assert_eq!(logs.len(), 2, "owner snapshot logs should be returned");
2835    }
2836
2837    #[tokio::test]
2838    async fn stats_stream_falls_back_to_snapshot_when_owner_has_no_stream() {
2839        // Sole backend = primary (SandboxRuntime model): `stats_stream` is
2840        // Unsupported but `get_container_stats` works. With no other backend
2841        // offering a native stream, the composite must synthesise a single
2842        // non-empty sample from the snapshot rather than 500.
2843        let calls = Arc::new(StdMutex::new(Vec::new()));
2844        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_stream_unsupported();
2845        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None);
2846        let id = cid("read-svc", 0);
2847        rt.dispatch
2848            .write()
2849            .await
2850            .insert(id.clone(), DispatchTarget::Primary);
2851
2852        let stream = rt
2853            .stats_stream(&id)
2854            .await
2855            .expect("stats_stream must not 500 when get_container_stats works");
2856        let samples = drain_stats(stream).await;
2857        assert_eq!(samples.len(), 1, "snapshot fallback yields one sample");
2858        assert!(
2859            samples[0].mem_used_bytes > 0,
2860            "synthesised sample must carry non-zero memory, got {:?}",
2861            samples[0],
2862        );
2863        assert_eq!(
2864            samples[0].cpu_total_ns, 1_000_000,
2865            "cpu microseconds must be scaled to nanoseconds in the synthesised sample",
2866        );
2867    }
2868
2869    #[tokio::test]
2870    async fn get_container_stats_tolerates_owner_miss_and_uses_other_backend() {
2871        // Owner = primary whose snapshot `get_container_stats` returns
2872        // `Unsupported` (a soft miss); the delegate that follows in the fallback
2873        // chain serves it. The composite must NOT propagate the owner's
2874        // Unsupported as a 500.
2875        let calls = Arc::new(StdMutex::new(Vec::new()));
2876        let primary =
2877            MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_stats_snapshot_unsupported();
2878        let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2879        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2880            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2881        let id = cid("read-svc", 0);
2882        rt.dispatch
2883            .write()
2884            .await
2885            .insert(id.clone(), DispatchTarget::Primary);
2886
2887        let stats = rt
2888            .get_container_stats(&id)
2889            .await
2890            .expect("owner Unsupported must fall back to the delegate, not 500");
2891        assert!(stats.memory_bytes > 0, "delegate stats should be returned");
2892
2893        let log = calls.lock().expect("call-log mutex poisoned");
2894        assert!(
2895            log.iter()
2896                .any(|(role, method, _)| *role == Role::Primary && method == "get_container_stats"),
2897            "primary must have been tried first, calls: {log:?}",
2898        );
2899        assert!(
2900            log.iter()
2901                .any(|(role, method, _)| *role == Role::VzLinux && method == "get_container_stats"),
2902            "delegate must have served the fallback, calls: {log:?}",
2903        );
2904    }
2905
2906    #[tokio::test]
2907    async fn reads_propagate_not_found_when_no_backend_owns_container() {
2908        // Every backend returns NotFound for the dispatched container: the
2909        // composite must surface NotFound (→ 404), NOT mask it as Unsupported
2910        // or empty success.
2911        let calls = Arc::new(StdMutex::new(Vec::new()));
2912        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_reads_not_found();
2913        let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls)).with_reads_not_found();
2914        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2915            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2916        let id = cid("read-svc", 0);
2917        rt.dispatch
2918            .write()
2919            .await
2920            .insert(id.clone(), DispatchTarget::Primary);
2921
2922        // `LogsStream`/`StatsStream` are not `Debug`, so match instead of
2923        // `unwrap_err()`.
2924        match rt.logs_stream(&id, LogsStreamOptions::default()).await {
2925            Err(AgentError::NotFound { .. }) => {}
2926            other => panic!(
2927                "all-not-found logs_stream must be NotFound (404), got {:?}",
2928                other.err(),
2929            ),
2930        }
2931        match rt.stats_stream(&id).await {
2932            Err(AgentError::NotFound { .. }) => {}
2933            other => panic!(
2934                "all-not-found stats_stream must be NotFound (404), got {:?}",
2935                other.err(),
2936            ),
2937        }
2938        let cl_err = rt.container_logs(&id, 10).await.unwrap_err();
2939        assert!(
2940            matches!(cl_err, AgentError::NotFound { .. }),
2941            "all-not-found container_logs must be NotFound (404), got {cl_err:?}",
2942        );
2943    }
2944
2945    #[tokio::test]
2946    async fn reads_on_undispatched_container_are_not_found() {
2947        // No dispatch record at all → NotFound (the id was never created here).
2948        let (rt, _calls) = make_composite(false);
2949        let id = cid("ghost", 0);
2950        match rt.logs_stream(&id, LogsStreamOptions::default()).await {
2951            Err(AgentError::NotFound { .. }) => {}
2952            other => panic!(
2953                "undispatched logs_stream must be NotFound, got {:?}",
2954                other.err()
2955            ),
2956        }
2957    }
2958
2959    /// Regression: `pull_image` must fan out to the VZ-Linux delegate so the
2960    /// image lands in the store where Linux containers actually execute on
2961    /// macOS (and so it becomes listable/inspectable). Before the fix the
2962    /// composite only pulled into `primary` + `delegate`, leaving the
2963    /// VZ-Linux `image_rootfs` empty.
2964    #[tokio::test]
2965    async fn pull_image_fans_out_to_vz_linux() {
2966        let calls = Arc::new(StdMutex::new(Vec::new()));
2967        let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2968        let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2969
2970        let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2971            .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2972
2973        rt.pull_image("docker.io/library/alpine:latest")
2974            .await
2975            .expect("pull should succeed");
2976
2977        let log = calls.lock().expect("call-log mutex poisoned");
2978        assert!(
2979            log.iter()
2980                .any(|(role, method, _)| *role == Role::VzLinux && method == "pull_image"),
2981            "pull_image must reach the VZ-Linux delegate, recorded calls: {log:?}",
2982        );
2983    }
2984
2985    #[tokio::test]
2986    async fn dispatch_lookup_unknown_container_errors() {
2987        let (rt, _calls) = make_composite(true);
2988        let id = cid("ghost", 0);
2989
2990        let err = rt.start_container(&id).await.unwrap_err();
2991        assert!(
2992            matches!(err, AgentError::NotFound { .. }),
2993            "expected NotFound for unknown container, got {err:?}"
2994        );
2995    }
2996
2997    /// Helper: read the internal image-OS cache for test assertions.
2998    async fn cached_os(rt: &CompositeRuntime, image: &str) -> Option<OsKind> {
2999        rt.image_os.read().await.get(image).copied()
3000    }
3001
3002    #[tokio::test]
3003    async fn apply_image_os_inspection_populates_cache_on_ok_some() {
3004        // Contract: when `fetch_image_os` resolves to a recognized OS, the
3005        // cache is populated so subsequent `select_for` calls for specs
3006        // without `platform` dispatch correctly.
3007        let (rt, _calls) = make_composite(true);
3008        let image = "docker.io/library/alpine:3.19";
3009
3010        rt.apply_image_os_inspection(image, Ok(Some(OsKind::Linux)))
3011            .await;
3012
3013        assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
3014    }
3015
3016    #[tokio::test]
3017    async fn apply_image_os_inspection_leaves_cache_untouched_on_ok_none() {
3018        // Contract: when the manifest carries no (or an unrecognized) `os`
3019        // field the cache is left alone. Dispatch will fall through to the
3020        // primary on `create_container`.
3021        let (rt, _calls) = make_composite(true);
3022        let image = "docker.io/library/nginx:1.25";
3023
3024        rt.apply_image_os_inspection(image, Ok(None)).await;
3025
3026        assert_eq!(cached_os(&rt, image).await, None);
3027    }
3028
3029    #[tokio::test]
3030    async fn apply_image_os_inspection_leaves_cache_untouched_on_err() {
3031        // Contract: a registry error during inspection is non-fatal and must
3032        // not poison the cache. Dispatch falls through to primary on lookup.
3033        let (rt, _calls) = make_composite(true);
3034        let image = "docker.io/library/nginx:1.25";
3035
3036        // Pre-seed the cache so we can assert the error path doesn't
3037        // overwrite or clear an existing entry.
3038        rt.record_image_os(image, OsKind::Linux).await;
3039
3040        let err = zlayer_registry::RegistryError::NotFound {
3041            registry: "docker.io".to_string(),
3042            image: image.to_string(),
3043        };
3044        rt.apply_image_os_inspection(image, Err(err)).await;
3045
3046        // Cache is still whatever it was before the failed inspection.
3047        assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
3048    }
3049
3050    #[tokio::test]
3051    async fn pull_image_inspection_failure_does_not_fail_pull() {
3052        // End-to-end: even when the registry fetch fails (inevitable for the
3053        // synthetic image refs used in unit tests), `pull_image` still
3054        // returns `Ok`. The mock primary/delegate both succeed; the
3055        // inspection step logs and moves on. The cache must remain empty
3056        // because there was no successful inspection to record.
3057        let (rt, _calls) = make_composite(true);
3058        let image = "invalid.example.invalid/ghost:v1";
3059
3060        rt.pull_image(image).await.unwrap();
3061
3062        assert_eq!(
3063            cached_os(&rt, image).await,
3064            None,
3065            "failed inspection must not populate the image-OS cache"
3066        );
3067    }
3068
3069    #[tokio::test]
3070    async fn pull_image_with_policy_inspection_failure_does_not_fail_pull() {
3071        // Same contract as `pull_image_inspection_failure_does_not_fail_pull`
3072        // but exercising the policy-aware entry point.
3073        let (rt, _calls) = make_composite(true);
3074        let image = "invalid.example.invalid/ghost:v1";
3075
3076        rt.pull_image_with_policy(
3077            image,
3078            PullPolicy::IfNotPresent,
3079            None,
3080            zlayer_spec::SourcePolicy::default(),
3081        )
3082        .await
3083        .unwrap();
3084
3085        assert_eq!(cached_os(&rt, image).await, None);
3086    }
3087
3088    #[test]
3089    fn os_kind_from_oci_str_roundtrip() {
3090        // Guards the `as_oci_str` ↔ `from_oci_str` relationship used by the
3091        // inspection path. If a new variant is added to `OsKind` without
3092        // updating `from_oci_str` we want the miss here, not a silent
3093        // "dispatch to primary" regression in production.
3094        for os in [OsKind::Linux, OsKind::Windows, OsKind::Macos] {
3095            assert_eq!(OsKind::from_oci_str(os.as_oci_str()), Some(os));
3096        }
3097        assert_eq!(OsKind::from_oci_str(""), None);
3098        assert_eq!(OsKind::from_oci_str("freebsd"), None);
3099    }
3100}