1use std::collections::HashMap;
37use std::net::IpAddr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use async_trait::async_trait;
42use tokio::sync::RwLock;
43use zlayer_observability::logs::LogEntry;
44use zlayer_spec::{OsKind, PullPolicy, RegistryAuth, ServiceSpec};
45
46use crate::cgroups_stats::ContainerStats;
47use crate::error::{AgentError, Result};
48use crate::runtime::{
49 ContainerId, ContainerInspectDetails, ContainerState, ExecEventStream, ImageInfo, PruneResult,
50 Runtime, WaitOutcome,
51};
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55enum DispatchTarget {
56 Primary,
57 Delegate,
58}
59
60pub struct CompositeRuntime {
64 primary: Arc<dyn Runtime>,
65 delegate: Option<Arc<dyn Runtime>>,
66 dispatch: Arc<RwLock<HashMap<ContainerId, DispatchTarget>>>,
69 image_os: Arc<RwLock<HashMap<String, OsKind>>>,
73}
74
75impl CompositeRuntime {
76 #[must_use]
82 pub fn new(primary: Arc<dyn Runtime>, delegate: Option<Arc<dyn Runtime>>) -> Self {
83 Self {
84 primary,
85 delegate,
86 dispatch: Arc::new(RwLock::new(HashMap::new())),
87 image_os: Arc::new(RwLock::new(HashMap::new())),
88 }
89 }
90
91 #[must_use]
93 pub fn primary(&self) -> &Arc<dyn Runtime> {
94 &self.primary
95 }
96
97 #[must_use]
99 pub fn delegate(&self) -> Option<&Arc<dyn Runtime>> {
100 self.delegate.as_ref()
101 }
102
103 pub(crate) async fn record_image_os(&self, image: &str, os: OsKind) {
109 self.image_os.write().await.insert(image.to_string(), os);
110 }
111
112 async fn apply_image_os_inspection(
129 &self,
130 image: &str,
131 result: std::result::Result<Option<OsKind>, zlayer_registry::RegistryError>,
132 ) {
133 match result {
134 Ok(Some(os)) => {
135 self.record_image_os(image, os).await;
136 tracing::debug!(image, ?os, "cached image OS for dispatch");
137 }
138 Ok(None) => {
139 tracing::trace!(
140 image,
141 "image manifest has no OS field — dispatch will fall through to primary",
142 );
143 }
144 Err(e) => {
145 tracing::warn!(
146 image,
147 error = %e,
148 "failed to inspect image manifest OS — dispatch will fall through to primary",
149 );
150 }
151 }
152 }
153
154 async fn select_for(&self, service: &str, spec: &ServiceSpec) -> Result<DispatchTarget> {
168 if let Some(platform) = &spec.platform {
169 let target = match platform.os {
170 OsKind::Windows | OsKind::Macos => DispatchTarget::Primary,
171 OsKind::Linux => DispatchTarget::Delegate,
172 };
173 if matches!(target, DispatchTarget::Delegate) && self.delegate.is_none() {
174 return Err(AgentError::RouteToPeer {
175 service: service.to_string(),
176 required_os: OsKind::Linux.as_oci_str().to_string(),
177 reason: "spec.platform.os = linux but this node has no WSL2 delegate \
178 configured; enable `--install-wsl yes` on this node or add a Linux \
179 peer to the cluster"
180 .to_string(),
181 });
182 }
183 return Ok(target);
184 }
185
186 if let Some(os) = self
187 .image_os
188 .read()
189 .await
190 .get(&spec.image.name.to_string())
191 .copied()
192 {
193 return match os {
194 OsKind::Linux => {
195 if self.delegate.is_some() {
196 Ok(DispatchTarget::Delegate)
197 } else {
198 Err(AgentError::RouteToPeer {
203 service: service.to_string(),
204 required_os: OsKind::Linux.as_oci_str().to_string(),
205 reason: format!(
206 "image '{}' manifest reports os=linux but this node has no WSL2 \
207 delegate configured; enable `--install-wsl yes` on this node or \
208 add a Linux peer to the cluster",
209 spec.image.name
210 ),
211 })
212 }
213 }
214 OsKind::Windows | OsKind::Macos => Ok(DispatchTarget::Primary),
215 };
216 }
217
218 Ok(DispatchTarget::Primary)
219 }
220
221 async fn lookup(&self, id: &ContainerId) -> Result<Arc<dyn Runtime>> {
223 let target =
224 self.dispatch
225 .read()
226 .await
227 .get(id)
228 .copied()
229 .ok_or_else(|| AgentError::NotFound {
230 container: id.to_string(),
231 reason: "no dispatch record in CompositeRuntime".to_string(),
232 })?;
233 Ok(self.runtime_for(target).clone())
234 }
235
236 fn runtime_for(&self, t: DispatchTarget) -> &Arc<dyn Runtime> {
243 match t {
244 DispatchTarget::Primary => &self.primary,
245 DispatchTarget::Delegate => self
246 .delegate
247 .as_ref()
248 .expect("delegate target requires delegate to exist"),
249 }
250 }
251}
252
253#[async_trait]
254impl Runtime for CompositeRuntime {
255 async fn pull_image(&self, image: &str) -> Result<()> {
256 self.primary.pull_image(image).await?;
257 if let Some(delegate) = &self.delegate {
258 if let Err(e) = delegate.pull_image(image).await {
259 tracing::debug!(
264 image,
265 error = %e,
266 "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
267 );
268 }
269 }
270
271 let os_result = zlayer_registry::fetch_image_os(image, None).await;
275 self.apply_image_os_inspection(image, os_result).await;
276
277 Ok(())
278 }
279
280 async fn pull_image_with_policy(
281 &self,
282 image: &str,
283 policy: PullPolicy,
284 auth: Option<&RegistryAuth>,
285 ) -> Result<()> {
286 self.primary
287 .pull_image_with_policy(image, policy, auth)
288 .await?;
289 if let Some(delegate) = &self.delegate {
290 if let Err(e) = delegate.pull_image_with_policy(image, policy, auth).await {
291 tracing::debug!(
292 image,
293 error = %e,
294 "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
295 );
296 }
297 }
298
299 let os_result = zlayer_registry::fetch_image_os(image, auth).await;
300 self.apply_image_os_inspection(image, os_result).await;
301
302 Ok(())
303 }
304
305 async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
306 let target = self.select_for(&id.service, spec).await?;
307 {
308 let mut dispatch = self.dispatch.write().await;
309 dispatch.insert(id.clone(), target);
310 }
311 let rt = self.runtime_for(target).clone();
312 match rt.create_container(id, spec).await {
313 Ok(()) => Ok(()),
314 Err(e) => {
315 self.dispatch.write().await.remove(id);
318 Err(e)
319 }
320 }
321 }
322
323 async fn start_container(&self, id: &ContainerId) -> Result<()> {
324 let rt = self.lookup(id).await?;
325 rt.start_container(id).await
326 }
327
328 async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
329 let rt = self.lookup(id).await?;
330 rt.stop_container(id, timeout).await
331 }
332
333 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
334 let rt = self.lookup(id).await?;
335 let res = rt.remove_container(id).await;
336 self.dispatch.write().await.remove(id);
337 res
338 }
339
340 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
341 let rt = self.lookup(id).await?;
342 rt.container_state(id).await
343 }
344
345 async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
346 let rt = self.lookup(id).await?;
347 rt.container_logs(id, tail).await
348 }
349
350 async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
351 let rt = self.lookup(id).await?;
352 rt.exec(id, cmd).await
353 }
354
355 async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
356 let rt = self.lookup(id).await?;
357 rt.exec_stream(id, cmd).await
358 }
359
360 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
361 let rt = self.lookup(id).await?;
362 rt.get_container_stats(id).await
363 }
364
365 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
366 let rt = self.lookup(id).await?;
367 rt.wait_container(id).await
368 }
369
370 async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
371 let rt = self.lookup(id).await?;
372 rt.wait_outcome(id).await
373 }
374
375 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
376 let rt = self.lookup(id).await?;
377 rt.get_logs(id).await
378 }
379
380 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
381 let rt = self.lookup(id).await?;
382 rt.get_container_pid(id).await
383 }
384
385 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
386 let rt = self.lookup(id).await?;
387 rt.get_container_ip(id).await
388 }
389
390 async fn get_container_port_override(&self, id: &ContainerId) -> Result<Option<u16>> {
391 let rt = self.lookup(id).await?;
392 rt.get_container_port_override(id).await
393 }
394
395 #[cfg(target_os = "windows")]
396 async fn get_container_namespace_id(
397 &self,
398 id: &ContainerId,
399 ) -> Result<Option<windows::core::GUID>> {
400 let rt = self.lookup(id).await?;
401 rt.get_container_namespace_id(id).await
402 }
403
404 async fn sync_container_volumes(&self, id: &ContainerId) -> Result<()> {
405 let rt = self.lookup(id).await?;
406 rt.sync_container_volumes(id).await
407 }
408
409 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
410 let mut out = self.primary.list_images().await?;
411 if let Some(delegate) = &self.delegate {
412 match delegate.list_images().await {
413 Ok(extra) => out.extend(extra),
414 Err(e) => tracing::warn!(
415 error = %e,
416 "delegate runtime list_images failed; returning primary results only",
417 ),
418 }
419 }
420 Ok(out)
421 }
422
423 async fn remove_image(&self, image: &str, force: bool) -> Result<()> {
424 match self.primary.remove_image(image, force).await {
425 Ok(()) => Ok(()),
426 Err(primary_err) => {
427 if let Some(delegate) = &self.delegate {
428 match delegate.remove_image(image, force).await {
429 Ok(()) => Ok(()),
430 Err(delegate_err) => {
431 tracing::debug!(
432 image,
433 %delegate_err,
434 "delegate remove_image also failed; returning primary error",
435 );
436 Err(primary_err)
437 }
438 }
439 } else {
440 Err(primary_err)
441 }
442 }
443 }
444 }
445
446 async fn prune_images(&self) -> Result<PruneResult> {
447 let mut result = self.primary.prune_images().await?;
448 if let Some(delegate) = &self.delegate {
449 match delegate.prune_images().await {
450 Ok(extra) => {
451 result.deleted.extend(extra.deleted);
452 result.space_reclaimed =
453 result.space_reclaimed.saturating_add(extra.space_reclaimed);
454 }
455 Err(e) => tracing::warn!(
456 error = %e,
457 "delegate runtime prune_images failed; returning primary result only",
458 ),
459 }
460 }
461 Ok(result)
462 }
463
464 async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
465 let rt = self.lookup(id).await?;
466 rt.kill_container(id, signal).await
467 }
468
469 async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
470 match self.primary.tag_image(source, target).await {
471 Ok(()) => Ok(()),
472 Err(primary_err) => {
473 if let Some(delegate) = &self.delegate {
474 match delegate.tag_image(source, target).await {
475 Ok(()) => Ok(()),
476 Err(delegate_err) => {
477 tracing::debug!(
478 source,
479 target,
480 %delegate_err,
481 "delegate tag_image also failed; returning primary error",
482 );
483 Err(primary_err)
484 }
485 }
486 } else {
487 Err(primary_err)
488 }
489 }
490 }
491 }
492
493 async fn inspect_detailed(&self, id: &ContainerId) -> Result<ContainerInspectDetails> {
494 let rt = self.lookup(id).await?;
495 rt.inspect_detailed(id).await
496 }
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502 use crate::cgroups_stats::ContainerStats;
503 use std::sync::Mutex as StdMutex;
504 use zlayer_spec::{ArchKind, DeploymentSpec, TargetPlatform};
505
506 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
509 enum Role {
510 Primary,
511 Delegate,
512 }
513
514 type CallRecord = (Role, String, Option<ContainerId>);
516 type CallLog = Arc<StdMutex<Vec<CallRecord>>>;
518
519 struct MockRuntime {
526 role: Role,
527 calls: CallLog,
528 list_images_response: Vec<ImageInfo>,
529 pull_image_error: Option<String>,
530 }
531
532 impl MockRuntime {
533 fn new(role: Role, calls: CallLog) -> Self {
534 Self {
535 role,
536 calls,
537 list_images_response: Vec::new(),
538 pull_image_error: None,
539 }
540 }
541
542 fn record(&self, method: &str, id: Option<&ContainerId>) {
543 self.calls
544 .lock()
545 .expect("mock call-log mutex poisoned")
546 .push((self.role, method.to_string(), id.cloned()));
547 }
548 }
549
550 #[async_trait]
551 impl Runtime for MockRuntime {
552 async fn pull_image(&self, _image: &str) -> Result<()> {
553 self.record("pull_image", None);
554 if let Some(msg) = &self.pull_image_error {
555 return Err(AgentError::Internal(msg.clone()));
556 }
557 Ok(())
558 }
559
560 async fn pull_image_with_policy(
561 &self,
562 _image: &str,
563 _policy: PullPolicy,
564 _auth: Option<&RegistryAuth>,
565 ) -> Result<()> {
566 self.record("pull_image_with_policy", None);
567 Ok(())
568 }
569
570 async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
571 self.record("create_container", Some(id));
572 Ok(())
573 }
574
575 async fn start_container(&self, id: &ContainerId) -> Result<()> {
576 self.record("start_container", Some(id));
577 Ok(())
578 }
579
580 async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
581 self.record("stop_container", Some(id));
582 Ok(())
583 }
584
585 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
586 self.record("remove_container", Some(id));
587 Ok(())
588 }
589
590 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
591 self.record("container_state", Some(id));
592 Ok(ContainerState::Running)
593 }
594
595 async fn container_logs(&self, id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
596 self.record("container_logs", Some(id));
597 Ok(Vec::new())
598 }
599
600 async fn exec(&self, id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
601 self.record("exec", Some(id));
602 Ok((0, String::new(), String::new()))
603 }
604
605 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
606 self.record("get_container_stats", Some(id));
607 Ok(ContainerStats {
608 cpu_usage_usec: 0,
609 memory_bytes: 0,
610 memory_limit: 0,
611 timestamp: std::time::Instant::now(),
612 })
613 }
614
615 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
616 self.record("wait_container", Some(id));
617 Ok(0)
618 }
619
620 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
621 self.record("get_logs", Some(id));
622 Ok(Vec::new())
623 }
624
625 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
626 self.record("get_container_pid", Some(id));
627 Ok(None)
628 }
629
630 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
631 self.record("get_container_ip", Some(id));
632 Ok(None)
633 }
634
635 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
636 self.record("list_images", None);
637 Ok(self.list_images_response.clone())
638 }
639 }
640
641 fn make_spec(image: &str, platform: Option<TargetPlatform>) -> ServiceSpec {
645 let yaml = format!(
646 r"
647version: v1
648deployment: test
649services:
650 test:
651 rtype: service
652 image:
653 name: {image}
654 endpoints:
655 - name: http
656 protocol: http
657 port: 8080
658"
659 );
660 let mut spec = serde_yaml::from_str::<DeploymentSpec>(&yaml)
661 .expect("valid deployment yaml")
662 .services
663 .remove("test")
664 .expect("service 'test' present");
665 spec.platform = platform;
666 spec
667 }
668
669 fn cid(service: &str, replica: u32) -> ContainerId {
670 ContainerId {
671 service: service.to_string(),
672 replica,
673 }
674 }
675
676 fn make_composite(with_delegate: bool) -> (CompositeRuntime, CallLog) {
677 let calls = Arc::new(StdMutex::new(Vec::new()));
678 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
679 let delegate = if with_delegate {
680 Some(Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>)
681 } else {
682 None
683 };
684 (
685 CompositeRuntime::new(primary as Arc<dyn Runtime>, delegate),
686 calls,
687 )
688 }
689
690 fn role_for(calls: &[CallRecord], method: &str) -> Option<Role> {
691 calls
692 .iter()
693 .find(|(_, m, _)| m == method)
694 .map(|(role, _, _)| *role)
695 }
696
697 #[tokio::test]
698 async fn dispatch_windows_spec_goes_to_primary() {
699 let (rt, calls) = make_composite(true);
700 let id = cid("win-svc", 0);
701 let spec = make_spec(
702 "mcr.microsoft.com/windows/nanoserver:ltsc2022",
703 Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
704 );
705
706 rt.create_container(&id, &spec).await.unwrap();
707 rt.start_container(&id).await.unwrap();
708
709 let calls = calls.lock().unwrap();
710 assert_eq!(
711 role_for(&calls, "create_container"),
712 Some(Role::Primary),
713 "create_container should hit primary for Windows spec"
714 );
715 assert_eq!(
716 role_for(&calls, "start_container"),
717 Some(Role::Primary),
718 "start_container should hit primary for Windows spec"
719 );
720 }
721
722 #[tokio::test]
723 async fn dispatch_linux_spec_goes_to_delegate() {
724 let (rt, calls) = make_composite(true);
725 let id = cid("lin-svc", 0);
726 let spec = make_spec(
727 "docker.io/library/alpine:3.19",
728 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
729 );
730
731 rt.create_container(&id, &spec).await.unwrap();
732 rt.start_container(&id).await.unwrap();
733
734 let calls = calls.lock().unwrap();
735 assert_eq!(
736 role_for(&calls, "create_container"),
737 Some(Role::Delegate),
738 "create_container should hit delegate for Linux spec"
739 );
740 assert_eq!(
741 role_for(&calls, "start_container"),
742 Some(Role::Delegate),
743 "start_container should hit delegate for Linux spec"
744 );
745 }
746
747 #[tokio::test]
748 async fn dispatch_linux_without_delegate_errors() {
749 let (rt, _calls) = make_composite(false);
753 let id = cid("lin-svc", 0);
754 let spec = make_spec(
755 "docker.io/library/alpine:3.19",
756 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
757 );
758
759 let err = rt.create_container(&id, &spec).await.unwrap_err();
760 match err {
761 AgentError::RouteToPeer {
762 service,
763 required_os,
764 reason,
765 } => {
766 assert_eq!(service, "lin-svc");
767 assert_eq!(required_os, "linux");
768 assert!(
769 reason.contains("--install-wsl") && reason.contains("Linux peer"),
770 "reason must name both remediations, got: {reason}"
771 );
772 }
773 other => panic!("expected RouteToPeer, got {other:?}"),
774 }
775 }
776
777 #[tokio::test]
778 async fn dispatch_linux_image_cache_without_delegate_routes_to_peer() {
779 let (rt, _calls) = make_composite(false);
784 let id = cid("svc", 0);
785 let image = "docker.io/library/nginx:1.25";
786 rt.record_image_os(image, OsKind::Linux).await;
787
788 let spec = make_spec(image, None);
789 let err = rt.create_container(&id, &spec).await.unwrap_err();
790 match err {
791 AgentError::RouteToPeer {
792 service,
793 required_os,
794 reason,
795 } => {
796 assert_eq!(service, "svc");
797 assert_eq!(required_os, "linux");
798 assert!(
799 reason.contains(image),
800 "reason should mention the image name, got: {reason}"
801 );
802 assert!(
803 reason.contains("--install-wsl") && reason.contains("Linux peer"),
804 "reason must name both remediations, got: {reason}"
805 );
806 }
807 other => panic!("expected RouteToPeer, got {other:?}"),
808 }
809 }
810
811 #[tokio::test]
812 async fn dispatch_macos_spec_goes_to_primary() {
813 let (rt, calls) = make_composite(true);
814 let id = cid("mac-svc", 0);
815 let spec = make_spec(
816 "ghcr.io/zlayer/macos:latest",
817 Some(TargetPlatform::new(OsKind::Macos, ArchKind::Arm64)),
818 );
819
820 rt.create_container(&id, &spec).await.unwrap();
821
822 let calls = calls.lock().unwrap();
823 assert_eq!(
824 role_for(&calls, "create_container"),
825 Some(Role::Primary),
826 "create_container should hit primary for Macos spec"
827 );
828 }
829
830 #[tokio::test]
831 async fn dispatch_no_platform_no_image_os_falls_through_to_primary() {
832 let (rt, calls) = make_composite(true);
833 let id = cid("svc", 0);
834 let spec = make_spec("docker.io/library/nginx:1.25", None);
835
836 rt.create_container(&id, &spec).await.unwrap();
837
838 let calls = calls.lock().unwrap();
839 assert_eq!(
840 role_for(&calls, "create_container"),
841 Some(Role::Primary),
842 "fall-through should pick primary when both platform and image-OS cache are unknown"
843 );
844 }
845
846 #[tokio::test]
847 async fn dispatch_uses_image_os_cache_when_platform_missing() {
848 let (rt, calls) = make_composite(true);
849 let id = cid("svc", 0);
850 let image = "docker.io/library/nginx:1.25";
851 rt.record_image_os(image, OsKind::Linux).await;
852
853 let spec = make_spec(image, None);
854 rt.create_container(&id, &spec).await.unwrap();
855
856 let calls = calls.lock().unwrap();
857 assert_eq!(
858 role_for(&calls, "create_container"),
859 Some(Role::Delegate),
860 "image-OS cache should route Linux images to the delegate"
861 );
862 }
863
864 #[tokio::test]
865 async fn per_container_dispatch_cache_persists_through_start_stop() {
866 let (rt, calls) = make_composite(true);
867 let id = cid("win-svc", 0);
868 let spec = make_spec(
869 "mcr.microsoft.com/windows/nanoserver:ltsc2022",
870 Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
871 );
872
873 rt.create_container(&id, &spec).await.unwrap();
874 rt.start_container(&id).await.unwrap();
875 rt.stop_container(&id, Duration::from_secs(1))
876 .await
877 .unwrap();
878 rt.remove_container(&id).await.unwrap();
879
880 let recorded = calls.lock().unwrap().clone();
881 for method in [
882 "create_container",
883 "start_container",
884 "stop_container",
885 "remove_container",
886 ] {
887 assert_eq!(
888 role_for(&recorded, method),
889 Some(Role::Primary),
890 "{method} should have dispatched to primary"
891 );
892 }
893
894 let after = rt
896 .start_container(&id)
897 .await
898 .expect_err("lookup after remove should fail");
899 assert!(
900 matches!(after, AgentError::NotFound { .. }),
901 "expected NotFound after remove, got {after:?}"
902 );
903 }
904
905 #[tokio::test]
906 async fn pull_image_calls_both_runtimes() {
907 let (rt, calls) = make_composite(true);
908 rt.pull_image("docker.io/library/alpine:3.19")
909 .await
910 .unwrap();
911
912 let recorded = calls.lock().unwrap();
913 let pull_calls: Vec<Role> = recorded
914 .iter()
915 .filter(|(_, m, _)| m == "pull_image")
916 .map(|(r, _, _)| *r)
917 .collect();
918 assert!(
919 pull_calls.contains(&Role::Primary),
920 "primary should have been pulled: {pull_calls:?}",
921 );
922 assert!(
923 pull_calls.contains(&Role::Delegate),
924 "delegate should have been pulled: {pull_calls:?}",
925 );
926 }
927
928 #[tokio::test]
929 async fn pull_image_delegate_error_does_not_fail() {
930 let calls = Arc::new(StdMutex::new(Vec::new()));
933 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
934 let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
935 delegate.pull_image_error = Some("simulated delegate pull failure".to_string());
936 let rt = CompositeRuntime::new(
937 primary as Arc<dyn Runtime>,
938 Some(Arc::new(delegate) as Arc<dyn Runtime>),
939 );
940
941 rt.pull_image("docker.io/library/alpine:3.19")
943 .await
944 .unwrap();
945
946 let recorded = calls.lock().unwrap();
947 let pull_calls: Vec<Role> = recorded
948 .iter()
949 .filter(|(_, m, _)| m == "pull_image")
950 .map(|(r, _, _)| *r)
951 .collect();
952 assert!(
953 pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
954 "both runtimes should have been called: {pull_calls:?}",
955 );
956 }
957
958 #[tokio::test]
959 async fn list_images_merges_both() {
960 let calls = Arc::new(StdMutex::new(Vec::new()));
962 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
963 primary.list_images_response = vec![ImageInfo {
964 reference: "primary/image:1".to_string(),
965 digest: None,
966 size_bytes: None,
967 }];
968 let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
969 delegate.list_images_response = vec![ImageInfo {
970 reference: "delegate/image:1".to_string(),
971 digest: None,
972 size_bytes: None,
973 }];
974 let rt = CompositeRuntime::new(
975 Arc::new(primary) as Arc<dyn Runtime>,
976 Some(Arc::new(delegate) as Arc<dyn Runtime>),
977 );
978
979 let merged = rt.list_images().await.unwrap();
980 let refs: Vec<&str> = merged.iter().map(|i| i.reference.as_str()).collect();
981 assert!(
982 refs.contains(&"primary/image:1") && refs.contains(&"delegate/image:1"),
983 "merged list should contain both entries, got {refs:?}",
984 );
985 }
986
987 #[tokio::test]
988 async fn dispatch_lookup_unknown_container_errors() {
989 let (rt, _calls) = make_composite(true);
990 let id = cid("ghost", 0);
991
992 let err = rt.start_container(&id).await.unwrap_err();
993 assert!(
994 matches!(err, AgentError::NotFound { .. }),
995 "expected NotFound for unknown container, got {err:?}"
996 );
997 }
998
999 async fn cached_os(rt: &CompositeRuntime, image: &str) -> Option<OsKind> {
1001 rt.image_os.read().await.get(image).copied()
1002 }
1003
1004 #[tokio::test]
1005 async fn apply_image_os_inspection_populates_cache_on_ok_some() {
1006 let (rt, _calls) = make_composite(true);
1010 let image = "docker.io/library/alpine:3.19";
1011
1012 rt.apply_image_os_inspection(image, Ok(Some(OsKind::Linux)))
1013 .await;
1014
1015 assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
1016 }
1017
1018 #[tokio::test]
1019 async fn apply_image_os_inspection_leaves_cache_untouched_on_ok_none() {
1020 let (rt, _calls) = make_composite(true);
1024 let image = "docker.io/library/nginx:1.25";
1025
1026 rt.apply_image_os_inspection(image, Ok(None)).await;
1027
1028 assert_eq!(cached_os(&rt, image).await, None);
1029 }
1030
1031 #[tokio::test]
1032 async fn apply_image_os_inspection_leaves_cache_untouched_on_err() {
1033 let (rt, _calls) = make_composite(true);
1036 let image = "docker.io/library/nginx:1.25";
1037
1038 rt.record_image_os(image, OsKind::Linux).await;
1041
1042 let err = zlayer_registry::RegistryError::NotFound {
1043 registry: "docker.io".to_string(),
1044 image: image.to_string(),
1045 };
1046 rt.apply_image_os_inspection(image, Err(err)).await;
1047
1048 assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
1050 }
1051
1052 #[tokio::test]
1053 async fn pull_image_inspection_failure_does_not_fail_pull() {
1054 let (rt, _calls) = make_composite(true);
1060 let image = "invalid.example.invalid/ghost:v1";
1061
1062 rt.pull_image(image).await.unwrap();
1063
1064 assert_eq!(
1065 cached_os(&rt, image).await,
1066 None,
1067 "failed inspection must not populate the image-OS cache"
1068 );
1069 }
1070
1071 #[tokio::test]
1072 async fn pull_image_with_policy_inspection_failure_does_not_fail_pull() {
1073 let (rt, _calls) = make_composite(true);
1076 let image = "invalid.example.invalid/ghost:v1";
1077
1078 rt.pull_image_with_policy(image, PullPolicy::IfNotPresent, None)
1079 .await
1080 .unwrap();
1081
1082 assert_eq!(cached_os(&rt, image).await, None);
1083 }
1084
1085 #[test]
1086 fn os_kind_from_oci_str_roundtrip() {
1087 for os in [OsKind::Linux, OsKind::Windows, OsKind::Macos] {
1092 assert_eq!(OsKind::from_oci_str(os.as_oci_str()), Some(os));
1093 }
1094 assert_eq!(OsKind::from_oci_str(""), None);
1095 assert_eq!(OsKind::from_oci_str("freebsd"), None);
1096 }
1097}