Skip to main content

zlayer_agent/runtimes/
composite.rs

1//! Composite runtime that dispatches per-container to a primary + optional delegate.
2//!
3//! The [`CompositeRuntime`] owns a *primary* runtime (the node-native runtime —
4//! e.g. `HcsRuntime` on Windows, `YoukiRuntime` on Linux, Docker elsewhere) and
5//! an optional *delegate* runtime used for foreign-OS workloads (e.g. a WSL2
6//! delegate on Windows that runs Linux containers). Each call is routed based
7//! on the container's identity:
8//!
9//! * **[`Runtime::create_container`]** consults
10//!   [`ServiceSpec::platform`](zlayer_spec::ServiceSpec) first; when the
11//!   spec's OS targets the delegate we route there, otherwise primary. When
12//!   `platform` is `None`, a secondary **image-OS cache** (populated by
13//!   [`CompositeRuntime::record_image_os`] from OCI manifest inspection at
14//!   pull time) is consulted. If both are unknown we fall through to the
15//!   primary. **Strict policy (H-7):** if either source identifies the
16//!   workload as Linux and this node has no delegate configured, dispatch
17//!   returns [`AgentError::RouteToPeer`] so the scheduler can re-place the
18//!   workload on a Linux peer — the old permissive "fall through to primary"
19//!   behavior is gone.
20//! * All subsequent per-container operations (start/stop/remove/logs/exec/…)
21//!   look up the container in an internal **dispatch cache** that records
22//!   which runtime created it. This guarantees the same runtime sees the
23//!   container for its whole lifecycle, even after daemon restarts within
24//!   the same process.
25//! * Cross-cutting image operations (`pull_image`, `pull_image_with_policy`,
26//!   `list_images`, `prune_images`) fan out to both runtimes — we cannot know
27//!   in advance which runtime will execute a pulled image, and merged image
28//!   listings give users a single coherent view. `remove_image` / `tag_image`
29//!   try primary first and fall back to delegate.
30//!
31//! The dispatch cache is populated on `create_container` and cleared on
32//! `remove_container`. Looking up an unknown id yields
33//! [`AgentError::NotFound`], which surfaces as a clean 404 at the API layer
34//! rather than silently forwarding to the wrong runtime.
35
36use std::collections::HashMap;
37use std::net::IpAddr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use async_trait::async_trait;
42use tokio::sync::RwLock;
43use zlayer_observability::logs::LogEntry;
44use zlayer_spec::{OsKind, PullPolicy, RegistryAuth, ServiceSpec};
45
46use crate::cgroups_stats::ContainerStats;
47use crate::error::{AgentError, Result};
48use crate::runtime::{
49    ContainerId, ContainerInspectDetails, ContainerState, ExecEventStream, ImageInfo, PruneResult,
50    Runtime, WaitOutcome,
51};
52
53/// Which underlying runtime a given container was dispatched to.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55enum DispatchTarget {
56    Primary,
57    Delegate,
58}
59
60/// Routes each container to either the primary runtime or an optional delegate.
61///
62/// See the module-level documentation for the dispatch rules.
63pub struct CompositeRuntime {
64    primary: Arc<dyn Runtime>,
65    delegate: Option<Arc<dyn Runtime>>,
66    /// Per-container dispatch cache. Populated on `create_container`, removed
67    /// on `remove_container`.
68    dispatch: Arc<RwLock<HashMap<ContainerId, DispatchTarget>>>,
69    /// Image-OS cache consulted when a spec has no explicit `platform`.
70    /// Populated by [`CompositeRuntime::record_image_os`], which is driven
71    /// from [`zlayer_registry::fetch_image_os`] during `pull_image*`.
72    image_os: Arc<RwLock<HashMap<String, OsKind>>>,
73}
74
75impl CompositeRuntime {
76    /// Construct a new composite runtime.
77    ///
78    /// `primary` handles containers whose platform matches the host node.
79    /// `delegate`, when present, handles foreign-OS containers (currently:
80    /// Linux containers on a Windows host via the WSL2 delegate runtime).
81    #[must_use]
82    pub fn new(primary: Arc<dyn Runtime>, delegate: Option<Arc<dyn Runtime>>) -> Self {
83        Self {
84            primary,
85            delegate,
86            dispatch: Arc::new(RwLock::new(HashMap::new())),
87            image_os: Arc::new(RwLock::new(HashMap::new())),
88        }
89    }
90
91    /// Access the primary runtime (for introspection / tests).
92    #[must_use]
93    pub fn primary(&self) -> &Arc<dyn Runtime> {
94        &self.primary
95    }
96
97    /// Access the delegate runtime, if one is configured.
98    #[must_use]
99    pub fn delegate(&self) -> Option<&Arc<dyn Runtime>> {
100        self.delegate.as_ref()
101    }
102
103    /// Record that `image` is known to target operating system `os`.
104    ///
105    /// Wired from [`zlayer_registry::fetch_image_os`] during `pull_image*`
106    /// (see [`CompositeRuntime::apply_image_os_inspection`]) so that specs
107    /// without an explicit `platform` still dispatch correctly.
108    pub(crate) async fn record_image_os(&self, image: &str, os: OsKind) {
109        self.image_os.write().await.insert(image.to_string(), os);
110    }
111
112    /// Apply the result of a manifest OS inspection to the image-OS cache.
113    ///
114    /// Factored out of [`Runtime::pull_image`] and
115    /// [`Runtime::pull_image_with_policy`] so the cache-update policy can be
116    /// unit-tested without depending on a live registry. The three branches
117    /// mirror the contract of [`zlayer_registry::fetch_image_os`]:
118    ///
119    /// * `Ok(Some(os))` — populate the cache so future `create_container`
120    ///   calls without an explicit `spec.platform` dispatch to the right
121    ///   runtime.
122    /// * `Ok(None)` — the config blob had no (or an unrecognized) `os`
123    ///   field. Leave the cache untouched; dispatch falls through to primary.
124    /// * `Err(_)` — transient or registry error. Log at warn and leave the
125    ///   cache untouched. We never fail the overall `pull_image*` call on
126    ///   inspection failure: the primary runtime's own pull already
127    ///   succeeded, and the safe fall-through is "primary".
128    async fn apply_image_os_inspection(
129        &self,
130        image: &str,
131        result: std::result::Result<Option<OsKind>, zlayer_registry::RegistryError>,
132    ) {
133        match result {
134            Ok(Some(os)) => {
135                self.record_image_os(image, os).await;
136                tracing::debug!(image, ?os, "cached image OS for dispatch");
137            }
138            Ok(None) => {
139                tracing::trace!(
140                    image,
141                    "image manifest has no OS field — dispatch will fall through to primary",
142                );
143            }
144            Err(e) => {
145                tracing::warn!(
146                    image,
147                    error = %e,
148                    "failed to inspect image manifest OS — dispatch will fall through to primary",
149                );
150            }
151        }
152    }
153
154    /// Decide which runtime should handle a `create_container` call for `spec`.
155    ///
156    /// The `service` argument is the originating service name, used to build an
157    /// actionable [`AgentError::RouteToPeer`] when a Linux workload lands on
158    /// this node without a local delegate so the scheduler can re-place it on
159    /// a capable peer.
160    ///
161    /// Policy (H-7): Linux workloads are never silently routed to the primary
162    /// on nodes without a delegate. The old "permissive" fall-through (primary
163    /// handles everything) returned an `Unsupported` error only when
164    /// `spec.platform` was explicitly set, but fell through to primary for
165    /// specs without a platform — producing cryptic downstream errors when the
166    /// image-OS cache said `Linux`. We now return `RouteToPeer` in both cases.
167    async fn select_for(&self, service: &str, spec: &ServiceSpec) -> Result<DispatchTarget> {
168        if let Some(platform) = &spec.platform {
169            let target = match platform.os {
170                OsKind::Windows | OsKind::Macos => DispatchTarget::Primary,
171                OsKind::Linux => DispatchTarget::Delegate,
172            };
173            if matches!(target, DispatchTarget::Delegate) && self.delegate.is_none() {
174                return Err(AgentError::RouteToPeer {
175                    service: service.to_string(),
176                    required_os: OsKind::Linux.as_oci_str().to_string(),
177                    reason: "spec.platform.os = linux but this node has no WSL2 delegate \
178                            configured; enable `--install-wsl yes` on this node or add a Linux \
179                            peer to the cluster"
180                        .to_string(),
181                });
182            }
183            return Ok(target);
184        }
185
186        if let Some(os) = self
187            .image_os
188            .read()
189            .await
190            .get(&spec.image.name.to_string())
191            .copied()
192        {
193            return match os {
194                OsKind::Linux => {
195                    if self.delegate.is_some() {
196                        Ok(DispatchTarget::Delegate)
197                    } else {
198                        // No delegate and the image manifest says Linux —
199                        // refuse at the composite layer so the scheduler can
200                        // re-place on a Linux peer instead of the primary
201                        // failing with a cryptic HCS error.
202                        Err(AgentError::RouteToPeer {
203                            service: service.to_string(),
204                            required_os: OsKind::Linux.as_oci_str().to_string(),
205                            reason: format!(
206                                "image '{}' manifest reports os=linux but this node has no WSL2 \
207                                 delegate configured; enable `--install-wsl yes` on this node or \
208                                 add a Linux peer to the cluster",
209                                spec.image.name
210                            ),
211                        })
212                    }
213                }
214                OsKind::Windows | OsKind::Macos => Ok(DispatchTarget::Primary),
215            };
216        }
217
218        Ok(DispatchTarget::Primary)
219    }
220
221    /// Look up an existing dispatch decision for `id`, or return `NotFound`.
222    async fn lookup(&self, id: &ContainerId) -> Result<Arc<dyn Runtime>> {
223        let target =
224            self.dispatch
225                .read()
226                .await
227                .get(id)
228                .copied()
229                .ok_or_else(|| AgentError::NotFound {
230                    container: id.to_string(),
231                    reason: "no dispatch record in CompositeRuntime".to_string(),
232                })?;
233        Ok(self.runtime_for(target).clone())
234    }
235
236    /// Resolve a [`DispatchTarget`] to the concrete runtime reference.
237    ///
238    /// Unwrapping the delegate is safe because [`Self::select_for`] returns
239    /// `Err` whenever a delegate would be required but is missing, so a
240    /// `DispatchTarget::Delegate` can never end up in the dispatch map
241    /// without a delegate being present.
242    fn runtime_for(&self, t: DispatchTarget) -> &Arc<dyn Runtime> {
243        match t {
244            DispatchTarget::Primary => &self.primary,
245            DispatchTarget::Delegate => self
246                .delegate
247                .as_ref()
248                .expect("delegate target requires delegate to exist"),
249        }
250    }
251}
252
253#[async_trait]
254impl Runtime for CompositeRuntime {
255    async fn pull_image(&self, image: &str) -> Result<()> {
256        self.primary.pull_image(image).await?;
257        if let Some(delegate) = &self.delegate {
258            if let Err(e) = delegate.pull_image(image).await {
259                // Foreign-OS images will reliably fail one of the two pulls
260                // (primary can't store a Linux image's config on Windows, or
261                // vice versa). That's expected — the successful side owns the
262                // layers we'll actually use — so we keep this at debug.
263                tracing::debug!(
264                    image,
265                    error = %e,
266                    "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
267                );
268            }
269        }
270
271        // Inspect the OCI manifest's `config.os` so `select_for(spec)` can
272        // dispatch correctly when `spec.platform` is `None`. Non-fatal: any
273        // failure here just means dispatch falls through to primary.
274        let os_result = zlayer_registry::fetch_image_os(image, None).await;
275        self.apply_image_os_inspection(image, os_result).await;
276
277        Ok(())
278    }
279
280    async fn pull_image_with_policy(
281        &self,
282        image: &str,
283        policy: PullPolicy,
284        auth: Option<&RegistryAuth>,
285    ) -> Result<()> {
286        self.primary
287            .pull_image_with_policy(image, policy, auth)
288            .await?;
289        if let Some(delegate) = &self.delegate {
290            if let Err(e) = delegate.pull_image_with_policy(image, policy, auth).await {
291                tracing::debug!(
292                    image,
293                    error = %e,
294                    "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
295                );
296            }
297        }
298
299        let os_result = zlayer_registry::fetch_image_os(image, auth).await;
300        self.apply_image_os_inspection(image, os_result).await;
301
302        Ok(())
303    }
304
305    async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
306        let target = self.select_for(&id.service, spec).await?;
307        {
308            let mut dispatch = self.dispatch.write().await;
309            dispatch.insert(id.clone(), target);
310        }
311        let rt = self.runtime_for(target).clone();
312        match rt.create_container(id, spec).await {
313            Ok(()) => Ok(()),
314            Err(e) => {
315                // Roll back the cache insert on failure so subsequent lookups
316                // don't find a dangling entry.
317                self.dispatch.write().await.remove(id);
318                Err(e)
319            }
320        }
321    }
322
323    async fn start_container(&self, id: &ContainerId) -> Result<()> {
324        let rt = self.lookup(id).await?;
325        rt.start_container(id).await
326    }
327
328    async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
329        let rt = self.lookup(id).await?;
330        rt.stop_container(id, timeout).await
331    }
332
333    async fn remove_container(&self, id: &ContainerId) -> Result<()> {
334        let rt = self.lookup(id).await?;
335        let res = rt.remove_container(id).await;
336        self.dispatch.write().await.remove(id);
337        res
338    }
339
340    async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
341        let rt = self.lookup(id).await?;
342        rt.container_state(id).await
343    }
344
345    async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
346        let rt = self.lookup(id).await?;
347        rt.container_logs(id, tail).await
348    }
349
350    async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
351        let rt = self.lookup(id).await?;
352        rt.exec(id, cmd).await
353    }
354
355    async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
356        let rt = self.lookup(id).await?;
357        rt.exec_stream(id, cmd).await
358    }
359
360    async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
361        let rt = self.lookup(id).await?;
362        rt.get_container_stats(id).await
363    }
364
365    async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
366        let rt = self.lookup(id).await?;
367        rt.wait_container(id).await
368    }
369
370    async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
371        let rt = self.lookup(id).await?;
372        rt.wait_outcome(id).await
373    }
374
375    async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
376        let rt = self.lookup(id).await?;
377        rt.get_logs(id).await
378    }
379
380    async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
381        let rt = self.lookup(id).await?;
382        rt.get_container_pid(id).await
383    }
384
385    async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
386        let rt = self.lookup(id).await?;
387        rt.get_container_ip(id).await
388    }
389
390    async fn get_container_port_override(&self, id: &ContainerId) -> Result<Option<u16>> {
391        let rt = self.lookup(id).await?;
392        rt.get_container_port_override(id).await
393    }
394
395    #[cfg(target_os = "windows")]
396    async fn get_container_namespace_id(
397        &self,
398        id: &ContainerId,
399    ) -> Result<Option<windows::core::GUID>> {
400        let rt = self.lookup(id).await?;
401        rt.get_container_namespace_id(id).await
402    }
403
404    async fn sync_container_volumes(&self, id: &ContainerId) -> Result<()> {
405        let rt = self.lookup(id).await?;
406        rt.sync_container_volumes(id).await
407    }
408
409    async fn list_images(&self) -> Result<Vec<ImageInfo>> {
410        let mut out = self.primary.list_images().await?;
411        if let Some(delegate) = &self.delegate {
412            match delegate.list_images().await {
413                Ok(extra) => out.extend(extra),
414                Err(e) => tracing::warn!(
415                    error = %e,
416                    "delegate runtime list_images failed; returning primary results only",
417                ),
418            }
419        }
420        Ok(out)
421    }
422
423    async fn remove_image(&self, image: &str, force: bool) -> Result<()> {
424        match self.primary.remove_image(image, force).await {
425            Ok(()) => Ok(()),
426            Err(primary_err) => {
427                if let Some(delegate) = &self.delegate {
428                    match delegate.remove_image(image, force).await {
429                        Ok(()) => Ok(()),
430                        Err(delegate_err) => {
431                            tracing::debug!(
432                                image,
433                                %delegate_err,
434                                "delegate remove_image also failed; returning primary error",
435                            );
436                            Err(primary_err)
437                        }
438                    }
439                } else {
440                    Err(primary_err)
441                }
442            }
443        }
444    }
445
446    async fn prune_images(&self) -> Result<PruneResult> {
447        let mut result = self.primary.prune_images().await?;
448        if let Some(delegate) = &self.delegate {
449            match delegate.prune_images().await {
450                Ok(extra) => {
451                    result.deleted.extend(extra.deleted);
452                    result.space_reclaimed =
453                        result.space_reclaimed.saturating_add(extra.space_reclaimed);
454                }
455                Err(e) => tracing::warn!(
456                    error = %e,
457                    "delegate runtime prune_images failed; returning primary result only",
458                ),
459            }
460        }
461        Ok(result)
462    }
463
464    async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
465        let rt = self.lookup(id).await?;
466        rt.kill_container(id, signal).await
467    }
468
469    async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
470        match self.primary.tag_image(source, target).await {
471            Ok(()) => Ok(()),
472            Err(primary_err) => {
473                if let Some(delegate) = &self.delegate {
474                    match delegate.tag_image(source, target).await {
475                        Ok(()) => Ok(()),
476                        Err(delegate_err) => {
477                            tracing::debug!(
478                                source,
479                                target,
480                                %delegate_err,
481                                "delegate tag_image also failed; returning primary error",
482                            );
483                            Err(primary_err)
484                        }
485                    }
486                } else {
487                    Err(primary_err)
488                }
489            }
490        }
491    }
492
493    async fn inspect_detailed(&self, id: &ContainerId) -> Result<ContainerInspectDetails> {
494        let rt = self.lookup(id).await?;
495        rt.inspect_detailed(id).await
496    }
497}
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502    use crate::cgroups_stats::ContainerStats;
503    use std::sync::Mutex as StdMutex;
504    use zlayer_spec::{ArchKind, DeploymentSpec, TargetPlatform};
505
506    /// Which runtime a mock represents. Only used for labelling invocation
507    /// records in tests.
508    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
509    enum Role {
510        Primary,
511        Delegate,
512    }
513
514    /// One recorded invocation: (runtime role, method name, container id).
515    type CallRecord = (Role, String, Option<ContainerId>);
516    /// Shared, thread-safe log of every mock call made in a single test.
517    type CallLog = Arc<StdMutex<Vec<CallRecord>>>;
518
519    /// Mock runtime that records every method call it receives.
520    ///
521    /// This is intentionally minimal — just enough trait surface to exercise
522    /// the composite's dispatch logic. Every recorded call includes the role
523    /// (primary vs delegate), the method name, and the container id (or
524    /// `None` for cross-cutting image operations).
525    struct MockRuntime {
526        role: Role,
527        calls: CallLog,
528        list_images_response: Vec<ImageInfo>,
529        pull_image_error: Option<String>,
530    }
531
532    impl MockRuntime {
533        fn new(role: Role, calls: CallLog) -> Self {
534            Self {
535                role,
536                calls,
537                list_images_response: Vec::new(),
538                pull_image_error: None,
539            }
540        }
541
542        fn record(&self, method: &str, id: Option<&ContainerId>) {
543            self.calls
544                .lock()
545                .expect("mock call-log mutex poisoned")
546                .push((self.role, method.to_string(), id.cloned()));
547        }
548    }
549
550    #[async_trait]
551    impl Runtime for MockRuntime {
552        async fn pull_image(&self, _image: &str) -> Result<()> {
553            self.record("pull_image", None);
554            if let Some(msg) = &self.pull_image_error {
555                return Err(AgentError::Internal(msg.clone()));
556            }
557            Ok(())
558        }
559
560        async fn pull_image_with_policy(
561            &self,
562            _image: &str,
563            _policy: PullPolicy,
564            _auth: Option<&RegistryAuth>,
565        ) -> Result<()> {
566            self.record("pull_image_with_policy", None);
567            Ok(())
568        }
569
570        async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
571            self.record("create_container", Some(id));
572            Ok(())
573        }
574
575        async fn start_container(&self, id: &ContainerId) -> Result<()> {
576            self.record("start_container", Some(id));
577            Ok(())
578        }
579
580        async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
581            self.record("stop_container", Some(id));
582            Ok(())
583        }
584
585        async fn remove_container(&self, id: &ContainerId) -> Result<()> {
586            self.record("remove_container", Some(id));
587            Ok(())
588        }
589
590        async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
591            self.record("container_state", Some(id));
592            Ok(ContainerState::Running)
593        }
594
595        async fn container_logs(&self, id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
596            self.record("container_logs", Some(id));
597            Ok(Vec::new())
598        }
599
600        async fn exec(&self, id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
601            self.record("exec", Some(id));
602            Ok((0, String::new(), String::new()))
603        }
604
605        async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
606            self.record("get_container_stats", Some(id));
607            Ok(ContainerStats {
608                cpu_usage_usec: 0,
609                memory_bytes: 0,
610                memory_limit: 0,
611                timestamp: std::time::Instant::now(),
612            })
613        }
614
615        async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
616            self.record("wait_container", Some(id));
617            Ok(0)
618        }
619
620        async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
621            self.record("get_logs", Some(id));
622            Ok(Vec::new())
623        }
624
625        async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
626            self.record("get_container_pid", Some(id));
627            Ok(None)
628        }
629
630        async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
631            self.record("get_container_ip", Some(id));
632            Ok(None)
633        }
634
635        async fn list_images(&self) -> Result<Vec<ImageInfo>> {
636            self.record("list_images", None);
637            Ok(self.list_images_response.clone())
638        }
639    }
640
641    /// Build a [`ServiceSpec`] (with the given image name) from the minimal
642    /// inline YAML the existing runtime tests use, then optionally set a
643    /// target platform on it.
644    fn make_spec(image: &str, platform: Option<TargetPlatform>) -> ServiceSpec {
645        let yaml = format!(
646            r"
647version: v1
648deployment: test
649services:
650  test:
651    rtype: service
652    image:
653      name: {image}
654    endpoints:
655      - name: http
656        protocol: http
657        port: 8080
658"
659        );
660        let mut spec = serde_yaml::from_str::<DeploymentSpec>(&yaml)
661            .expect("valid deployment yaml")
662            .services
663            .remove("test")
664            .expect("service 'test' present");
665        spec.platform = platform;
666        spec
667    }
668
669    fn cid(service: &str, replica: u32) -> ContainerId {
670        ContainerId {
671            service: service.to_string(),
672            replica,
673        }
674    }
675
676    fn make_composite(with_delegate: bool) -> (CompositeRuntime, CallLog) {
677        let calls = Arc::new(StdMutex::new(Vec::new()));
678        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
679        let delegate = if with_delegate {
680            Some(Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>)
681        } else {
682            None
683        };
684        (
685            CompositeRuntime::new(primary as Arc<dyn Runtime>, delegate),
686            calls,
687        )
688    }
689
690    fn role_for(calls: &[CallRecord], method: &str) -> Option<Role> {
691        calls
692            .iter()
693            .find(|(_, m, _)| m == method)
694            .map(|(role, _, _)| *role)
695    }
696
697    #[tokio::test]
698    async fn dispatch_windows_spec_goes_to_primary() {
699        let (rt, calls) = make_composite(true);
700        let id = cid("win-svc", 0);
701        let spec = make_spec(
702            "mcr.microsoft.com/windows/nanoserver:ltsc2022",
703            Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
704        );
705
706        rt.create_container(&id, &spec).await.unwrap();
707        rt.start_container(&id).await.unwrap();
708
709        let calls = calls.lock().unwrap();
710        assert_eq!(
711            role_for(&calls, "create_container"),
712            Some(Role::Primary),
713            "create_container should hit primary for Windows spec"
714        );
715        assert_eq!(
716            role_for(&calls, "start_container"),
717            Some(Role::Primary),
718            "start_container should hit primary for Windows spec"
719        );
720    }
721
722    #[tokio::test]
723    async fn dispatch_linux_spec_goes_to_delegate() {
724        let (rt, calls) = make_composite(true);
725        let id = cid("lin-svc", 0);
726        let spec = make_spec(
727            "docker.io/library/alpine:3.19",
728            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
729        );
730
731        rt.create_container(&id, &spec).await.unwrap();
732        rt.start_container(&id).await.unwrap();
733
734        let calls = calls.lock().unwrap();
735        assert_eq!(
736            role_for(&calls, "create_container"),
737            Some(Role::Delegate),
738            "create_container should hit delegate for Linux spec"
739        );
740        assert_eq!(
741            role_for(&calls, "start_container"),
742            Some(Role::Delegate),
743            "start_container should hit delegate for Linux spec"
744        );
745    }
746
747    #[tokio::test]
748    async fn dispatch_linux_without_delegate_errors() {
749        // H-7 policy: a Linux spec on a node without a delegate must return
750        // `RouteToPeer` (not `Unsupported`, not a silent primary fall-through)
751        // so the scheduler can re-place the workload on a capable peer.
752        let (rt, _calls) = make_composite(false);
753        let id = cid("lin-svc", 0);
754        let spec = make_spec(
755            "docker.io/library/alpine:3.19",
756            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
757        );
758
759        let err = rt.create_container(&id, &spec).await.unwrap_err();
760        match err {
761            AgentError::RouteToPeer {
762                service,
763                required_os,
764                reason,
765            } => {
766                assert_eq!(service, "lin-svc");
767                assert_eq!(required_os, "linux");
768                assert!(
769                    reason.contains("--install-wsl") && reason.contains("Linux peer"),
770                    "reason must name both remediations, got: {reason}"
771                );
772            }
773            other => panic!("expected RouteToPeer, got {other:?}"),
774        }
775    }
776
777    #[tokio::test]
778    async fn dispatch_linux_image_cache_without_delegate_routes_to_peer() {
779        // H-7 policy: even when `spec.platform` is unset, a Linux image in the
780        // OS cache must route to a peer instead of falling through to primary.
781        // This is the old permissive-fallthrough path the comment at lines
782        // 172-178 used to describe; the behavior is now strict.
783        let (rt, _calls) = make_composite(false);
784        let id = cid("svc", 0);
785        let image = "docker.io/library/nginx:1.25";
786        rt.record_image_os(image, OsKind::Linux).await;
787
788        let spec = make_spec(image, None);
789        let err = rt.create_container(&id, &spec).await.unwrap_err();
790        match err {
791            AgentError::RouteToPeer {
792                service,
793                required_os,
794                reason,
795            } => {
796                assert_eq!(service, "svc");
797                assert_eq!(required_os, "linux");
798                assert!(
799                    reason.contains(image),
800                    "reason should mention the image name, got: {reason}"
801                );
802                assert!(
803                    reason.contains("--install-wsl") && reason.contains("Linux peer"),
804                    "reason must name both remediations, got: {reason}"
805                );
806            }
807            other => panic!("expected RouteToPeer, got {other:?}"),
808        }
809    }
810
811    #[tokio::test]
812    async fn dispatch_macos_spec_goes_to_primary() {
813        let (rt, calls) = make_composite(true);
814        let id = cid("mac-svc", 0);
815        let spec = make_spec(
816            "ghcr.io/zlayer/macos:latest",
817            Some(TargetPlatform::new(OsKind::Macos, ArchKind::Arm64)),
818        );
819
820        rt.create_container(&id, &spec).await.unwrap();
821
822        let calls = calls.lock().unwrap();
823        assert_eq!(
824            role_for(&calls, "create_container"),
825            Some(Role::Primary),
826            "create_container should hit primary for Macos spec"
827        );
828    }
829
830    #[tokio::test]
831    async fn dispatch_no_platform_no_image_os_falls_through_to_primary() {
832        let (rt, calls) = make_composite(true);
833        let id = cid("svc", 0);
834        let spec = make_spec("docker.io/library/nginx:1.25", None);
835
836        rt.create_container(&id, &spec).await.unwrap();
837
838        let calls = calls.lock().unwrap();
839        assert_eq!(
840            role_for(&calls, "create_container"),
841            Some(Role::Primary),
842            "fall-through should pick primary when both platform and image-OS cache are unknown"
843        );
844    }
845
846    #[tokio::test]
847    async fn dispatch_uses_image_os_cache_when_platform_missing() {
848        let (rt, calls) = make_composite(true);
849        let id = cid("svc", 0);
850        let image = "docker.io/library/nginx:1.25";
851        rt.record_image_os(image, OsKind::Linux).await;
852
853        let spec = make_spec(image, None);
854        rt.create_container(&id, &spec).await.unwrap();
855
856        let calls = calls.lock().unwrap();
857        assert_eq!(
858            role_for(&calls, "create_container"),
859            Some(Role::Delegate),
860            "image-OS cache should route Linux images to the delegate"
861        );
862    }
863
864    #[tokio::test]
865    async fn per_container_dispatch_cache_persists_through_start_stop() {
866        let (rt, calls) = make_composite(true);
867        let id = cid("win-svc", 0);
868        let spec = make_spec(
869            "mcr.microsoft.com/windows/nanoserver:ltsc2022",
870            Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
871        );
872
873        rt.create_container(&id, &spec).await.unwrap();
874        rt.start_container(&id).await.unwrap();
875        rt.stop_container(&id, Duration::from_secs(1))
876            .await
877            .unwrap();
878        rt.remove_container(&id).await.unwrap();
879
880        let recorded = calls.lock().unwrap().clone();
881        for method in [
882            "create_container",
883            "start_container",
884            "stop_container",
885            "remove_container",
886        ] {
887            assert_eq!(
888                role_for(&recorded, method),
889                Some(Role::Primary),
890                "{method} should have dispatched to primary"
891            );
892        }
893
894        // After remove, the dispatch cache entry should be gone.
895        let after = rt
896            .start_container(&id)
897            .await
898            .expect_err("lookup after remove should fail");
899        assert!(
900            matches!(after, AgentError::NotFound { .. }),
901            "expected NotFound after remove, got {after:?}"
902        );
903    }
904
905    #[tokio::test]
906    async fn pull_image_calls_both_runtimes() {
907        let (rt, calls) = make_composite(true);
908        rt.pull_image("docker.io/library/alpine:3.19")
909            .await
910            .unwrap();
911
912        let recorded = calls.lock().unwrap();
913        let pull_calls: Vec<Role> = recorded
914            .iter()
915            .filter(|(_, m, _)| m == "pull_image")
916            .map(|(r, _, _)| *r)
917            .collect();
918        assert!(
919            pull_calls.contains(&Role::Primary),
920            "primary should have been pulled: {pull_calls:?}",
921        );
922        assert!(
923            pull_calls.contains(&Role::Delegate),
924            "delegate should have been pulled: {pull_calls:?}",
925        );
926    }
927
928    #[tokio::test]
929    async fn pull_image_delegate_error_does_not_fail() {
930        // Build the composite by hand so we can flip the delegate's
931        // pull_image_error before wrapping it in an Arc<dyn Runtime>.
932        let calls = Arc::new(StdMutex::new(Vec::new()));
933        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
934        let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
935        delegate.pull_image_error = Some("simulated delegate pull failure".to_string());
936        let rt = CompositeRuntime::new(
937            primary as Arc<dyn Runtime>,
938            Some(Arc::new(delegate) as Arc<dyn Runtime>),
939        );
940
941        // Top-level call must succeed despite the delegate error.
942        rt.pull_image("docker.io/library/alpine:3.19")
943            .await
944            .unwrap();
945
946        let recorded = calls.lock().unwrap();
947        let pull_calls: Vec<Role> = recorded
948            .iter()
949            .filter(|(_, m, _)| m == "pull_image")
950            .map(|(r, _, _)| *r)
951            .collect();
952        assert!(
953            pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
954            "both runtimes should have been called: {pull_calls:?}",
955        );
956    }
957
958    #[tokio::test]
959    async fn list_images_merges_both() {
960        // Hand-build so we can seed each mock's list_images_response.
961        let calls = Arc::new(StdMutex::new(Vec::new()));
962        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
963        primary.list_images_response = vec![ImageInfo {
964            reference: "primary/image:1".to_string(),
965            digest: None,
966            size_bytes: None,
967        }];
968        let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
969        delegate.list_images_response = vec![ImageInfo {
970            reference: "delegate/image:1".to_string(),
971            digest: None,
972            size_bytes: None,
973        }];
974        let rt = CompositeRuntime::new(
975            Arc::new(primary) as Arc<dyn Runtime>,
976            Some(Arc::new(delegate) as Arc<dyn Runtime>),
977        );
978
979        let merged = rt.list_images().await.unwrap();
980        let refs: Vec<&str> = merged.iter().map(|i| i.reference.as_str()).collect();
981        assert!(
982            refs.contains(&"primary/image:1") && refs.contains(&"delegate/image:1"),
983            "merged list should contain both entries, got {refs:?}",
984        );
985    }
986
987    #[tokio::test]
988    async fn dispatch_lookup_unknown_container_errors() {
989        let (rt, _calls) = make_composite(true);
990        let id = cid("ghost", 0);
991
992        let err = rt.start_container(&id).await.unwrap_err();
993        assert!(
994            matches!(err, AgentError::NotFound { .. }),
995            "expected NotFound for unknown container, got {err:?}"
996        );
997    }
998
999    /// Helper: read the internal image-OS cache for test assertions.
1000    async fn cached_os(rt: &CompositeRuntime, image: &str) -> Option<OsKind> {
1001        rt.image_os.read().await.get(image).copied()
1002    }
1003
1004    #[tokio::test]
1005    async fn apply_image_os_inspection_populates_cache_on_ok_some() {
1006        // Contract: when `fetch_image_os` resolves to a recognized OS, the
1007        // cache is populated so subsequent `select_for` calls for specs
1008        // without `platform` dispatch correctly.
1009        let (rt, _calls) = make_composite(true);
1010        let image = "docker.io/library/alpine:3.19";
1011
1012        rt.apply_image_os_inspection(image, Ok(Some(OsKind::Linux)))
1013            .await;
1014
1015        assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
1016    }
1017
1018    #[tokio::test]
1019    async fn apply_image_os_inspection_leaves_cache_untouched_on_ok_none() {
1020        // Contract: when the manifest carries no (or an unrecognized) `os`
1021        // field the cache is left alone. Dispatch will fall through to the
1022        // primary on `create_container`.
1023        let (rt, _calls) = make_composite(true);
1024        let image = "docker.io/library/nginx:1.25";
1025
1026        rt.apply_image_os_inspection(image, Ok(None)).await;
1027
1028        assert_eq!(cached_os(&rt, image).await, None);
1029    }
1030
1031    #[tokio::test]
1032    async fn apply_image_os_inspection_leaves_cache_untouched_on_err() {
1033        // Contract: a registry error during inspection is non-fatal and must
1034        // not poison the cache. Dispatch falls through to primary on lookup.
1035        let (rt, _calls) = make_composite(true);
1036        let image = "docker.io/library/nginx:1.25";
1037
1038        // Pre-seed the cache so we can assert the error path doesn't
1039        // overwrite or clear an existing entry.
1040        rt.record_image_os(image, OsKind::Linux).await;
1041
1042        let err = zlayer_registry::RegistryError::NotFound {
1043            registry: "docker.io".to_string(),
1044            image: image.to_string(),
1045        };
1046        rt.apply_image_os_inspection(image, Err(err)).await;
1047
1048        // Cache is still whatever it was before the failed inspection.
1049        assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
1050    }
1051
1052    #[tokio::test]
1053    async fn pull_image_inspection_failure_does_not_fail_pull() {
1054        // End-to-end: even when the registry fetch fails (inevitable for the
1055        // synthetic image refs used in unit tests), `pull_image` still
1056        // returns `Ok`. The mock primary/delegate both succeed; the
1057        // inspection step logs and moves on. The cache must remain empty
1058        // because there was no successful inspection to record.
1059        let (rt, _calls) = make_composite(true);
1060        let image = "invalid.example.invalid/ghost:v1";
1061
1062        rt.pull_image(image).await.unwrap();
1063
1064        assert_eq!(
1065            cached_os(&rt, image).await,
1066            None,
1067            "failed inspection must not populate the image-OS cache"
1068        );
1069    }
1070
1071    #[tokio::test]
1072    async fn pull_image_with_policy_inspection_failure_does_not_fail_pull() {
1073        // Same contract as `pull_image_inspection_failure_does_not_fail_pull`
1074        // but exercising the policy-aware entry point.
1075        let (rt, _calls) = make_composite(true);
1076        let image = "invalid.example.invalid/ghost:v1";
1077
1078        rt.pull_image_with_policy(image, PullPolicy::IfNotPresent, None)
1079            .await
1080            .unwrap();
1081
1082        assert_eq!(cached_os(&rt, image).await, None);
1083    }
1084
1085    #[test]
1086    fn os_kind_from_oci_str_roundtrip() {
1087        // Guards the `as_oci_str` ↔ `from_oci_str` relationship used by the
1088        // inspection path. If a new variant is added to `OsKind` without
1089        // updating `from_oci_str` we want the miss here, not a silent
1090        // "dispatch to primary" regression in production.
1091        for os in [OsKind::Linux, OsKind::Windows, OsKind::Macos] {
1092            assert_eq!(OsKind::from_oci_str(os.as_oci_str()), Some(os));
1093        }
1094        assert_eq!(OsKind::from_oci_str(""), None);
1095        assert_eq!(OsKind::from_oci_str("freebsd"), None);
1096    }
1097}