1use std::collections::HashMap;
37use std::net::IpAddr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use async_trait::async_trait;
42use tokio::sync::RwLock;
43use zlayer_observability::logs::LogEntry;
44use zlayer_spec::{OsKind, PullPolicy, RegistryAuth, ServiceSpec};
45
46use crate::cgroups_stats::ContainerStats;
47use crate::error::{AgentError, Result};
48use crate::runtime::{
49 ContainerId, ContainerInspectDetails, ContainerState, ExecEventStream, ImageInfo, PruneResult,
50 Runtime, WaitCondition, WaitOutcome,
51};
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55enum DispatchTarget {
56 Primary,
57 Delegate,
58}
59
60pub struct CompositeRuntime {
64 primary: Arc<dyn Runtime>,
65 delegate: Option<Arc<dyn Runtime>>,
66 dispatch: Arc<RwLock<HashMap<ContainerId, DispatchTarget>>>,
69 image_os: Arc<RwLock<HashMap<String, OsKind>>>,
73}
74
75impl CompositeRuntime {
76 #[must_use]
82 pub fn new(primary: Arc<dyn Runtime>, delegate: Option<Arc<dyn Runtime>>) -> Self {
83 Self {
84 primary,
85 delegate,
86 dispatch: Arc::new(RwLock::new(HashMap::new())),
87 image_os: Arc::new(RwLock::new(HashMap::new())),
88 }
89 }
90
91 #[must_use]
93 pub fn primary(&self) -> &Arc<dyn Runtime> {
94 &self.primary
95 }
96
97 #[must_use]
99 pub fn delegate(&self) -> Option<&Arc<dyn Runtime>> {
100 self.delegate.as_ref()
101 }
102
103 pub(crate) async fn record_image_os(&self, image: &str, os: OsKind) {
109 self.image_os.write().await.insert(image.to_string(), os);
110 }
111
112 async fn apply_image_os_inspection(
129 &self,
130 image: &str,
131 result: std::result::Result<Option<OsKind>, zlayer_registry::RegistryError>,
132 ) {
133 match result {
134 Ok(Some(os)) => {
135 self.record_image_os(image, os).await;
136 tracing::debug!(image, ?os, "cached image OS for dispatch");
137 }
138 Ok(None) => {
139 tracing::trace!(
140 image,
141 "image manifest has no OS field — dispatch will fall through to primary",
142 );
143 }
144 Err(e) => {
145 tracing::warn!(
146 image,
147 error = %e,
148 "failed to inspect image manifest OS — dispatch will fall through to primary",
149 );
150 }
151 }
152 }
153
154 async fn select_for(&self, service: &str, spec: &ServiceSpec) -> Result<DispatchTarget> {
168 if let Some(platform) = &spec.platform {
169 let target = match platform.os {
170 OsKind::Windows | OsKind::Macos => DispatchTarget::Primary,
171 OsKind::Linux => DispatchTarget::Delegate,
172 };
173 if matches!(target, DispatchTarget::Delegate) && self.delegate.is_none() {
174 return Err(AgentError::RouteToPeer {
175 service: service.to_string(),
176 required_os: OsKind::Linux.as_oci_str().to_string(),
177 reason: "spec.platform.os = linux but this node has no WSL2 delegate \
178 configured; enable `--install-wsl yes` on this node or add a Linux \
179 peer to the cluster"
180 .to_string(),
181 });
182 }
183 return Ok(target);
184 }
185
186 if let Some(os) = self
187 .image_os
188 .read()
189 .await
190 .get(&spec.image.name.to_string())
191 .copied()
192 {
193 return match os {
194 OsKind::Linux => {
195 if self.delegate.is_some() {
196 Ok(DispatchTarget::Delegate)
197 } else {
198 Err(AgentError::RouteToPeer {
203 service: service.to_string(),
204 required_os: OsKind::Linux.as_oci_str().to_string(),
205 reason: format!(
206 "image '{}' manifest reports os=linux but this node has no WSL2 \
207 delegate configured; enable `--install-wsl yes` on this node or \
208 add a Linux peer to the cluster",
209 spec.image.name
210 ),
211 })
212 }
213 }
214 OsKind::Windows | OsKind::Macos => Ok(DispatchTarget::Primary),
215 };
216 }
217
218 Ok(DispatchTarget::Primary)
219 }
220
221 async fn lookup(&self, id: &ContainerId) -> Result<Arc<dyn Runtime>> {
223 let target =
224 self.dispatch
225 .read()
226 .await
227 .get(id)
228 .copied()
229 .ok_or_else(|| AgentError::NotFound {
230 container: id.to_string(),
231 reason: "no dispatch record in CompositeRuntime".to_string(),
232 })?;
233 Ok(self.runtime_for(target).clone())
234 }
235
236 fn runtime_for(&self, t: DispatchTarget) -> &Arc<dyn Runtime> {
243 match t {
244 DispatchTarget::Primary => &self.primary,
245 DispatchTarget::Delegate => self
246 .delegate
247 .as_ref()
248 .expect("delegate target requires delegate to exist"),
249 }
250 }
251}
252
253#[async_trait]
254impl Runtime for CompositeRuntime {
255 async fn pull_image(&self, image: &str) -> Result<()> {
256 self.primary.pull_image(image).await?;
257 if let Some(delegate) = &self.delegate {
258 if let Err(e) = delegate.pull_image(image).await {
259 tracing::debug!(
264 image,
265 error = %e,
266 "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
267 );
268 }
269 }
270
271 let os_result = zlayer_registry::fetch_image_os(image, None).await;
275 self.apply_image_os_inspection(image, os_result).await;
276
277 Ok(())
278 }
279
280 async fn pull_image_with_policy(
281 &self,
282 image: &str,
283 policy: PullPolicy,
284 auth: Option<&RegistryAuth>,
285 ) -> Result<()> {
286 self.primary
287 .pull_image_with_policy(image, policy, auth)
288 .await?;
289 if let Some(delegate) = &self.delegate {
290 if let Err(e) = delegate.pull_image_with_policy(image, policy, auth).await {
291 tracing::debug!(
292 image,
293 error = %e,
294 "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
295 );
296 }
297 }
298
299 let os_result = zlayer_registry::fetch_image_os(image, auth).await;
300 self.apply_image_os_inspection(image, os_result).await;
301
302 Ok(())
303 }
304
305 async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
306 let target = self.select_for(&id.service, spec).await?;
307 {
308 let mut dispatch = self.dispatch.write().await;
309 dispatch.insert(id.clone(), target);
310 }
311 let rt = self.runtime_for(target).clone();
312 match rt.create_container(id, spec).await {
313 Ok(()) => Ok(()),
314 Err(e) => {
315 self.dispatch.write().await.remove(id);
318 Err(e)
319 }
320 }
321 }
322
323 async fn start_container(&self, id: &ContainerId) -> Result<()> {
324 let rt = self.lookup(id).await?;
325 rt.start_container(id).await
326 }
327
328 async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
329 let rt = self.lookup(id).await?;
330 rt.stop_container(id, timeout).await
331 }
332
333 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
334 let rt = self.lookup(id).await?;
335 let res = rt.remove_container(id).await;
336 self.dispatch.write().await.remove(id);
337 res
338 }
339
340 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
341 let rt = self.lookup(id).await?;
342 rt.container_state(id).await
343 }
344
345 async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
346 let rt = self.lookup(id).await?;
347 rt.container_logs(id, tail).await
348 }
349
350 async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
351 let rt = self.lookup(id).await?;
352 rt.exec(id, cmd).await
353 }
354
355 async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
356 let rt = self.lookup(id).await?;
357 rt.exec_stream(id, cmd).await
358 }
359
360 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
361 let rt = self.lookup(id).await?;
362 rt.get_container_stats(id).await
363 }
364
365 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
366 let rt = self.lookup(id).await?;
367 rt.wait_container(id).await
368 }
369
370 async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
371 let rt = self.lookup(id).await?;
372 rt.wait_outcome(id).await
373 }
374
375 async fn wait_outcome_with_condition(
376 &self,
377 id: &ContainerId,
378 condition: WaitCondition,
379 ) -> Result<WaitOutcome> {
380 let rt = self.lookup(id).await?;
381 rt.wait_outcome_with_condition(id, condition).await
382 }
383
384 async fn rename_container(&self, id: &ContainerId, new_name: &str) -> Result<()> {
385 let rt = self.lookup(id).await?;
386 rt.rename_container(id, new_name).await
387 }
388
389 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
390 let rt = self.lookup(id).await?;
391 rt.get_logs(id).await
392 }
393
394 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
395 let rt = self.lookup(id).await?;
396 rt.get_container_pid(id).await
397 }
398
399 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
400 let rt = self.lookup(id).await?;
401 rt.get_container_ip(id).await
402 }
403
404 async fn get_container_port_override(&self, id: &ContainerId) -> Result<Option<u16>> {
405 let rt = self.lookup(id).await?;
406 rt.get_container_port_override(id).await
407 }
408
409 #[cfg(target_os = "windows")]
410 async fn get_container_namespace_id(
411 &self,
412 id: &ContainerId,
413 ) -> Result<Option<windows::core::GUID>> {
414 let rt = self.lookup(id).await?;
415 rt.get_container_namespace_id(id).await
416 }
417
418 async fn sync_container_volumes(&self, id: &ContainerId) -> Result<()> {
419 let rt = self.lookup(id).await?;
420 rt.sync_container_volumes(id).await
421 }
422
423 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
424 let mut out = self.primary.list_images().await?;
425 if let Some(delegate) = &self.delegate {
426 match delegate.list_images().await {
427 Ok(extra) => out.extend(extra),
428 Err(e) => tracing::warn!(
429 error = %e,
430 "delegate runtime list_images failed; returning primary results only",
431 ),
432 }
433 }
434 Ok(out)
435 }
436
437 async fn remove_image(&self, image: &str, force: bool) -> Result<()> {
438 match self.primary.remove_image(image, force).await {
439 Ok(()) => Ok(()),
440 Err(primary_err) => {
441 if let Some(delegate) = &self.delegate {
442 match delegate.remove_image(image, force).await {
443 Ok(()) => Ok(()),
444 Err(delegate_err) => {
445 tracing::debug!(
446 image,
447 %delegate_err,
448 "delegate remove_image also failed; returning primary error",
449 );
450 Err(primary_err)
451 }
452 }
453 } else {
454 Err(primary_err)
455 }
456 }
457 }
458 }
459
460 async fn prune_images(&self) -> Result<PruneResult> {
461 let mut result = self.primary.prune_images().await?;
462 if let Some(delegate) = &self.delegate {
463 match delegate.prune_images().await {
464 Ok(extra) => {
465 result.deleted.extend(extra.deleted);
466 result.space_reclaimed =
467 result.space_reclaimed.saturating_add(extra.space_reclaimed);
468 }
469 Err(e) => tracing::warn!(
470 error = %e,
471 "delegate runtime prune_images failed; returning primary result only",
472 ),
473 }
474 }
475 Ok(result)
476 }
477
478 async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
479 let rt = self.lookup(id).await?;
480 rt.kill_container(id, signal).await
481 }
482
483 async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
484 match self.primary.tag_image(source, target).await {
485 Ok(()) => Ok(()),
486 Err(primary_err) => {
487 if let Some(delegate) = &self.delegate {
488 match delegate.tag_image(source, target).await {
489 Ok(()) => Ok(()),
490 Err(delegate_err) => {
491 tracing::debug!(
492 source,
493 target,
494 %delegate_err,
495 "delegate tag_image also failed; returning primary error",
496 );
497 Err(primary_err)
498 }
499 }
500 } else {
501 Err(primary_err)
502 }
503 }
504 }
505 }
506
507 async fn inspect_detailed(&self, id: &ContainerId) -> Result<ContainerInspectDetails> {
508 let rt = self.lookup(id).await?;
509 rt.inspect_detailed(id).await
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use super::*;
516 use crate::cgroups_stats::ContainerStats;
517 use std::sync::Mutex as StdMutex;
518 use zlayer_spec::{ArchKind, DeploymentSpec, TargetPlatform};
519
520 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
523 enum Role {
524 Primary,
525 Delegate,
526 }
527
528 type CallRecord = (Role, String, Option<ContainerId>);
530 type CallLog = Arc<StdMutex<Vec<CallRecord>>>;
532
533 struct MockRuntime {
540 role: Role,
541 calls: CallLog,
542 list_images_response: Vec<ImageInfo>,
543 pull_image_error: Option<String>,
544 }
545
546 impl MockRuntime {
547 fn new(role: Role, calls: CallLog) -> Self {
548 Self {
549 role,
550 calls,
551 list_images_response: Vec::new(),
552 pull_image_error: None,
553 }
554 }
555
556 fn record(&self, method: &str, id: Option<&ContainerId>) {
557 self.calls
558 .lock()
559 .expect("mock call-log mutex poisoned")
560 .push((self.role, method.to_string(), id.cloned()));
561 }
562 }
563
564 #[async_trait]
565 impl Runtime for MockRuntime {
566 async fn pull_image(&self, _image: &str) -> Result<()> {
567 self.record("pull_image", None);
568 if let Some(msg) = &self.pull_image_error {
569 return Err(AgentError::Internal(msg.clone()));
570 }
571 Ok(())
572 }
573
574 async fn pull_image_with_policy(
575 &self,
576 _image: &str,
577 _policy: PullPolicy,
578 _auth: Option<&RegistryAuth>,
579 ) -> Result<()> {
580 self.record("pull_image_with_policy", None);
581 Ok(())
582 }
583
584 async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
585 self.record("create_container", Some(id));
586 Ok(())
587 }
588
589 async fn start_container(&self, id: &ContainerId) -> Result<()> {
590 self.record("start_container", Some(id));
591 Ok(())
592 }
593
594 async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
595 self.record("stop_container", Some(id));
596 Ok(())
597 }
598
599 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
600 self.record("remove_container", Some(id));
601 Ok(())
602 }
603
604 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
605 self.record("container_state", Some(id));
606 Ok(ContainerState::Running)
607 }
608
609 async fn container_logs(&self, id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
610 self.record("container_logs", Some(id));
611 Ok(Vec::new())
612 }
613
614 async fn exec(&self, id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
615 self.record("exec", Some(id));
616 Ok((0, String::new(), String::new()))
617 }
618
619 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
620 self.record("get_container_stats", Some(id));
621 Ok(ContainerStats {
622 cpu_usage_usec: 0,
623 memory_bytes: 0,
624 memory_limit: 0,
625 timestamp: std::time::Instant::now(),
626 })
627 }
628
629 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
630 self.record("wait_container", Some(id));
631 Ok(0)
632 }
633
634 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
635 self.record("get_logs", Some(id));
636 Ok(Vec::new())
637 }
638
639 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
640 self.record("get_container_pid", Some(id));
641 Ok(None)
642 }
643
644 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
645 self.record("get_container_ip", Some(id));
646 Ok(None)
647 }
648
649 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
650 self.record("list_images", None);
651 Ok(self.list_images_response.clone())
652 }
653 }
654
655 fn make_spec(image: &str, platform: Option<TargetPlatform>) -> ServiceSpec {
659 let yaml = format!(
660 r"
661version: v1
662deployment: test
663services:
664 test:
665 rtype: service
666 image:
667 name: {image}
668 endpoints:
669 - name: http
670 protocol: http
671 port: 8080
672"
673 );
674 let mut spec = serde_yaml::from_str::<DeploymentSpec>(&yaml)
675 .expect("valid deployment yaml")
676 .services
677 .remove("test")
678 .expect("service 'test' present");
679 spec.platform = platform;
680 spec
681 }
682
683 fn cid(service: &str, replica: u32) -> ContainerId {
684 ContainerId {
685 service: service.to_string(),
686 replica,
687 }
688 }
689
690 fn make_composite(with_delegate: bool) -> (CompositeRuntime, CallLog) {
691 let calls = Arc::new(StdMutex::new(Vec::new()));
692 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
693 let delegate = if with_delegate {
694 Some(Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>)
695 } else {
696 None
697 };
698 (
699 CompositeRuntime::new(primary as Arc<dyn Runtime>, delegate),
700 calls,
701 )
702 }
703
704 fn role_for(calls: &[CallRecord], method: &str) -> Option<Role> {
705 calls
706 .iter()
707 .find(|(_, m, _)| m == method)
708 .map(|(role, _, _)| *role)
709 }
710
711 #[tokio::test]
712 async fn dispatch_windows_spec_goes_to_primary() {
713 let (rt, calls) = make_composite(true);
714 let id = cid("win-svc", 0);
715 let spec = make_spec(
716 "mcr.microsoft.com/windows/nanoserver:ltsc2022",
717 Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
718 );
719
720 rt.create_container(&id, &spec).await.unwrap();
721 rt.start_container(&id).await.unwrap();
722
723 let calls = calls.lock().unwrap();
724 assert_eq!(
725 role_for(&calls, "create_container"),
726 Some(Role::Primary),
727 "create_container should hit primary for Windows spec"
728 );
729 assert_eq!(
730 role_for(&calls, "start_container"),
731 Some(Role::Primary),
732 "start_container should hit primary for Windows spec"
733 );
734 }
735
736 #[tokio::test]
737 async fn dispatch_linux_spec_goes_to_delegate() {
738 let (rt, calls) = make_composite(true);
739 let id = cid("lin-svc", 0);
740 let spec = make_spec(
741 "docker.io/library/alpine:3.19",
742 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
743 );
744
745 rt.create_container(&id, &spec).await.unwrap();
746 rt.start_container(&id).await.unwrap();
747
748 let calls = calls.lock().unwrap();
749 assert_eq!(
750 role_for(&calls, "create_container"),
751 Some(Role::Delegate),
752 "create_container should hit delegate for Linux spec"
753 );
754 assert_eq!(
755 role_for(&calls, "start_container"),
756 Some(Role::Delegate),
757 "start_container should hit delegate for Linux spec"
758 );
759 }
760
761 #[tokio::test]
762 async fn dispatch_linux_without_delegate_errors() {
763 let (rt, _calls) = make_composite(false);
767 let id = cid("lin-svc", 0);
768 let spec = make_spec(
769 "docker.io/library/alpine:3.19",
770 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
771 );
772
773 let err = rt.create_container(&id, &spec).await.unwrap_err();
774 match err {
775 AgentError::RouteToPeer {
776 service,
777 required_os,
778 reason,
779 } => {
780 assert_eq!(service, "lin-svc");
781 assert_eq!(required_os, "linux");
782 assert!(
783 reason.contains("--install-wsl") && reason.contains("Linux peer"),
784 "reason must name both remediations, got: {reason}"
785 );
786 }
787 other => panic!("expected RouteToPeer, got {other:?}"),
788 }
789 }
790
791 #[tokio::test]
792 async fn dispatch_linux_image_cache_without_delegate_routes_to_peer() {
793 let (rt, _calls) = make_composite(false);
798 let id = cid("svc", 0);
799 let image = "docker.io/library/nginx:1.25";
800 rt.record_image_os(image, OsKind::Linux).await;
801
802 let spec = make_spec(image, None);
803 let err = rt.create_container(&id, &spec).await.unwrap_err();
804 match err {
805 AgentError::RouteToPeer {
806 service,
807 required_os,
808 reason,
809 } => {
810 assert_eq!(service, "svc");
811 assert_eq!(required_os, "linux");
812 assert!(
813 reason.contains(image),
814 "reason should mention the image name, got: {reason}"
815 );
816 assert!(
817 reason.contains("--install-wsl") && reason.contains("Linux peer"),
818 "reason must name both remediations, got: {reason}"
819 );
820 }
821 other => panic!("expected RouteToPeer, got {other:?}"),
822 }
823 }
824
825 #[tokio::test]
826 async fn dispatch_macos_spec_goes_to_primary() {
827 let (rt, calls) = make_composite(true);
828 let id = cid("mac-svc", 0);
829 let spec = make_spec(
830 "ghcr.io/zlayer/macos:latest",
831 Some(TargetPlatform::new(OsKind::Macos, ArchKind::Arm64)),
832 );
833
834 rt.create_container(&id, &spec).await.unwrap();
835
836 let calls = calls.lock().unwrap();
837 assert_eq!(
838 role_for(&calls, "create_container"),
839 Some(Role::Primary),
840 "create_container should hit primary for Macos spec"
841 );
842 }
843
844 #[tokio::test]
845 async fn dispatch_no_platform_no_image_os_falls_through_to_primary() {
846 let (rt, calls) = make_composite(true);
847 let id = cid("svc", 0);
848 let spec = make_spec("docker.io/library/nginx:1.25", None);
849
850 rt.create_container(&id, &spec).await.unwrap();
851
852 let calls = calls.lock().unwrap();
853 assert_eq!(
854 role_for(&calls, "create_container"),
855 Some(Role::Primary),
856 "fall-through should pick primary when both platform and image-OS cache are unknown"
857 );
858 }
859
860 #[tokio::test]
861 async fn dispatch_uses_image_os_cache_when_platform_missing() {
862 let (rt, calls) = make_composite(true);
863 let id = cid("svc", 0);
864 let image = "docker.io/library/nginx:1.25";
865 rt.record_image_os(image, OsKind::Linux).await;
866
867 let spec = make_spec(image, None);
868 rt.create_container(&id, &spec).await.unwrap();
869
870 let calls = calls.lock().unwrap();
871 assert_eq!(
872 role_for(&calls, "create_container"),
873 Some(Role::Delegate),
874 "image-OS cache should route Linux images to the delegate"
875 );
876 }
877
878 #[tokio::test]
879 async fn per_container_dispatch_cache_persists_through_start_stop() {
880 let (rt, calls) = make_composite(true);
881 let id = cid("win-svc", 0);
882 let spec = make_spec(
883 "mcr.microsoft.com/windows/nanoserver:ltsc2022",
884 Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
885 );
886
887 rt.create_container(&id, &spec).await.unwrap();
888 rt.start_container(&id).await.unwrap();
889 rt.stop_container(&id, Duration::from_secs(1))
890 .await
891 .unwrap();
892 rt.remove_container(&id).await.unwrap();
893
894 let recorded = calls.lock().unwrap().clone();
895 for method in [
896 "create_container",
897 "start_container",
898 "stop_container",
899 "remove_container",
900 ] {
901 assert_eq!(
902 role_for(&recorded, method),
903 Some(Role::Primary),
904 "{method} should have dispatched to primary"
905 );
906 }
907
908 let after = rt
910 .start_container(&id)
911 .await
912 .expect_err("lookup after remove should fail");
913 assert!(
914 matches!(after, AgentError::NotFound { .. }),
915 "expected NotFound after remove, got {after:?}"
916 );
917 }
918
919 #[tokio::test]
920 async fn pull_image_calls_both_runtimes() {
921 let (rt, calls) = make_composite(true);
922 rt.pull_image("docker.io/library/alpine:3.19")
923 .await
924 .unwrap();
925
926 let recorded = calls.lock().unwrap();
927 let pull_calls: Vec<Role> = recorded
928 .iter()
929 .filter(|(_, m, _)| m == "pull_image")
930 .map(|(r, _, _)| *r)
931 .collect();
932 assert!(
933 pull_calls.contains(&Role::Primary),
934 "primary should have been pulled: {pull_calls:?}",
935 );
936 assert!(
937 pull_calls.contains(&Role::Delegate),
938 "delegate should have been pulled: {pull_calls:?}",
939 );
940 }
941
942 #[tokio::test]
943 async fn pull_image_delegate_error_does_not_fail() {
944 let calls = Arc::new(StdMutex::new(Vec::new()));
947 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
948 let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
949 delegate.pull_image_error = Some("simulated delegate pull failure".to_string());
950 let rt = CompositeRuntime::new(
951 primary as Arc<dyn Runtime>,
952 Some(Arc::new(delegate) as Arc<dyn Runtime>),
953 );
954
955 rt.pull_image("docker.io/library/alpine:3.19")
957 .await
958 .unwrap();
959
960 let recorded = calls.lock().unwrap();
961 let pull_calls: Vec<Role> = recorded
962 .iter()
963 .filter(|(_, m, _)| m == "pull_image")
964 .map(|(r, _, _)| *r)
965 .collect();
966 assert!(
967 pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
968 "both runtimes should have been called: {pull_calls:?}",
969 );
970 }
971
972 #[tokio::test]
973 async fn list_images_merges_both() {
974 let calls = Arc::new(StdMutex::new(Vec::new()));
976 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
977 primary.list_images_response = vec![ImageInfo {
978 reference: "primary/image:1".to_string(),
979 digest: None,
980 size_bytes: None,
981 }];
982 let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
983 delegate.list_images_response = vec![ImageInfo {
984 reference: "delegate/image:1".to_string(),
985 digest: None,
986 size_bytes: None,
987 }];
988 let rt = CompositeRuntime::new(
989 Arc::new(primary) as Arc<dyn Runtime>,
990 Some(Arc::new(delegate) as Arc<dyn Runtime>),
991 );
992
993 let merged = rt.list_images().await.unwrap();
994 let refs: Vec<&str> = merged.iter().map(|i| i.reference.as_str()).collect();
995 assert!(
996 refs.contains(&"primary/image:1") && refs.contains(&"delegate/image:1"),
997 "merged list should contain both entries, got {refs:?}",
998 );
999 }
1000
1001 #[tokio::test]
1002 async fn dispatch_lookup_unknown_container_errors() {
1003 let (rt, _calls) = make_composite(true);
1004 let id = cid("ghost", 0);
1005
1006 let err = rt.start_container(&id).await.unwrap_err();
1007 assert!(
1008 matches!(err, AgentError::NotFound { .. }),
1009 "expected NotFound for unknown container, got {err:?}"
1010 );
1011 }
1012
1013 async fn cached_os(rt: &CompositeRuntime, image: &str) -> Option<OsKind> {
1015 rt.image_os.read().await.get(image).copied()
1016 }
1017
1018 #[tokio::test]
1019 async fn apply_image_os_inspection_populates_cache_on_ok_some() {
1020 let (rt, _calls) = make_composite(true);
1024 let image = "docker.io/library/alpine:3.19";
1025
1026 rt.apply_image_os_inspection(image, Ok(Some(OsKind::Linux)))
1027 .await;
1028
1029 assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
1030 }
1031
1032 #[tokio::test]
1033 async fn apply_image_os_inspection_leaves_cache_untouched_on_ok_none() {
1034 let (rt, _calls) = make_composite(true);
1038 let image = "docker.io/library/nginx:1.25";
1039
1040 rt.apply_image_os_inspection(image, Ok(None)).await;
1041
1042 assert_eq!(cached_os(&rt, image).await, None);
1043 }
1044
1045 #[tokio::test]
1046 async fn apply_image_os_inspection_leaves_cache_untouched_on_err() {
1047 let (rt, _calls) = make_composite(true);
1050 let image = "docker.io/library/nginx:1.25";
1051
1052 rt.record_image_os(image, OsKind::Linux).await;
1055
1056 let err = zlayer_registry::RegistryError::NotFound {
1057 registry: "docker.io".to_string(),
1058 image: image.to_string(),
1059 };
1060 rt.apply_image_os_inspection(image, Err(err)).await;
1061
1062 assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
1064 }
1065
1066 #[tokio::test]
1067 async fn pull_image_inspection_failure_does_not_fail_pull() {
1068 let (rt, _calls) = make_composite(true);
1074 let image = "invalid.example.invalid/ghost:v1";
1075
1076 rt.pull_image(image).await.unwrap();
1077
1078 assert_eq!(
1079 cached_os(&rt, image).await,
1080 None,
1081 "failed inspection must not populate the image-OS cache"
1082 );
1083 }
1084
1085 #[tokio::test]
1086 async fn pull_image_with_policy_inspection_failure_does_not_fail_pull() {
1087 let (rt, _calls) = make_composite(true);
1090 let image = "invalid.example.invalid/ghost:v1";
1091
1092 rt.pull_image_with_policy(image, PullPolicy::IfNotPresent, None)
1093 .await
1094 .unwrap();
1095
1096 assert_eq!(cached_os(&rt, image).await, None);
1097 }
1098
1099 #[test]
1100 fn os_kind_from_oci_str_roundtrip() {
1101 for os in [OsKind::Linux, OsKind::Windows, OsKind::Macos] {
1106 assert_eq!(OsKind::from_oci_str(os.as_oci_str()), Some(os));
1107 }
1108 assert_eq!(OsKind::from_oci_str(""), None);
1109 assert_eq!(OsKind::from_oci_str("freebsd"), None);
1110 }
1111}