1use std::collections::HashMap;
37use std::net::IpAddr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use async_trait::async_trait;
42use tokio::sync::RwLock;
43use zlayer_observability::logs::LogEntry;
44use zlayer_spec::{OsKind, PullPolicy, RegistryAuth, ServiceSpec};
45
46use crate::cgroups_stats::ContainerStats;
47use crate::error::{AgentError, Result};
48use crate::runtime::{
49 ContainerId, ContainerInspectDetails, ContainerState, ExecEventStream, ImageInfo, PruneResult,
50 Runtime, WaitOutcome,
51};
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55enum DispatchTarget {
56 Primary,
57 Delegate,
58}
59
60pub struct CompositeRuntime {
64 primary: Arc<dyn Runtime>,
65 delegate: Option<Arc<dyn Runtime>>,
66 dispatch: Arc<RwLock<HashMap<ContainerId, DispatchTarget>>>,
69 image_os: Arc<RwLock<HashMap<String, OsKind>>>,
73}
74
75impl CompositeRuntime {
76 #[must_use]
82 pub fn new(primary: Arc<dyn Runtime>, delegate: Option<Arc<dyn Runtime>>) -> Self {
83 Self {
84 primary,
85 delegate,
86 dispatch: Arc::new(RwLock::new(HashMap::new())),
87 image_os: Arc::new(RwLock::new(HashMap::new())),
88 }
89 }
90
91 #[must_use]
93 pub fn primary(&self) -> &Arc<dyn Runtime> {
94 &self.primary
95 }
96
97 #[must_use]
99 pub fn delegate(&self) -> Option<&Arc<dyn Runtime>> {
100 self.delegate.as_ref()
101 }
102
103 pub(crate) async fn record_image_os(&self, image: &str, os: OsKind) {
109 self.image_os.write().await.insert(image.to_string(), os);
110 }
111
112 async fn apply_image_os_inspection(
129 &self,
130 image: &str,
131 result: std::result::Result<Option<OsKind>, zlayer_registry::RegistryError>,
132 ) {
133 match result {
134 Ok(Some(os)) => {
135 self.record_image_os(image, os).await;
136 tracing::debug!(image, ?os, "cached image OS for dispatch");
137 }
138 Ok(None) => {
139 tracing::trace!(
140 image,
141 "image manifest has no OS field — dispatch will fall through to primary",
142 );
143 }
144 Err(e) => {
145 tracing::warn!(
146 image,
147 error = %e,
148 "failed to inspect image manifest OS — dispatch will fall through to primary",
149 );
150 }
151 }
152 }
153
154 async fn select_for(&self, service: &str, spec: &ServiceSpec) -> Result<DispatchTarget> {
168 if let Some(platform) = &spec.platform {
169 let target = match platform.os {
170 OsKind::Windows | OsKind::Macos => DispatchTarget::Primary,
171 OsKind::Linux => DispatchTarget::Delegate,
172 };
173 if matches!(target, DispatchTarget::Delegate) && self.delegate.is_none() {
174 return Err(AgentError::RouteToPeer {
175 service: service.to_string(),
176 required_os: OsKind::Linux.as_oci_str().to_string(),
177 reason: "spec.platform.os = linux but this node has no WSL2 delegate \
178 configured; enable `--install-wsl yes` on this node or add a Linux \
179 peer to the cluster"
180 .to_string(),
181 });
182 }
183 return Ok(target);
184 }
185
186 if let Some(os) = self.image_os.read().await.get(&spec.image.name).copied() {
187 return match os {
188 OsKind::Linux => {
189 if self.delegate.is_some() {
190 Ok(DispatchTarget::Delegate)
191 } else {
192 Err(AgentError::RouteToPeer {
197 service: service.to_string(),
198 required_os: OsKind::Linux.as_oci_str().to_string(),
199 reason: format!(
200 "image '{}' manifest reports os=linux but this node has no WSL2 \
201 delegate configured; enable `--install-wsl yes` on this node or \
202 add a Linux peer to the cluster",
203 spec.image.name
204 ),
205 })
206 }
207 }
208 OsKind::Windows | OsKind::Macos => Ok(DispatchTarget::Primary),
209 };
210 }
211
212 Ok(DispatchTarget::Primary)
213 }
214
215 async fn lookup(&self, id: &ContainerId) -> Result<Arc<dyn Runtime>> {
217 let target =
218 self.dispatch
219 .read()
220 .await
221 .get(id)
222 .copied()
223 .ok_or_else(|| AgentError::NotFound {
224 container: id.to_string(),
225 reason: "no dispatch record in CompositeRuntime".to_string(),
226 })?;
227 Ok(self.runtime_for(target).clone())
228 }
229
230 fn runtime_for(&self, t: DispatchTarget) -> &Arc<dyn Runtime> {
237 match t {
238 DispatchTarget::Primary => &self.primary,
239 DispatchTarget::Delegate => self
240 .delegate
241 .as_ref()
242 .expect("delegate target requires delegate to exist"),
243 }
244 }
245}
246
247#[async_trait]
248impl Runtime for CompositeRuntime {
249 async fn pull_image(&self, image: &str) -> Result<()> {
250 self.primary.pull_image(image).await?;
251 if let Some(delegate) = &self.delegate {
252 if let Err(e) = delegate.pull_image(image).await {
253 tracing::debug!(
258 image,
259 error = %e,
260 "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
261 );
262 }
263 }
264
265 let os_result = zlayer_registry::fetch_image_os(image, None).await;
269 self.apply_image_os_inspection(image, os_result).await;
270
271 Ok(())
272 }
273
274 async fn pull_image_with_policy(
275 &self,
276 image: &str,
277 policy: PullPolicy,
278 auth: Option<&RegistryAuth>,
279 ) -> Result<()> {
280 self.primary
281 .pull_image_with_policy(image, policy, auth)
282 .await?;
283 if let Some(delegate) = &self.delegate {
284 if let Err(e) = delegate.pull_image_with_policy(image, policy, auth).await {
285 tracing::debug!(
286 image,
287 error = %e,
288 "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
289 );
290 }
291 }
292
293 let os_result = zlayer_registry::fetch_image_os(image, auth).await;
294 self.apply_image_os_inspection(image, os_result).await;
295
296 Ok(())
297 }
298
299 async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
300 let target = self.select_for(&id.service, spec).await?;
301 {
302 let mut dispatch = self.dispatch.write().await;
303 dispatch.insert(id.clone(), target);
304 }
305 let rt = self.runtime_for(target).clone();
306 match rt.create_container(id, spec).await {
307 Ok(()) => Ok(()),
308 Err(e) => {
309 self.dispatch.write().await.remove(id);
312 Err(e)
313 }
314 }
315 }
316
317 async fn start_container(&self, id: &ContainerId) -> Result<()> {
318 let rt = self.lookup(id).await?;
319 rt.start_container(id).await
320 }
321
322 async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
323 let rt = self.lookup(id).await?;
324 rt.stop_container(id, timeout).await
325 }
326
327 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
328 let rt = self.lookup(id).await?;
329 let res = rt.remove_container(id).await;
330 self.dispatch.write().await.remove(id);
331 res
332 }
333
334 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
335 let rt = self.lookup(id).await?;
336 rt.container_state(id).await
337 }
338
339 async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
340 let rt = self.lookup(id).await?;
341 rt.container_logs(id, tail).await
342 }
343
344 async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
345 let rt = self.lookup(id).await?;
346 rt.exec(id, cmd).await
347 }
348
349 async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
350 let rt = self.lookup(id).await?;
351 rt.exec_stream(id, cmd).await
352 }
353
354 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
355 let rt = self.lookup(id).await?;
356 rt.get_container_stats(id).await
357 }
358
359 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
360 let rt = self.lookup(id).await?;
361 rt.wait_container(id).await
362 }
363
364 async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
365 let rt = self.lookup(id).await?;
366 rt.wait_outcome(id).await
367 }
368
369 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
370 let rt = self.lookup(id).await?;
371 rt.get_logs(id).await
372 }
373
374 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
375 let rt = self.lookup(id).await?;
376 rt.get_container_pid(id).await
377 }
378
379 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
380 let rt = self.lookup(id).await?;
381 rt.get_container_ip(id).await
382 }
383
384 async fn get_container_port_override(&self, id: &ContainerId) -> Result<Option<u16>> {
385 let rt = self.lookup(id).await?;
386 rt.get_container_port_override(id).await
387 }
388
389 #[cfg(target_os = "windows")]
390 async fn get_container_namespace_id(
391 &self,
392 id: &ContainerId,
393 ) -> Result<Option<windows::core::GUID>> {
394 let rt = self.lookup(id).await?;
395 rt.get_container_namespace_id(id).await
396 }
397
398 async fn sync_container_volumes(&self, id: &ContainerId) -> Result<()> {
399 let rt = self.lookup(id).await?;
400 rt.sync_container_volumes(id).await
401 }
402
403 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
404 let mut out = self.primary.list_images().await?;
405 if let Some(delegate) = &self.delegate {
406 match delegate.list_images().await {
407 Ok(extra) => out.extend(extra),
408 Err(e) => tracing::warn!(
409 error = %e,
410 "delegate runtime list_images failed; returning primary results only",
411 ),
412 }
413 }
414 Ok(out)
415 }
416
417 async fn remove_image(&self, image: &str, force: bool) -> Result<()> {
418 match self.primary.remove_image(image, force).await {
419 Ok(()) => Ok(()),
420 Err(primary_err) => {
421 if let Some(delegate) = &self.delegate {
422 match delegate.remove_image(image, force).await {
423 Ok(()) => Ok(()),
424 Err(delegate_err) => {
425 tracing::debug!(
426 image,
427 %delegate_err,
428 "delegate remove_image also failed; returning primary error",
429 );
430 Err(primary_err)
431 }
432 }
433 } else {
434 Err(primary_err)
435 }
436 }
437 }
438 }
439
440 async fn prune_images(&self) -> Result<PruneResult> {
441 let mut result = self.primary.prune_images().await?;
442 if let Some(delegate) = &self.delegate {
443 match delegate.prune_images().await {
444 Ok(extra) => {
445 result.deleted.extend(extra.deleted);
446 result.space_reclaimed =
447 result.space_reclaimed.saturating_add(extra.space_reclaimed);
448 }
449 Err(e) => tracing::warn!(
450 error = %e,
451 "delegate runtime prune_images failed; returning primary result only",
452 ),
453 }
454 }
455 Ok(result)
456 }
457
458 async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
459 let rt = self.lookup(id).await?;
460 rt.kill_container(id, signal).await
461 }
462
463 async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
464 match self.primary.tag_image(source, target).await {
465 Ok(()) => Ok(()),
466 Err(primary_err) => {
467 if let Some(delegate) = &self.delegate {
468 match delegate.tag_image(source, target).await {
469 Ok(()) => Ok(()),
470 Err(delegate_err) => {
471 tracing::debug!(
472 source,
473 target,
474 %delegate_err,
475 "delegate tag_image also failed; returning primary error",
476 );
477 Err(primary_err)
478 }
479 }
480 } else {
481 Err(primary_err)
482 }
483 }
484 }
485 }
486
487 async fn inspect_detailed(&self, id: &ContainerId) -> Result<ContainerInspectDetails> {
488 let rt = self.lookup(id).await?;
489 rt.inspect_detailed(id).await
490 }
491}
492
493#[cfg(test)]
494mod tests {
495 use super::*;
496 use crate::cgroups_stats::ContainerStats;
497 use std::sync::Mutex as StdMutex;
498 use zlayer_spec::{ArchKind, DeploymentSpec, TargetPlatform};
499
500 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
503 enum Role {
504 Primary,
505 Delegate,
506 }
507
508 type CallRecord = (Role, String, Option<ContainerId>);
510 type CallLog = Arc<StdMutex<Vec<CallRecord>>>;
512
513 struct MockRuntime {
520 role: Role,
521 calls: CallLog,
522 list_images_response: Vec<ImageInfo>,
523 pull_image_error: Option<String>,
524 }
525
526 impl MockRuntime {
527 fn new(role: Role, calls: CallLog) -> Self {
528 Self {
529 role,
530 calls,
531 list_images_response: Vec::new(),
532 pull_image_error: None,
533 }
534 }
535
536 fn record(&self, method: &str, id: Option<&ContainerId>) {
537 self.calls
538 .lock()
539 .expect("mock call-log mutex poisoned")
540 .push((self.role, method.to_string(), id.cloned()));
541 }
542 }
543
544 #[async_trait]
545 impl Runtime for MockRuntime {
546 async fn pull_image(&self, _image: &str) -> Result<()> {
547 self.record("pull_image", None);
548 if let Some(msg) = &self.pull_image_error {
549 return Err(AgentError::Internal(msg.clone()));
550 }
551 Ok(())
552 }
553
554 async fn pull_image_with_policy(
555 &self,
556 _image: &str,
557 _policy: PullPolicy,
558 _auth: Option<&RegistryAuth>,
559 ) -> Result<()> {
560 self.record("pull_image_with_policy", None);
561 Ok(())
562 }
563
564 async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
565 self.record("create_container", Some(id));
566 Ok(())
567 }
568
569 async fn start_container(&self, id: &ContainerId) -> Result<()> {
570 self.record("start_container", Some(id));
571 Ok(())
572 }
573
574 async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
575 self.record("stop_container", Some(id));
576 Ok(())
577 }
578
579 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
580 self.record("remove_container", Some(id));
581 Ok(())
582 }
583
584 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
585 self.record("container_state", Some(id));
586 Ok(ContainerState::Running)
587 }
588
589 async fn container_logs(&self, id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
590 self.record("container_logs", Some(id));
591 Ok(Vec::new())
592 }
593
594 async fn exec(&self, id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
595 self.record("exec", Some(id));
596 Ok((0, String::new(), String::new()))
597 }
598
599 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
600 self.record("get_container_stats", Some(id));
601 Ok(ContainerStats {
602 cpu_usage_usec: 0,
603 memory_bytes: 0,
604 memory_limit: 0,
605 timestamp: std::time::Instant::now(),
606 })
607 }
608
609 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
610 self.record("wait_container", Some(id));
611 Ok(0)
612 }
613
614 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
615 self.record("get_logs", Some(id));
616 Ok(Vec::new())
617 }
618
619 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
620 self.record("get_container_pid", Some(id));
621 Ok(None)
622 }
623
624 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
625 self.record("get_container_ip", Some(id));
626 Ok(None)
627 }
628
629 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
630 self.record("list_images", None);
631 Ok(self.list_images_response.clone())
632 }
633 }
634
635 fn make_spec(image: &str, platform: Option<TargetPlatform>) -> ServiceSpec {
639 let yaml = format!(
640 r"
641version: v1
642deployment: test
643services:
644 test:
645 rtype: service
646 image:
647 name: {image}
648 endpoints:
649 - name: http
650 protocol: http
651 port: 8080
652"
653 );
654 let mut spec = serde_yaml::from_str::<DeploymentSpec>(&yaml)
655 .expect("valid deployment yaml")
656 .services
657 .remove("test")
658 .expect("service 'test' present");
659 spec.platform = platform;
660 spec
661 }
662
663 fn cid(service: &str, replica: u32) -> ContainerId {
664 ContainerId {
665 service: service.to_string(),
666 replica,
667 }
668 }
669
670 fn make_composite(with_delegate: bool) -> (CompositeRuntime, CallLog) {
671 let calls = Arc::new(StdMutex::new(Vec::new()));
672 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
673 let delegate = if with_delegate {
674 Some(Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>)
675 } else {
676 None
677 };
678 (
679 CompositeRuntime::new(primary as Arc<dyn Runtime>, delegate),
680 calls,
681 )
682 }
683
684 fn role_for(calls: &[CallRecord], method: &str) -> Option<Role> {
685 calls
686 .iter()
687 .find(|(_, m, _)| m == method)
688 .map(|(role, _, _)| *role)
689 }
690
691 #[tokio::test]
692 async fn dispatch_windows_spec_goes_to_primary() {
693 let (rt, calls) = make_composite(true);
694 let id = cid("win-svc", 0);
695 let spec = make_spec(
696 "mcr.microsoft.com/windows/nanoserver:ltsc2022",
697 Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
698 );
699
700 rt.create_container(&id, &spec).await.unwrap();
701 rt.start_container(&id).await.unwrap();
702
703 let calls = calls.lock().unwrap();
704 assert_eq!(
705 role_for(&calls, "create_container"),
706 Some(Role::Primary),
707 "create_container should hit primary for Windows spec"
708 );
709 assert_eq!(
710 role_for(&calls, "start_container"),
711 Some(Role::Primary),
712 "start_container should hit primary for Windows spec"
713 );
714 }
715
716 #[tokio::test]
717 async fn dispatch_linux_spec_goes_to_delegate() {
718 let (rt, calls) = make_composite(true);
719 let id = cid("lin-svc", 0);
720 let spec = make_spec(
721 "docker.io/library/alpine:3.19",
722 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
723 );
724
725 rt.create_container(&id, &spec).await.unwrap();
726 rt.start_container(&id).await.unwrap();
727
728 let calls = calls.lock().unwrap();
729 assert_eq!(
730 role_for(&calls, "create_container"),
731 Some(Role::Delegate),
732 "create_container should hit delegate for Linux spec"
733 );
734 assert_eq!(
735 role_for(&calls, "start_container"),
736 Some(Role::Delegate),
737 "start_container should hit delegate for Linux spec"
738 );
739 }
740
741 #[tokio::test]
742 async fn dispatch_linux_without_delegate_errors() {
743 let (rt, _calls) = make_composite(false);
747 let id = cid("lin-svc", 0);
748 let spec = make_spec(
749 "docker.io/library/alpine:3.19",
750 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
751 );
752
753 let err = rt.create_container(&id, &spec).await.unwrap_err();
754 match err {
755 AgentError::RouteToPeer {
756 service,
757 required_os,
758 reason,
759 } => {
760 assert_eq!(service, "lin-svc");
761 assert_eq!(required_os, "linux");
762 assert!(
763 reason.contains("--install-wsl") && reason.contains("Linux peer"),
764 "reason must name both remediations, got: {reason}"
765 );
766 }
767 other => panic!("expected RouteToPeer, got {other:?}"),
768 }
769 }
770
771 #[tokio::test]
772 async fn dispatch_linux_image_cache_without_delegate_routes_to_peer() {
773 let (rt, _calls) = make_composite(false);
778 let id = cid("svc", 0);
779 let image = "docker.io/library/nginx:1.25";
780 rt.record_image_os(image, OsKind::Linux).await;
781
782 let spec = make_spec(image, None);
783 let err = rt.create_container(&id, &spec).await.unwrap_err();
784 match err {
785 AgentError::RouteToPeer {
786 service,
787 required_os,
788 reason,
789 } => {
790 assert_eq!(service, "svc");
791 assert_eq!(required_os, "linux");
792 assert!(
793 reason.contains(image),
794 "reason should mention the image name, got: {reason}"
795 );
796 assert!(
797 reason.contains("--install-wsl") && reason.contains("Linux peer"),
798 "reason must name both remediations, got: {reason}"
799 );
800 }
801 other => panic!("expected RouteToPeer, got {other:?}"),
802 }
803 }
804
805 #[tokio::test]
806 async fn dispatch_macos_spec_goes_to_primary() {
807 let (rt, calls) = make_composite(true);
808 let id = cid("mac-svc", 0);
809 let spec = make_spec(
810 "ghcr.io/zlayer/macos:latest",
811 Some(TargetPlatform::new(OsKind::Macos, ArchKind::Arm64)),
812 );
813
814 rt.create_container(&id, &spec).await.unwrap();
815
816 let calls = calls.lock().unwrap();
817 assert_eq!(
818 role_for(&calls, "create_container"),
819 Some(Role::Primary),
820 "create_container should hit primary for Macos spec"
821 );
822 }
823
824 #[tokio::test]
825 async fn dispatch_no_platform_no_image_os_falls_through_to_primary() {
826 let (rt, calls) = make_composite(true);
827 let id = cid("svc", 0);
828 let spec = make_spec("docker.io/library/nginx:1.25", None);
829
830 rt.create_container(&id, &spec).await.unwrap();
831
832 let calls = calls.lock().unwrap();
833 assert_eq!(
834 role_for(&calls, "create_container"),
835 Some(Role::Primary),
836 "fall-through should pick primary when both platform and image-OS cache are unknown"
837 );
838 }
839
840 #[tokio::test]
841 async fn dispatch_uses_image_os_cache_when_platform_missing() {
842 let (rt, calls) = make_composite(true);
843 let id = cid("svc", 0);
844 let image = "docker.io/library/nginx:1.25";
845 rt.record_image_os(image, OsKind::Linux).await;
846
847 let spec = make_spec(image, None);
848 rt.create_container(&id, &spec).await.unwrap();
849
850 let calls = calls.lock().unwrap();
851 assert_eq!(
852 role_for(&calls, "create_container"),
853 Some(Role::Delegate),
854 "image-OS cache should route Linux images to the delegate"
855 );
856 }
857
858 #[tokio::test]
859 async fn per_container_dispatch_cache_persists_through_start_stop() {
860 let (rt, calls) = make_composite(true);
861 let id = cid("win-svc", 0);
862 let spec = make_spec(
863 "mcr.microsoft.com/windows/nanoserver:ltsc2022",
864 Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
865 );
866
867 rt.create_container(&id, &spec).await.unwrap();
868 rt.start_container(&id).await.unwrap();
869 rt.stop_container(&id, Duration::from_secs(1))
870 .await
871 .unwrap();
872 rt.remove_container(&id).await.unwrap();
873
874 let recorded = calls.lock().unwrap().clone();
875 for method in [
876 "create_container",
877 "start_container",
878 "stop_container",
879 "remove_container",
880 ] {
881 assert_eq!(
882 role_for(&recorded, method),
883 Some(Role::Primary),
884 "{method} should have dispatched to primary"
885 );
886 }
887
888 let after = rt
890 .start_container(&id)
891 .await
892 .expect_err("lookup after remove should fail");
893 assert!(
894 matches!(after, AgentError::NotFound { .. }),
895 "expected NotFound after remove, got {after:?}"
896 );
897 }
898
899 #[tokio::test]
900 async fn pull_image_calls_both_runtimes() {
901 let (rt, calls) = make_composite(true);
902 rt.pull_image("docker.io/library/alpine:3.19")
903 .await
904 .unwrap();
905
906 let recorded = calls.lock().unwrap();
907 let pull_calls: Vec<Role> = recorded
908 .iter()
909 .filter(|(_, m, _)| m == "pull_image")
910 .map(|(r, _, _)| *r)
911 .collect();
912 assert!(
913 pull_calls.contains(&Role::Primary),
914 "primary should have been pulled: {pull_calls:?}",
915 );
916 assert!(
917 pull_calls.contains(&Role::Delegate),
918 "delegate should have been pulled: {pull_calls:?}",
919 );
920 }
921
922 #[tokio::test]
923 async fn pull_image_delegate_error_does_not_fail() {
924 let calls = Arc::new(StdMutex::new(Vec::new()));
927 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
928 let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
929 delegate.pull_image_error = Some("simulated delegate pull failure".to_string());
930 let rt = CompositeRuntime::new(
931 primary as Arc<dyn Runtime>,
932 Some(Arc::new(delegate) as Arc<dyn Runtime>),
933 );
934
935 rt.pull_image("docker.io/library/alpine:3.19")
937 .await
938 .unwrap();
939
940 let recorded = calls.lock().unwrap();
941 let pull_calls: Vec<Role> = recorded
942 .iter()
943 .filter(|(_, m, _)| m == "pull_image")
944 .map(|(r, _, _)| *r)
945 .collect();
946 assert!(
947 pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
948 "both runtimes should have been called: {pull_calls:?}",
949 );
950 }
951
952 #[tokio::test]
953 async fn list_images_merges_both() {
954 let calls = Arc::new(StdMutex::new(Vec::new()));
956 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
957 primary.list_images_response = vec![ImageInfo {
958 reference: "primary/image:1".to_string(),
959 digest: None,
960 size_bytes: None,
961 }];
962 let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
963 delegate.list_images_response = vec![ImageInfo {
964 reference: "delegate/image:1".to_string(),
965 digest: None,
966 size_bytes: None,
967 }];
968 let rt = CompositeRuntime::new(
969 Arc::new(primary) as Arc<dyn Runtime>,
970 Some(Arc::new(delegate) as Arc<dyn Runtime>),
971 );
972
973 let merged = rt.list_images().await.unwrap();
974 let refs: Vec<&str> = merged.iter().map(|i| i.reference.as_str()).collect();
975 assert!(
976 refs.contains(&"primary/image:1") && refs.contains(&"delegate/image:1"),
977 "merged list should contain both entries, got {refs:?}",
978 );
979 }
980
981 #[tokio::test]
982 async fn dispatch_lookup_unknown_container_errors() {
983 let (rt, _calls) = make_composite(true);
984 let id = cid("ghost", 0);
985
986 let err = rt.start_container(&id).await.unwrap_err();
987 assert!(
988 matches!(err, AgentError::NotFound { .. }),
989 "expected NotFound for unknown container, got {err:?}"
990 );
991 }
992
993 async fn cached_os(rt: &CompositeRuntime, image: &str) -> Option<OsKind> {
995 rt.image_os.read().await.get(image).copied()
996 }
997
998 #[tokio::test]
999 async fn apply_image_os_inspection_populates_cache_on_ok_some() {
1000 let (rt, _calls) = make_composite(true);
1004 let image = "docker.io/library/alpine:3.19";
1005
1006 rt.apply_image_os_inspection(image, Ok(Some(OsKind::Linux)))
1007 .await;
1008
1009 assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
1010 }
1011
1012 #[tokio::test]
1013 async fn apply_image_os_inspection_leaves_cache_untouched_on_ok_none() {
1014 let (rt, _calls) = make_composite(true);
1018 let image = "docker.io/library/nginx:1.25";
1019
1020 rt.apply_image_os_inspection(image, Ok(None)).await;
1021
1022 assert_eq!(cached_os(&rt, image).await, None);
1023 }
1024
1025 #[tokio::test]
1026 async fn apply_image_os_inspection_leaves_cache_untouched_on_err() {
1027 let (rt, _calls) = make_composite(true);
1030 let image = "docker.io/library/nginx:1.25";
1031
1032 rt.record_image_os(image, OsKind::Linux).await;
1035
1036 let err = zlayer_registry::RegistryError::NotFound {
1037 registry: "docker.io".to_string(),
1038 image: image.to_string(),
1039 };
1040 rt.apply_image_os_inspection(image, Err(err)).await;
1041
1042 assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
1044 }
1045
1046 #[tokio::test]
1047 async fn pull_image_inspection_failure_does_not_fail_pull() {
1048 let (rt, _calls) = make_composite(true);
1054 let image = "invalid.example.invalid/ghost:v1";
1055
1056 rt.pull_image(image).await.unwrap();
1057
1058 assert_eq!(
1059 cached_os(&rt, image).await,
1060 None,
1061 "failed inspection must not populate the image-OS cache"
1062 );
1063 }
1064
1065 #[tokio::test]
1066 async fn pull_image_with_policy_inspection_failure_does_not_fail_pull() {
1067 let (rt, _calls) = make_composite(true);
1070 let image = "invalid.example.invalid/ghost:v1";
1071
1072 rt.pull_image_with_policy(image, PullPolicy::IfNotPresent, None)
1073 .await
1074 .unwrap();
1075
1076 assert_eq!(cached_os(&rt, image).await, None);
1077 }
1078
1079 #[test]
1080 fn os_kind_from_oci_str_roundtrip() {
1081 for os in [OsKind::Linux, OsKind::Windows, OsKind::Macos] {
1086 assert_eq!(OsKind::from_oci_str(os.as_oci_str()), Some(os));
1087 }
1088 assert_eq!(OsKind::from_oci_str(""), None);
1089 assert_eq!(OsKind::from_oci_str("freebsd"), None);
1090 }
1091}