1use std::collections::HashMap;
37use std::net::IpAddr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use async_trait::async_trait;
42use tokio::sync::RwLock;
43use zlayer_observability::logs::{LogEntry, LogStream};
44use zlayer_spec::{OsKind, PullPolicy, RegistryAuth, RuntimeIsolation, ServiceSpec};
45
46use crate::cgroups_stats::ContainerStats;
47use crate::error::{AgentError, Result};
48use crate::runtime::{
49 ContainerId, ContainerInspectDetails, ContainerResourceUpdate, ContainerState,
50 ContainerUpdateOutcome, ExecEventStream, ImageInfo, ImageInspectInfo, ImageStoreHandles,
51 LogChannel, LogChunk, LogsStream, LogsStreamOptions, OverlayAttachKind, PruneResult, Runtime,
52 StatsSample, StatsStream, WaitCondition, WaitOutcome,
53};
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57enum DispatchTarget {
58 Primary,
59 Delegate,
60 Vz,
64 VzLinux,
69}
70
71pub struct CompositeRuntime {
75 primary: Arc<dyn Runtime>,
76 delegate: Option<Arc<dyn Runtime>>,
77 vz: Option<Arc<dyn Runtime>>,
80 vz_linux: Option<Arc<dyn Runtime>>,
84 dispatch: Arc<RwLock<HashMap<ContainerId, DispatchTarget>>>,
87 image_os: Arc<RwLock<HashMap<String, OsKind>>>,
91 image_runtime: Arc<RwLock<HashMap<String, String>>>,
97 image_isolation: Arc<RwLock<HashMap<String, String>>>,
105 os_inspect_cache_paths: Vec<std::path::PathBuf>,
124}
125
126impl CompositeRuntime {
127 #[must_use]
133 pub fn new(primary: Arc<dyn Runtime>, delegate: Option<Arc<dyn Runtime>>) -> Self {
134 Self {
135 primary,
136 delegate,
137 vz: None,
138 vz_linux: None,
139 dispatch: Arc::new(RwLock::new(HashMap::new())),
140 image_os: Arc::new(RwLock::new(HashMap::new())),
141 image_runtime: Arc::new(RwLock::new(HashMap::new())),
142 image_isolation: Arc::new(RwLock::new(HashMap::new())),
143 os_inspect_cache_paths: Vec::new(),
144 }
145 }
146
147 #[must_use]
155 pub fn with_os_inspect_cache_path(self, path: Option<std::path::PathBuf>) -> Self {
156 self.with_os_inspect_cache_paths(path.into_iter().collect())
157 }
158
159 #[must_use]
171 pub fn with_os_inspect_cache_paths(mut self, paths: Vec<std::path::PathBuf>) -> Self {
172 self.os_inspect_cache_paths = paths;
173 self
174 }
175
176 async fn image_present_locally(&self, image: &str) -> bool {
186 match self.list_images().await {
187 Ok(images) => images.iter().any(|info| info.reference == image),
188 Err(e) => {
189 tracing::debug!(
190 image,
191 error = %e,
192 "composite: list_images unavailable for the IfNotPresent presence check; \
193 proceeding with a normal pull",
194 );
195 false
196 }
197 }
198 }
199
200 async fn inspect_image_os(
220 &self,
221 image: &str,
222 ) -> std::result::Result<Option<OsKind>, zlayer_registry::RegistryError> {
223 if let Some(os) = self.image_os_from_local_sidecar(image).await {
232 return Ok(Some(os));
233 }
234
235 for path in &self.os_inspect_cache_paths {
236 match zlayer_registry::CacheType::persistent_at(path)
237 .build()
238 .await
239 {
240 Ok(cache) => {
241 match zlayer_registry::fetch_image_os_in_cache_only(image, cache, None).await {
242 Ok(Some(os)) => return Ok(Some(os)),
243 Ok(None) => {
244 tracing::trace!(
245 image,
246 cache = %path.display(),
247 "image OS not resolvable from this local cache; trying next",
248 );
249 }
250 Err(e) => return Err(e),
251 }
252 }
253 Err(e) => {
254 tracing::debug!(
255 image,
256 cache = %path.display(),
257 error = %e,
258 "failed to open OS-inspect blob cache; trying next",
259 );
260 }
261 }
262 }
263 Ok(None)
268 }
269
270 async fn image_os_from_local_sidecar(&self, image: &str) -> Option<OsKind> {
281 let sanitized = sanitize_image_name(image);
282 for path in &self.os_inspect_cache_paths {
283 let Some(images_dir) = path.parent() else {
284 continue;
285 };
286 let sidecar = images_dir
287 .join(&sanitized)
288 .join(zlayer_types::local_image::LOCAL_IMAGE_METADATA_FILE);
289 let Ok(bytes) = tokio::fs::read(&sidecar).await else {
290 continue;
291 };
292 let meta: zlayer_types::local_image::LocalImageMetadata =
293 match serde_json::from_slice(&bytes) {
294 Ok(m) => m,
295 Err(e) => {
296 tracing::debug!(
297 image,
298 sidecar = %sidecar.display(),
299 error = %e,
300 "failed to parse local image metadata sidecar; trying next",
301 );
302 continue;
303 }
304 };
305 if let Some(os) = OsKind::from_oci_str(&meta.os) {
306 return Some(os);
307 }
308 }
309 None
310 }
311
312 async fn inspect_image_runtime_marker(
315 &self,
316 image: &str,
317 auth: Option<&RegistryAuth>,
318 ) -> std::result::Result<Option<String>, zlayer_registry::RegistryError> {
319 for path in &self.os_inspect_cache_paths {
320 match zlayer_registry::CacheType::persistent_at(path)
321 .build()
322 .await
323 {
324 Ok(cache) => {
325 match zlayer_registry::fetch_image_runtime_marker_in_cache_only(
326 image, cache, None,
327 )
328 .await
329 {
330 Ok(Some(marker)) => return Ok(Some(marker)),
331 Ok(None) => {
332 tracing::trace!(
333 image,
334 cache = %path.display(),
335 "runtime marker not resolvable from this local cache; trying next",
336 );
337 }
338 Err(e) => return Err(e),
339 }
340 }
341 Err(e) => {
342 tracing::debug!(
343 image,
344 cache = %path.display(),
345 error = %e,
346 "failed to open marker-inspect blob cache; trying next",
347 );
348 }
349 }
350 }
351 zlayer_registry::fetch_image_runtime_marker(image, auth).await
352 }
353
354 async fn inspect_image_isolation_marker(
360 &self,
361 image: &str,
362 auth: Option<&RegistryAuth>,
363 ) -> std::result::Result<Option<String>, zlayer_registry::RegistryError> {
364 for path in &self.os_inspect_cache_paths {
365 match zlayer_registry::CacheType::persistent_at(path)
366 .build()
367 .await
368 {
369 Ok(cache) => {
370 match zlayer_registry::fetch_image_isolation_marker_in_cache_only(
371 image, cache, None,
372 )
373 .await
374 {
375 Ok(Some(marker)) => return Ok(Some(marker)),
376 Ok(None) => {
377 tracing::trace!(
378 image,
379 cache = %path.display(),
380 "isolation default not resolvable from this local cache; trying next",
381 );
382 }
383 Err(e) => return Err(e),
384 }
385 }
386 Err(e) => {
387 tracing::debug!(
388 image,
389 cache = %path.display(),
390 error = %e,
391 "failed to open isolation-inspect blob cache; trying next",
392 );
393 }
394 }
395 }
396 zlayer_registry::fetch_image_isolation_marker(image, auth).await
397 }
398
399 #[must_use]
402 pub fn with_vz_delegate(mut self, vz: Option<Arc<dyn Runtime>>) -> Self {
403 self.vz = vz;
404 self
405 }
406
407 #[must_use]
413 pub fn with_vz_linux_delegate(mut self, vz_linux: Option<Arc<dyn Runtime>>) -> Self {
414 self.vz_linux = vz_linux;
415 self
416 }
417
418 #[must_use]
420 pub fn primary(&self) -> &Arc<dyn Runtime> {
421 &self.primary
422 }
423
424 #[must_use]
426 pub fn delegate(&self) -> Option<&Arc<dyn Runtime>> {
427 self.delegate.as_ref()
428 }
429
430 pub(crate) async fn record_image_os(&self, image: &str, os: OsKind) {
436 self.image_os.write().await.insert(image.to_string(), os);
437 }
438
439 pub(crate) async fn record_image_runtime(&self, image: &str, marker: String) {
442 self.image_runtime
443 .write()
444 .await
445 .insert(image.to_string(), marker);
446 }
447
448 pub(crate) async fn record_image_isolation(&self, image: &str, marker: String) {
452 self.image_isolation
453 .write()
454 .await
455 .insert(image.to_string(), marker);
456 }
457
458 async fn apply_image_runtime_inspection(
463 &self,
464 image: &str,
465 result: std::result::Result<Option<String>, zlayer_registry::RegistryError>,
466 ) {
467 match result {
468 Ok(Some(marker)) => {
469 tracing::debug!(image, marker, "cached image runtime marker for dispatch");
470 self.record_image_runtime(image, marker).await;
471 }
472 Ok(None) => {}
473 Err(e) => {
474 tracing::trace!(
475 image,
476 error = %e,
477 "failed to inspect image runtime marker — dispatch unaffected",
478 );
479 }
480 }
481 }
482
483 async fn apply_image_isolation_inspection(
489 &self,
490 image: &str,
491 result: std::result::Result<Option<String>, zlayer_registry::RegistryError>,
492 ) {
493 match result {
494 Ok(Some(marker)) => {
495 tracing::debug!(image, marker, "cached image isolation default for dispatch");
496 self.record_image_isolation(image, marker).await;
497 }
498 Ok(None) => {}
499 Err(e) => {
500 tracing::trace!(
501 image,
502 error = %e,
503 "failed to inspect image isolation default — dispatch unaffected",
504 );
505 }
506 }
507 }
508
509 async fn apply_image_os_inspection(
526 &self,
527 image: &str,
528 result: std::result::Result<Option<OsKind>, zlayer_registry::RegistryError>,
529 ) {
530 match result {
531 Ok(Some(os)) => {
532 self.record_image_os(image, os).await;
533 tracing::debug!(image, ?os, "cached image OS for dispatch");
534 }
535 Ok(None) => {
536 tracing::trace!(
537 image,
538 "image manifest has no OS field — dispatch will fall through to primary",
539 );
540 }
541 Err(e) => {
542 tracing::warn!(
543 image,
544 error = %e,
545 "failed to inspect image manifest OS — dispatch will fall through to primary",
546 );
547 }
548 }
549 }
550
551 #[allow(clippy::too_many_lines)]
582 async fn select_for(&self, service: &str, spec: &ServiceSpec) -> Result<DispatchTarget> {
583 let image_known_linux = matches!(
589 self.image_os
590 .read()
591 .await
592 .get(&spec.image.name.to_string())
593 .copied(),
594 Some(OsKind::Linux)
595 );
596
597 if let Some(isolation) = spec.runtime {
602 match isolation {
603 RuntimeIsolation::Auto => {}
604 RuntimeIsolation::Sandbox => {
605 if self.vz_linux.is_some() && image_known_linux {
609 tracing::warn!(
610 service,
611 image = %spec.image.name,
612 "sandbox requested but image is Linux; routing to VZ-Linux"
613 );
614 return Ok(DispatchTarget::VzLinux);
615 }
616 return Ok(DispatchTarget::Primary);
617 }
618 RuntimeIsolation::Vz => {
619 if self.vz.is_some() {
620 return Ok(DispatchTarget::Vz);
621 }
622 return Err(AgentError::Configuration(
623 "runtime `vz` requested but the native-VZ delegate is not \
624 available on this node"
625 .to_string(),
626 ));
627 }
628 RuntimeIsolation::VzLinux => {
629 if self.vz_linux.is_some() {
630 return Ok(DispatchTarget::VzLinux);
631 }
632 return Err(AgentError::Configuration(
633 "runtime `vz-linux` requested but the VZ-Linux delegate is not \
634 available on this node"
635 .to_string(),
636 ));
637 }
638 RuntimeIsolation::Vm => {
639 if self.delegate.is_some() {
640 return Ok(DispatchTarget::Delegate);
641 }
642 return Err(AgentError::Configuration(
643 "runtime `vm` requested but the libkrun delegate is not \
644 available on this node"
645 .to_string(),
646 ));
647 }
648 }
649 }
650
651 if let Some(label) = spec.labels.get("com.zlayer.isolation") {
658 if self.vz.is_some() && label.eq_ignore_ascii_case("vz") {
659 return Ok(DispatchTarget::Vz);
660 }
661 if self.vz_linux.is_some() && label.eq_ignore_ascii_case("vz-linux") {
662 return Ok(DispatchTarget::VzLinux);
663 }
664 if label.eq_ignore_ascii_case("vm") || label.eq_ignore_ascii_case("libkrun") {
665 if self.delegate.is_some() {
669 return Ok(DispatchTarget::Delegate);
670 }
671 }
672 if label.eq_ignore_ascii_case("sandbox") || label.eq_ignore_ascii_case("seatbelt") {
673 if self.vz_linux.is_some() && image_known_linux {
677 tracing::warn!(
678 service,
679 image = %spec.image.name,
680 "sandbox requested but image is Linux; routing to VZ-Linux"
681 );
682 return Ok(DispatchTarget::VzLinux);
683 }
684 return Ok(DispatchTarget::Primary);
685 }
686 }
687
688 if let Some(label) = self
697 .image_isolation
698 .read()
699 .await
700 .get(&spec.image.name.to_string())
701 .cloned()
702 {
703 if self.vz.is_some() && label.eq_ignore_ascii_case("vz") {
704 return Ok(DispatchTarget::Vz);
705 }
706 if self.vz_linux.is_some() && label.eq_ignore_ascii_case("vz-linux") {
707 return Ok(DispatchTarget::VzLinux);
708 }
709 if label.eq_ignore_ascii_case("vm") || label.eq_ignore_ascii_case("libkrun") {
710 if self.delegate.is_some() {
713 return Ok(DispatchTarget::Delegate);
714 }
715 }
716 if label.eq_ignore_ascii_case("sandbox") || label.eq_ignore_ascii_case("seatbelt") {
717 if self.vz_linux.is_some() && image_known_linux {
721 tracing::warn!(
722 service,
723 image = %spec.image.name,
724 "image declares isolation=sandbox but image is Linux; routing to VZ-Linux"
725 );
726 return Ok(DispatchTarget::VzLinux);
727 }
728 return Ok(DispatchTarget::Primary);
729 }
730 }
731
732 if self.vz.is_some()
738 && self
739 .image_runtime
740 .read()
741 .await
742 .get(&spec.image.name.to_string())
743 .is_some_and(|m| m.eq_ignore_ascii_case(zlayer_registry::ZLAYER_RUNTIME_VZ))
744 {
745 return Ok(DispatchTarget::Vz);
746 }
747
748 if self.vz_linux.is_some()
751 && self
752 .image_runtime
753 .read()
754 .await
755 .get(&spec.image.name.to_string())
756 .is_some_and(|m| m.eq_ignore_ascii_case(zlayer_registry::ZLAYER_RUNTIME_LINUX_VZ))
757 {
758 return Ok(DispatchTarget::VzLinux);
759 }
760
761 if let Some(platform) = &spec.platform {
762 let target = match platform.os {
763 OsKind::Windows | OsKind::Macos => DispatchTarget::Primary,
764 OsKind::Linux if self.vz_linux.is_some() => DispatchTarget::VzLinux,
767 OsKind::Linux => DispatchTarget::Delegate,
768 };
769 if matches!(target, DispatchTarget::Delegate) && self.delegate.is_none() {
770 return Err(AgentError::RouteToPeer {
771 service: service.to_string(),
772 required_os: OsKind::Linux.as_oci_str().to_string(),
773 reason: "spec.platform.os = linux but this node has no WSL2 delegate \
774 configured; enable `--install-wsl yes` on this node or add a Linux \
775 peer to the cluster"
776 .to_string(),
777 });
778 }
779 return Ok(target);
780 }
781
782 if let Some(os) = self
783 .image_os
784 .read()
785 .await
786 .get(&spec.image.name.to_string())
787 .copied()
788 {
789 return match os {
790 OsKind::Linux => {
791 if self.vz_linux.is_some() {
792 Ok(DispatchTarget::VzLinux)
794 } else if self.delegate.is_some() {
795 Ok(DispatchTarget::Delegate)
796 } else {
797 Err(AgentError::RouteToPeer {
802 service: service.to_string(),
803 required_os: OsKind::Linux.as_oci_str().to_string(),
804 reason: format!(
805 "image '{}' manifest reports os=linux but this node has no WSL2 \
806 delegate configured; enable `--install-wsl yes` on this node or \
807 add a Linux peer to the cluster",
808 spec.image.name
809 ),
810 })
811 }
812 }
813 OsKind::Windows | OsKind::Macos => Ok(DispatchTarget::Primary),
814 };
815 }
816
817 if self.vz_linux.is_some() {
832 return Ok(DispatchTarget::VzLinux);
833 }
834
835 Ok(DispatchTarget::Primary)
836 }
837
838 async fn lookup(&self, id: &ContainerId) -> Result<Arc<dyn Runtime>> {
840 let target =
841 self.dispatch
842 .read()
843 .await
844 .get(id)
845 .copied()
846 .ok_or_else(|| AgentError::NotFound {
847 container: id.to_string(),
848 reason: "no dispatch record in CompositeRuntime".to_string(),
849 })?;
850 Ok(self.runtime_for(target).clone())
851 }
852
853 fn runtime_for(&self, t: DispatchTarget) -> &Arc<dyn Runtime> {
860 match t {
861 DispatchTarget::Primary => &self.primary,
862 DispatchTarget::Delegate => self
863 .delegate
864 .as_ref()
865 .expect("delegate target requires delegate to exist"),
866 DispatchTarget::Vz => self.vz.as_ref().unwrap_or(&self.primary),
869 DispatchTarget::VzLinux => self.vz_linux.as_ref().unwrap_or(&self.primary),
872 }
873 }
874
875 async fn read_backends(
887 &self,
888 id: &ContainerId,
889 ) -> Result<Vec<(&'static str, Arc<dyn Runtime>)>> {
890 let owner =
891 self.dispatch
892 .read()
893 .await
894 .get(id)
895 .copied()
896 .ok_or_else(|| AgentError::NotFound {
897 container: id.to_string(),
898 reason: "no dispatch record in CompositeRuntime".to_string(),
899 })?;
900
901 let all: [(DispatchTarget, Option<&Arc<dyn Runtime>>); 4] = [
905 (DispatchTarget::Primary, Some(&self.primary)),
906 (DispatchTarget::Delegate, self.delegate.as_ref()),
907 (DispatchTarget::Vz, self.vz.as_ref()),
908 (DispatchTarget::VzLinux, self.vz_linux.as_ref()),
909 ];
910
911 let label_for = |t: DispatchTarget| match t {
912 DispatchTarget::Primary => "primary",
913 DispatchTarget::Delegate => "delegate",
914 DispatchTarget::Vz => "vz",
915 DispatchTarget::VzLinux => "vz_linux",
916 };
917
918 let mut out: Vec<(&'static str, Arc<dyn Runtime>)> =
919 vec![(label_for(owner), self.runtime_for(owner).clone())];
920 for (target, rt) in all {
921 if target != owner {
922 if let Some(rt) = rt {
923 out.push((label_for(target), rt.clone()));
924 }
925 }
926 }
927 Ok(out)
928 }
929}
930
931#[derive(Default)]
945struct ReadMissAccumulator {
946 soft_err: Option<AgentError>,
948 not_found: Option<AgentError>,
950}
951
952impl ReadMissAccumulator {
953 fn record(&mut self, e: AgentError) {
954 if matches!(e, AgentError::NotFound { .. }) {
955 self.not_found = Some(e);
956 } else {
957 self.soft_err = Some(e);
958 }
959 }
960
961 fn into_error(self, what: &str) -> AgentError {
967 self.soft_err
968 .or(self.not_found)
969 .unwrap_or_else(|| AgentError::Unsupported(format!("no backend could serve {what}")))
970 }
971}
972
973fn one_shot_logs_stream(entries: Vec<LogEntry>, opts: &LogsStreamOptions) -> LogsStream {
982 use futures_util::stream;
983
984 let want_stdout = opts.stdout || !opts.stderr;
988 let want_stderr = opts.stderr || !opts.stdout;
989 let timestamps = opts.timestamps;
990
991 let chunks: Vec<Result<LogChunk>> = entries
992 .into_iter()
993 .filter_map(|e| {
994 let channel = match e.stream {
995 LogStream::Stdout => LogChannel::Stdout,
996 LogStream::Stderr => LogChannel::Stderr,
997 };
998 let keep = match channel {
999 LogChannel::Stdout => want_stdout,
1000 LogChannel::Stderr => want_stderr,
1001 LogChannel::Stdin => false,
1002 };
1003 if !keep {
1004 return None;
1005 }
1006 let mut bytes = e.message.into_bytes();
1007 bytes.push(b'\n');
1008 Some(Ok(LogChunk {
1009 stream: channel,
1010 bytes: bytes::Bytes::from(bytes),
1011 timestamp: timestamps.then_some(e.timestamp),
1012 }))
1013 })
1014 .collect();
1015
1016 Box::pin(stream::iter(chunks))
1017}
1018
1019fn one_shot_stats_stream(stats: &ContainerStats) -> StatsStream {
1029 use futures_util::stream;
1030
1031 let sample = StatsSample {
1032 cpu_total_ns: stats.cpu_usage_usec.saturating_mul(1_000),
1033 cpu_system_ns: 0,
1034 online_cpus: 1,
1035 mem_used_bytes: stats.memory_bytes,
1036 mem_limit_bytes: stats.memory_limit,
1037 net_rx_bytes: 0,
1038 net_tx_bytes: 0,
1039 blkio_read_bytes: 0,
1040 blkio_write_bytes: 0,
1041 pids_current: 0,
1042 pids_limit: None,
1043 timestamp: chrono::Utc::now(),
1044 };
1045 Box::pin(stream::iter(vec![Ok(sample)]))
1046}
1047
1048#[async_trait]
1049impl Runtime for CompositeRuntime {
1050 fn image_store_handles(&self) -> Option<ImageStoreHandles> {
1056 self.primary
1057 .image_store_handles()
1058 .or_else(|| self.delegate.as_ref().and_then(|d| d.image_store_handles()))
1059 }
1060
1061 async fn pull_image(&self, image: &str) -> Result<()> {
1062 if let Err(e) = self.primary.pull_image(image).await {
1069 if matches!(e, AgentError::WrongPlatform { .. }) {
1070 tracing::debug!(
1071 image,
1072 error = %e,
1073 "primary runtime cannot service image (wrong platform); delegating",
1074 );
1075 } else {
1076 return Err(e);
1077 }
1078 }
1079 if let Some(delegate) = &self.delegate {
1080 if let Err(e) = delegate.pull_image(image).await {
1081 tracing::debug!(
1086 image,
1087 error = %e,
1088 "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
1089 );
1090 }
1091 }
1092 for (label, rt) in [
1100 self.vz.as_ref().map(|r| ("vz", r)),
1101 self.vz_linux.as_ref().map(|r| ("vz_linux", r)),
1102 ]
1103 .into_iter()
1104 .flatten()
1105 {
1106 if let Err(e) = rt.pull_image(image).await {
1107 tracing::debug!(
1108 image,
1109 runtime = label,
1110 error = %e,
1111 "vz delegate failed to pull image (likely wrong OS); continuing",
1112 );
1113 }
1114 }
1115
1116 let os_result = self.inspect_image_os(image).await;
1120 self.apply_image_os_inspection(image, os_result).await;
1121 let marker_result = self.inspect_image_runtime_marker(image, None).await;
1122 self.apply_image_runtime_inspection(image, marker_result)
1123 .await;
1124 let isolation_result = self.inspect_image_isolation_marker(image, None).await;
1125 self.apply_image_isolation_inspection(image, isolation_result)
1126 .await;
1127
1128 Ok(())
1129 }
1130
1131 async fn pull_image_with_policy(
1132 &self,
1133 image: &str,
1134 policy: PullPolicy,
1135 auth: Option<&RegistryAuth>,
1136 source: zlayer_spec::SourcePolicy,
1137 ) -> Result<()> {
1138 if matches!(policy, PullPolicy::IfNotPresent) && self.image_present_locally(image).await {
1151 tracing::debug!(
1152 image,
1153 "composite: image already present in a local store (resolvable by ref); \
1154 skipping the remote presence check on the primary",
1155 );
1156 let os_result = self.inspect_image_os(image).await;
1160 self.apply_image_os_inspection(image, os_result).await;
1161 for (label, rt) in [
1173 self.delegate.as_ref().map(|r| ("delegate", r)),
1174 self.vz.as_ref().map(|r| ("vz", r)),
1175 self.vz_linux.as_ref().map(|r| ("vz_linux", r)),
1176 ]
1177 .into_iter()
1178 .flatten()
1179 {
1180 if let Err(e) = rt.pull_image_with_policy(image, policy, auth, source).await {
1181 tracing::debug!(
1182 image,
1183 runtime = label,
1184 error = %e,
1185 "composite: local materialization in delegate runtime failed; continuing",
1186 );
1187 }
1188 }
1189 return Ok(());
1190 }
1191
1192 if let Err(e) = self
1194 .primary
1195 .pull_image_with_policy(image, policy, auth, source)
1196 .await
1197 {
1198 if matches!(e, AgentError::WrongPlatform { .. }) {
1199 tracing::debug!(
1200 image,
1201 error = %e,
1202 "primary runtime cannot service image (wrong platform); delegating",
1203 );
1204 } else {
1205 return Err(e);
1206 }
1207 }
1208 if let Some(delegate) = &self.delegate {
1209 if let Err(e) = delegate
1210 .pull_image_with_policy(image, policy, auth, source)
1211 .await
1212 {
1213 tracing::debug!(
1214 image,
1215 error = %e,
1216 "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
1217 );
1218 }
1219 }
1220 for (label, rt) in [
1224 self.vz.as_ref().map(|r| ("vz", r)),
1225 self.vz_linux.as_ref().map(|r| ("vz_linux", r)),
1226 ]
1227 .into_iter()
1228 .flatten()
1229 {
1230 if let Err(e) = rt.pull_image_with_policy(image, policy, auth, source).await {
1231 tracing::debug!(
1232 image,
1233 runtime = label,
1234 error = %e,
1235 "vz delegate failed to pull image (likely wrong OS); continuing",
1236 );
1237 }
1238 }
1239
1240 let os_result = self.inspect_image_os(image).await;
1241 self.apply_image_os_inspection(image, os_result).await;
1242 let marker_result = self.inspect_image_runtime_marker(image, auth).await;
1243 self.apply_image_runtime_inspection(image, marker_result)
1244 .await;
1245 let isolation_result = self.inspect_image_isolation_marker(image, auth).await;
1246 self.apply_image_isolation_inspection(image, isolation_result)
1247 .await;
1248
1249 Ok(())
1250 }
1251
1252 async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
1264 let target = self.select_for(&id.service, spec).await?;
1265 let is_macos_image = matches!(
1269 self.image_os
1270 .read()
1271 .await
1272 .get(&spec.image.name.to_string())
1273 .copied(),
1274 Some(OsKind::Macos)
1275 );
1276 {
1277 let mut dispatch = self.dispatch.write().await;
1278 dispatch.insert(id.clone(), target);
1279 }
1280 let rt = self.runtime_for(target).clone();
1281 match rt.create_container(id, spec).await {
1282 Ok(()) => Ok(()),
1283 Err(e) => {
1284 if target == DispatchTarget::Primary && is_macos_image && self.vz.is_some() {
1286 tracing::warn!(
1287 service = %id.service,
1288 error = %e,
1289 "seatbelt create failed for macOS image; falling back to native-VZ"
1290 );
1291 {
1292 let mut dispatch = self.dispatch.write().await;
1293 dispatch.insert(id.clone(), DispatchTarget::Vz);
1294 }
1295 return match self
1296 .runtime_for(DispatchTarget::Vz)
1297 .create_container(id, spec)
1298 .await
1299 {
1300 Ok(()) => Ok(()),
1301 Err(e2) => {
1302 self.dispatch.write().await.remove(id);
1303 Err(e2)
1304 }
1305 };
1306 }
1307 self.dispatch.write().await.remove(id);
1310 Err(e)
1311 }
1312 }
1313 }
1314
1315 async fn start_container(&self, id: &ContainerId) -> Result<()> {
1316 let rt = self.lookup(id).await?;
1317 rt.start_container(id).await
1318 }
1319
1320 async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
1321 let rt = self.lookup(id).await?;
1322 rt.stop_container(id, timeout).await
1323 }
1324
1325 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
1326 let rt = self.lookup(id).await?;
1327 let res = rt.remove_container(id).await;
1328 self.dispatch.write().await.remove(id);
1329 res
1330 }
1331
1332 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
1333 let rt = self.lookup(id).await?;
1334 rt.container_state(id).await
1335 }
1336
1337 async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
1338 let backends = self.read_backends(id).await?;
1339 let mut misses = ReadMissAccumulator::default();
1340 for (label, rt) in backends {
1341 match rt.container_logs(id, tail).await {
1342 Ok(logs) => return Ok(logs),
1343 Err(e) => {
1344 tracing::warn!(
1345 container = %id,
1346 runtime = label,
1347 error = %e,
1348 "composite container_logs: backend could not serve logs; trying next backend",
1349 );
1350 misses.record(e);
1351 }
1352 }
1353 }
1354 Err(misses.into_error("container_logs"))
1355 }
1356
1357 async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
1358 let rt = self.lookup(id).await?;
1359 rt.exec(id, cmd).await
1360 }
1361
1362 async fn exec_with_opts(
1363 &self,
1364 id: &ContainerId,
1365 opts: &crate::runtime::ExecOptions,
1366 ) -> Result<(i32, String, String)> {
1367 let rt = self.lookup(id).await?;
1372 rt.exec_with_opts(id, opts).await
1373 }
1374
1375 async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
1376 let rt = self.lookup(id).await?;
1377 rt.exec_stream(id, cmd).await
1378 }
1379
1380 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
1381 let backends = self.read_backends(id).await?;
1382 let mut misses = ReadMissAccumulator::default();
1383 for (label, rt) in backends {
1384 match rt.get_container_stats(id).await {
1385 Ok(stats) => return Ok(stats),
1386 Err(e) => {
1387 tracing::warn!(
1388 container = %id,
1389 runtime = label,
1390 error = %e,
1391 "composite get_container_stats: backend could not serve stats; \
1392 trying next backend",
1393 );
1394 misses.record(e);
1395 }
1396 }
1397 }
1398 Err(misses.into_error("get_container_stats"))
1399 }
1400
1401 async fn update_container_resources(
1402 &self,
1403 id: &ContainerId,
1404 update: &ContainerResourceUpdate,
1405 ) -> Result<ContainerUpdateOutcome> {
1406 let backends = self.read_backends(id).await?;
1415 let mut misses = ReadMissAccumulator::default();
1416 for (label, rt) in backends {
1417 match rt.update_container_resources(id, update).await {
1418 Ok(outcome) => return Ok(outcome),
1419 Err(e) => {
1420 tracing::warn!(
1421 container = %id,
1422 runtime = label,
1423 error = %e,
1424 "composite update_container_resources: backend could not apply update; \
1425 trying next backend",
1426 );
1427 misses.record(e);
1428 }
1429 }
1430 }
1431 Err(misses.into_error("update_container_resources"))
1432 }
1433
1434 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
1435 let rt = self.lookup(id).await?;
1436 rt.wait_container(id).await
1437 }
1438
1439 async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
1440 let rt = self.lookup(id).await?;
1441 rt.wait_outcome(id).await
1442 }
1443
1444 async fn wait_outcome_with_condition(
1445 &self,
1446 id: &ContainerId,
1447 condition: WaitCondition,
1448 ) -> Result<WaitOutcome> {
1449 let rt = self.lookup(id).await?;
1450 rt.wait_outcome_with_condition(id, condition).await
1451 }
1452
1453 async fn rename_container(&self, id: &ContainerId, new_name: &str) -> Result<()> {
1454 let rt = self.lookup(id).await?;
1455 rt.rename_container(id, new_name).await
1456 }
1457
1458 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
1459 let backends = self.read_backends(id).await?;
1460 let mut misses = ReadMissAccumulator::default();
1461 for (label, rt) in backends {
1462 match rt.get_logs(id).await {
1463 Ok(logs) => return Ok(logs),
1464 Err(e) => {
1465 tracing::warn!(
1466 container = %id,
1467 runtime = label,
1468 error = %e,
1469 "composite get_logs: backend could not serve logs; trying next backend",
1470 );
1471 misses.record(e);
1472 }
1473 }
1474 }
1475 Err(misses.into_error("get_logs"))
1476 }
1477
1478 async fn logs_stream(&self, id: &ContainerId, opts: LogsStreamOptions) -> Result<LogsStream> {
1479 let backends = self.read_backends(id).await?;
1490 let mut misses = ReadMissAccumulator::default();
1491 for (label, rt) in &backends {
1492 match rt.logs_stream(id, opts.clone()).await {
1493 Ok(stream) => return Ok(stream),
1494 Err(e) => {
1495 tracing::warn!(
1496 container = %id,
1497 runtime = label,
1498 error = %e,
1499 "composite logs_stream: backend has no native log stream; \
1500 falling back to a one-shot snapshot",
1501 );
1502 misses.record(e);
1503 }
1504 }
1505 }
1506
1507 let tail = opts
1510 .tail
1511 .map_or(1000, |n| usize::try_from(n).unwrap_or(1000));
1512 for (label, rt) in &backends {
1513 match rt.container_logs(id, tail).await {
1514 Ok(entries) => {
1515 return Ok(one_shot_logs_stream(entries, &opts));
1516 }
1517 Err(e) => {
1518 tracing::warn!(
1519 container = %id,
1520 runtime = label,
1521 error = %e,
1522 "composite logs_stream: backend snapshot fallback failed; trying next",
1523 );
1524 misses.record(e);
1525 }
1526 }
1527 }
1528 Err(misses.into_error("container logs"))
1529 }
1530
1531 async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
1532 let backends = self.read_backends(id).await?;
1538 let mut misses = ReadMissAccumulator::default();
1539 for (label, rt) in &backends {
1540 match rt.stats_stream(id).await {
1541 Ok(stream) => return Ok(stream),
1542 Err(e) => {
1543 tracing::warn!(
1544 container = %id,
1545 runtime = label,
1546 error = %e,
1547 "composite stats_stream: backend has no native stats stream; \
1548 falling back to a one-shot sample",
1549 );
1550 misses.record(e);
1551 }
1552 }
1553 }
1554
1555 for (label, rt) in &backends {
1556 match rt.get_container_stats(id).await {
1557 Ok(stats) => return Ok(one_shot_stats_stream(&stats)),
1558 Err(e) => {
1559 tracing::warn!(
1560 container = %id,
1561 runtime = label,
1562 error = %e,
1563 "composite stats_stream: backend sample fallback failed; trying next",
1564 );
1565 misses.record(e);
1566 }
1567 }
1568 }
1569 Err(misses.into_error("container stats"))
1570 }
1571
1572 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
1573 let rt = self.lookup(id).await?;
1574 rt.get_container_pid(id).await
1575 }
1576
1577 fn overlay_attach_kind(&self) -> OverlayAttachKind {
1578 self.vz_linux.as_ref().map_or_else(
1585 || self.primary.overlay_attach_kind(),
1586 |vz| vz.overlay_attach_kind(),
1587 )
1588 }
1589
1590 async fn overlay_attach_kind_for(&self, id: &ContainerId) -> OverlayAttachKind {
1591 match self.lookup(id).await {
1597 Ok(rt) => rt.overlay_attach_kind(),
1598 Err(_) => self.overlay_attach_kind(),
1599 }
1600 }
1601
1602 async fn push_overlay_config(
1603 &self,
1604 id: &ContainerId,
1605 config: &zlayer_types::overlayd::GuestOverlayConfig,
1606 ) -> Result<()> {
1607 let rt = self.lookup(id).await?;
1608 rt.push_overlay_config(id, config).await
1609 }
1610
1611 async fn attach_overlay_ip(&self, id: &ContainerId, overlay_ip: IpAddr) -> Result<()> {
1612 let rt = self.lookup(id).await?;
1613 rt.attach_overlay_ip(id, overlay_ip).await
1614 }
1615
1616 async fn detach_overlay_ip(&self, id: &ContainerId) -> Result<()> {
1617 let rt = self.lookup(id).await?;
1618 rt.detach_overlay_ip(id).await
1619 }
1620
1621 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
1622 let rt = self.lookup(id).await?;
1623 rt.get_container_ip(id).await
1624 }
1625
1626 async fn get_container_port_override(&self, id: &ContainerId) -> Result<Option<u16>> {
1627 let rt = self.lookup(id).await?;
1628 rt.get_container_port_override(id).await
1629 }
1630
1631 #[cfg(target_os = "windows")]
1632 async fn get_container_namespace_id(
1633 &self,
1634 id: &ContainerId,
1635 ) -> Result<Option<windows::core::GUID>> {
1636 let rt = self.lookup(id).await?;
1637 rt.get_container_namespace_id(id).await
1638 }
1639
1640 async fn sync_container_volumes(&self, id: &ContainerId) -> Result<()> {
1641 let rt = self.lookup(id).await?;
1642 rt.sync_container_volumes(id).await
1643 }
1644
1645 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
1646 let mut out: Vec<ImageInfo> = Vec::new();
1657 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1658 let mut any_ok = false;
1659 let mut last_err: Option<AgentError> = None;
1660
1661 for (label, rt) in [
1662 Some(("primary", &self.primary)),
1663 self.delegate.as_ref().map(|d| ("delegate", d)),
1664 self.vz.as_ref().map(|d| ("vz", d)),
1665 self.vz_linux.as_ref().map(|d| ("vz_linux", d)),
1666 ]
1667 .into_iter()
1668 .flatten()
1669 {
1670 match rt.list_images().await {
1671 Ok(images) => {
1672 any_ok = true;
1673 for img in images {
1674 if seen.insert(img.reference.clone()) {
1677 out.push(img);
1678 }
1679 }
1680 }
1681 Err(e) => {
1682 tracing::debug!(
1683 runtime = label,
1684 error = %e,
1685 "composite list_images: backend returned an error; skipping it",
1686 );
1687 last_err = Some(e);
1688 }
1689 }
1690 }
1691
1692 if any_ok {
1696 Ok(out)
1697 } else {
1698 Err(last_err.unwrap_or_else(|| {
1699 AgentError::Unsupported("no runtime implements list_images".into())
1700 }))
1701 }
1702 }
1703
1704 async fn remove_image(&self, image: &str, force: bool) -> Result<()> {
1705 match self.primary.remove_image(image, force).await {
1706 Ok(()) => Ok(()),
1707 Err(primary_err) => {
1708 if let Some(delegate) = &self.delegate {
1709 match delegate.remove_image(image, force).await {
1710 Ok(()) => Ok(()),
1711 Err(delegate_err) => {
1712 tracing::debug!(
1713 image,
1714 %delegate_err,
1715 "delegate remove_image also failed; returning primary error",
1716 );
1717 Err(primary_err)
1718 }
1719 }
1720 } else {
1721 Err(primary_err)
1722 }
1723 }
1724 }
1725 }
1726
1727 async fn prune_images(&self) -> Result<PruneResult> {
1728 let mut result = match self.primary.prune_images().await {
1734 Ok(r) => r,
1735 Err(AgentError::Unsupported(reason)) if self.delegate.is_some() => {
1736 tracing::debug!(
1737 %reason,
1738 "primary runtime does not support prune_images; relying on delegate",
1739 );
1740 PruneResult::default()
1741 }
1742 Err(e) => return Err(e),
1743 };
1744 if let Some(delegate) = &self.delegate {
1745 match delegate.prune_images().await {
1746 Ok(extra) => {
1747 result.deleted.extend(extra.deleted);
1748 result.space_reclaimed =
1749 result.space_reclaimed.saturating_add(extra.space_reclaimed);
1750 }
1751 Err(e) => tracing::warn!(
1752 error = %e,
1753 "delegate runtime prune_images failed; returning primary result only",
1754 ),
1755 }
1756 }
1757 Ok(result)
1758 }
1759
1760 async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
1761 let rt = self.lookup(id).await?;
1762 rt.kill_container(id, signal).await
1763 }
1764
1765 async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
1766 match self.primary.tag_image(source, target).await {
1767 Ok(()) => Ok(()),
1768 Err(primary_err) => {
1769 if let Some(delegate) = &self.delegate {
1770 match delegate.tag_image(source, target).await {
1771 Ok(()) => Ok(()),
1772 Err(delegate_err) => {
1773 tracing::debug!(
1774 source,
1775 target,
1776 %delegate_err,
1777 "delegate tag_image also failed; returning primary error",
1778 );
1779 Err(primary_err)
1780 }
1781 }
1782 } else {
1783 Err(primary_err)
1784 }
1785 }
1786 }
1787 }
1788
1789 async fn inspect_image_native(&self, image: &str) -> Result<ImageInspectInfo> {
1790 match self.primary.inspect_image_native(image).await {
1797 Ok(info) => Ok(info),
1798 Err(primary_err) => {
1799 if let Some(delegate) = &self.delegate {
1800 match delegate.inspect_image_native(image).await {
1801 Ok(info) => Ok(info),
1802 Err(delegate_err) => {
1803 tracing::debug!(
1804 image,
1805 %delegate_err,
1806 "delegate inspect_image_native also failed; returning primary error",
1807 );
1808 Err(primary_err)
1809 }
1810 }
1811 } else {
1812 Err(primary_err)
1813 }
1814 }
1815 }
1816 }
1817
1818 async fn inspect_detailed(&self, id: &ContainerId) -> Result<ContainerInspectDetails> {
1819 let rt = self.lookup(id).await?;
1820 rt.inspect_detailed(id).await
1821 }
1822
1823 fn set_secrets_provider(&self, provider: std::sync::Arc<dyn zlayer_secrets::SecretsProvider>) {
1836 self.primary.set_secrets_provider(provider.clone());
1837 if let Some(delegate) = &self.delegate {
1838 delegate.set_secrets_provider(provider.clone());
1839 }
1840 if let Some(vz) = &self.vz {
1841 vz.set_secrets_provider(provider.clone());
1842 }
1843 if let Some(vz_linux) = &self.vz_linux {
1844 vz_linux.set_secrets_provider(provider);
1845 }
1846 }
1847}
1848
1849fn sanitize_image_name(image: &str) -> String {
1855 image.replace(['/', ':', '@'], "_")
1856}
1857
1858#[cfg(test)]
1859mod tests {
1860 use super::*;
1861 use crate::cgroups_stats::ContainerStats;
1862 use std::sync::Mutex as StdMutex;
1863 use zlayer_spec::{ArchKind, DeploymentSpec, TargetPlatform};
1864
1865 fn db_tempdir() -> tempfile::TempDir {
1886 let base = std::env::var_os("CARGO_TARGET_DIR").map_or_else(
1887 || {
1888 std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
1889 .join("../../target")
1890 .join("test-tmp")
1891 },
1892 std::path::PathBuf::from,
1893 );
1894 if std::fs::create_dir_all(&base).is_ok() {
1895 if let Ok(td) = tempfile::Builder::new()
1896 .prefix("zql-test-")
1897 .tempdir_in(&base)
1898 {
1899 return td;
1900 }
1901 }
1902 tempfile::tempdir().unwrap()
1905 }
1906
1907 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1910 enum Role {
1911 Primary,
1912 Delegate,
1913 Vz,
1914 VzLinux,
1915 }
1916
1917 type CallRecord = (Role, String, Option<ContainerId>);
1919 type CallLog = Arc<StdMutex<Vec<CallRecord>>>;
1921
1922 struct MockRuntime {
1929 role: Role,
1930 calls: CallLog,
1931 list_images_response: Vec<ImageInfo>,
1932 list_images_error: Option<String>,
1936 pull_image_error: Option<String>,
1937 pull_image_wrong_platform: Option<(&'static str, &'static str)>,
1942 stream_unsupported: bool,
1948 reads_not_found: bool,
1954 logs_response: Vec<LogEntry>,
1958 stats_snapshot_unsupported: bool,
1963 prune_images_response: Option<PruneResult>,
1968 create_err: Option<String>,
1973 }
1974
1975 impl MockRuntime {
1976 fn new(role: Role, calls: CallLog) -> Self {
1977 Self {
1978 role,
1979 calls,
1980 list_images_response: Vec::new(),
1981 list_images_error: None,
1982 pull_image_error: None,
1983 pull_image_wrong_platform: None,
1984 stream_unsupported: false,
1985 reads_not_found: false,
1986 logs_response: Vec::new(),
1987 stats_snapshot_unsupported: false,
1988 prune_images_response: None,
1989 create_err: None,
1990 }
1991 }
1992
1993 fn with_create_error(mut self, msg: &str) -> Self {
1995 self.create_err = Some(msg.to_string());
1996 self
1997 }
1998
1999 fn with_stream_unsupported(mut self) -> Self {
2001 self.stream_unsupported = true;
2002 self
2003 }
2004
2005 fn with_reads_not_found(mut self) -> Self {
2007 self.reads_not_found = true;
2008 self
2009 }
2010
2011 fn with_logs(mut self, logs: Vec<LogEntry>) -> Self {
2013 self.logs_response = logs;
2014 self
2015 }
2016
2017 fn with_stats_snapshot_unsupported(mut self) -> Self {
2019 self.stats_snapshot_unsupported = true;
2020 self
2021 }
2022
2023 fn with_prune_result(mut self, result: PruneResult) -> Self {
2025 self.prune_images_response = Some(result);
2026 self
2027 }
2028
2029 fn build_wrong_platform_error(&self, image: &str) -> Option<AgentError> {
2030 self.pull_image_wrong_platform
2031 .map(|(expected, actual)| AgentError::WrongPlatform {
2032 runtime: match self.role {
2033 Role::Primary => "primary-mock".to_string(),
2034 Role::Delegate => "delegate-mock".to_string(),
2035 Role::Vz => "vz-mock".to_string(),
2036 Role::VzLinux => "vz-linux-mock".to_string(),
2037 },
2038 expected: expected.to_string(),
2039 actual: actual.to_string(),
2040 image: image.to_string(),
2041 })
2042 }
2043
2044 fn record(&self, method: &str, id: Option<&ContainerId>) {
2045 self.calls
2046 .lock()
2047 .expect("mock call-log mutex poisoned")
2048 .push((self.role, method.to_string(), id.cloned()));
2049 }
2050 }
2051
2052 #[async_trait]
2053 impl Runtime for MockRuntime {
2054 async fn pull_image(&self, image: &str) -> Result<()> {
2055 self.record("pull_image", None);
2056 if let Some(err) = self.build_wrong_platform_error(image) {
2057 return Err(err);
2058 }
2059 if let Some(msg) = &self.pull_image_error {
2060 return Err(AgentError::Internal(msg.clone()));
2061 }
2062 Ok(())
2063 }
2064
2065 async fn pull_image_with_policy(
2066 &self,
2067 image: &str,
2068 _policy: PullPolicy,
2069 _auth: Option<&RegistryAuth>,
2070 _source: zlayer_spec::SourcePolicy,
2071 ) -> Result<()> {
2072 self.record("pull_image_with_policy", None);
2073 if let Some(err) = self.build_wrong_platform_error(image) {
2074 return Err(err);
2075 }
2076 if let Some(msg) = &self.pull_image_error {
2077 return Err(AgentError::Internal(msg.clone()));
2078 }
2079 Ok(())
2080 }
2081
2082 async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
2083 self.record("create_container", Some(id));
2084 if let Some(reason) = &self.create_err {
2085 return Err(AgentError::CreateFailed {
2086 id: id.to_string(),
2087 reason: reason.clone(),
2088 });
2089 }
2090 Ok(())
2091 }
2092
2093 async fn start_container(&self, id: &ContainerId) -> Result<()> {
2094 self.record("start_container", Some(id));
2095 Ok(())
2096 }
2097
2098 async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
2099 self.record("stop_container", Some(id));
2100 Ok(())
2101 }
2102
2103 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
2104 self.record("remove_container", Some(id));
2105 Ok(())
2106 }
2107
2108 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
2109 self.record("container_state", Some(id));
2110 Ok(ContainerState::Running)
2111 }
2112
2113 async fn container_logs(&self, id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
2114 self.record("container_logs", Some(id));
2115 if self.reads_not_found {
2116 return Err(mock_not_found());
2117 }
2118 Ok(self.logs_response.clone())
2119 }
2120
2121 async fn exec(&self, id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
2122 self.record("exec", Some(id));
2123 Ok((0, String::new(), String::new()))
2124 }
2125
2126 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
2127 self.record("get_container_stats", Some(id));
2128 if self.reads_not_found {
2129 return Err(mock_not_found());
2130 }
2131 if self.stats_snapshot_unsupported {
2132 return Err(AgentError::Unsupported("mock has no snapshot stats".into()));
2133 }
2134 Ok(ContainerStats {
2135 cpu_usage_usec: 1_000,
2136 memory_bytes: 4096,
2137 memory_limit: 8192,
2138 timestamp: std::time::Instant::now(),
2139 })
2140 }
2141
2142 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
2143 self.record("wait_container", Some(id));
2144 Ok(0)
2145 }
2146
2147 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
2148 self.record("get_logs", Some(id));
2149 if self.reads_not_found {
2150 return Err(mock_not_found());
2151 }
2152 Ok(self.logs_response.clone())
2153 }
2154
2155 async fn logs_stream(
2156 &self,
2157 id: &ContainerId,
2158 _opts: LogsStreamOptions,
2159 ) -> Result<LogsStream> {
2160 self.record("logs_stream", Some(id));
2161 if self.reads_not_found {
2162 return Err(mock_not_found());
2163 }
2164 if self.stream_unsupported {
2165 return Err(AgentError::Unsupported("mock has no log stream".into()));
2166 }
2167 Ok(one_shot_logs_stream(
2169 self.logs_response.clone(),
2170 &LogsStreamOptions::default(),
2171 ))
2172 }
2173
2174 async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
2175 use futures_util::stream;
2176 self.record("stats_stream", Some(id));
2177 if self.reads_not_found {
2178 return Err(mock_not_found());
2179 }
2180 if self.stream_unsupported {
2181 return Err(AgentError::Unsupported("mock has no stats stream".into()));
2182 }
2183 Ok(Box::pin(stream::iter(vec![Ok(StatsSample {
2184 cpu_total_ns: 0,
2185 cpu_system_ns: 0,
2186 online_cpus: 1,
2187 mem_used_bytes: 4096,
2188 mem_limit_bytes: 8192,
2189 net_rx_bytes: 0,
2190 net_tx_bytes: 0,
2191 blkio_read_bytes: 0,
2192 blkio_write_bytes: 0,
2193 pids_current: 0,
2194 pids_limit: None,
2195 timestamp: chrono::Utc::now(),
2196 })])))
2197 }
2198
2199 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
2200 self.record("get_container_pid", Some(id));
2201 Ok(None)
2202 }
2203
2204 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
2205 self.record("get_container_ip", Some(id));
2206 Ok(None)
2207 }
2208
2209 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
2210 self.record("list_images", None);
2211 if let Some(msg) = &self.list_images_error {
2212 return Err(AgentError::Unsupported(msg.clone()));
2213 }
2214 Ok(self.list_images_response.clone())
2215 }
2216
2217 async fn prune_images(&self) -> Result<PruneResult> {
2218 self.record("prune_images", None);
2219 match &self.prune_images_response {
2220 Some(result) => Ok(result.clone()),
2221 None => Err(AgentError::Unsupported(
2222 "mock runtime does not support prune_images".into(),
2223 )),
2224 }
2225 }
2226 }
2227
2228 fn make_spec(image: &str, platform: Option<TargetPlatform>) -> ServiceSpec {
2232 let yaml = format!(
2233 r"
2234version: v1
2235deployment: test
2236services:
2237 test:
2238 rtype: service
2239 image:
2240 name: {image}
2241 endpoints:
2242 - name: http
2243 protocol: http
2244 port: 8080
2245"
2246 );
2247 let mut spec = serde_yaml::from_str::<DeploymentSpec>(&yaml)
2248 .expect("valid deployment yaml")
2249 .services
2250 .remove("test")
2251 .expect("service 'test' present");
2252 spec.platform = platform;
2253 spec
2254 }
2255
2256 fn cid(service: &str, replica: u32) -> ContainerId {
2257 ContainerId::new(service.to_string(), replica)
2258 }
2259
2260 fn make_composite(with_delegate: bool) -> (CompositeRuntime, CallLog) {
2261 let calls = Arc::new(StdMutex::new(Vec::new()));
2262 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
2263 let delegate = if with_delegate {
2264 Some(Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>)
2265 } else {
2266 None
2267 };
2268 (
2269 CompositeRuntime::new(primary as Arc<dyn Runtime>, delegate),
2270 calls,
2271 )
2272 }
2273
2274 fn role_for(calls: &[CallRecord], method: &str) -> Option<Role> {
2275 calls
2276 .iter()
2277 .find(|(_, m, _)| m == method)
2278 .map(|(role, _, _)| *role)
2279 }
2280
2281 fn mock_not_found() -> AgentError {
2283 AgentError::NotFound {
2284 container: "mock".to_string(),
2285 reason: "mock backend does not own this container".to_string(),
2286 }
2287 }
2288
2289 #[tokio::test]
2290 async fn dispatch_windows_spec_goes_to_primary() {
2291 let (rt, calls) = make_composite(true);
2292 let id = cid("win-svc", 0);
2293 let spec = make_spec(
2294 "mcr.microsoft.com/windows/nanoserver:ltsc2022",
2295 Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
2296 );
2297
2298 rt.create_container(&id, &spec).await.unwrap();
2299 rt.start_container(&id).await.unwrap();
2300
2301 let calls = calls.lock().unwrap();
2302 assert_eq!(
2303 role_for(&calls, "create_container"),
2304 Some(Role::Primary),
2305 "create_container should hit primary for Windows spec"
2306 );
2307 assert_eq!(
2308 role_for(&calls, "start_container"),
2309 Some(Role::Primary),
2310 "start_container should hit primary for Windows spec"
2311 );
2312 }
2313
2314 #[tokio::test]
2315 async fn dispatch_linux_spec_goes_to_delegate() {
2316 let (rt, calls) = make_composite(true);
2317 let id = cid("lin-svc", 0);
2318 let spec = make_spec(
2319 "docker.io/library/alpine:3.19",
2320 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
2321 );
2322
2323 rt.create_container(&id, &spec).await.unwrap();
2324 rt.start_container(&id).await.unwrap();
2325
2326 let calls = calls.lock().unwrap();
2327 assert_eq!(
2328 role_for(&calls, "create_container"),
2329 Some(Role::Delegate),
2330 "create_container should hit delegate for Linux spec"
2331 );
2332 assert_eq!(
2333 role_for(&calls, "start_container"),
2334 Some(Role::Delegate),
2335 "start_container should hit delegate for Linux spec"
2336 );
2337 }
2338
2339 #[tokio::test]
2340 async fn dispatch_linux_without_delegate_errors() {
2341 let (rt, _calls) = make_composite(false);
2345 let id = cid("lin-svc", 0);
2346 let spec = make_spec(
2347 "docker.io/library/alpine:3.19",
2348 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
2349 );
2350
2351 let err = rt.create_container(&id, &spec).await.unwrap_err();
2352 match err {
2353 AgentError::RouteToPeer {
2354 service,
2355 required_os,
2356 reason,
2357 } => {
2358 assert_eq!(service, "lin-svc");
2359 assert_eq!(required_os, "linux");
2360 assert!(
2361 reason.contains("--install-wsl") && reason.contains("Linux peer"),
2362 "reason must name both remediations, got: {reason}"
2363 );
2364 }
2365 other => panic!("expected RouteToPeer, got {other:?}"),
2366 }
2367 }
2368
2369 #[tokio::test]
2370 async fn dispatch_linux_image_cache_without_delegate_routes_to_peer() {
2371 let (rt, _calls) = make_composite(false);
2376 let id = cid("svc", 0);
2377 let image = "docker.io/library/nginx:1.25";
2378 rt.record_image_os(image, OsKind::Linux).await;
2379
2380 let spec = make_spec(image, None);
2381 let err = rt.create_container(&id, &spec).await.unwrap_err();
2382 match err {
2383 AgentError::RouteToPeer {
2384 service,
2385 required_os,
2386 reason,
2387 } => {
2388 assert_eq!(service, "svc");
2389 assert_eq!(required_os, "linux");
2390 assert!(
2391 reason.contains(image),
2392 "reason should mention the image name, got: {reason}"
2393 );
2394 assert!(
2395 reason.contains("--install-wsl") && reason.contains("Linux peer"),
2396 "reason must name both remediations, got: {reason}"
2397 );
2398 }
2399 other => panic!("expected RouteToPeer, got {other:?}"),
2400 }
2401 }
2402
2403 #[tokio::test]
2404 async fn dispatch_macos_spec_goes_to_primary() {
2405 let (rt, calls) = make_composite(true);
2406 let id = cid("mac-svc", 0);
2407 let spec = make_spec(
2408 "ghcr.io/zlayer/macos:latest",
2409 Some(TargetPlatform::new(OsKind::Macos, ArchKind::Arm64)),
2410 );
2411
2412 rt.create_container(&id, &spec).await.unwrap();
2413
2414 let calls = calls.lock().unwrap();
2415 assert_eq!(
2416 role_for(&calls, "create_container"),
2417 Some(Role::Primary),
2418 "create_container should hit primary for Macos spec"
2419 );
2420 }
2421
2422 #[tokio::test]
2423 async fn dispatch_no_platform_no_image_os_falls_through_to_primary() {
2424 let (rt, calls) = make_composite(true);
2425 let id = cid("svc", 0);
2426 let spec = make_spec("docker.io/library/nginx:1.25", None);
2427
2428 rt.create_container(&id, &spec).await.unwrap();
2429
2430 let calls = calls.lock().unwrap();
2431 assert_eq!(
2432 role_for(&calls, "create_container"),
2433 Some(Role::Primary),
2434 "fall-through should pick primary when both platform and image-OS cache are unknown"
2435 );
2436 }
2437
2438 #[tokio::test]
2439 async fn dispatch_uses_image_os_cache_when_platform_missing() {
2440 let (rt, calls) = make_composite(true);
2441 let id = cid("svc", 0);
2442 let image = "docker.io/library/nginx:1.25";
2443 rt.record_image_os(image, OsKind::Linux).await;
2444
2445 let spec = make_spec(image, None);
2446 rt.create_container(&id, &spec).await.unwrap();
2447
2448 let calls = calls.lock().unwrap();
2449 assert_eq!(
2450 role_for(&calls, "create_container"),
2451 Some(Role::Delegate),
2452 "image-OS cache should route Linux images to the delegate"
2453 );
2454 }
2455
2456 fn make_composite_with_vz() -> (CompositeRuntime, CallLog) {
2459 let calls = Arc::new(StdMutex::new(Vec::new()));
2460 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
2461 let delegate =
2462 Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>;
2463 let vz = Arc::new(MockRuntime::new(Role::Vz, Arc::clone(&calls))) as Arc<dyn Runtime>;
2464 let rt = CompositeRuntime::new(primary as Arc<dyn Runtime>, Some(delegate))
2465 .with_vz_delegate(Some(vz));
2466 (rt, calls)
2467 }
2468
2469 #[tokio::test]
2470 async fn dispatch_vz_bundle_annotation_auto_routes_to_vz() {
2471 let (rt, calls) = make_composite_with_vz();
2472 let id = cid("mac-svc", 0);
2473 let image = "ghcr.io/org/macos-vz:sequoia";
2474 rt.record_image_runtime(image, "vz".to_string()).await;
2476
2477 let spec = make_spec(image, None);
2478 rt.create_container(&id, &spec).await.unwrap();
2479
2480 let calls = calls.lock().unwrap();
2481 assert_eq!(
2482 role_for(&calls, "create_container"),
2483 Some(Role::Vz),
2484 "a com.zlayer.runtime=vz bundle should auto-route to the VZ runtime"
2485 );
2486 }
2487
2488 #[tokio::test]
2489 async fn dispatch_vz_label_forces_vz() {
2490 let (rt, calls) = make_composite_with_vz();
2491 let id = cid("mac-svc", 0);
2492 let mut spec = make_spec("ghcr.io/org/whatever:1", None);
2493 spec.labels
2494 .insert("com.zlayer.isolation".to_string(), "vz".to_string());
2495
2496 rt.create_container(&id, &spec).await.unwrap();
2497
2498 let calls = calls.lock().unwrap();
2499 assert_eq!(
2500 role_for(&calls, "create_container"),
2501 Some(Role::Vz),
2502 "an explicit com.zlayer.isolation=vz label should force the VZ runtime"
2503 );
2504 }
2505
2506 #[tokio::test]
2507 async fn dispatch_sandbox_label_overrides_vz_bundle() {
2508 let (rt, calls) = make_composite_with_vz();
2509 let id = cid("mac-svc", 0);
2510 let image = "ghcr.io/org/macos-vz:sequoia";
2511 rt.record_image_runtime(image, "vz".to_string()).await;
2512
2513 let mut spec = make_spec(image, None);
2514 spec.labels
2515 .insert("com.zlayer.isolation".to_string(), "sandbox".to_string());
2516 rt.create_container(&id, &spec).await.unwrap();
2517
2518 let calls = calls.lock().unwrap();
2519 assert_eq!(
2520 role_for(&calls, "create_container"),
2521 Some(Role::Primary),
2522 "com.zlayer.isolation=sandbox should opt out of VZ auto-detect (force the sandbox)"
2523 );
2524 }
2525
2526 fn make_composite_with_vz_primary(primary: MockRuntime) -> (CompositeRuntime, CallLog) {
2530 let calls = Arc::clone(&primary.calls);
2531 let primary = Arc::new(primary);
2532 let delegate =
2533 Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>;
2534 let vz = Arc::new(MockRuntime::new(Role::Vz, Arc::clone(&calls))) as Arc<dyn Runtime>;
2535 let rt = CompositeRuntime::new(primary as Arc<dyn Runtime>, Some(delegate))
2536 .with_vz_delegate(Some(vz));
2537 (rt, calls)
2538 }
2539
2540 #[tokio::test]
2541 async fn macos_image_falls_back_to_vz_when_seatbelt_create_fails() {
2542 let primary = MockRuntime::new(Role::Primary, Arc::new(StdMutex::new(Vec::new())))
2543 .with_create_error("seatbelt boom");
2544 let (rt, calls) = make_composite_with_vz_primary(primary);
2545 let id = cid("mac-svc", 0);
2546 let image = "ghcr.io/org/macos-native:sequoia";
2547 rt.record_image_os(image, OsKind::Macos).await;
2548
2549 let spec = make_spec(image, None);
2550 rt.create_container(&id, &spec).await.unwrap();
2552
2553 rt.start_container(&id).await.unwrap();
2555
2556 let calls = calls.lock().unwrap();
2557 let creates: Vec<Role> = calls
2558 .iter()
2559 .filter(|(_, m, _)| m == "create_container")
2560 .map(|(role, _, _)| *role)
2561 .collect();
2562 assert_eq!(
2563 creates,
2564 vec![Role::Primary, Role::Vz],
2565 "seatbelt create should be attempted first, then native-VZ as fallback"
2566 );
2567 assert_eq!(
2568 role_for(&calls, "start_container"),
2569 Some(Role::Vz),
2570 "after the fallback the dispatch cache should route the container to VZ"
2571 );
2572 }
2573
2574 #[tokio::test]
2575 async fn macos_image_stays_on_primary_when_seatbelt_create_succeeds() {
2576 let (rt, calls) = make_composite_with_vz();
2577 let id = cid("mac-svc", 0);
2578 let image = "ghcr.io/org/macos-native:sequoia";
2579 rt.record_image_os(image, OsKind::Macos).await;
2580
2581 let spec = make_spec(image, None);
2582 rt.create_container(&id, &spec).await.unwrap();
2583
2584 let calls = calls.lock().unwrap();
2585 let creates: Vec<Role> = calls
2586 .iter()
2587 .filter(|(_, m, _)| m == "create_container")
2588 .map(|(role, _, _)| *role)
2589 .collect();
2590 assert_eq!(
2591 creates,
2592 vec![Role::Primary],
2593 "a successful Seatbelt create must stay on Primary with no VZ fallback"
2594 );
2595 }
2596
2597 #[tokio::test]
2598 async fn linux_image_create_failure_does_not_fall_back_to_vz() {
2599 let primary = MockRuntime::new(Role::Primary, Arc::new(StdMutex::new(Vec::new())))
2603 .with_create_error("seatbelt boom");
2604 let (rt, calls) = make_composite_with_vz_primary(primary);
2605 let id = cid("lin-svc", 0);
2606 let image = "docker.io/library/alpine:3.19";
2607 rt.record_image_os(image, OsKind::Linux).await;
2608
2609 let spec = make_spec(image, None);
2610 rt.create_container(&id, &spec).await.unwrap();
2614
2615 let calls = calls.lock().unwrap();
2616 let creates: Vec<Role> = calls
2617 .iter()
2618 .filter(|(_, m, _)| m == "create_container")
2619 .map(|(role, _, _)| *role)
2620 .collect();
2621 assert_eq!(
2622 creates,
2623 vec![Role::Delegate],
2624 "a Linux image must route to the delegate and never trigger the macOS VZ fallback"
2625 );
2626 }
2627
2628 fn make_composite_with_vz_linux() -> (CompositeRuntime, CallLog) {
2631 let calls = Arc::new(StdMutex::new(Vec::new()));
2632 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
2633 let delegate =
2634 Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>;
2635 let vz_linux =
2636 Arc::new(MockRuntime::new(Role::VzLinux, Arc::clone(&calls))) as Arc<dyn Runtime>;
2637 let rt = CompositeRuntime::new(primary as Arc<dyn Runtime>, Some(delegate))
2638 .with_vz_linux_delegate(Some(vz_linux));
2639 (rt, calls)
2640 }
2641
2642 #[tokio::test]
2643 async fn dispatch_vz_linux_label_forces_vz_linux() {
2644 let (rt, calls) = make_composite_with_vz_linux();
2645 let id = cid("lin-svc", 0);
2646 let mut spec = make_spec("docker.io/library/alpine:3.19", None);
2647 spec.labels
2648 .insert("com.zlayer.isolation".to_string(), "vz-linux".to_string());
2649
2650 rt.create_container(&id, &spec).await.unwrap();
2651
2652 let calls = calls.lock().unwrap();
2653 assert_eq!(
2654 role_for(&calls, "create_container"),
2655 Some(Role::VzLinux),
2656 "com.zlayer.isolation=vz-linux must force the VZ Linux runtime"
2657 );
2658 }
2659
2660 #[tokio::test]
2661 async fn dispatch_vz_linux_marker_auto_routes_to_vz_linux() {
2662 let (rt, calls) = make_composite_with_vz_linux();
2663 let id = cid("lin-svc", 0);
2664 let image = "ghcr.io/org/linux-vz:bookworm";
2665 rt.record_image_runtime(image, "vz-linux".to_string()).await;
2666
2667 let spec = make_spec(image, None);
2668 rt.create_container(&id, &spec).await.unwrap();
2669
2670 let calls = calls.lock().unwrap();
2671 assert_eq!(
2672 role_for(&calls, "create_container"),
2673 Some(Role::VzLinux),
2674 "a com.zlayer.runtime=vz-linux marker should auto-route to the VZ Linux runtime"
2675 );
2676 }
2677
2678 #[tokio::test]
2679 async fn dispatch_image_isolation_default_sandbox_routes_to_primary() {
2680 let (rt, calls) = make_composite_with_vz_linux();
2684 let id = cid("mac-svc", 0);
2685 let image = "ghcr.io/org/seatbelt-app:latest";
2686 rt.record_image_isolation(image, "sandbox".to_string())
2687 .await;
2688
2689 let spec = make_spec(image, None);
2690 rt.create_container(&id, &spec).await.unwrap();
2691
2692 let calls = calls.lock().unwrap();
2693 assert_eq!(
2694 role_for(&calls, "create_container"),
2695 Some(Role::Primary),
2696 "an image-declared isolation=sandbox default must route to the Seatbelt sandbox"
2697 );
2698 }
2699
2700 #[tokio::test]
2701 async fn dispatch_image_isolation_default_vz_linux_routes_to_vz_linux() {
2702 let (rt, calls) = make_composite_with_vz_linux();
2705 let id = cid("lin-svc", 0);
2706 let image = "ghcr.io/org/linux-app:latest";
2707 rt.record_image_isolation(image, "vz-linux".to_string())
2708 .await;
2709
2710 let spec = make_spec(image, None);
2711 rt.create_container(&id, &spec).await.unwrap();
2712
2713 let calls = calls.lock().unwrap();
2714 assert_eq!(
2715 role_for(&calls, "create_container"),
2716 Some(Role::VzLinux),
2717 "an image-declared isolation=vz-linux default must route to the VZ Linux runtime"
2718 );
2719 }
2720
2721 #[tokio::test]
2722 async fn dispatch_image_isolation_sandbox_guarded_to_vz_linux_for_linux_image() {
2723 let (rt, calls) = make_composite_with_vz_linux();
2726 let id = cid("lin-svc", 0);
2727 let image = "docker.io/library/nginx:1.25";
2728 rt.record_image_os(image, OsKind::Linux).await;
2729 rt.record_image_isolation(image, "sandbox".to_string())
2730 .await;
2731
2732 let spec = make_spec(image, None);
2733 rt.create_container(&id, &spec).await.unwrap();
2734
2735 let calls = calls.lock().unwrap();
2736 assert_eq!(
2737 role_for(&calls, "create_container"),
2738 Some(Role::VzLinux),
2739 "the Linux-image guard must override an image-declared sandbox default to VZ-Linux"
2740 );
2741 }
2742
2743 #[tokio::test]
2744 async fn dispatch_spec_runtime_overrides_image_isolation_default() {
2745 let (rt, calls) = make_composite_with_vz();
2748 let id = cid("mac-svc", 0);
2749 let image = "ghcr.io/org/whatever:1";
2750 rt.record_image_isolation(image, "sandbox".to_string())
2751 .await;
2752
2753 let mut spec = make_spec(image, None);
2754 spec.runtime = Some(RuntimeIsolation::Vz);
2755 rt.create_container(&id, &spec).await.unwrap();
2756
2757 let calls = calls.lock().unwrap();
2758 assert_eq!(
2759 role_for(&calls, "create_container"),
2760 Some(Role::Vz),
2761 "spec.runtime=Vz must override an image-declared isolation=sandbox default"
2762 );
2763 }
2764
2765 #[tokio::test]
2766 async fn dispatch_isolation_label_overrides_image_isolation_default() {
2767 let (rt, calls) = make_composite_with_vz();
2770 let id = cid("mac-svc", 0);
2771 let image = "ghcr.io/org/whatever:1";
2772 rt.record_image_isolation(image, "sandbox".to_string())
2773 .await;
2774
2775 let mut spec = make_spec(image, None);
2776 spec.labels
2777 .insert("com.zlayer.isolation".to_string(), "vz".to_string());
2778 rt.create_container(&id, &spec).await.unwrap();
2779
2780 let calls = calls.lock().unwrap();
2781 assert_eq!(
2782 role_for(&calls, "create_container"),
2783 Some(Role::Vz),
2784 "an explicit com.zlayer.isolation=vz label must override an image-declared default"
2785 );
2786 }
2787
2788 #[tokio::test]
2789 async fn dispatch_linux_platform_with_vz_linux_routes_to_vz_linux() {
2790 let (rt, calls) = make_composite_with_vz_linux();
2791 let id = cid("lin-svc", 0);
2792 let spec = make_spec(
2795 "docker.io/library/alpine:3.19",
2796 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Arm64)),
2797 );
2798
2799 rt.create_container(&id, &spec).await.unwrap();
2800
2801 let calls = calls.lock().unwrap();
2802 assert_eq!(
2803 role_for(&calls, "create_container"),
2804 Some(Role::VzLinux),
2805 "a Linux platform spec must default to the VZ Linux runtime when present"
2806 );
2807 }
2808
2809 #[tokio::test]
2810 async fn dispatch_linux_image_os_with_vz_linux_routes_to_vz_linux() {
2811 let (rt, calls) = make_composite_with_vz_linux();
2812 let id = cid("lin-svc", 0);
2813 let image = "docker.io/library/nginx:1.25";
2814 rt.record_image_os(image, OsKind::Linux).await;
2815
2816 let spec = make_spec(image, None);
2817 rt.create_container(&id, &spec).await.unwrap();
2818
2819 let calls = calls.lock().unwrap();
2820 assert_eq!(
2821 role_for(&calls, "create_container"),
2822 Some(Role::VzLinux),
2823 "a Linux image-OS cache hit must default to the VZ Linux runtime when present"
2824 );
2825 }
2826
2827 #[tokio::test]
2828 async fn dispatch_macos_image_os_with_vz_linux_routes_to_primary() {
2829 let (rt, calls) = make_composite_with_vz_linux();
2833 let id = cid("mac-svc", 0);
2834 let image = "ghcr.io/zlayer/macos-native:latest";
2835 rt.record_image_os(image, OsKind::Macos).await;
2836
2837 let spec = make_spec(image, None);
2838 rt.create_container(&id, &spec).await.unwrap();
2839
2840 let calls = calls.lock().unwrap();
2841 assert_eq!(
2842 role_for(&calls, "create_container"),
2843 Some(Role::Primary),
2844 "image_os == Macos must route to primary even when VZ-Linux is the default",
2845 );
2846 }
2847
2848 #[tokio::test]
2849 async fn dispatch_unknown_os_with_vz_linux_defaults_to_vz_linux() {
2850 let (rt, calls) = make_composite_with_vz_linux();
2856 let id = cid("svc", 0);
2857 let spec = make_spec("docker.io/library/whatever:latest", None);
2858
2859 rt.create_container(&id, &spec).await.unwrap();
2860
2861 let calls = calls.lock().unwrap();
2862 assert_eq!(
2863 role_for(&calls, "create_container"),
2864 Some(Role::VzLinux),
2865 "an unknown-OS image must default to VZ-Linux when the delegate is present",
2866 );
2867 }
2868
2869 #[tokio::test]
2870 async fn dispatch_unknown_os_without_vz_linux_falls_through_to_primary() {
2871 let (rt, calls) = make_composite(true);
2875 let id = cid("svc", 0);
2876 let spec = make_spec("docker.io/library/whatever:latest", None);
2877
2878 rt.create_container(&id, &spec).await.unwrap();
2879
2880 let calls = calls.lock().unwrap();
2881 assert_eq!(
2882 role_for(&calls, "create_container"),
2883 Some(Role::Primary),
2884 "without a VZ-Linux delegate an unknown-OS image keeps the primary fallthrough",
2885 );
2886 }
2887
2888 async fn seed_persistent_linux_cache(path: &std::path::Path, image: &str) {
2892 seed_persistent_cache_with_os(path, image, "linux").await;
2893 }
2894
2895 async fn seed_persistent_cache_with_os(path: &std::path::Path, image: &str, os: &str) {
2898 let cache = zlayer_registry::CacheType::persistent_at(path)
2899 .build()
2900 .await
2901 .expect("open persistent blob cache");
2902
2903 let config_json = serde_json::json!({
2904 "architecture": "arm64",
2905 "os": os,
2906 "config": {},
2907 });
2908 let config_bytes = serde_json::to_vec(&config_json).unwrap();
2909 let config_digest = zlayer_registry::compute_digest(&config_bytes);
2910 cache.put(&config_digest, &config_bytes).await.unwrap();
2911
2912 let manifest = zlayer_registry::OciImageManifest {
2913 schema_version: 2,
2914 media_type: Some("application/vnd.oci.image.manifest.v1+json".to_string()),
2915 artifact_type: None,
2916 config: oci_client::manifest::OciDescriptor {
2917 media_type: "application/vnd.oci.image.config.v1+json".to_string(),
2918 digest: config_digest.clone(),
2919 size: i64::try_from(config_bytes.len()).unwrap(),
2920 urls: None,
2921 annotations: None,
2922 },
2923 layers: vec![],
2924 annotations: None,
2925 subject: None,
2926 };
2927 let manifest_bytes = serde_json::to_vec(&manifest).unwrap();
2928 let manifest_digest = zlayer_registry::compute_digest(&manifest_bytes);
2929 cache
2930 .put(&zlayer_registry::manifest_cache_key(image), &manifest_bytes)
2931 .await
2932 .unwrap();
2933 cache
2934 .put(
2935 &zlayer_registry::manifest_digest_cache_key(image),
2936 manifest_digest.as_bytes(),
2937 )
2938 .await
2939 .unwrap();
2940 }
2941
2942 #[tokio::test]
2948 async fn pull_then_dispatch_resolves_linux_os_from_local_cache_routes_to_vz_linux() {
2949 let tmp = db_tempdir();
2950 let cache_path = tmp.path().join("blobs.redb");
2951 let image = "docker.io/library/alpine:latest";
2952 seed_persistent_linux_cache(&cache_path, image).await;
2953
2954 let (rt, calls) = make_composite_with_vz_linux();
2955 let rt = rt.with_os_inspect_cache_path(Some(cache_path));
2956
2957 rt.pull_image(image).await.unwrap();
2959
2960 assert_eq!(
2962 rt.image_os.read().await.get(image).copied(),
2963 Some(OsKind::Linux),
2964 "pull_image must resolve Linux OS from the local persistent cache",
2965 );
2966
2967 let id = cid("lin-svc", 0);
2969 let spec = make_spec(image, None);
2970 rt.create_container(&id, &spec).await.unwrap();
2971
2972 let calls = calls.lock().unwrap();
2973 assert_eq!(
2974 role_for(&calls, "create_container"),
2975 Some(Role::VzLinux),
2976 "a Linux image whose OS came from the local cache must route to VZ-Linux",
2977 );
2978 }
2979
2980 #[tokio::test]
2987 async fn bare_ref_spec_resolves_os_from_qualified_seeded_cache_routes_to_vz_linux() {
2988 let tmp = db_tempdir();
2989 let cache_path = tmp.path().join("blobs.redb");
2990 seed_persistent_linux_cache(&cache_path, "docker.io/library/alpine:latest").await;
2992
2993 let (rt, calls) = make_composite_with_vz_linux();
2994 let rt = rt.with_os_inspect_cache_paths(vec![cache_path]);
2995
2996 let bare = "alpine:latest";
2999 rt.pull_image(bare).await.unwrap();
3000
3001 assert_eq!(
3002 rt.image_os.read().await.get(bare).copied(),
3003 Some(OsKind::Linux),
3004 "bare-ref inspect must resolve Linux from the qualified-seeded cache",
3005 );
3006
3007 let id = cid("lin-svc", 0);
3008 let spec = make_spec(bare, None);
3009 rt.create_container(&id, &spec).await.unwrap();
3010
3011 let calls = calls.lock().unwrap();
3012 assert_eq!(
3013 role_for(&calls, "create_container"),
3014 Some(Role::VzLinux),
3015 "bare-ref Linux image routes to VZ-Linux via the canonical-key cache hit",
3016 );
3017 }
3018
3019 #[tokio::test]
3025 async fn os_resolves_from_second_cache_when_first_is_empty() {
3026 let tmp = db_tempdir();
3027 let empty_cache = tmp.path().join("vz-linux-blobs.redb");
3028 let primary_cache = tmp.path().join("primary-blobs.redb");
3029 zlayer_registry::CacheType::persistent_at(&empty_cache)
3031 .build()
3032 .await
3033 .unwrap();
3034 seed_persistent_linux_cache(&primary_cache, "docker.io/library/alpine:latest").await;
3036
3037 let (rt, calls) = make_composite_with_vz_linux();
3038 let rt = rt.with_os_inspect_cache_paths(vec![empty_cache, primary_cache]);
3039
3040 let bare = "alpine:latest";
3041 rt.pull_image(bare).await.unwrap();
3042
3043 assert_eq!(
3044 rt.image_os.read().await.get(bare).copied(),
3045 Some(OsKind::Linux),
3046 "OS must resolve from the second cache after the first misses (no network)",
3047 );
3048
3049 let id = cid("lin-svc", 0);
3050 let spec = make_spec(bare, None);
3051 rt.create_container(&id, &spec).await.unwrap();
3052
3053 let calls = calls.lock().unwrap();
3054 assert_eq!(role_for(&calls, "create_container"), Some(Role::VzLinux),);
3055 }
3056
3057 #[tokio::test]
3062 async fn local_darwin_sidecar_resolves_macos_and_routes_to_primary() {
3063 let tmp = db_tempdir();
3064 let cache_path = tmp.path().join("blobs.redb");
3067 let image = "myapp:latest";
3068 let sanitized = sanitize_image_name(image);
3069 let image_dir = tmp.path().join(&sanitized);
3070 std::fs::create_dir_all(&image_dir).unwrap();
3071 let meta = zlayer_types::local_image::LocalImageMetadata::new(image, "darwin", "arm64");
3072 std::fs::write(
3073 image_dir.join(zlayer_types::local_image::LOCAL_IMAGE_METADATA_FILE),
3074 serde_json::to_vec(&meta).unwrap(),
3075 )
3076 .unwrap();
3077
3078 let (rt, calls) = make_composite_with_vz_linux();
3079 let rt = rt.with_os_inspect_cache_path(Some(cache_path));
3080
3081 rt.pull_image(image).await.unwrap();
3083
3084 assert_eq!(
3085 rt.image_os.read().await.get(image).copied(),
3086 Some(OsKind::Macos),
3087 "the darwin sidecar must resolve os=Macos from the local image store",
3088 );
3089
3090 let id = cid("mac-svc", 0);
3091 let spec = make_spec(image, None);
3092 rt.create_container(&id, &spec).await.unwrap();
3093
3094 let calls = calls.lock().unwrap();
3095 assert_eq!(
3096 role_for(&calls, "create_container"),
3097 Some(Role::Primary),
3098 "a darwin sidecar image must route to the Seatbelt primary, not VZ-Linux",
3099 );
3100 }
3101
3102 #[tokio::test]
3114 async fn pull_with_network_429_still_dispatches_via_local_cache() {
3115 let tmp = db_tempdir();
3116 let cache_path = tmp.path().join("blobs.redb");
3117 let image = "registry.invalid.example/library/alpine:latest";
3120 seed_persistent_linux_cache(&cache_path, image).await;
3121
3122 let (rt, calls) = make_composite_with_vz_linux();
3123 let rt = rt.with_os_inspect_cache_path(Some(cache_path));
3124
3125 rt.pull_image(image).await.unwrap();
3129 assert_eq!(
3130 rt.image_os.read().await.get(image).copied(),
3131 Some(OsKind::Linux),
3132 "OS must be resolved from the local cache with no network call",
3133 );
3134
3135 let id = cid("lin-svc", 0);
3137 let spec = make_spec(image, None);
3138 rt.create_container(&id, &spec).await.unwrap();
3139
3140 let calls = calls.lock().unwrap();
3141 assert_eq!(
3142 role_for(&calls, "create_container"),
3143 Some(Role::VzLinux),
3144 "a would-be-429 pull must still route the cached Linux image to VZ-Linux",
3145 );
3146 }
3147
3148 #[tokio::test]
3153 async fn pull_then_dispatch_resolves_macos_os_from_local_cache_routes_to_primary() {
3154 let tmp = db_tempdir();
3155 let cache_path = tmp.path().join("blobs.redb");
3156 let image = "ghcr.io/zlayer/macos-native:latest";
3157 seed_persistent_cache_with_os(&cache_path, image, "darwin").await;
3158
3159 let (rt, calls) = make_composite_with_vz_linux();
3160 let rt = rt.with_os_inspect_cache_path(Some(cache_path));
3161
3162 rt.pull_image(image).await.unwrap();
3163 assert_eq!(
3164 rt.image_os.read().await.get(image).copied(),
3165 Some(OsKind::Macos),
3166 "pull_image must resolve macOS OS from the local persistent cache",
3167 );
3168
3169 let id = cid("mac-svc", 0);
3170 let spec = make_spec(image, None);
3171 rt.create_container(&id, &spec).await.unwrap();
3172
3173 let calls = calls.lock().unwrap();
3174 assert_eq!(
3175 role_for(&calls, "create_container"),
3176 Some(Role::Primary),
3177 "a macOS-native rootfs must route to primary even with VZ-Linux as default",
3178 );
3179 }
3180
3181 #[tokio::test]
3182 async fn dispatch_vm_label_forces_libkrun_delegate() {
3183 let (rt, calls) = make_composite_with_vz_linux();
3184 let id = cid("lin-svc", 0);
3185 let mut spec = make_spec(
3188 "docker.io/library/alpine:3.19",
3189 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Arm64)),
3190 );
3191 spec.labels
3192 .insert("com.zlayer.isolation".to_string(), "vm".to_string());
3193
3194 rt.create_container(&id, &spec).await.unwrap();
3195
3196 let calls = calls.lock().unwrap();
3197 assert_eq!(
3198 role_for(&calls, "create_container"),
3199 Some(Role::Delegate),
3200 "com.zlayer.isolation=vm must force the libkrun delegate even when VZ Linux is default"
3201 );
3202 }
3203
3204 #[tokio::test]
3205 async fn dispatch_unmarked_image_with_vz_delegate_falls_through_to_primary() {
3206 let (rt, calls) = make_composite_with_vz();
3207 let id = cid("mac-svc", 0);
3208 let spec = make_spec("ghcr.io/org/plain:1", None);
3211 rt.create_container(&id, &spec).await.unwrap();
3212
3213 let calls = calls.lock().unwrap();
3214 assert_eq!(
3215 role_for(&calls, "create_container"),
3216 Some(Role::Primary),
3217 "an unmarked image must fall through to primary even when a VZ delegate is attached"
3218 );
3219 }
3220
3221 #[tokio::test]
3222 async fn per_container_dispatch_cache_persists_through_start_stop() {
3223 let (rt, calls) = make_composite(true);
3224 let id = cid("win-svc", 0);
3225 let spec = make_spec(
3226 "mcr.microsoft.com/windows/nanoserver:ltsc2022",
3227 Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
3228 );
3229
3230 rt.create_container(&id, &spec).await.unwrap();
3231 rt.start_container(&id).await.unwrap();
3232 rt.stop_container(&id, Duration::from_secs(1))
3233 .await
3234 .unwrap();
3235 rt.remove_container(&id).await.unwrap();
3236
3237 let recorded = calls.lock().unwrap().clone();
3238 for method in [
3239 "create_container",
3240 "start_container",
3241 "stop_container",
3242 "remove_container",
3243 ] {
3244 assert_eq!(
3245 role_for(&recorded, method),
3246 Some(Role::Primary),
3247 "{method} should have dispatched to primary"
3248 );
3249 }
3250
3251 let after = rt
3253 .start_container(&id)
3254 .await
3255 .expect_err("lookup after remove should fail");
3256 assert!(
3257 matches!(after, AgentError::NotFound { .. }),
3258 "expected NotFound after remove, got {after:?}"
3259 );
3260 }
3261
3262 #[tokio::test]
3263 async fn pull_image_calls_both_runtimes() {
3264 let (rt, calls) = make_composite(true);
3265 rt.pull_image("docker.io/library/alpine:3.19")
3266 .await
3267 .unwrap();
3268
3269 let recorded = calls.lock().unwrap();
3270 let pull_calls: Vec<Role> = recorded
3271 .iter()
3272 .filter(|(_, m, _)| m == "pull_image")
3273 .map(|(r, _, _)| *r)
3274 .collect();
3275 assert!(
3276 pull_calls.contains(&Role::Primary),
3277 "primary should have been pulled: {pull_calls:?}",
3278 );
3279 assert!(
3280 pull_calls.contains(&Role::Delegate),
3281 "delegate should have been pulled: {pull_calls:?}",
3282 );
3283 }
3284
3285 #[tokio::test]
3286 async fn pull_image_delegate_error_does_not_fail() {
3287 let calls = Arc::new(StdMutex::new(Vec::new()));
3290 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
3291 let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
3292 delegate.pull_image_error = Some("simulated delegate pull failure".to_string());
3293 let rt = CompositeRuntime::new(
3294 primary as Arc<dyn Runtime>,
3295 Some(Arc::new(delegate) as Arc<dyn Runtime>),
3296 );
3297
3298 rt.pull_image("docker.io/library/alpine:3.19")
3300 .await
3301 .unwrap();
3302
3303 let recorded = calls.lock().unwrap();
3304 let pull_calls: Vec<Role> = recorded
3305 .iter()
3306 .filter(|(_, m, _)| m == "pull_image")
3307 .map(|(r, _, _)| *r)
3308 .collect();
3309 assert!(
3310 pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
3311 "both runtimes should have been called: {pull_calls:?}",
3312 );
3313 }
3314
3315 #[tokio::test]
3316 async fn pull_image_primary_wrong_platform_does_not_fail() {
3317 let calls = Arc::new(StdMutex::new(Vec::new()));
3323 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3324 primary.pull_image_wrong_platform = Some(("windows", "linux"));
3325 let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
3326 let rt = CompositeRuntime::new(
3327 Arc::new(primary) as Arc<dyn Runtime>,
3328 Some(Arc::new(delegate) as Arc<dyn Runtime>),
3329 );
3330
3331 rt.pull_image("docker.io/library/alpine:3.19")
3333 .await
3334 .expect("composite pull must tolerate WrongPlatform from primary");
3335
3336 let recorded = calls.lock().unwrap();
3337 let pull_calls: Vec<Role> = recorded
3338 .iter()
3339 .filter(|(_, m, _)| m == "pull_image")
3340 .map(|(r, _, _)| *r)
3341 .collect();
3342 assert!(
3343 pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
3344 "delegate must still be called when primary soft-skips: {pull_calls:?}",
3345 );
3346 }
3347
3348 #[tokio::test]
3349 async fn pull_image_with_policy_primary_wrong_platform_does_not_fail() {
3350 let calls = Arc::new(StdMutex::new(Vec::new()));
3355 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3356 primary.pull_image_wrong_platform = Some(("windows", "linux"));
3357 let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
3358 let rt = CompositeRuntime::new(
3359 Arc::new(primary) as Arc<dyn Runtime>,
3360 Some(Arc::new(delegate) as Arc<dyn Runtime>),
3361 );
3362
3363 rt.pull_image_with_policy(
3364 "docker.io/library/alpine:3.19",
3365 PullPolicy::IfNotPresent,
3366 None,
3367 zlayer_spec::SourcePolicy::default(),
3368 )
3369 .await
3370 .expect("composite pull_image_with_policy must tolerate WrongPlatform from primary");
3371
3372 let recorded = calls.lock().unwrap();
3373 let pull_calls: Vec<Role> = recorded
3374 .iter()
3375 .filter(|(_, m, _)| m == "pull_image_with_policy")
3376 .map(|(r, _, _)| *r)
3377 .collect();
3378 assert!(
3379 pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
3380 "delegate must still be called when primary soft-skips: {pull_calls:?}",
3381 );
3382 }
3383
3384 #[tokio::test]
3385 async fn pull_image_primary_non_wrong_platform_error_still_fails() {
3386 let calls = Arc::new(StdMutex::new(Vec::new()));
3390 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3391 primary.pull_image_error = Some("simulated real failure".to_string());
3392 let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
3393 let rt = CompositeRuntime::new(
3394 Arc::new(primary) as Arc<dyn Runtime>,
3395 Some(Arc::new(delegate) as Arc<dyn Runtime>),
3396 );
3397
3398 let err = rt
3399 .pull_image("docker.io/library/alpine:3.19")
3400 .await
3401 .expect_err("real primary error must propagate");
3402 assert!(
3403 matches!(err, AgentError::Internal(_)),
3404 "expected Internal, got {err:?}",
3405 );
3406 }
3407
3408 #[tokio::test]
3409 async fn list_images_merges_both() {
3410 let calls = Arc::new(StdMutex::new(Vec::new()));
3412 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3413 primary.list_images_response = vec![ImageInfo {
3414 reference: "primary/image:1".to_string(),
3415 digest: None,
3416 size_bytes: None,
3417 }];
3418 let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
3419 delegate.list_images_response = vec![ImageInfo {
3420 reference: "delegate/image:1".to_string(),
3421 digest: None,
3422 size_bytes: None,
3423 }];
3424 let rt = CompositeRuntime::new(
3425 Arc::new(primary) as Arc<dyn Runtime>,
3426 Some(Arc::new(delegate) as Arc<dyn Runtime>),
3427 );
3428
3429 let merged = rt.list_images().await.unwrap();
3430 let refs: Vec<&str> = merged.iter().map(|i| i.reference.as_str()).collect();
3431 assert!(
3432 refs.contains(&"primary/image:1") && refs.contains(&"delegate/image:1"),
3433 "merged list should contain both entries, got {refs:?}",
3434 );
3435 }
3436
3437 #[tokio::test]
3445 async fn list_images_tolerates_primary_unsupported_and_uses_vz_linux() {
3446 let calls = Arc::new(StdMutex::new(Vec::new()));
3447 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3448 primary.list_images_error = Some("list_images is not supported".to_string());
3449 let mut vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
3450 vz_linux.list_images_response = vec![ImageInfo {
3451 reference: "docker.io/library/alpine:latest".to_string(),
3452 digest: None,
3453 size_bytes: None,
3454 }];
3455
3456 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
3457 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
3458
3459 let images = rt
3460 .list_images()
3461 .await
3462 .expect("primary Unsupported must not fail the composite list_images");
3463 let refs: Vec<&str> = images.iter().map(|i| i.reference.as_str()).collect();
3464 assert_eq!(
3465 refs,
3466 vec!["docker.io/library/alpine:latest"],
3467 "should return the VZ-Linux delegate's images, got {refs:?}",
3468 );
3469 }
3470
3471 #[tokio::test]
3475 async fn list_images_errors_only_when_all_backends_fail() {
3476 let calls = Arc::new(StdMutex::new(Vec::new()));
3477 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3478 primary.list_images_error = Some("unsupported".to_string());
3479 let mut vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
3480 vz_linux.list_images_error = Some("also unsupported".to_string());
3481
3482 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
3483 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
3484
3485 let err = rt.list_images().await.unwrap_err();
3486 assert!(
3487 matches!(err, AgentError::Unsupported(_)),
3488 "all-backends-fail should surface Unsupported, got {err:?}",
3489 );
3490 }
3491
3492 #[tokio::test]
3498 async fn prune_images_tolerates_primary_unsupported_and_uses_delegate() {
3499 let calls = Arc::new(StdMutex::new(Vec::new()));
3500 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3502 let delegate =
3503 MockRuntime::new(Role::Delegate, Arc::clone(&calls)).with_prune_result(PruneResult {
3504 deleted: vec![
3505 "docker.io/library/alpine:3.19".to_string(),
3506 "docker.io/library/nginx:1.25".to_string(),
3507 ],
3508 space_reclaimed: 4096,
3509 });
3510
3511 let rt = CompositeRuntime::new(
3512 Arc::new(primary) as Arc<dyn Runtime>,
3513 Some(Arc::new(delegate) as Arc<dyn Runtime>),
3514 );
3515
3516 let result = rt
3517 .prune_images()
3518 .await
3519 .expect("primary Unsupported must not fail the composite prune_images");
3520 assert_eq!(
3521 result.deleted,
3522 vec![
3523 "docker.io/library/alpine:3.19".to_string(),
3524 "docker.io/library/nginx:1.25".to_string(),
3525 ],
3526 "should return the delegate's deleted images, got {:?}",
3527 result.deleted,
3528 );
3529 assert_eq!(
3530 result.space_reclaimed, 4096,
3531 "should return the delegate's reclaimed bytes",
3532 );
3533
3534 let calls = calls.lock().unwrap();
3535 assert_eq!(
3536 role_for(&calls, "prune_images"),
3537 Some(Role::Primary),
3538 "primary prune_images must still be attempted first",
3539 );
3540 assert!(
3541 calls
3542 .iter()
3543 .any(|(role, m, _)| *role == Role::Delegate && m == "prune_images"),
3544 "delegate prune_images must be invoked after the primary miss",
3545 );
3546 }
3547
3548 fn log_entry(stream: LogStream, message: &str) -> LogEntry {
3562 LogEntry {
3563 timestamp: chrono::Utc::now(),
3564 stream,
3565 source: zlayer_observability::logs::LogSource::Container("test".to_string()),
3566 message: message.to_string(),
3567 service: None,
3568 deployment: None,
3569 }
3570 }
3571
3572 async fn drain_logs(stream: LogsStream) -> String {
3574 use futures_util::StreamExt as _;
3575 let mut out = Vec::new();
3576 let mut s = stream;
3577 while let Some(item) = s.next().await {
3578 out.extend_from_slice(&item.expect("log chunk ok").bytes);
3579 }
3580 String::from_utf8(out).expect("utf8 log body")
3581 }
3582
3583 async fn drain_stats(stream: StatsStream) -> Vec<StatsSample> {
3585 use futures_util::StreamExt as _;
3586 let mut out = Vec::new();
3587 let mut s = stream;
3588 while let Some(item) = s.next().await {
3589 out.push(item.expect("stats sample ok"));
3590 }
3591 out
3592 }
3593
3594 async fn make_read_composite(owner: Role) -> (CompositeRuntime, ContainerId, CallLog) {
3600 let calls = Arc::new(StdMutex::new(Vec::new()));
3601 let logs = vec![
3602 log_entry(LogStream::Stdout, "hello stdout"),
3603 log_entry(LogStream::Stderr, "hello stderr"),
3604 ];
3605 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls))
3606 .with_stream_unsupported()
3607 .with_logs(logs.clone());
3608 let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls)).with_logs(logs);
3609 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
3610 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
3611
3612 let id = cid("read-svc", 0);
3613 let target = match owner {
3616 Role::Primary => DispatchTarget::Primary,
3617 Role::VzLinux => DispatchTarget::VzLinux,
3618 other => panic!("make_read_composite supports Primary/VzLinux, not {other:?}"),
3619 };
3620 rt.dispatch.write().await.insert(id.clone(), target);
3621 (rt, id, calls)
3622 }
3623
3624 #[tokio::test]
3625 async fn logs_stream_falls_back_to_snapshot_when_owner_has_no_stream() {
3626 let calls = Arc::new(StdMutex::new(Vec::new()));
3630 let logs = vec![
3631 log_entry(LogStream::Stdout, "hello stdout"),
3632 log_entry(LogStream::Stderr, "hello stderr"),
3633 ];
3634 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls))
3635 .with_stream_unsupported()
3636 .with_logs(logs);
3637 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None);
3638 let id = cid("read-svc", 0);
3639 rt.dispatch
3640 .write()
3641 .await
3642 .insert(id.clone(), DispatchTarget::Primary);
3643
3644 let stream = rt
3645 .logs_stream(&id, LogsStreamOptions::default())
3646 .await
3647 .expect("logs_stream must not 500 when snapshot reads work");
3648 let body = drain_logs(stream).await;
3649 assert!(
3650 body.contains("hello stdout") && body.contains("hello stderr"),
3651 "synthesised stream must carry the captured logs, got: {body:?}",
3652 );
3653 }
3654
3655 #[tokio::test]
3656 async fn logs_stream_routes_to_delegate_owner_native_stream() {
3657 let (rt, id, calls) = make_read_composite(Role::VzLinux).await;
3660 let stream = rt
3661 .logs_stream(&id, LogsStreamOptions::default())
3662 .await
3663 .expect("delegate-owned logs_stream must succeed");
3664 let body = drain_logs(stream).await;
3665 assert!(body.contains("hello stdout"), "got: {body:?}");
3666
3667 let log = calls.lock().expect("call-log mutex poisoned");
3668 assert_eq!(
3669 role_for(&log, "logs_stream"),
3670 Some(Role::VzLinux),
3671 "logs_stream must hit the owning delegate first, calls: {log:?}",
3672 );
3673 }
3674
3675 #[tokio::test]
3676 async fn get_logs_falls_back_across_backends() {
3677 let (rt, id, _calls) = make_read_composite(Role::Primary).await;
3681 let logs = rt.get_logs(&id).await.expect("get_logs must succeed");
3682 assert_eq!(logs.len(), 2, "owner snapshot logs should be returned");
3683 }
3684
3685 #[tokio::test]
3686 async fn stats_stream_falls_back_to_snapshot_when_owner_has_no_stream() {
3687 let calls = Arc::new(StdMutex::new(Vec::new()));
3692 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_stream_unsupported();
3693 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None);
3694 let id = cid("read-svc", 0);
3695 rt.dispatch
3696 .write()
3697 .await
3698 .insert(id.clone(), DispatchTarget::Primary);
3699
3700 let stream = rt
3701 .stats_stream(&id)
3702 .await
3703 .expect("stats_stream must not 500 when get_container_stats works");
3704 let samples = drain_stats(stream).await;
3705 assert_eq!(samples.len(), 1, "snapshot fallback yields one sample");
3706 assert!(
3707 samples[0].mem_used_bytes > 0,
3708 "synthesised sample must carry non-zero memory, got {:?}",
3709 samples[0],
3710 );
3711 assert_eq!(
3712 samples[0].cpu_total_ns, 1_000_000,
3713 "cpu microseconds must be scaled to nanoseconds in the synthesised sample",
3714 );
3715 }
3716
3717 #[tokio::test]
3718 async fn get_container_stats_tolerates_owner_miss_and_uses_other_backend() {
3719 let calls = Arc::new(StdMutex::new(Vec::new()));
3724 let primary =
3725 MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_stats_snapshot_unsupported();
3726 let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
3727 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
3728 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
3729 let id = cid("read-svc", 0);
3730 rt.dispatch
3731 .write()
3732 .await
3733 .insert(id.clone(), DispatchTarget::Primary);
3734
3735 let stats = rt
3736 .get_container_stats(&id)
3737 .await
3738 .expect("owner Unsupported must fall back to the delegate, not 500");
3739 assert!(stats.memory_bytes > 0, "delegate stats should be returned");
3740
3741 let log = calls.lock().expect("call-log mutex poisoned");
3742 assert!(
3743 log.iter()
3744 .any(|(role, method, _)| *role == Role::Primary && method == "get_container_stats"),
3745 "primary must have been tried first, calls: {log:?}",
3746 );
3747 assert!(
3748 log.iter()
3749 .any(|(role, method, _)| *role == Role::VzLinux && method == "get_container_stats"),
3750 "delegate must have served the fallback, calls: {log:?}",
3751 );
3752 }
3753
3754 #[tokio::test]
3755 async fn reads_propagate_not_found_when_no_backend_owns_container() {
3756 let calls = Arc::new(StdMutex::new(Vec::new()));
3760 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_reads_not_found();
3761 let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls)).with_reads_not_found();
3762 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
3763 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
3764 let id = cid("read-svc", 0);
3765 rt.dispatch
3766 .write()
3767 .await
3768 .insert(id.clone(), DispatchTarget::Primary);
3769
3770 match rt.logs_stream(&id, LogsStreamOptions::default()).await {
3773 Err(AgentError::NotFound { .. }) => {}
3774 other => panic!(
3775 "all-not-found logs_stream must be NotFound (404), got {:?}",
3776 other.err(),
3777 ),
3778 }
3779 match rt.stats_stream(&id).await {
3780 Err(AgentError::NotFound { .. }) => {}
3781 other => panic!(
3782 "all-not-found stats_stream must be NotFound (404), got {:?}",
3783 other.err(),
3784 ),
3785 }
3786 let cl_err = rt.container_logs(&id, 10).await.unwrap_err();
3787 assert!(
3788 matches!(cl_err, AgentError::NotFound { .. }),
3789 "all-not-found container_logs must be NotFound (404), got {cl_err:?}",
3790 );
3791 }
3792
3793 #[tokio::test]
3794 async fn reads_on_undispatched_container_are_not_found() {
3795 let (rt, _calls) = make_composite(false);
3797 let id = cid("ghost", 0);
3798 match rt.logs_stream(&id, LogsStreamOptions::default()).await {
3799 Err(AgentError::NotFound { .. }) => {}
3800 other => panic!(
3801 "undispatched logs_stream must be NotFound, got {:?}",
3802 other.err()
3803 ),
3804 }
3805 }
3806
3807 #[tokio::test]
3813 async fn pull_image_fans_out_to_vz_linux() {
3814 let calls = Arc::new(StdMutex::new(Vec::new()));
3815 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
3816 let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
3817
3818 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
3819 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
3820
3821 rt.pull_image("docker.io/library/alpine:latest")
3822 .await
3823 .expect("pull should succeed");
3824
3825 let log = calls.lock().expect("call-log mutex poisoned");
3826 assert!(
3827 log.iter()
3828 .any(|(role, method, _)| *role == Role::VzLinux && method == "pull_image"),
3829 "pull_image must reach the VZ-Linux delegate, recorded calls: {log:?}",
3830 );
3831 }
3832
3833 #[tokio::test]
3834 async fn dispatch_lookup_unknown_container_errors() {
3835 let (rt, _calls) = make_composite(true);
3836 let id = cid("ghost", 0);
3837
3838 let err = rt.start_container(&id).await.unwrap_err();
3839 assert!(
3840 matches!(err, AgentError::NotFound { .. }),
3841 "expected NotFound for unknown container, got {err:?}"
3842 );
3843 }
3844
3845 async fn cached_os(rt: &CompositeRuntime, image: &str) -> Option<OsKind> {
3847 rt.image_os.read().await.get(image).copied()
3848 }
3849
3850 #[tokio::test]
3851 async fn apply_image_os_inspection_populates_cache_on_ok_some() {
3852 let (rt, _calls) = make_composite(true);
3856 let image = "docker.io/library/alpine:3.19";
3857
3858 rt.apply_image_os_inspection(image, Ok(Some(OsKind::Linux)))
3859 .await;
3860
3861 assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
3862 }
3863
3864 #[tokio::test]
3865 async fn apply_image_os_inspection_leaves_cache_untouched_on_ok_none() {
3866 let (rt, _calls) = make_composite(true);
3870 let image = "docker.io/library/nginx:1.25";
3871
3872 rt.apply_image_os_inspection(image, Ok(None)).await;
3873
3874 assert_eq!(cached_os(&rt, image).await, None);
3875 }
3876
3877 #[tokio::test]
3878 async fn apply_image_os_inspection_leaves_cache_untouched_on_err() {
3879 let (rt, _calls) = make_composite(true);
3882 let image = "docker.io/library/nginx:1.25";
3883
3884 rt.record_image_os(image, OsKind::Linux).await;
3887
3888 let err = zlayer_registry::RegistryError::NotFound {
3889 registry: "docker.io".to_string(),
3890 image: image.to_string(),
3891 };
3892 rt.apply_image_os_inspection(image, Err(err)).await;
3893
3894 assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
3896 }
3897
3898 #[tokio::test]
3899 async fn pull_image_inspection_failure_does_not_fail_pull() {
3900 let (rt, _calls) = make_composite(true);
3906 let image = "invalid.example.invalid/ghost:v1";
3907
3908 rt.pull_image(image).await.unwrap();
3909
3910 assert_eq!(
3911 cached_os(&rt, image).await,
3912 None,
3913 "failed inspection must not populate the image-OS cache"
3914 );
3915 }
3916
3917 #[tokio::test]
3918 async fn pull_image_with_policy_inspection_failure_does_not_fail_pull() {
3919 let (rt, _calls) = make_composite(true);
3922 let image = "invalid.example.invalid/ghost:v1";
3923
3924 rt.pull_image_with_policy(
3925 image,
3926 PullPolicy::IfNotPresent,
3927 None,
3928 zlayer_spec::SourcePolicy::default(),
3929 )
3930 .await
3931 .unwrap();
3932
3933 assert_eq!(cached_os(&rt, image).await, None);
3934 }
3935
3936 #[test]
3937 fn os_kind_from_oci_str_roundtrip() {
3938 for os in [OsKind::Linux, OsKind::Windows, OsKind::Macos] {
3943 assert_eq!(OsKind::from_oci_str(os.as_oci_str()), Some(os));
3944 }
3945 assert_eq!(OsKind::from_oci_str(""), None);
3946 assert_eq!(OsKind::from_oci_str("freebsd"), None);
3947 }
3948}