Skip to main content

zlayer_agent/runtimes/
composite.rs

1//! Composite runtime that dispatches per-container to a primary + optional delegate.
2//!
3//! The [`CompositeRuntime`] owns a *primary* runtime (the node-native runtime —
4//! e.g. `HcsRuntime` on Windows, `YoukiRuntime` on Linux, Docker elsewhere) and
5//! an optional *delegate* runtime used for foreign-OS workloads (e.g. a WSL2
6//! delegate on Windows that runs Linux containers). Each call is routed based
7//! on the container's identity:
8//!
9//! * **[`Runtime::create_container`]** consults
10//!   [`ServiceSpec::platform`](zlayer_spec::ServiceSpec) first; when the
11//!   spec's OS targets the delegate we route there, otherwise primary. When
12//!   `platform` is `None`, a secondary **image-OS cache** (populated by
13//!   [`CompositeRuntime::record_image_os`] from OCI manifest inspection at
14//!   pull time) is consulted. If both are unknown we fall through to the
15//!   primary. **Strict policy (H-7):** if either source identifies the
16//!   workload as Linux and this node has no delegate configured, dispatch
17//!   returns [`AgentError::RouteToPeer`] so the scheduler can re-place the
18//!   workload on a Linux peer — the old permissive "fall through to primary"
19//!   behavior is gone.
20//! * All subsequent per-container operations (start/stop/remove/logs/exec/…)
21//!   look up the container in an internal **dispatch cache** that records
22//!   which runtime created it. This guarantees the same runtime sees the
23//!   container for its whole lifecycle, even after daemon restarts within
24//!   the same process.
25//! * Cross-cutting image operations (`pull_image`, `pull_image_with_policy`,
26//!   `list_images`, `prune_images`) fan out to both runtimes — we cannot know
27//!   in advance which runtime will execute a pulled image, and merged image
28//!   listings give users a single coherent view. `remove_image` / `tag_image`
29//!   try primary first and fall back to delegate.
30//!
31//! The dispatch cache is populated on `create_container` and cleared on
32//! `remove_container`. Looking up an unknown id yields
33//! [`AgentError::NotFound`], which surfaces as a clean 404 at the API layer
34//! rather than silently forwarding to the wrong runtime.
35
36use std::collections::HashMap;
37use std::net::IpAddr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use async_trait::async_trait;
42use tokio::sync::RwLock;
43use zlayer_observability::logs::LogEntry;
44use zlayer_spec::{OsKind, PullPolicy, RegistryAuth, ServiceSpec};
45
46use crate::cgroups_stats::ContainerStats;
47use crate::error::{AgentError, Result};
48use crate::runtime::{
49    ContainerId, ContainerInspectDetails, ContainerState, ExecEventStream, ImageInfo, PruneResult,
50    Runtime, WaitCondition, WaitOutcome,
51};
52
53/// Which underlying runtime a given container was dispatched to.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55enum DispatchTarget {
56    Primary,
57    Delegate,
58}
59
60/// Routes each container to either the primary runtime or an optional delegate.
61///
62/// See the module-level documentation for the dispatch rules.
63pub struct CompositeRuntime {
64    primary: Arc<dyn Runtime>,
65    delegate: Option<Arc<dyn Runtime>>,
66    /// Per-container dispatch cache. Populated on `create_container`, removed
67    /// on `remove_container`.
68    dispatch: Arc<RwLock<HashMap<ContainerId, DispatchTarget>>>,
69    /// Image-OS cache consulted when a spec has no explicit `platform`.
70    /// Populated by [`CompositeRuntime::record_image_os`], which is driven
71    /// from [`zlayer_registry::fetch_image_os`] during `pull_image*`.
72    image_os: Arc<RwLock<HashMap<String, OsKind>>>,
73}
74
75impl CompositeRuntime {
76    /// Construct a new composite runtime.
77    ///
78    /// `primary` handles containers whose platform matches the host node.
79    /// `delegate`, when present, handles foreign-OS containers (currently:
80    /// Linux containers on a Windows host via the WSL2 delegate runtime).
81    #[must_use]
82    pub fn new(primary: Arc<dyn Runtime>, delegate: Option<Arc<dyn Runtime>>) -> Self {
83        Self {
84            primary,
85            delegate,
86            dispatch: Arc::new(RwLock::new(HashMap::new())),
87            image_os: Arc::new(RwLock::new(HashMap::new())),
88        }
89    }
90
91    /// Access the primary runtime (for introspection / tests).
92    #[must_use]
93    pub fn primary(&self) -> &Arc<dyn Runtime> {
94        &self.primary
95    }
96
97    /// Access the delegate runtime, if one is configured.
98    #[must_use]
99    pub fn delegate(&self) -> Option<&Arc<dyn Runtime>> {
100        self.delegate.as_ref()
101    }
102
103    /// Record that `image` is known to target operating system `os`.
104    ///
105    /// Wired from [`zlayer_registry::fetch_image_os`] during `pull_image*`
106    /// (see [`CompositeRuntime::apply_image_os_inspection`]) so that specs
107    /// without an explicit `platform` still dispatch correctly.
108    pub(crate) async fn record_image_os(&self, image: &str, os: OsKind) {
109        self.image_os.write().await.insert(image.to_string(), os);
110    }
111
112    /// Apply the result of a manifest OS inspection to the image-OS cache.
113    ///
114    /// Factored out of [`Runtime::pull_image`] and
115    /// [`Runtime::pull_image_with_policy`] so the cache-update policy can be
116    /// unit-tested without depending on a live registry. The three branches
117    /// mirror the contract of [`zlayer_registry::fetch_image_os`]:
118    ///
119    /// * `Ok(Some(os))` — populate the cache so future `create_container`
120    ///   calls without an explicit `spec.platform` dispatch to the right
121    ///   runtime.
122    /// * `Ok(None)` — the config blob had no (or an unrecognized) `os`
123    ///   field. Leave the cache untouched; dispatch falls through to primary.
124    /// * `Err(_)` — transient or registry error. Log at warn and leave the
125    ///   cache untouched. We never fail the overall `pull_image*` call on
126    ///   inspection failure: the primary runtime's own pull already
127    ///   succeeded, and the safe fall-through is "primary".
128    async fn apply_image_os_inspection(
129        &self,
130        image: &str,
131        result: std::result::Result<Option<OsKind>, zlayer_registry::RegistryError>,
132    ) {
133        match result {
134            Ok(Some(os)) => {
135                self.record_image_os(image, os).await;
136                tracing::debug!(image, ?os, "cached image OS for dispatch");
137            }
138            Ok(None) => {
139                tracing::trace!(
140                    image,
141                    "image manifest has no OS field — dispatch will fall through to primary",
142                );
143            }
144            Err(e) => {
145                tracing::warn!(
146                    image,
147                    error = %e,
148                    "failed to inspect image manifest OS — dispatch will fall through to primary",
149                );
150            }
151        }
152    }
153
154    /// Decide which runtime should handle a `create_container` call for `spec`.
155    ///
156    /// The `service` argument is the originating service name, used to build an
157    /// actionable [`AgentError::RouteToPeer`] when a Linux workload lands on
158    /// this node without a local delegate so the scheduler can re-place it on
159    /// a capable peer.
160    ///
161    /// Policy (H-7): Linux workloads are never silently routed to the primary
162    /// on nodes without a delegate. The old "permissive" fall-through (primary
163    /// handles everything) returned an `Unsupported` error only when
164    /// `spec.platform` was explicitly set, but fell through to primary for
165    /// specs without a platform — producing cryptic downstream errors when the
166    /// image-OS cache said `Linux`. We now return `RouteToPeer` in both cases.
167    async fn select_for(&self, service: &str, spec: &ServiceSpec) -> Result<DispatchTarget> {
168        if let Some(platform) = &spec.platform {
169            let target = match platform.os {
170                OsKind::Windows | OsKind::Macos => DispatchTarget::Primary,
171                OsKind::Linux => DispatchTarget::Delegate,
172            };
173            if matches!(target, DispatchTarget::Delegate) && self.delegate.is_none() {
174                return Err(AgentError::RouteToPeer {
175                    service: service.to_string(),
176                    required_os: OsKind::Linux.as_oci_str().to_string(),
177                    reason: "spec.platform.os = linux but this node has no WSL2 delegate \
178                            configured; enable `--install-wsl yes` on this node or add a Linux \
179                            peer to the cluster"
180                        .to_string(),
181                });
182            }
183            return Ok(target);
184        }
185
186        if let Some(os) = self
187            .image_os
188            .read()
189            .await
190            .get(&spec.image.name.to_string())
191            .copied()
192        {
193            return match os {
194                OsKind::Linux => {
195                    if self.delegate.is_some() {
196                        Ok(DispatchTarget::Delegate)
197                    } else {
198                        // No delegate and the image manifest says Linux —
199                        // refuse at the composite layer so the scheduler can
200                        // re-place on a Linux peer instead of the primary
201                        // failing with a cryptic HCS error.
202                        Err(AgentError::RouteToPeer {
203                            service: service.to_string(),
204                            required_os: OsKind::Linux.as_oci_str().to_string(),
205                            reason: format!(
206                                "image '{}' manifest reports os=linux but this node has no WSL2 \
207                                 delegate configured; enable `--install-wsl yes` on this node or \
208                                 add a Linux peer to the cluster",
209                                spec.image.name
210                            ),
211                        })
212                    }
213                }
214                OsKind::Windows | OsKind::Macos => Ok(DispatchTarget::Primary),
215            };
216        }
217
218        Ok(DispatchTarget::Primary)
219    }
220
221    /// Look up an existing dispatch decision for `id`, or return `NotFound`.
222    async fn lookup(&self, id: &ContainerId) -> Result<Arc<dyn Runtime>> {
223        let target =
224            self.dispatch
225                .read()
226                .await
227                .get(id)
228                .copied()
229                .ok_or_else(|| AgentError::NotFound {
230                    container: id.to_string(),
231                    reason: "no dispatch record in CompositeRuntime".to_string(),
232                })?;
233        Ok(self.runtime_for(target).clone())
234    }
235
236    /// Resolve a [`DispatchTarget`] to the concrete runtime reference.
237    ///
238    /// Unwrapping the delegate is safe because [`Self::select_for`] returns
239    /// `Err` whenever a delegate would be required but is missing, so a
240    /// `DispatchTarget::Delegate` can never end up in the dispatch map
241    /// without a delegate being present.
242    fn runtime_for(&self, t: DispatchTarget) -> &Arc<dyn Runtime> {
243        match t {
244            DispatchTarget::Primary => &self.primary,
245            DispatchTarget::Delegate => self
246                .delegate
247                .as_ref()
248                .expect("delegate target requires delegate to exist"),
249        }
250    }
251}
252
253#[async_trait]
254impl Runtime for CompositeRuntime {
255    async fn pull_image(&self, image: &str) -> Result<()> {
256        self.primary.pull_image(image).await?;
257        if let Some(delegate) = &self.delegate {
258            if let Err(e) = delegate.pull_image(image).await {
259                // Foreign-OS images will reliably fail one of the two pulls
260                // (primary can't store a Linux image's config on Windows, or
261                // vice versa). That's expected — the successful side owns the
262                // layers we'll actually use — so we keep this at debug.
263                tracing::debug!(
264                    image,
265                    error = %e,
266                    "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
267                );
268            }
269        }
270
271        // Inspect the OCI manifest's `config.os` so `select_for(spec)` can
272        // dispatch correctly when `spec.platform` is `None`. Non-fatal: any
273        // failure here just means dispatch falls through to primary.
274        let os_result = zlayer_registry::fetch_image_os(image, None).await;
275        self.apply_image_os_inspection(image, os_result).await;
276
277        Ok(())
278    }
279
280    async fn pull_image_with_policy(
281        &self,
282        image: &str,
283        policy: PullPolicy,
284        auth: Option<&RegistryAuth>,
285    ) -> Result<()> {
286        self.primary
287            .pull_image_with_policy(image, policy, auth)
288            .await?;
289        if let Some(delegate) = &self.delegate {
290            if let Err(e) = delegate.pull_image_with_policy(image, policy, auth).await {
291                tracing::debug!(
292                    image,
293                    error = %e,
294                    "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
295                );
296            }
297        }
298
299        let os_result = zlayer_registry::fetch_image_os(image, auth).await;
300        self.apply_image_os_inspection(image, os_result).await;
301
302        Ok(())
303    }
304
305    async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
306        let target = self.select_for(&id.service, spec).await?;
307        {
308            let mut dispatch = self.dispatch.write().await;
309            dispatch.insert(id.clone(), target);
310        }
311        let rt = self.runtime_for(target).clone();
312        match rt.create_container(id, spec).await {
313            Ok(()) => Ok(()),
314            Err(e) => {
315                // Roll back the cache insert on failure so subsequent lookups
316                // don't find a dangling entry.
317                self.dispatch.write().await.remove(id);
318                Err(e)
319            }
320        }
321    }
322
323    async fn start_container(&self, id: &ContainerId) -> Result<()> {
324        let rt = self.lookup(id).await?;
325        rt.start_container(id).await
326    }
327
328    async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
329        let rt = self.lookup(id).await?;
330        rt.stop_container(id, timeout).await
331    }
332
333    async fn remove_container(&self, id: &ContainerId) -> Result<()> {
334        let rt = self.lookup(id).await?;
335        let res = rt.remove_container(id).await;
336        self.dispatch.write().await.remove(id);
337        res
338    }
339
340    async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
341        let rt = self.lookup(id).await?;
342        rt.container_state(id).await
343    }
344
345    async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
346        let rt = self.lookup(id).await?;
347        rt.container_logs(id, tail).await
348    }
349
350    async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
351        let rt = self.lookup(id).await?;
352        rt.exec(id, cmd).await
353    }
354
355    async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
356        let rt = self.lookup(id).await?;
357        rt.exec_stream(id, cmd).await
358    }
359
360    async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
361        let rt = self.lookup(id).await?;
362        rt.get_container_stats(id).await
363    }
364
365    async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
366        let rt = self.lookup(id).await?;
367        rt.wait_container(id).await
368    }
369
370    async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
371        let rt = self.lookup(id).await?;
372        rt.wait_outcome(id).await
373    }
374
375    async fn wait_outcome_with_condition(
376        &self,
377        id: &ContainerId,
378        condition: WaitCondition,
379    ) -> Result<WaitOutcome> {
380        let rt = self.lookup(id).await?;
381        rt.wait_outcome_with_condition(id, condition).await
382    }
383
384    async fn rename_container(&self, id: &ContainerId, new_name: &str) -> Result<()> {
385        let rt = self.lookup(id).await?;
386        rt.rename_container(id, new_name).await
387    }
388
389    async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
390        let rt = self.lookup(id).await?;
391        rt.get_logs(id).await
392    }
393
394    async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
395        let rt = self.lookup(id).await?;
396        rt.get_container_pid(id).await
397    }
398
399    async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
400        let rt = self.lookup(id).await?;
401        rt.get_container_ip(id).await
402    }
403
404    async fn get_container_port_override(&self, id: &ContainerId) -> Result<Option<u16>> {
405        let rt = self.lookup(id).await?;
406        rt.get_container_port_override(id).await
407    }
408
409    #[cfg(target_os = "windows")]
410    async fn get_container_namespace_id(
411        &self,
412        id: &ContainerId,
413    ) -> Result<Option<windows::core::GUID>> {
414        let rt = self.lookup(id).await?;
415        rt.get_container_namespace_id(id).await
416    }
417
418    async fn sync_container_volumes(&self, id: &ContainerId) -> Result<()> {
419        let rt = self.lookup(id).await?;
420        rt.sync_container_volumes(id).await
421    }
422
423    async fn list_images(&self) -> Result<Vec<ImageInfo>> {
424        let mut out = self.primary.list_images().await?;
425        if let Some(delegate) = &self.delegate {
426            match delegate.list_images().await {
427                Ok(extra) => out.extend(extra),
428                Err(e) => tracing::warn!(
429                    error = %e,
430                    "delegate runtime list_images failed; returning primary results only",
431                ),
432            }
433        }
434        Ok(out)
435    }
436
437    async fn remove_image(&self, image: &str, force: bool) -> Result<()> {
438        match self.primary.remove_image(image, force).await {
439            Ok(()) => Ok(()),
440            Err(primary_err) => {
441                if let Some(delegate) = &self.delegate {
442                    match delegate.remove_image(image, force).await {
443                        Ok(()) => Ok(()),
444                        Err(delegate_err) => {
445                            tracing::debug!(
446                                image,
447                                %delegate_err,
448                                "delegate remove_image also failed; returning primary error",
449                            );
450                            Err(primary_err)
451                        }
452                    }
453                } else {
454                    Err(primary_err)
455                }
456            }
457        }
458    }
459
460    async fn prune_images(&self) -> Result<PruneResult> {
461        let mut result = self.primary.prune_images().await?;
462        if let Some(delegate) = &self.delegate {
463            match delegate.prune_images().await {
464                Ok(extra) => {
465                    result.deleted.extend(extra.deleted);
466                    result.space_reclaimed =
467                        result.space_reclaimed.saturating_add(extra.space_reclaimed);
468                }
469                Err(e) => tracing::warn!(
470                    error = %e,
471                    "delegate runtime prune_images failed; returning primary result only",
472                ),
473            }
474        }
475        Ok(result)
476    }
477
478    async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
479        let rt = self.lookup(id).await?;
480        rt.kill_container(id, signal).await
481    }
482
483    async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
484        match self.primary.tag_image(source, target).await {
485            Ok(()) => Ok(()),
486            Err(primary_err) => {
487                if let Some(delegate) = &self.delegate {
488                    match delegate.tag_image(source, target).await {
489                        Ok(()) => Ok(()),
490                        Err(delegate_err) => {
491                            tracing::debug!(
492                                source,
493                                target,
494                                %delegate_err,
495                                "delegate tag_image also failed; returning primary error",
496                            );
497                            Err(primary_err)
498                        }
499                    }
500                } else {
501                    Err(primary_err)
502                }
503            }
504        }
505    }
506
507    async fn inspect_detailed(&self, id: &ContainerId) -> Result<ContainerInspectDetails> {
508        let rt = self.lookup(id).await?;
509        rt.inspect_detailed(id).await
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use super::*;
516    use crate::cgroups_stats::ContainerStats;
517    use std::sync::Mutex as StdMutex;
518    use zlayer_spec::{ArchKind, DeploymentSpec, TargetPlatform};
519
520    /// Which runtime a mock represents. Only used for labelling invocation
521    /// records in tests.
522    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
523    enum Role {
524        Primary,
525        Delegate,
526    }
527
528    /// One recorded invocation: (runtime role, method name, container id).
529    type CallRecord = (Role, String, Option<ContainerId>);
530    /// Shared, thread-safe log of every mock call made in a single test.
531    type CallLog = Arc<StdMutex<Vec<CallRecord>>>;
532
533    /// Mock runtime that records every method call it receives.
534    ///
535    /// This is intentionally minimal — just enough trait surface to exercise
536    /// the composite's dispatch logic. Every recorded call includes the role
537    /// (primary vs delegate), the method name, and the container id (or
538    /// `None` for cross-cutting image operations).
539    struct MockRuntime {
540        role: Role,
541        calls: CallLog,
542        list_images_response: Vec<ImageInfo>,
543        pull_image_error: Option<String>,
544    }
545
546    impl MockRuntime {
547        fn new(role: Role, calls: CallLog) -> Self {
548            Self {
549                role,
550                calls,
551                list_images_response: Vec::new(),
552                pull_image_error: None,
553            }
554        }
555
556        fn record(&self, method: &str, id: Option<&ContainerId>) {
557            self.calls
558                .lock()
559                .expect("mock call-log mutex poisoned")
560                .push((self.role, method.to_string(), id.cloned()));
561        }
562    }
563
564    #[async_trait]
565    impl Runtime for MockRuntime {
566        async fn pull_image(&self, _image: &str) -> Result<()> {
567            self.record("pull_image", None);
568            if let Some(msg) = &self.pull_image_error {
569                return Err(AgentError::Internal(msg.clone()));
570            }
571            Ok(())
572        }
573
574        async fn pull_image_with_policy(
575            &self,
576            _image: &str,
577            _policy: PullPolicy,
578            _auth: Option<&RegistryAuth>,
579        ) -> Result<()> {
580            self.record("pull_image_with_policy", None);
581            Ok(())
582        }
583
584        async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
585            self.record("create_container", Some(id));
586            Ok(())
587        }
588
589        async fn start_container(&self, id: &ContainerId) -> Result<()> {
590            self.record("start_container", Some(id));
591            Ok(())
592        }
593
594        async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
595            self.record("stop_container", Some(id));
596            Ok(())
597        }
598
599        async fn remove_container(&self, id: &ContainerId) -> Result<()> {
600            self.record("remove_container", Some(id));
601            Ok(())
602        }
603
604        async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
605            self.record("container_state", Some(id));
606            Ok(ContainerState::Running)
607        }
608
609        async fn container_logs(&self, id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
610            self.record("container_logs", Some(id));
611            Ok(Vec::new())
612        }
613
614        async fn exec(&self, id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
615            self.record("exec", Some(id));
616            Ok((0, String::new(), String::new()))
617        }
618
619        async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
620            self.record("get_container_stats", Some(id));
621            Ok(ContainerStats {
622                cpu_usage_usec: 0,
623                memory_bytes: 0,
624                memory_limit: 0,
625                timestamp: std::time::Instant::now(),
626            })
627        }
628
629        async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
630            self.record("wait_container", Some(id));
631            Ok(0)
632        }
633
634        async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
635            self.record("get_logs", Some(id));
636            Ok(Vec::new())
637        }
638
639        async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
640            self.record("get_container_pid", Some(id));
641            Ok(None)
642        }
643
644        async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
645            self.record("get_container_ip", Some(id));
646            Ok(None)
647        }
648
649        async fn list_images(&self) -> Result<Vec<ImageInfo>> {
650            self.record("list_images", None);
651            Ok(self.list_images_response.clone())
652        }
653    }
654
655    /// Build a [`ServiceSpec`] (with the given image name) from the minimal
656    /// inline YAML the existing runtime tests use, then optionally set a
657    /// target platform on it.
658    fn make_spec(image: &str, platform: Option<TargetPlatform>) -> ServiceSpec {
659        let yaml = format!(
660            r"
661version: v1
662deployment: test
663services:
664  test:
665    rtype: service
666    image:
667      name: {image}
668    endpoints:
669      - name: http
670        protocol: http
671        port: 8080
672"
673        );
674        let mut spec = serde_yaml::from_str::<DeploymentSpec>(&yaml)
675            .expect("valid deployment yaml")
676            .services
677            .remove("test")
678            .expect("service 'test' present");
679        spec.platform = platform;
680        spec
681    }
682
683    fn cid(service: &str, replica: u32) -> ContainerId {
684        ContainerId {
685            service: service.to_string(),
686            replica,
687        }
688    }
689
690    fn make_composite(with_delegate: bool) -> (CompositeRuntime, CallLog) {
691        let calls = Arc::new(StdMutex::new(Vec::new()));
692        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
693        let delegate = if with_delegate {
694            Some(Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>)
695        } else {
696            None
697        };
698        (
699            CompositeRuntime::new(primary as Arc<dyn Runtime>, delegate),
700            calls,
701        )
702    }
703
704    fn role_for(calls: &[CallRecord], method: &str) -> Option<Role> {
705        calls
706            .iter()
707            .find(|(_, m, _)| m == method)
708            .map(|(role, _, _)| *role)
709    }
710
711    #[tokio::test]
712    async fn dispatch_windows_spec_goes_to_primary() {
713        let (rt, calls) = make_composite(true);
714        let id = cid("win-svc", 0);
715        let spec = make_spec(
716            "mcr.microsoft.com/windows/nanoserver:ltsc2022",
717            Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
718        );
719
720        rt.create_container(&id, &spec).await.unwrap();
721        rt.start_container(&id).await.unwrap();
722
723        let calls = calls.lock().unwrap();
724        assert_eq!(
725            role_for(&calls, "create_container"),
726            Some(Role::Primary),
727            "create_container should hit primary for Windows spec"
728        );
729        assert_eq!(
730            role_for(&calls, "start_container"),
731            Some(Role::Primary),
732            "start_container should hit primary for Windows spec"
733        );
734    }
735
736    #[tokio::test]
737    async fn dispatch_linux_spec_goes_to_delegate() {
738        let (rt, calls) = make_composite(true);
739        let id = cid("lin-svc", 0);
740        let spec = make_spec(
741            "docker.io/library/alpine:3.19",
742            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
743        );
744
745        rt.create_container(&id, &spec).await.unwrap();
746        rt.start_container(&id).await.unwrap();
747
748        let calls = calls.lock().unwrap();
749        assert_eq!(
750            role_for(&calls, "create_container"),
751            Some(Role::Delegate),
752            "create_container should hit delegate for Linux spec"
753        );
754        assert_eq!(
755            role_for(&calls, "start_container"),
756            Some(Role::Delegate),
757            "start_container should hit delegate for Linux spec"
758        );
759    }
760
761    #[tokio::test]
762    async fn dispatch_linux_without_delegate_errors() {
763        // H-7 policy: a Linux spec on a node without a delegate must return
764        // `RouteToPeer` (not `Unsupported`, not a silent primary fall-through)
765        // so the scheduler can re-place the workload on a capable peer.
766        let (rt, _calls) = make_composite(false);
767        let id = cid("lin-svc", 0);
768        let spec = make_spec(
769            "docker.io/library/alpine:3.19",
770            Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
771        );
772
773        let err = rt.create_container(&id, &spec).await.unwrap_err();
774        match err {
775            AgentError::RouteToPeer {
776                service,
777                required_os,
778                reason,
779            } => {
780                assert_eq!(service, "lin-svc");
781                assert_eq!(required_os, "linux");
782                assert!(
783                    reason.contains("--install-wsl") && reason.contains("Linux peer"),
784                    "reason must name both remediations, got: {reason}"
785                );
786            }
787            other => panic!("expected RouteToPeer, got {other:?}"),
788        }
789    }
790
791    #[tokio::test]
792    async fn dispatch_linux_image_cache_without_delegate_routes_to_peer() {
793        // H-7 policy: even when `spec.platform` is unset, a Linux image in the
794        // OS cache must route to a peer instead of falling through to primary.
795        // This is the old permissive-fallthrough path the comment at lines
796        // 172-178 used to describe; the behavior is now strict.
797        let (rt, _calls) = make_composite(false);
798        let id = cid("svc", 0);
799        let image = "docker.io/library/nginx:1.25";
800        rt.record_image_os(image, OsKind::Linux).await;
801
802        let spec = make_spec(image, None);
803        let err = rt.create_container(&id, &spec).await.unwrap_err();
804        match err {
805            AgentError::RouteToPeer {
806                service,
807                required_os,
808                reason,
809            } => {
810                assert_eq!(service, "svc");
811                assert_eq!(required_os, "linux");
812                assert!(
813                    reason.contains(image),
814                    "reason should mention the image name, got: {reason}"
815                );
816                assert!(
817                    reason.contains("--install-wsl") && reason.contains("Linux peer"),
818                    "reason must name both remediations, got: {reason}"
819                );
820            }
821            other => panic!("expected RouteToPeer, got {other:?}"),
822        }
823    }
824
825    #[tokio::test]
826    async fn dispatch_macos_spec_goes_to_primary() {
827        let (rt, calls) = make_composite(true);
828        let id = cid("mac-svc", 0);
829        let spec = make_spec(
830            "ghcr.io/zlayer/macos:latest",
831            Some(TargetPlatform::new(OsKind::Macos, ArchKind::Arm64)),
832        );
833
834        rt.create_container(&id, &spec).await.unwrap();
835
836        let calls = calls.lock().unwrap();
837        assert_eq!(
838            role_for(&calls, "create_container"),
839            Some(Role::Primary),
840            "create_container should hit primary for Macos spec"
841        );
842    }
843
844    #[tokio::test]
845    async fn dispatch_no_platform_no_image_os_falls_through_to_primary() {
846        let (rt, calls) = make_composite(true);
847        let id = cid("svc", 0);
848        let spec = make_spec("docker.io/library/nginx:1.25", None);
849
850        rt.create_container(&id, &spec).await.unwrap();
851
852        let calls = calls.lock().unwrap();
853        assert_eq!(
854            role_for(&calls, "create_container"),
855            Some(Role::Primary),
856            "fall-through should pick primary when both platform and image-OS cache are unknown"
857        );
858    }
859
860    #[tokio::test]
861    async fn dispatch_uses_image_os_cache_when_platform_missing() {
862        let (rt, calls) = make_composite(true);
863        let id = cid("svc", 0);
864        let image = "docker.io/library/nginx:1.25";
865        rt.record_image_os(image, OsKind::Linux).await;
866
867        let spec = make_spec(image, None);
868        rt.create_container(&id, &spec).await.unwrap();
869
870        let calls = calls.lock().unwrap();
871        assert_eq!(
872            role_for(&calls, "create_container"),
873            Some(Role::Delegate),
874            "image-OS cache should route Linux images to the delegate"
875        );
876    }
877
878    #[tokio::test]
879    async fn per_container_dispatch_cache_persists_through_start_stop() {
880        let (rt, calls) = make_composite(true);
881        let id = cid("win-svc", 0);
882        let spec = make_spec(
883            "mcr.microsoft.com/windows/nanoserver:ltsc2022",
884            Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
885        );
886
887        rt.create_container(&id, &spec).await.unwrap();
888        rt.start_container(&id).await.unwrap();
889        rt.stop_container(&id, Duration::from_secs(1))
890            .await
891            .unwrap();
892        rt.remove_container(&id).await.unwrap();
893
894        let recorded = calls.lock().unwrap().clone();
895        for method in [
896            "create_container",
897            "start_container",
898            "stop_container",
899            "remove_container",
900        ] {
901            assert_eq!(
902                role_for(&recorded, method),
903                Some(Role::Primary),
904                "{method} should have dispatched to primary"
905            );
906        }
907
908        // After remove, the dispatch cache entry should be gone.
909        let after = rt
910            .start_container(&id)
911            .await
912            .expect_err("lookup after remove should fail");
913        assert!(
914            matches!(after, AgentError::NotFound { .. }),
915            "expected NotFound after remove, got {after:?}"
916        );
917    }
918
919    #[tokio::test]
920    async fn pull_image_calls_both_runtimes() {
921        let (rt, calls) = make_composite(true);
922        rt.pull_image("docker.io/library/alpine:3.19")
923            .await
924            .unwrap();
925
926        let recorded = calls.lock().unwrap();
927        let pull_calls: Vec<Role> = recorded
928            .iter()
929            .filter(|(_, m, _)| m == "pull_image")
930            .map(|(r, _, _)| *r)
931            .collect();
932        assert!(
933            pull_calls.contains(&Role::Primary),
934            "primary should have been pulled: {pull_calls:?}",
935        );
936        assert!(
937            pull_calls.contains(&Role::Delegate),
938            "delegate should have been pulled: {pull_calls:?}",
939        );
940    }
941
942    #[tokio::test]
943    async fn pull_image_delegate_error_does_not_fail() {
944        // Build the composite by hand so we can flip the delegate's
945        // pull_image_error before wrapping it in an Arc<dyn Runtime>.
946        let calls = Arc::new(StdMutex::new(Vec::new()));
947        let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
948        let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
949        delegate.pull_image_error = Some("simulated delegate pull failure".to_string());
950        let rt = CompositeRuntime::new(
951            primary as Arc<dyn Runtime>,
952            Some(Arc::new(delegate) as Arc<dyn Runtime>),
953        );
954
955        // Top-level call must succeed despite the delegate error.
956        rt.pull_image("docker.io/library/alpine:3.19")
957            .await
958            .unwrap();
959
960        let recorded = calls.lock().unwrap();
961        let pull_calls: Vec<Role> = recorded
962            .iter()
963            .filter(|(_, m, _)| m == "pull_image")
964            .map(|(r, _, _)| *r)
965            .collect();
966        assert!(
967            pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
968            "both runtimes should have been called: {pull_calls:?}",
969        );
970    }
971
972    #[tokio::test]
973    async fn list_images_merges_both() {
974        // Hand-build so we can seed each mock's list_images_response.
975        let calls = Arc::new(StdMutex::new(Vec::new()));
976        let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
977        primary.list_images_response = vec![ImageInfo {
978            reference: "primary/image:1".to_string(),
979            digest: None,
980            size_bytes: None,
981        }];
982        let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
983        delegate.list_images_response = vec![ImageInfo {
984            reference: "delegate/image:1".to_string(),
985            digest: None,
986            size_bytes: None,
987        }];
988        let rt = CompositeRuntime::new(
989            Arc::new(primary) as Arc<dyn Runtime>,
990            Some(Arc::new(delegate) as Arc<dyn Runtime>),
991        );
992
993        let merged = rt.list_images().await.unwrap();
994        let refs: Vec<&str> = merged.iter().map(|i| i.reference.as_str()).collect();
995        assert!(
996            refs.contains(&"primary/image:1") && refs.contains(&"delegate/image:1"),
997            "merged list should contain both entries, got {refs:?}",
998        );
999    }
1000
1001    #[tokio::test]
1002    async fn dispatch_lookup_unknown_container_errors() {
1003        let (rt, _calls) = make_composite(true);
1004        let id = cid("ghost", 0);
1005
1006        let err = rt.start_container(&id).await.unwrap_err();
1007        assert!(
1008            matches!(err, AgentError::NotFound { .. }),
1009            "expected NotFound for unknown container, got {err:?}"
1010        );
1011    }
1012
1013    /// Helper: read the internal image-OS cache for test assertions.
1014    async fn cached_os(rt: &CompositeRuntime, image: &str) -> Option<OsKind> {
1015        rt.image_os.read().await.get(image).copied()
1016    }
1017
1018    #[tokio::test]
1019    async fn apply_image_os_inspection_populates_cache_on_ok_some() {
1020        // Contract: when `fetch_image_os` resolves to a recognized OS, the
1021        // cache is populated so subsequent `select_for` calls for specs
1022        // without `platform` dispatch correctly.
1023        let (rt, _calls) = make_composite(true);
1024        let image = "docker.io/library/alpine:3.19";
1025
1026        rt.apply_image_os_inspection(image, Ok(Some(OsKind::Linux)))
1027            .await;
1028
1029        assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
1030    }
1031
1032    #[tokio::test]
1033    async fn apply_image_os_inspection_leaves_cache_untouched_on_ok_none() {
1034        // Contract: when the manifest carries no (or an unrecognized) `os`
1035        // field the cache is left alone. Dispatch will fall through to the
1036        // primary on `create_container`.
1037        let (rt, _calls) = make_composite(true);
1038        let image = "docker.io/library/nginx:1.25";
1039
1040        rt.apply_image_os_inspection(image, Ok(None)).await;
1041
1042        assert_eq!(cached_os(&rt, image).await, None);
1043    }
1044
1045    #[tokio::test]
1046    async fn apply_image_os_inspection_leaves_cache_untouched_on_err() {
1047        // Contract: a registry error during inspection is non-fatal and must
1048        // not poison the cache. Dispatch falls through to primary on lookup.
1049        let (rt, _calls) = make_composite(true);
1050        let image = "docker.io/library/nginx:1.25";
1051
1052        // Pre-seed the cache so we can assert the error path doesn't
1053        // overwrite or clear an existing entry.
1054        rt.record_image_os(image, OsKind::Linux).await;
1055
1056        let err = zlayer_registry::RegistryError::NotFound {
1057            registry: "docker.io".to_string(),
1058            image: image.to_string(),
1059        };
1060        rt.apply_image_os_inspection(image, Err(err)).await;
1061
1062        // Cache is still whatever it was before the failed inspection.
1063        assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
1064    }
1065
1066    #[tokio::test]
1067    async fn pull_image_inspection_failure_does_not_fail_pull() {
1068        // End-to-end: even when the registry fetch fails (inevitable for the
1069        // synthetic image refs used in unit tests), `pull_image` still
1070        // returns `Ok`. The mock primary/delegate both succeed; the
1071        // inspection step logs and moves on. The cache must remain empty
1072        // because there was no successful inspection to record.
1073        let (rt, _calls) = make_composite(true);
1074        let image = "invalid.example.invalid/ghost:v1";
1075
1076        rt.pull_image(image).await.unwrap();
1077
1078        assert_eq!(
1079            cached_os(&rt, image).await,
1080            None,
1081            "failed inspection must not populate the image-OS cache"
1082        );
1083    }
1084
1085    #[tokio::test]
1086    async fn pull_image_with_policy_inspection_failure_does_not_fail_pull() {
1087        // Same contract as `pull_image_inspection_failure_does_not_fail_pull`
1088        // but exercising the policy-aware entry point.
1089        let (rt, _calls) = make_composite(true);
1090        let image = "invalid.example.invalid/ghost:v1";
1091
1092        rt.pull_image_with_policy(image, PullPolicy::IfNotPresent, None)
1093            .await
1094            .unwrap();
1095
1096        assert_eq!(cached_os(&rt, image).await, None);
1097    }
1098
1099    #[test]
1100    fn os_kind_from_oci_str_roundtrip() {
1101        // Guards the `as_oci_str` ↔ `from_oci_str` relationship used by the
1102        // inspection path. If a new variant is added to `OsKind` without
1103        // updating `from_oci_str` we want the miss here, not a silent
1104        // "dispatch to primary" regression in production.
1105        for os in [OsKind::Linux, OsKind::Windows, OsKind::Macos] {
1106            assert_eq!(OsKind::from_oci_str(os.as_oci_str()), Some(os));
1107        }
1108        assert_eq!(OsKind::from_oci_str(""), None);
1109        assert_eq!(OsKind::from_oci_str("freebsd"), None);
1110    }
1111}