Skip to main content

zlayer_agent/runtimes/
composite.rs

1//! Composite runtime that dispatches per-container to a primary + optional delegate.
2//!
3//! The [`CompositeRuntime`] owns a *primary* runtime (the node-native runtime —
4//! e.g. `HcsRuntime` on Windows, `YoukiRuntime` on Linux, Docker elsewhere) and
5//! an optional *delegate* runtime used for foreign-OS workloads (e.g. a WSL2
6//! delegate on Windows that runs Linux containers). Each call is routed based
7//! on the container's identity:
8//!
9//! * **[`Runtime::create_container`]** consults
10//!   [`ServiceSpec::platform`](zlayer_spec::ServiceSpec) first; when the
11//!   spec's OS targets the delegate we route there, otherwise primary. When
12//!   `platform` is `None`, a secondary **image-OS cache** (populated by
13//!   [`CompositeRuntime::record_image_os`] from OCI manifest inspection at
14//!   pull time) is consulted. If both are unknown we fall through to the
15//!   primary. **Strict policy (H-7):** if either source identifies the
16//!   workload as Linux and this node has no delegate configured, dispatch
17//!   returns [`AgentError::RouteToPeer`] so the scheduler can re-place the
18//!   workload on a Linux peer — the old permissive "fall through to primary"
19//!   behavior is gone.
20//! * All subsequent per-container operations (start/stop/remove/logs/exec/…)
21//!   look up the container in an internal **dispatch cache** that records
22//!   which runtime created it. This guarantees the same runtime sees the
23//!   container for its whole lifecycle, even after daemon restarts within
24//!   the same process.
25//! * Cross-cutting image operations (`pull_image`, `pull_image_with_policy`,
26//!   `list_images`, `prune_images`) fan out to both runtimes — we cannot know
27//!   in advance which runtime will execute a pulled image, and merged image
28//!   listings give users a single coherent view. `remove_image` / `tag_image`
29//!   try primary first and fall back to delegate.
30//!
31//! The dispatch cache is populated on `create_container` and cleared on
32//! `remove_container`. Looking up an unknown id yields
33//! [`AgentError::NotFound`], which surfaces as a clean 404 at the API layer
34//! rather than silently forwarding to the wrong runtime.
35
36use std::collections::HashMap;
37use std::net::IpAddr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use async_trait::async_trait;
42use tokio::sync::RwLock;
43use zlayer_observability::logs::LogEntry;
44use zlayer_spec::{OsKind, PullPolicy, RegistryAuth, ServiceSpec};
45
46use crate::cgroups_stats::ContainerStats;
47use crate::error::{AgentError, Result};
48use crate::runtime::{
49    ContainerId, ContainerInspectDetails, ContainerState, ExecEventStream, ImageInfo, PruneResult,
50    Runtime, WaitOutcome,
51};
52
53/// Which underlying runtime a given container was dispatched to.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55enum DispatchTarget {
56    Primary,
57    Delegate,
58}
59
60/// Routes each container to either the primary runtime or an optional delegate.
61///
62/// See the module-level documentation for the dispatch rules.
63pub struct CompositeRuntime {
64    primary: Arc<dyn Runtime>,
65    delegate: Option<Arc<dyn Runtime>>,
66    /// Per-container dispatch cache. Populated on `create_container`, removed
67    /// on `remove_container`.
68    dispatch: Arc<RwLock<HashMap<ContainerId, DispatchTarget>>>,
69    /// Image-OS cache consulted when a spec has no explicit `platform`.
70    /// Populated by [`CompositeRuntime::record_image_os`], which is driven
71    /// from [`zlayer_registry::fetch_image_os`] during `pull_image*`.
72    image_os: Arc<RwLock<HashMap<String, OsKind>>>,
73}
74
75impl CompositeRuntime {
76    /// Construct a new composite runtime.
77    ///
78    /// `primary` handles containers whose platform matches the host node.
79    /// `delegate`, when present, handles foreign-OS containers (currently:
80    /// Linux containers on a Windows host via the WSL2 delegate runtime).
81    #[must_use]
82    pub fn new(primary: Arc<dyn Runtime>, delegate: Option<Arc<dyn Runtime>>) -> Self {
83        Self {
84            primary,
85            delegate,
86            dispatch: Arc::new(RwLock::new(HashMap::new())),
87            image_os: Arc::new(RwLock::new(HashMap::new())),
88        }
89    }
90
91    /// Access the primary runtime (for introspection / tests).
92    #[must_use]
93    pub fn primary(&self) -> &Arc<dyn Runtime> {
94        &self.primary
95    }
96
97    /// Access the delegate runtime, if one is configured.
98    #[must_use]
99    pub fn delegate(&self) -> Option<&Arc<dyn Runtime>> {
100        self.delegate.as_ref()
101    }
102
103    /// Record that `image` is known to target operating system `os`.
104    ///
105    /// Wired from [`zlayer_registry::fetch_image_os`] during `pull_image*`
106    /// (see [`CompositeRuntime::apply_image_os_inspection`]) so that specs
107    /// without an explicit `platform` still dispatch correctly.
108    pub(crate) async fn record_image_os(&self, image: &str, os: OsKind) {
109        self.image_os.write().await.insert(image.to_string(), os);
110    }
111
112    /// Apply the result of a manifest OS inspection to the image-OS cache.
113    ///
114    /// Factored out of [`Runtime::pull_image`] and
115    /// [`Runtime::pull_image_with_policy`] so the cache-update policy can be
116    /// unit-tested without depending on a live registry. The three branches
117    /// mirror the contract of [`zlayer_registry::fetch_image_os`]:
118    ///
119    /// * `Ok(Some(os))` — populate the cache so future `create_container`
120    ///   calls without an explicit `spec.platform` dispatch to the right
121    ///   runtime.
122    /// * `Ok(None)` — the config blob had no (or an unrecognized) `os`
123    ///   field. Leave the cache untouched; dispatch falls through to primary.
124    /// * `Err(_)` — transient or registry error. Log at warn and leave the
125    ///   cache untouched. We never fail the overall `pull_image*` call on
126    ///   inspection failure: the primary runtime's own pull already
127    ///   succeeded, and the safe fall-through is "primary".
128    async fn apply_image_os_inspection(
129        &self,
130        image: &str,
131        result: std::result::Result<Option<OsKind>, zlayer_registry::RegistryError>,
132    ) {
133        match result {
134            Ok(Some(os)) => {
135                self.record_image_os(image, os).await;
136                tracing::debug!(image, ?os, "cached image OS for dispatch");
137            }
138            Ok(None) => {
139                tracing::trace!(
140                    image,
141                    "image manifest has no OS field — dispatch will fall through to primary",
142                );
143            }
144            Err(e) => {
145                tracing::warn!(
146                    image,
147                    error = %e,
148                    "failed to inspect image manifest OS — dispatch will fall through to primary",
149                );
150            }
151        }
152    }
153
154    /// Decide which runtime should handle a `create_container` call for `spec`.
155    ///
156    /// The `service` argument is the originating service name, used to build an
157    /// actionable [`AgentError::RouteToPeer`] when a Linux workload lands on
158    /// this node without a local delegate so the scheduler can re-place it on
159    /// a capable peer.
160    ///
161    /// Policy (H-7): Linux workloads are never silently routed to the primary
162    /// on nodes without a delegate. The old "permissive" fall-through (primary
163    /// handles everything) returned an `Unsupported` error only when
164    /// `spec.platform` was explicitly set, but fell through to primary for
165    /// specs without a platform — producing cryptic downstream errors when the
166    /// image-OS cache said `Linux`. We now return `RouteToPeer` in both cases.
167    async fn select_for(&self, service: &str, spec: &ServiceSpec) -> Result<DispatchTarget> {
168        if let Some(platform) = &spec.platform {
169            let target = match platform.os {
170                OsKind::Windows | OsKind::Macos => DispatchTarget::Primary,
171                OsKind::Linux => DispatchTarget::Delegate,
172            };
173            if matches!(target, DispatchTarget::Delegate) && self.delegate.is_none() {
174                return Err(AgentError::RouteToPeer {
175                    service: service.to_string(),
176                    required_os: OsKind::Linux.as_oci_str().to_string(),
177                    reason: "spec.platform.os = linux but this node has no WSL2 delegate \
178                            configured; enable `--install-wsl yes` on this node or add a Linux \
179                            peer to the cluster"
180                        .to_string(),
181                });
182            }
183            return Ok(target);
184        }
185
186        if let Some(os) = self.image_os.read().await.get(&spec.image.name).copied() {
187            return match os {
188                OsKind::Linux => {
189                    if self.delegate.is_some() {
190                        Ok(DispatchTarget::Delegate)
191                    } else {
192                        // No delegate and the image manifest says Linux —
193                        // refuse at the composite layer so the scheduler can
194                        // re-place on a Linux peer instead of the primary
195                        // failing with a cryptic HCS error.
196                        Err(AgentError::RouteToPeer {
197                            service: service.to_string(),
198                            required_os: OsKind::Linux.as_oci_str().to_string(),
199                            reason: format!(
200                                "image '{}' manifest reports os=linux but this node has no WSL2 \
201                                 delegate configured; enable `--install-wsl yes` on this node or \
202                                 add a Linux peer to the cluster",
203                                spec.image.name
204                            ),
205                        })
206                    }
207                }
208                OsKind::Windows | OsKind::Macos => Ok(DispatchTarget::Primary),
209            };
210        }
211
212        Ok(DispatchTarget::Primary)
213    }
214
215    /// Look up an existing dispatch decision for `id`, or return `NotFound`.
216    async fn lookup(&self, id: &ContainerId) -> Result<Arc<dyn Runtime>> {
217        let target =
218            self.dispatch
219                .read()
220                .await
221                .get(id)
222                .copied()
223                .ok_or_else(|| AgentError::NotFound {
224                    container: id.to_string(),
225                    reason: "no dispatch record in CompositeRuntime".to_string(),
226                })?;
227        Ok(self.runtime_for(target).clone())
228    }
229
230    /// Resolve a [`DispatchTarget`] to the concrete runtime reference.
231    ///
232    /// Unwrapping the delegate is safe because [`Self::select_for`] returns
233    /// `Err` whenever a delegate would be required but is missing, so a
234    /// `DispatchTarget::Delegate` can never end up in the dispatch map
235    /// without a delegate being present.
236    fn runtime_for(&self, t: DispatchTarget) -> &Arc<dyn Runtime> {
237        match t {
238            DispatchTarget::Primary => &self.primary,
239            DispatchTarget::Delegate => self
240                .delegate
241                .as_ref()
242                .expect("delegate target requires delegate to exist"),
243        }
244    }
245}
246
247#[async_trait]
248impl Runtime for CompositeRuntime {
249    async fn pull_image(&self, image: &str) -> Result<()> {
250        self.primary.pull_image(image).await?;
251        if let Some(delegate) = &self.delegate {
252            if let Err(e) = delegate.pull_image(image).await {
253                // Foreign-OS images will reliably fail one of the two pulls
254                // (primary can't store a Linux image's config on Windows, or
255                // vice versa). That's expected — the successful side owns the
256                // layers we'll actually use — so we keep this at debug.
257                tracing::debug!(
258                    image,
259                    error = %e,
260                    "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
261                );
262            }
263        }
264
265        // Inspect the OCI manifest's `config.os` so `select_for(spec)` can
266        // dispatch correctly when `spec.platform` is `None`. Non-fatal: any
267        // failure here just means dispatch falls through to primary.
268        let os_result = zlayer_registry::fetch_image_os(image, None).await;
269        self.apply_image_os_inspection(image, os_result).await;
270
271        Ok(())
272    }
273
274    async fn pull_image_with_policy(
275        &self,
276        image: &str,
277        policy: PullPolicy,
278        auth: Option<&RegistryAuth>,
279    ) -> Result<()> {
280        self.primary
281            .pull_image_with_policy(image, policy, auth)
282            .await?;
283        if let Some(delegate) = &self.delegate {
284            if let Err(e) = delegate.pull_image_with_policy(image, policy, auth).await {
285                tracing::debug!(
286                    image,
287                    error = %e,
288                    "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
289                );
290            }
291        }
292
293        let os_result = zlayer_registry::fetch_image_os(image, auth).await;
294        self.apply_image_os_inspection(image, os_result).await;
295
296        Ok(())
297    }
298
299    async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
300        let target = self.select_for(&id.service, spec).await?;
301        {
302            let mut dispatch = self.dispatch.write().await;
303            dispatch.insert(id.clone(), target);
304        }
305        let rt = self.runtime_for(target).clone();
306        match rt.create_container(id, spec).await {
307            Ok(()) => Ok(()),
308            Err(e) => {
309                // Roll back the cache insert on failure so subsequent lookups
310                // don't find a dangling entry.
311                self.dispatch.write().await.remove(id);
312                Err(e)
313            }
314        }
315    }
316
317    async fn start_container(&self, id: &ContainerId) -> Result<()> {
318        let rt = self.lookup(id).await?;
319        rt.start_container(id).await
320    }
321
322    async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
323        let rt = self.lookup(id).await?;
324        rt.stop_container(id, timeout).await
325    }
326
327    async fn remove_container(&self, id: &ContainerId) -> Result<()> {
328        let rt = self.lookup(id).await?;
329        let res = rt.remove_container(id).await;
330        self.dispatch.write().await.remove(id);
331        res
332    }
333
334    async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
335        let rt = self.lookup(id).await?;
336        rt.container_state(id).await
337    }
338
339    async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
340        let rt = self.lookup(id).await?;
341        rt.container_logs(id, tail).await
342    }
343
344    async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
345        let rt = self.lookup(id).await?;
346        rt.exec(id, cmd).await
347    }
348
349    async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
350        let rt = self.lookup(id).await?;
351        rt.exec_stream(id, cmd).await
352    }
353
354    async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
355        let rt = self.lookup(id).await?;
356        rt.get_container_stats(id).await
357    }
358
359    async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
360        let rt = self.lookup(id).await?;
361        rt.wait_container(id).await
362    }
363
364    async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
365        let rt = self.lookup(id).await?;
366        rt.wait_outcome(id).await
367    }
368
369    async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
370        let rt = self.lookup(id).await?;
371        rt.get_logs(id).await
372    }
373
374    async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
375        let rt = self.lookup(id).await?;
376        rt.get_container_pid(id).await
377    }
378
379    async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
380        let rt = self.lookup(id).await?;
381        rt.get_container_ip(id).await
382    }
383
384    async fn get_container_port_override(&self, id: &ContainerId) -> Result<Option<u16>> {
385        let rt = self.lookup(id).await?;
386        rt.get_container_port_override(id).await
387    }
388
389    #[cfg(target_os = "windows")]
390    async fn get_container_namespace_id(
391        &self,
392        id: &ContainerId,
393    ) -> Result<Option<windows::core::GUID>> {
394        let rt = self.lookup(id).await?;
395        rt.get_container_namespace_id(id).await
396    }
397
398    async fn sync_container_volumes(&self, id: &ContainerId) -> Result<()> {
399        let rt = self.lookup(id).await?;
400        rt.sync_container_volumes(id).await
401    }
402
403    async fn list_images(&self) -> Result<Vec<ImageInfo>> {
404        let mut out = self.primary.list_images().await?;
405        if let Some(delegate) = &self.delegate {
406            match delegate.list_images().await {
407                Ok(extra) => out.extend(extra),
408                Err(e) => tracing::warn!(
409                    error = %e,
410                    "delegate runtime list_images failed; returning primary results only",
411                ),
412            }
413        }
414        Ok(out)
415    }
416
417    async fn remove_image(&self, image: &str, force: bool) -> Result<()> {
418        match self.primary.remove_image(image, force).await {
419            Ok(()) => Ok(()),
420            Err(primary_err) => {
421                if let Some(delegate) = &self.delegate {
422                    match delegate.remove_image(image, force).await {
423                        Ok(()) => Ok(()),
424                        Err(delegate_err) => {
425                            tracing::debug!(
426                                image,
427                                %delegate_err,
428                                "delegate remove_image also failed; returning primary error",
429                            );
430                            Err(primary_err)
431                        }
432                    }
433                } else {
434                    Err(primary_err)
435                }
436            }
437        }
438    }
439
440    async fn prune_images(&self) -> Result<PruneResult> {
441        let mut result = self.primary.prune_images().await?;
442        if let Some(delegate) = &self.delegate {
443            match delegate.prune_images().await {
444                Ok(extra) => {
445                    result.deleted.extend(extra.deleted);
446                    result.space_reclaimed =
447                        result.space_reclaimed.saturating_add(extra.space_reclaimed);
448                }
449                Err(e) => tracing::warn!(
450                    error = %e,
451                    "delegate runtime prune_images failed; returning primary result only",
452                ),
453            }
454        }
455        Ok(result)
456    }
457
458    async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
459        let rt = self.lookup(id).await?;
460        rt.kill_container(id, signal).await
461    }
462
463    async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
464        match self.primary.tag_image(source, target).await {
465            Ok(()) => Ok(()),
466            Err(primary_err) => {
467                if let Some(delegate) = &self.delegate {
468                    match delegate.tag_image(source, target).await {
469                        Ok(()) => Ok(()),
470                        Err(delegate_err) => {
471                            tracing::debug!(
472                                source,
473                                target,
474                                %delegate_err,
475                                "delegate tag_image also failed; returning primary error",
476                            );
477                            Err(primary_err)
478                        }
479                    }
480                } else {
481                    Err(primary_err)
482                }
483            }
484        }
485    }
486
487    async fn inspect_detailed(&self, id: &ContainerId) -> Result<ContainerInspectDetails> {
488        let rt = self.lookup(id).await?;
489        rt.inspect_detailed(id).await
490    }
491}
492
493#[cfg(test)]
494mod tests {
495    use super::*;
496    use crate::cgroups_stats::ContainerStats;
497    use std::sync::Mutex as StdMutex;
498    use zlayer_spec::{ArchKind, DeploymentSpec, TargetPlatform};
499
500    /// Which runtime a mock represents. Only used for labelling invocation
501    /// records in tests.
502    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
503    enum Role {
504        Primary,
505        Delegate,
506    }
507
508    /// One recorded invocation: (runtime role, method name, container id).
509    type CallRecord = (Role, String, Option<ContainerId>);
510    /// Shared, thread-safe log of every mock call made in a single test.
511    type CallLog = Arc<StdMutex<Vec<CallRecord>>>;
512
513    /// Mock runtime that records every method call it receives.
514    ///
515    /// This is intentionally minimal — just enough trait surface to exercise
516    /// the composite's dispatch logic. Every recorded call includes the role
517    /// (primary vs delegate), the method name, and the container id (or
518    /// `None` for cross-cutting image operations).
519    struct MockRuntime {
520        role: Role,
521        calls: CallLog,
522        list_images_response: Vec<ImageInfo>,
523        pull_image_error: Option<String>,
524    }
525
526    impl MockRuntime {
527        fn new(role: Role, calls: CallLog) -> Self {
528            Self {
529                role,
530                calls,
531                list_images_response: Vec::new(),
532                pull_image_error: None,
533            }
534        }
535
536        fn record(&self, method: &str, id: Option<&ContainerId>) {
537            self.calls
538                .lock()
539                .expect("mock call-log mutex poisoned")
540                .push((self.role, method.to_string(), id.cloned()));
541        }
542    }
543
544    #[async_trait]
545    impl Runtime for MockRuntime {
546        async fn pull_image(&self, _image: &str) -> Result<()> {
547            self.record("pull_image", None);
548            if let Some(msg) = &self.pull_image_error {
549                return Err(AgentError::Internal(msg.clone()));
550            }
551            Ok(())
552        }
553
554        async fn pull_image_with_policy(
555            &self,
556            _image: &str,
557            _policy: PullPolicy,
558            _auth: Option<&RegistryAuth>,
559        ) -> Result<()> {
560            self.record("pull_image_with_policy", None);
561            Ok(())
562        }
563
564        async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
565            self.record("create_container", Some(id));
566            Ok(())
567        }
568
569        async fn start_container(&self, id: &ContainerId) -> Result<()> {
570            self.record("start_container", Some(id));
571            Ok(())
572        }
573
574        async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
575            self.record("stop_container", Some(id));
576            Ok(())
577        }
578
579        async fn remove_container(&self, id: &ContainerId) -> Result<()> {
580            self.record("remove_container", Some(id));
581            Ok(())
582        }
583
584        async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
585            self.record("container_state", Some(id));
586            Ok(ContainerState::Running)
587        }
588
589        async fn container_logs(&self, id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
590            self.record("container_logs", Some(id));
591            Ok(Vec::new())
592        }
593
594        async fn exec(&self, id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
595            self.record("exec", Some(id));
596            Ok((0, String::new(), String::new()))
597        }
598
599        async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
600            self.record("get_container_stats", Some(id));
601            Ok(ContainerStats {
602                cpu_usage_usec: 0,
603                memory_bytes: 0,
604                memory_limit: 0,
605                timestamp: std::time::Instant::now(),
606            })
607        }
608
609        async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
610            self.record("wait_container", Some(id));
611            Ok(0)
612        }
613
614        async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
615            self.record("get_logs", Some(id));
616            Ok(Vec::new())
617        }
618
619        async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
620            self.record("get_container_pid", Some(id));
621            Ok(None)
622        }
623
624        async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
625            self.record("get_container_ip", Some(id));
626            Ok(None)
627        }
628
629        async fn list_images(&self) -> Result<Vec<ImageInfo>> {
630            self.record("list_images", None);
631            Ok(self.list_images_response.clone())
632        }
633    }
634
635    /// Build a [`ServiceSpec`] (with the given image name) from the minimal
636    /// inline YAML the existing runtime tests use, then optionally set a
637    /// target platform on it.
638    fn make_spec(image: &str, platform: Option<TargetPlatform>) -> ServiceSpec {
639        let yaml = format!(
640            r"
641version: v1
642deployment: test
643services:
644  test:
645    rtype: service
646    image:
647      name: {image}
648    endpoints:
649      - name: http
650        protocol: http
651        port: 8080
652"
653        );
654        let mut spec = serde_yaml::from_str::<DeploymentSpec>(&yaml)
655            .expect("valid deployment yaml")
656            .services
657            .remove("test")
658            .expect("service 'test' present");
659        spec.platform = platform;
660        spec
661    }
662
663    fn cid(service: &str, replica: u32) -> ContainerId {
664        ContainerId {
665            service: service.to_string(),
666            replica,
667        }
668    }
669
670    fn make_composite(with_delegate: bool) -> (CompositeRuntime, CallLog) {
671        let calls = Arc::new(StdMutex::new(Vec::new()));
672        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
673        let delegate = if with_delegate {
674            Some(Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>)
675        } else {
676            None
677        };
678        (
679            CompositeRuntime::new(primary as Arc<dyn Runtime>, delegate),
680            calls,
681        )
682    }
683
684    fn role_for(calls: &[CallRecord], method: &str) -> Option<Role> {
685        calls
686            .iter()
687            .find(|(_, m, _)| m == method)
688            .map(|(role, _, _)| *role)
689    }
690
691    #[tokio::test]
692    async fn dispatch_windows_spec_goes_to_primary() {
693        let (rt, calls) = make_composite(true);
694        let id = cid("win-svc", 0);
695        let spec = make_spec(
696            "mcr.microsoft.com/windows/nanoserver:ltsc2022",
697            Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
698        );
699
700        rt.create_container(&id, &spec).await.unwrap();
701        rt.start_container(&id).await.unwrap();
702
703        let calls = calls.lock().unwrap();
704        assert_eq!(
705            role_for(&calls, "create_container"),
706            Some(Role::Primary),
707            "create_container should hit primary for Windows spec"
708        );
709        assert_eq!(
710            role_for(&calls, "start_container"),
711            Some(Role::Primary),
712            "start_container should hit primary for Windows spec"
713        );
714    }
715
716    #[tokio::test]
717    async fn dispatch_linux_spec_goes_to_delegate() {
718        let (rt, calls) = make_composite(true);
719        let id = cid("lin-svc", 0);
720        let spec = make_spec(
721            "docker.io/library/alpine:3.19",
722            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
723        );
724
725        rt.create_container(&id, &spec).await.unwrap();
726        rt.start_container(&id).await.unwrap();
727
728        let calls = calls.lock().unwrap();
729        assert_eq!(
730            role_for(&calls, "create_container"),
731            Some(Role::Delegate),
732            "create_container should hit delegate for Linux spec"
733        );
734        assert_eq!(
735            role_for(&calls, "start_container"),
736            Some(Role::Delegate),
737            "start_container should hit delegate for Linux spec"
738        );
739    }
740
741    #[tokio::test]
742    async fn dispatch_linux_without_delegate_errors() {
743        // H-7 policy: a Linux spec on a node without a delegate must return
744        // `RouteToPeer` (not `Unsupported`, not a silent primary fall-through)
745        // so the scheduler can re-place the workload on a capable peer.
746        let (rt, _calls) = make_composite(false);
747        let id = cid("lin-svc", 0);
748        let spec = make_spec(
749            "docker.io/library/alpine:3.19",
750            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
751        );
752
753        let err = rt.create_container(&id, &spec).await.unwrap_err();
754        match err {
755            AgentError::RouteToPeer {
756                service,
757                required_os,
758                reason,
759            } => {
760                assert_eq!(service, "lin-svc");
761                assert_eq!(required_os, "linux");
762                assert!(
763                    reason.contains("--install-wsl") && reason.contains("Linux peer"),
764                    "reason must name both remediations, got: {reason}"
765                );
766            }
767            other => panic!("expected RouteToPeer, got {other:?}"),
768        }
769    }
770
771    #[tokio::test]
772    async fn dispatch_linux_image_cache_without_delegate_routes_to_peer() {
773        // H-7 policy: even when `spec.platform` is unset, a Linux image in the
774        // OS cache must route to a peer instead of falling through to primary.
775        // This is the old permissive-fallthrough path the comment at lines
776        // 172-178 used to describe; the behavior is now strict.
777        let (rt, _calls) = make_composite(false);
778        let id = cid("svc", 0);
779        let image = "docker.io/library/nginx:1.25";
780        rt.record_image_os(image, OsKind::Linux).await;
781
782        let spec = make_spec(image, None);
783        let err = rt.create_container(&id, &spec).await.unwrap_err();
784        match err {
785            AgentError::RouteToPeer {
786                service,
787                required_os,
788                reason,
789            } => {
790                assert_eq!(service, "svc");
791                assert_eq!(required_os, "linux");
792                assert!(
793                    reason.contains(image),
794                    "reason should mention the image name, got: {reason}"
795                );
796                assert!(
797                    reason.contains("--install-wsl") && reason.contains("Linux peer"),
798                    "reason must name both remediations, got: {reason}"
799                );
800            }
801            other => panic!("expected RouteToPeer, got {other:?}"),
802        }
803    }
804
805    #[tokio::test]
806    async fn dispatch_macos_spec_goes_to_primary() {
807        let (rt, calls) = make_composite(true);
808        let id = cid("mac-svc", 0);
809        let spec = make_spec(
810            "ghcr.io/zlayer/macos:latest",
811            Some(TargetPlatform::new(OsKind::Macos, ArchKind::Arm64)),
812        );
813
814        rt.create_container(&id, &spec).await.unwrap();
815
816        let calls = calls.lock().unwrap();
817        assert_eq!(
818            role_for(&calls, "create_container"),
819            Some(Role::Primary),
820            "create_container should hit primary for Macos spec"
821        );
822    }
823
824    #[tokio::test]
825    async fn dispatch_no_platform_no_image_os_falls_through_to_primary() {
826        let (rt, calls) = make_composite(true);
827        let id = cid("svc", 0);
828        let spec = make_spec("docker.io/library/nginx:1.25", None);
829
830        rt.create_container(&id, &spec).await.unwrap();
831
832        let calls = calls.lock().unwrap();
833        assert_eq!(
834            role_for(&calls, "create_container"),
835            Some(Role::Primary),
836            "fall-through should pick primary when both platform and image-OS cache are unknown"
837        );
838    }
839
840    #[tokio::test]
841    async fn dispatch_uses_image_os_cache_when_platform_missing() {
842        let (rt, calls) = make_composite(true);
843        let id = cid("svc", 0);
844        let image = "docker.io/library/nginx:1.25";
845        rt.record_image_os(image, OsKind::Linux).await;
846
847        let spec = make_spec(image, None);
848        rt.create_container(&id, &spec).await.unwrap();
849
850        let calls = calls.lock().unwrap();
851        assert_eq!(
852            role_for(&calls, "create_container"),
853            Some(Role::Delegate),
854            "image-OS cache should route Linux images to the delegate"
855        );
856    }
857
858    #[tokio::test]
859    async fn per_container_dispatch_cache_persists_through_start_stop() {
860        let (rt, calls) = make_composite(true);
861        let id = cid("win-svc", 0);
862        let spec = make_spec(
863            "mcr.microsoft.com/windows/nanoserver:ltsc2022",
864            Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
865        );
866
867        rt.create_container(&id, &spec).await.unwrap();
868        rt.start_container(&id).await.unwrap();
869        rt.stop_container(&id, Duration::from_secs(1))
870            .await
871            .unwrap();
872        rt.remove_container(&id).await.unwrap();
873
874        let recorded = calls.lock().unwrap().clone();
875        for method in [
876            "create_container",
877            "start_container",
878            "stop_container",
879            "remove_container",
880        ] {
881            assert_eq!(
882                role_for(&recorded, method),
883                Some(Role::Primary),
884                "{method} should have dispatched to primary"
885            );
886        }
887
888        // After remove, the dispatch cache entry should be gone.
889        let after = rt
890            .start_container(&id)
891            .await
892            .expect_err("lookup after remove should fail");
893        assert!(
894            matches!(after, AgentError::NotFound { .. }),
895            "expected NotFound after remove, got {after:?}"
896        );
897    }
898
899    #[tokio::test]
900    async fn pull_image_calls_both_runtimes() {
901        let (rt, calls) = make_composite(true);
902        rt.pull_image("docker.io/library/alpine:3.19")
903            .await
904            .unwrap();
905
906        let recorded = calls.lock().unwrap();
907        let pull_calls: Vec<Role> = recorded
908            .iter()
909            .filter(|(_, m, _)| m == "pull_image")
910            .map(|(r, _, _)| *r)
911            .collect();
912        assert!(
913            pull_calls.contains(&Role::Primary),
914            "primary should have been pulled: {pull_calls:?}",
915        );
916        assert!(
917            pull_calls.contains(&Role::Delegate),
918            "delegate should have been pulled: {pull_calls:?}",
919        );
920    }
921
922    #[tokio::test]
923    async fn pull_image_delegate_error_does_not_fail() {
924        // Build the composite by hand so we can flip the delegate's
925        // pull_image_error before wrapping it in an Arc<dyn Runtime>.
926        let calls = Arc::new(StdMutex::new(Vec::new()));
927        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
928        let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
929        delegate.pull_image_error = Some("simulated delegate pull failure".to_string());
930        let rt = CompositeRuntime::new(
931            primary as Arc<dyn Runtime>,
932            Some(Arc::new(delegate) as Arc<dyn Runtime>),
933        );
934
935        // Top-level call must succeed despite the delegate error.
936        rt.pull_image("docker.io/library/alpine:3.19")
937            .await
938            .unwrap();
939
940        let recorded = calls.lock().unwrap();
941        let pull_calls: Vec<Role> = recorded
942            .iter()
943            .filter(|(_, m, _)| m == "pull_image")
944            .map(|(r, _, _)| *r)
945            .collect();
946        assert!(
947            pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
948            "both runtimes should have been called: {pull_calls:?}",
949        );
950    }
951
952    #[tokio::test]
953    async fn list_images_merges_both() {
954        // Hand-build so we can seed each mock's list_images_response.
955        let calls = Arc::new(StdMutex::new(Vec::new()));
956        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
957        primary.list_images_response = vec![ImageInfo {
958            reference: "primary/image:1".to_string(),
959            digest: None,
960            size_bytes: None,
961        }];
962        let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
963        delegate.list_images_response = vec![ImageInfo {
964            reference: "delegate/image:1".to_string(),
965            digest: None,
966            size_bytes: None,
967        }];
968        let rt = CompositeRuntime::new(
969            Arc::new(primary) as Arc<dyn Runtime>,
970            Some(Arc::new(delegate) as Arc<dyn Runtime>),
971        );
972
973        let merged = rt.list_images().await.unwrap();
974        let refs: Vec<&str> = merged.iter().map(|i| i.reference.as_str()).collect();
975        assert!(
976            refs.contains(&"primary/image:1") && refs.contains(&"delegate/image:1"),
977            "merged list should contain both entries, got {refs:?}",
978        );
979    }
980
981    #[tokio::test]
982    async fn dispatch_lookup_unknown_container_errors() {
983        let (rt, _calls) = make_composite(true);
984        let id = cid("ghost", 0);
985
986        let err = rt.start_container(&id).await.unwrap_err();
987        assert!(
988            matches!(err, AgentError::NotFound { .. }),
989            "expected NotFound for unknown container, got {err:?}"
990        );
991    }
992
993    /// Helper: read the internal image-OS cache for test assertions.
994    async fn cached_os(rt: &CompositeRuntime, image: &str) -> Option<OsKind> {
995        rt.image_os.read().await.get(image).copied()
996    }
997
998    #[tokio::test]
999    async fn apply_image_os_inspection_populates_cache_on_ok_some() {
1000        // Contract: when `fetch_image_os` resolves to a recognized OS, the
1001        // cache is populated so subsequent `select_for` calls for specs
1002        // without `platform` dispatch correctly.
1003        let (rt, _calls) = make_composite(true);
1004        let image = "docker.io/library/alpine:3.19";
1005
1006        rt.apply_image_os_inspection(image, Ok(Some(OsKind::Linux)))
1007            .await;
1008
1009        assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
1010    }
1011
1012    #[tokio::test]
1013    async fn apply_image_os_inspection_leaves_cache_untouched_on_ok_none() {
1014        // Contract: when the manifest carries no (or an unrecognized) `os`
1015        // field the cache is left alone. Dispatch will fall through to the
1016        // primary on `create_container`.
1017        let (rt, _calls) = make_composite(true);
1018        let image = "docker.io/library/nginx:1.25";
1019
1020        rt.apply_image_os_inspection(image, Ok(None)).await;
1021
1022        assert_eq!(cached_os(&rt, image).await, None);
1023    }
1024
1025    #[tokio::test]
1026    async fn apply_image_os_inspection_leaves_cache_untouched_on_err() {
1027        // Contract: a registry error during inspection is non-fatal and must
1028        // not poison the cache. Dispatch falls through to primary on lookup.
1029        let (rt, _calls) = make_composite(true);
1030        let image = "docker.io/library/nginx:1.25";
1031
1032        // Pre-seed the cache so we can assert the error path doesn't
1033        // overwrite or clear an existing entry.
1034        rt.record_image_os(image, OsKind::Linux).await;
1035
1036        let err = zlayer_registry::RegistryError::NotFound {
1037            registry: "docker.io".to_string(),
1038            image: image.to_string(),
1039        };
1040        rt.apply_image_os_inspection(image, Err(err)).await;
1041
1042        // Cache is still whatever it was before the failed inspection.
1043        assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
1044    }
1045
1046    #[tokio::test]
1047    async fn pull_image_inspection_failure_does_not_fail_pull() {
1048        // End-to-end: even when the registry fetch fails (inevitable for the
1049        // synthetic image refs used in unit tests), `pull_image` still
1050        // returns `Ok`. The mock primary/delegate both succeed; the
1051        // inspection step logs and moves on. The cache must remain empty
1052        // because there was no successful inspection to record.
1053        let (rt, _calls) = make_composite(true);
1054        let image = "invalid.example.invalid/ghost:v1";
1055
1056        rt.pull_image(image).await.unwrap();
1057
1058        assert_eq!(
1059            cached_os(&rt, image).await,
1060            None,
1061            "failed inspection must not populate the image-OS cache"
1062        );
1063    }
1064
1065    #[tokio::test]
1066    async fn pull_image_with_policy_inspection_failure_does_not_fail_pull() {
1067        // Same contract as `pull_image_inspection_failure_does_not_fail_pull`
1068        // but exercising the policy-aware entry point.
1069        let (rt, _calls) = make_composite(true);
1070        let image = "invalid.example.invalid/ghost:v1";
1071
1072        rt.pull_image_with_policy(image, PullPolicy::IfNotPresent, None)
1073            .await
1074            .unwrap();
1075
1076        assert_eq!(cached_os(&rt, image).await, None);
1077    }
1078
1079    #[test]
1080    fn os_kind_from_oci_str_roundtrip() {
1081        // Guards the `as_oci_str` ↔ `from_oci_str` relationship used by the
1082        // inspection path. If a new variant is added to `OsKind` without
1083        // updating `from_oci_str` we want the miss here, not a silent
1084        // "dispatch to primary" regression in production.
1085        for os in [OsKind::Linux, OsKind::Windows, OsKind::Macos] {
1086            assert_eq!(OsKind::from_oci_str(os.as_oci_str()), Some(os));
1087        }
1088        assert_eq!(OsKind::from_oci_str(""), None);
1089        assert_eq!(OsKind::from_oci_str("freebsd"), None);
1090    }
1091}