1use std::collections::HashMap;
37use std::net::IpAddr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use async_trait::async_trait;
42use tokio::sync::RwLock;
43use zlayer_observability::logs::{LogEntry, LogStream};
44use zlayer_spec::{OsKind, PullPolicy, RegistryAuth, ServiceSpec};
45
46use crate::cgroups_stats::ContainerStats;
47use crate::error::{AgentError, Result};
48use crate::runtime::{
49 ContainerId, ContainerInspectDetails, ContainerState, ExecEventStream, ImageInfo, LogChannel,
50 LogChunk, LogsStream, LogsStreamOptions, OverlayAttachKind, PruneResult, Runtime, StatsSample,
51 StatsStream, WaitCondition, WaitOutcome,
52};
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56enum DispatchTarget {
57 Primary,
58 Delegate,
59 Vz,
63 VzLinux,
68}
69
70pub struct CompositeRuntime {
74 primary: Arc<dyn Runtime>,
75 delegate: Option<Arc<dyn Runtime>>,
76 vz: Option<Arc<dyn Runtime>>,
79 vz_linux: Option<Arc<dyn Runtime>>,
83 dispatch: Arc<RwLock<HashMap<ContainerId, DispatchTarget>>>,
86 image_os: Arc<RwLock<HashMap<String, OsKind>>>,
90 image_runtime: Arc<RwLock<HashMap<String, String>>>,
96 os_inspect_cache_paths: Vec<std::path::PathBuf>,
115}
116
117impl CompositeRuntime {
118 #[must_use]
124 pub fn new(primary: Arc<dyn Runtime>, delegate: Option<Arc<dyn Runtime>>) -> Self {
125 Self {
126 primary,
127 delegate,
128 vz: None,
129 vz_linux: None,
130 dispatch: Arc::new(RwLock::new(HashMap::new())),
131 image_os: Arc::new(RwLock::new(HashMap::new())),
132 image_runtime: Arc::new(RwLock::new(HashMap::new())),
133 os_inspect_cache_paths: Vec::new(),
134 }
135 }
136
137 #[must_use]
145 pub fn with_os_inspect_cache_path(self, path: Option<std::path::PathBuf>) -> Self {
146 self.with_os_inspect_cache_paths(path.into_iter().collect())
147 }
148
149 #[must_use]
161 pub fn with_os_inspect_cache_paths(mut self, paths: Vec<std::path::PathBuf>) -> Self {
162 self.os_inspect_cache_paths = paths;
163 self
164 }
165
166 async fn inspect_image_os(
186 &self,
187 image: &str,
188 ) -> std::result::Result<Option<OsKind>, zlayer_registry::RegistryError> {
189 for path in &self.os_inspect_cache_paths {
190 match zlayer_registry::CacheType::persistent_at(path)
191 .build()
192 .await
193 {
194 Ok(cache) => {
195 match zlayer_registry::fetch_image_os_in_cache_only(image, cache, None).await {
196 Ok(Some(os)) => return Ok(Some(os)),
197 Ok(None) => {
198 tracing::trace!(
199 image,
200 cache = %path.display(),
201 "image OS not resolvable from this local cache; trying next",
202 );
203 }
204 Err(e) => return Err(e),
205 }
206 }
207 Err(e) => {
208 tracing::debug!(
209 image,
210 cache = %path.display(),
211 error = %e,
212 "failed to open OS-inspect blob cache; trying next",
213 );
214 }
215 }
216 }
217 Ok(None)
222 }
223
224 async fn inspect_image_runtime_marker(
227 &self,
228 image: &str,
229 auth: Option<&RegistryAuth>,
230 ) -> std::result::Result<Option<String>, zlayer_registry::RegistryError> {
231 for path in &self.os_inspect_cache_paths {
232 match zlayer_registry::CacheType::persistent_at(path)
233 .build()
234 .await
235 {
236 Ok(cache) => {
237 match zlayer_registry::fetch_image_runtime_marker_in_cache_only(
238 image, cache, None,
239 )
240 .await
241 {
242 Ok(Some(marker)) => return Ok(Some(marker)),
243 Ok(None) => {
244 tracing::trace!(
245 image,
246 cache = %path.display(),
247 "runtime marker not resolvable from this local cache; trying next",
248 );
249 }
250 Err(e) => return Err(e),
251 }
252 }
253 Err(e) => {
254 tracing::debug!(
255 image,
256 cache = %path.display(),
257 error = %e,
258 "failed to open marker-inspect blob cache; trying next",
259 );
260 }
261 }
262 }
263 zlayer_registry::fetch_image_runtime_marker(image, auth).await
264 }
265
266 #[must_use]
269 pub fn with_vz_delegate(mut self, vz: Option<Arc<dyn Runtime>>) -> Self {
270 self.vz = vz;
271 self
272 }
273
274 #[must_use]
280 pub fn with_vz_linux_delegate(mut self, vz_linux: Option<Arc<dyn Runtime>>) -> Self {
281 self.vz_linux = vz_linux;
282 self
283 }
284
285 #[must_use]
287 pub fn primary(&self) -> &Arc<dyn Runtime> {
288 &self.primary
289 }
290
291 #[must_use]
293 pub fn delegate(&self) -> Option<&Arc<dyn Runtime>> {
294 self.delegate.as_ref()
295 }
296
297 pub(crate) async fn record_image_os(&self, image: &str, os: OsKind) {
303 self.image_os.write().await.insert(image.to_string(), os);
304 }
305
306 pub(crate) async fn record_image_runtime(&self, image: &str, marker: String) {
309 self.image_runtime
310 .write()
311 .await
312 .insert(image.to_string(), marker);
313 }
314
315 async fn apply_image_runtime_inspection(
320 &self,
321 image: &str,
322 result: std::result::Result<Option<String>, zlayer_registry::RegistryError>,
323 ) {
324 match result {
325 Ok(Some(marker)) => {
326 tracing::debug!(image, marker, "cached image runtime marker for dispatch");
327 self.record_image_runtime(image, marker).await;
328 }
329 Ok(None) => {}
330 Err(e) => {
331 tracing::trace!(
332 image,
333 error = %e,
334 "failed to inspect image runtime marker — dispatch unaffected",
335 );
336 }
337 }
338 }
339
340 async fn apply_image_os_inspection(
357 &self,
358 image: &str,
359 result: std::result::Result<Option<OsKind>, zlayer_registry::RegistryError>,
360 ) {
361 match result {
362 Ok(Some(os)) => {
363 self.record_image_os(image, os).await;
364 tracing::debug!(image, ?os, "cached image OS for dispatch");
365 }
366 Ok(None) => {
367 tracing::trace!(
368 image,
369 "image manifest has no OS field — dispatch will fall through to primary",
370 );
371 }
372 Err(e) => {
373 tracing::warn!(
374 image,
375 error = %e,
376 "failed to inspect image manifest OS — dispatch will fall through to primary",
377 );
378 }
379 }
380 }
381
382 async fn select_for(&self, service: &str, spec: &ServiceSpec) -> Result<DispatchTarget> {
409 if let Some(label) = spec.labels.get("com.zlayer.isolation") {
416 if self.vz.is_some() && label.eq_ignore_ascii_case("vz") {
417 return Ok(DispatchTarget::Vz);
418 }
419 if self.vz_linux.is_some() && label.eq_ignore_ascii_case("vz-linux") {
420 return Ok(DispatchTarget::VzLinux);
421 }
422 if label.eq_ignore_ascii_case("vm") || label.eq_ignore_ascii_case("libkrun") {
423 if self.delegate.is_some() {
427 return Ok(DispatchTarget::Delegate);
428 }
429 }
430 if label.eq_ignore_ascii_case("sandbox") || label.eq_ignore_ascii_case("seatbelt") {
431 return Ok(DispatchTarget::Primary);
432 }
433 }
434
435 if self.vz.is_some()
441 && self
442 .image_runtime
443 .read()
444 .await
445 .get(&spec.image.name.to_string())
446 .is_some_and(|m| m.eq_ignore_ascii_case(zlayer_registry::ZLAYER_RUNTIME_VZ))
447 {
448 return Ok(DispatchTarget::Vz);
449 }
450
451 if self.vz_linux.is_some()
454 && self
455 .image_runtime
456 .read()
457 .await
458 .get(&spec.image.name.to_string())
459 .is_some_and(|m| m.eq_ignore_ascii_case(zlayer_registry::ZLAYER_RUNTIME_LINUX_VZ))
460 {
461 return Ok(DispatchTarget::VzLinux);
462 }
463
464 if let Some(platform) = &spec.platform {
465 let target = match platform.os {
466 OsKind::Windows | OsKind::Macos => DispatchTarget::Primary,
467 OsKind::Linux if self.vz_linux.is_some() => DispatchTarget::VzLinux,
470 OsKind::Linux => DispatchTarget::Delegate,
471 };
472 if matches!(target, DispatchTarget::Delegate) && self.delegate.is_none() {
473 return Err(AgentError::RouteToPeer {
474 service: service.to_string(),
475 required_os: OsKind::Linux.as_oci_str().to_string(),
476 reason: "spec.platform.os = linux but this node has no WSL2 delegate \
477 configured; enable `--install-wsl yes` on this node or add a Linux \
478 peer to the cluster"
479 .to_string(),
480 });
481 }
482 return Ok(target);
483 }
484
485 if let Some(os) = self
486 .image_os
487 .read()
488 .await
489 .get(&spec.image.name.to_string())
490 .copied()
491 {
492 return match os {
493 OsKind::Linux => {
494 if self.vz_linux.is_some() {
495 Ok(DispatchTarget::VzLinux)
497 } else if self.delegate.is_some() {
498 Ok(DispatchTarget::Delegate)
499 } else {
500 Err(AgentError::RouteToPeer {
505 service: service.to_string(),
506 required_os: OsKind::Linux.as_oci_str().to_string(),
507 reason: format!(
508 "image '{}' manifest reports os=linux but this node has no WSL2 \
509 delegate configured; enable `--install-wsl yes` on this node or \
510 add a Linux peer to the cluster",
511 spec.image.name
512 ),
513 })
514 }
515 }
516 OsKind::Windows | OsKind::Macos => Ok(DispatchTarget::Primary),
517 };
518 }
519
520 if self.vz_linux.is_some() {
535 return Ok(DispatchTarget::VzLinux);
536 }
537
538 Ok(DispatchTarget::Primary)
539 }
540
541 async fn lookup(&self, id: &ContainerId) -> Result<Arc<dyn Runtime>> {
543 let target =
544 self.dispatch
545 .read()
546 .await
547 .get(id)
548 .copied()
549 .ok_or_else(|| AgentError::NotFound {
550 container: id.to_string(),
551 reason: "no dispatch record in CompositeRuntime".to_string(),
552 })?;
553 Ok(self.runtime_for(target).clone())
554 }
555
556 fn runtime_for(&self, t: DispatchTarget) -> &Arc<dyn Runtime> {
563 match t {
564 DispatchTarget::Primary => &self.primary,
565 DispatchTarget::Delegate => self
566 .delegate
567 .as_ref()
568 .expect("delegate target requires delegate to exist"),
569 DispatchTarget::Vz => self.vz.as_ref().unwrap_or(&self.primary),
572 DispatchTarget::VzLinux => self.vz_linux.as_ref().unwrap_or(&self.primary),
575 }
576 }
577
578 async fn read_backends(
590 &self,
591 id: &ContainerId,
592 ) -> Result<Vec<(&'static str, Arc<dyn Runtime>)>> {
593 let owner =
594 self.dispatch
595 .read()
596 .await
597 .get(id)
598 .copied()
599 .ok_or_else(|| AgentError::NotFound {
600 container: id.to_string(),
601 reason: "no dispatch record in CompositeRuntime".to_string(),
602 })?;
603
604 let all: [(DispatchTarget, Option<&Arc<dyn Runtime>>); 4] = [
608 (DispatchTarget::Primary, Some(&self.primary)),
609 (DispatchTarget::Delegate, self.delegate.as_ref()),
610 (DispatchTarget::Vz, self.vz.as_ref()),
611 (DispatchTarget::VzLinux, self.vz_linux.as_ref()),
612 ];
613
614 let label_for = |t: DispatchTarget| match t {
615 DispatchTarget::Primary => "primary",
616 DispatchTarget::Delegate => "delegate",
617 DispatchTarget::Vz => "vz",
618 DispatchTarget::VzLinux => "vz_linux",
619 };
620
621 let mut out: Vec<(&'static str, Arc<dyn Runtime>)> =
622 vec![(label_for(owner), self.runtime_for(owner).clone())];
623 for (target, rt) in all {
624 if target != owner {
625 if let Some(rt) = rt {
626 out.push((label_for(target), rt.clone()));
627 }
628 }
629 }
630 Ok(out)
631 }
632}
633
634#[derive(Default)]
648struct ReadMissAccumulator {
649 soft_err: Option<AgentError>,
651 not_found: Option<AgentError>,
653}
654
655impl ReadMissAccumulator {
656 fn record(&mut self, e: AgentError) {
657 if matches!(e, AgentError::NotFound { .. }) {
658 self.not_found = Some(e);
659 } else {
660 self.soft_err = Some(e);
661 }
662 }
663
664 fn into_error(self, what: &str) -> AgentError {
670 self.soft_err
671 .or(self.not_found)
672 .unwrap_or_else(|| AgentError::Unsupported(format!("no backend could serve {what}")))
673 }
674}
675
676fn one_shot_logs_stream(entries: Vec<LogEntry>, opts: &LogsStreamOptions) -> LogsStream {
685 use futures_util::stream;
686
687 let want_stdout = opts.stdout || !opts.stderr;
691 let want_stderr = opts.stderr || !opts.stdout;
692 let timestamps = opts.timestamps;
693
694 let chunks: Vec<Result<LogChunk>> = entries
695 .into_iter()
696 .filter_map(|e| {
697 let channel = match e.stream {
698 LogStream::Stdout => LogChannel::Stdout,
699 LogStream::Stderr => LogChannel::Stderr,
700 };
701 let keep = match channel {
702 LogChannel::Stdout => want_stdout,
703 LogChannel::Stderr => want_stderr,
704 LogChannel::Stdin => false,
705 };
706 if !keep {
707 return None;
708 }
709 let mut bytes = e.message.into_bytes();
710 bytes.push(b'\n');
711 Some(Ok(LogChunk {
712 stream: channel,
713 bytes: bytes::Bytes::from(bytes),
714 timestamp: timestamps.then_some(e.timestamp),
715 }))
716 })
717 .collect();
718
719 Box::pin(stream::iter(chunks))
720}
721
722fn one_shot_stats_stream(stats: &ContainerStats) -> StatsStream {
732 use futures_util::stream;
733
734 let sample = StatsSample {
735 cpu_total_ns: stats.cpu_usage_usec.saturating_mul(1_000),
736 cpu_system_ns: 0,
737 online_cpus: 1,
738 mem_used_bytes: stats.memory_bytes,
739 mem_limit_bytes: stats.memory_limit,
740 net_rx_bytes: 0,
741 net_tx_bytes: 0,
742 blkio_read_bytes: 0,
743 blkio_write_bytes: 0,
744 pids_current: 0,
745 pids_limit: None,
746 timestamp: chrono::Utc::now(),
747 };
748 Box::pin(stream::iter(vec![Ok(sample)]))
749}
750
751#[async_trait]
752impl Runtime for CompositeRuntime {
753 async fn pull_image(&self, image: &str) -> Result<()> {
754 if let Err(e) = self.primary.pull_image(image).await {
761 if matches!(e, AgentError::WrongPlatform { .. }) {
762 tracing::debug!(
763 image,
764 error = %e,
765 "primary runtime cannot service image (wrong platform); delegating",
766 );
767 } else {
768 return Err(e);
769 }
770 }
771 if let Some(delegate) = &self.delegate {
772 if let Err(e) = delegate.pull_image(image).await {
773 tracing::debug!(
778 image,
779 error = %e,
780 "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
781 );
782 }
783 }
784 for (label, rt) in [
792 self.vz.as_ref().map(|r| ("vz", r)),
793 self.vz_linux.as_ref().map(|r| ("vz_linux", r)),
794 ]
795 .into_iter()
796 .flatten()
797 {
798 if let Err(e) = rt.pull_image(image).await {
799 tracing::debug!(
800 image,
801 runtime = label,
802 error = %e,
803 "vz delegate failed to pull image (likely wrong OS); continuing",
804 );
805 }
806 }
807
808 let os_result = self.inspect_image_os(image).await;
812 self.apply_image_os_inspection(image, os_result).await;
813 let marker_result = self.inspect_image_runtime_marker(image, None).await;
814 self.apply_image_runtime_inspection(image, marker_result)
815 .await;
816
817 Ok(())
818 }
819
820 async fn pull_image_with_policy(
821 &self,
822 image: &str,
823 policy: PullPolicy,
824 auth: Option<&RegistryAuth>,
825 source: zlayer_spec::SourcePolicy,
826 ) -> Result<()> {
827 if let Err(e) = self
829 .primary
830 .pull_image_with_policy(image, policy, auth, source)
831 .await
832 {
833 if matches!(e, AgentError::WrongPlatform { .. }) {
834 tracing::debug!(
835 image,
836 error = %e,
837 "primary runtime cannot service image (wrong platform); delegating",
838 );
839 } else {
840 return Err(e);
841 }
842 }
843 if let Some(delegate) = &self.delegate {
844 if let Err(e) = delegate
845 .pull_image_with_policy(image, policy, auth, source)
846 .await
847 {
848 tracing::debug!(
849 image,
850 error = %e,
851 "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
852 );
853 }
854 }
855 for (label, rt) in [
859 self.vz.as_ref().map(|r| ("vz", r)),
860 self.vz_linux.as_ref().map(|r| ("vz_linux", r)),
861 ]
862 .into_iter()
863 .flatten()
864 {
865 if let Err(e) = rt.pull_image_with_policy(image, policy, auth, source).await {
866 tracing::debug!(
867 image,
868 runtime = label,
869 error = %e,
870 "vz delegate failed to pull image (likely wrong OS); continuing",
871 );
872 }
873 }
874
875 let os_result = self.inspect_image_os(image).await;
876 self.apply_image_os_inspection(image, os_result).await;
877 let marker_result = self.inspect_image_runtime_marker(image, auth).await;
878 self.apply_image_runtime_inspection(image, marker_result)
879 .await;
880
881 Ok(())
882 }
883
884 async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
885 let target = self.select_for(&id.service, spec).await?;
886 {
887 let mut dispatch = self.dispatch.write().await;
888 dispatch.insert(id.clone(), target);
889 }
890 let rt = self.runtime_for(target).clone();
891 match rt.create_container(id, spec).await {
892 Ok(()) => Ok(()),
893 Err(e) => {
894 self.dispatch.write().await.remove(id);
897 Err(e)
898 }
899 }
900 }
901
902 async fn start_container(&self, id: &ContainerId) -> Result<()> {
903 let rt = self.lookup(id).await?;
904 rt.start_container(id).await
905 }
906
907 async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
908 let rt = self.lookup(id).await?;
909 rt.stop_container(id, timeout).await
910 }
911
912 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
913 let rt = self.lookup(id).await?;
914 let res = rt.remove_container(id).await;
915 self.dispatch.write().await.remove(id);
916 res
917 }
918
919 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
920 let rt = self.lookup(id).await?;
921 rt.container_state(id).await
922 }
923
924 async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
925 let backends = self.read_backends(id).await?;
926 let mut misses = ReadMissAccumulator::default();
927 for (label, rt) in backends {
928 match rt.container_logs(id, tail).await {
929 Ok(logs) => return Ok(logs),
930 Err(e) => {
931 tracing::warn!(
932 container = %id,
933 runtime = label,
934 error = %e,
935 "composite container_logs: backend could not serve logs; trying next backend",
936 );
937 misses.record(e);
938 }
939 }
940 }
941 Err(misses.into_error("container_logs"))
942 }
943
944 async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
945 let rt = self.lookup(id).await?;
946 rt.exec(id, cmd).await
947 }
948
949 async fn exec_with_opts(
950 &self,
951 id: &ContainerId,
952 opts: &crate::runtime::ExecOptions,
953 ) -> Result<(i32, String, String)> {
954 let rt = self.lookup(id).await?;
959 rt.exec_with_opts(id, opts).await
960 }
961
962 async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
963 let rt = self.lookup(id).await?;
964 rt.exec_stream(id, cmd).await
965 }
966
967 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
968 let backends = self.read_backends(id).await?;
969 let mut misses = ReadMissAccumulator::default();
970 for (label, rt) in backends {
971 match rt.get_container_stats(id).await {
972 Ok(stats) => return Ok(stats),
973 Err(e) => {
974 tracing::warn!(
975 container = %id,
976 runtime = label,
977 error = %e,
978 "composite get_container_stats: backend could not serve stats; \
979 trying next backend",
980 );
981 misses.record(e);
982 }
983 }
984 }
985 Err(misses.into_error("get_container_stats"))
986 }
987
988 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
989 let rt = self.lookup(id).await?;
990 rt.wait_container(id).await
991 }
992
993 async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
994 let rt = self.lookup(id).await?;
995 rt.wait_outcome(id).await
996 }
997
998 async fn wait_outcome_with_condition(
999 &self,
1000 id: &ContainerId,
1001 condition: WaitCondition,
1002 ) -> Result<WaitOutcome> {
1003 let rt = self.lookup(id).await?;
1004 rt.wait_outcome_with_condition(id, condition).await
1005 }
1006
1007 async fn rename_container(&self, id: &ContainerId, new_name: &str) -> Result<()> {
1008 let rt = self.lookup(id).await?;
1009 rt.rename_container(id, new_name).await
1010 }
1011
1012 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
1013 let backends = self.read_backends(id).await?;
1014 let mut misses = ReadMissAccumulator::default();
1015 for (label, rt) in backends {
1016 match rt.get_logs(id).await {
1017 Ok(logs) => return Ok(logs),
1018 Err(e) => {
1019 tracing::warn!(
1020 container = %id,
1021 runtime = label,
1022 error = %e,
1023 "composite get_logs: backend could not serve logs; trying next backend",
1024 );
1025 misses.record(e);
1026 }
1027 }
1028 }
1029 Err(misses.into_error("get_logs"))
1030 }
1031
1032 async fn logs_stream(&self, id: &ContainerId, opts: LogsStreamOptions) -> Result<LogsStream> {
1033 let backends = self.read_backends(id).await?;
1044 let mut misses = ReadMissAccumulator::default();
1045 for (label, rt) in &backends {
1046 match rt.logs_stream(id, opts.clone()).await {
1047 Ok(stream) => return Ok(stream),
1048 Err(e) => {
1049 tracing::warn!(
1050 container = %id,
1051 runtime = label,
1052 error = %e,
1053 "composite logs_stream: backend has no native log stream; \
1054 falling back to a one-shot snapshot",
1055 );
1056 misses.record(e);
1057 }
1058 }
1059 }
1060
1061 let tail = opts
1064 .tail
1065 .map_or(1000, |n| usize::try_from(n).unwrap_or(1000));
1066 for (label, rt) in &backends {
1067 match rt.container_logs(id, tail).await {
1068 Ok(entries) => {
1069 return Ok(one_shot_logs_stream(entries, &opts));
1070 }
1071 Err(e) => {
1072 tracing::warn!(
1073 container = %id,
1074 runtime = label,
1075 error = %e,
1076 "composite logs_stream: backend snapshot fallback failed; trying next",
1077 );
1078 misses.record(e);
1079 }
1080 }
1081 }
1082 Err(misses.into_error("container logs"))
1083 }
1084
1085 async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
1086 let backends = self.read_backends(id).await?;
1092 let mut misses = ReadMissAccumulator::default();
1093 for (label, rt) in &backends {
1094 match rt.stats_stream(id).await {
1095 Ok(stream) => return Ok(stream),
1096 Err(e) => {
1097 tracing::warn!(
1098 container = %id,
1099 runtime = label,
1100 error = %e,
1101 "composite stats_stream: backend has no native stats stream; \
1102 falling back to a one-shot sample",
1103 );
1104 misses.record(e);
1105 }
1106 }
1107 }
1108
1109 for (label, rt) in &backends {
1110 match rt.get_container_stats(id).await {
1111 Ok(stats) => return Ok(one_shot_stats_stream(&stats)),
1112 Err(e) => {
1113 tracing::warn!(
1114 container = %id,
1115 runtime = label,
1116 error = %e,
1117 "composite stats_stream: backend sample fallback failed; trying next",
1118 );
1119 misses.record(e);
1120 }
1121 }
1122 }
1123 Err(misses.into_error("container stats"))
1124 }
1125
1126 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
1127 let rt = self.lookup(id).await?;
1128 rt.get_container_pid(id).await
1129 }
1130
1131 fn overlay_attach_kind(&self) -> OverlayAttachKind {
1132 self.vz_linux.as_ref().map_or_else(
1139 || self.primary.overlay_attach_kind(),
1140 |vz| vz.overlay_attach_kind(),
1141 )
1142 }
1143
1144 async fn push_overlay_config(
1145 &self,
1146 id: &ContainerId,
1147 config: &zlayer_types::overlayd::GuestOverlayConfig,
1148 ) -> Result<()> {
1149 let rt = self.lookup(id).await?;
1150 rt.push_overlay_config(id, config).await
1151 }
1152
1153 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
1154 let rt = self.lookup(id).await?;
1155 rt.get_container_ip(id).await
1156 }
1157
1158 async fn get_container_port_override(&self, id: &ContainerId) -> Result<Option<u16>> {
1159 let rt = self.lookup(id).await?;
1160 rt.get_container_port_override(id).await
1161 }
1162
1163 #[cfg(target_os = "windows")]
1164 async fn get_container_namespace_id(
1165 &self,
1166 id: &ContainerId,
1167 ) -> Result<Option<windows::core::GUID>> {
1168 let rt = self.lookup(id).await?;
1169 rt.get_container_namespace_id(id).await
1170 }
1171
1172 async fn sync_container_volumes(&self, id: &ContainerId) -> Result<()> {
1173 let rt = self.lookup(id).await?;
1174 rt.sync_container_volumes(id).await
1175 }
1176
1177 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
1178 let mut out: Vec<ImageInfo> = Vec::new();
1189 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1190 let mut any_ok = false;
1191 let mut last_err: Option<AgentError> = None;
1192
1193 for (label, rt) in [
1194 Some(("primary", &self.primary)),
1195 self.delegate.as_ref().map(|d| ("delegate", d)),
1196 self.vz.as_ref().map(|d| ("vz", d)),
1197 self.vz_linux.as_ref().map(|d| ("vz_linux", d)),
1198 ]
1199 .into_iter()
1200 .flatten()
1201 {
1202 match rt.list_images().await {
1203 Ok(images) => {
1204 any_ok = true;
1205 for img in images {
1206 if seen.insert(img.reference.clone()) {
1209 out.push(img);
1210 }
1211 }
1212 }
1213 Err(e) => {
1214 tracing::debug!(
1215 runtime = label,
1216 error = %e,
1217 "composite list_images: backend returned an error; skipping it",
1218 );
1219 last_err = Some(e);
1220 }
1221 }
1222 }
1223
1224 if any_ok {
1228 Ok(out)
1229 } else {
1230 Err(last_err.unwrap_or_else(|| {
1231 AgentError::Unsupported("no runtime implements list_images".into())
1232 }))
1233 }
1234 }
1235
1236 async fn remove_image(&self, image: &str, force: bool) -> Result<()> {
1237 match self.primary.remove_image(image, force).await {
1238 Ok(()) => Ok(()),
1239 Err(primary_err) => {
1240 if let Some(delegate) = &self.delegate {
1241 match delegate.remove_image(image, force).await {
1242 Ok(()) => Ok(()),
1243 Err(delegate_err) => {
1244 tracing::debug!(
1245 image,
1246 %delegate_err,
1247 "delegate remove_image also failed; returning primary error",
1248 );
1249 Err(primary_err)
1250 }
1251 }
1252 } else {
1253 Err(primary_err)
1254 }
1255 }
1256 }
1257 }
1258
1259 async fn prune_images(&self) -> Result<PruneResult> {
1260 let mut result = self.primary.prune_images().await?;
1261 if let Some(delegate) = &self.delegate {
1262 match delegate.prune_images().await {
1263 Ok(extra) => {
1264 result.deleted.extend(extra.deleted);
1265 result.space_reclaimed =
1266 result.space_reclaimed.saturating_add(extra.space_reclaimed);
1267 }
1268 Err(e) => tracing::warn!(
1269 error = %e,
1270 "delegate runtime prune_images failed; returning primary result only",
1271 ),
1272 }
1273 }
1274 Ok(result)
1275 }
1276
1277 async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
1278 let rt = self.lookup(id).await?;
1279 rt.kill_container(id, signal).await
1280 }
1281
1282 async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
1283 match self.primary.tag_image(source, target).await {
1284 Ok(()) => Ok(()),
1285 Err(primary_err) => {
1286 if let Some(delegate) = &self.delegate {
1287 match delegate.tag_image(source, target).await {
1288 Ok(()) => Ok(()),
1289 Err(delegate_err) => {
1290 tracing::debug!(
1291 source,
1292 target,
1293 %delegate_err,
1294 "delegate tag_image also failed; returning primary error",
1295 );
1296 Err(primary_err)
1297 }
1298 }
1299 } else {
1300 Err(primary_err)
1301 }
1302 }
1303 }
1304 }
1305
1306 async fn inspect_detailed(&self, id: &ContainerId) -> Result<ContainerInspectDetails> {
1307 let rt = self.lookup(id).await?;
1308 rt.inspect_detailed(id).await
1309 }
1310}
1311
1312#[cfg(test)]
1313mod tests {
1314 use super::*;
1315 use crate::cgroups_stats::ContainerStats;
1316 use std::sync::Mutex as StdMutex;
1317 use zlayer_spec::{ArchKind, DeploymentSpec, TargetPlatform};
1318
1319 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1322 enum Role {
1323 Primary,
1324 Delegate,
1325 Vz,
1326 VzLinux,
1327 }
1328
1329 type CallRecord = (Role, String, Option<ContainerId>);
1331 type CallLog = Arc<StdMutex<Vec<CallRecord>>>;
1333
1334 struct MockRuntime {
1341 role: Role,
1342 calls: CallLog,
1343 list_images_response: Vec<ImageInfo>,
1344 list_images_error: Option<String>,
1348 pull_image_error: Option<String>,
1349 pull_image_wrong_platform: Option<(&'static str, &'static str)>,
1354 stream_unsupported: bool,
1360 reads_not_found: bool,
1366 logs_response: Vec<LogEntry>,
1370 stats_snapshot_unsupported: bool,
1375 }
1376
1377 impl MockRuntime {
1378 fn new(role: Role, calls: CallLog) -> Self {
1379 Self {
1380 role,
1381 calls,
1382 list_images_response: Vec::new(),
1383 list_images_error: None,
1384 pull_image_error: None,
1385 pull_image_wrong_platform: None,
1386 stream_unsupported: false,
1387 reads_not_found: false,
1388 logs_response: Vec::new(),
1389 stats_snapshot_unsupported: false,
1390 }
1391 }
1392
1393 fn with_stream_unsupported(mut self) -> Self {
1395 self.stream_unsupported = true;
1396 self
1397 }
1398
1399 fn with_reads_not_found(mut self) -> Self {
1401 self.reads_not_found = true;
1402 self
1403 }
1404
1405 fn with_logs(mut self, logs: Vec<LogEntry>) -> Self {
1407 self.logs_response = logs;
1408 self
1409 }
1410
1411 fn with_stats_snapshot_unsupported(mut self) -> Self {
1413 self.stats_snapshot_unsupported = true;
1414 self
1415 }
1416
1417 fn build_wrong_platform_error(&self, image: &str) -> Option<AgentError> {
1418 self.pull_image_wrong_platform
1419 .map(|(expected, actual)| AgentError::WrongPlatform {
1420 runtime: match self.role {
1421 Role::Primary => "primary-mock".to_string(),
1422 Role::Delegate => "delegate-mock".to_string(),
1423 Role::Vz => "vz-mock".to_string(),
1424 Role::VzLinux => "vz-linux-mock".to_string(),
1425 },
1426 expected: expected.to_string(),
1427 actual: actual.to_string(),
1428 image: image.to_string(),
1429 })
1430 }
1431
1432 fn record(&self, method: &str, id: Option<&ContainerId>) {
1433 self.calls
1434 .lock()
1435 .expect("mock call-log mutex poisoned")
1436 .push((self.role, method.to_string(), id.cloned()));
1437 }
1438 }
1439
1440 #[async_trait]
1441 impl Runtime for MockRuntime {
1442 async fn pull_image(&self, image: &str) -> Result<()> {
1443 self.record("pull_image", None);
1444 if let Some(err) = self.build_wrong_platform_error(image) {
1445 return Err(err);
1446 }
1447 if let Some(msg) = &self.pull_image_error {
1448 return Err(AgentError::Internal(msg.clone()));
1449 }
1450 Ok(())
1451 }
1452
1453 async fn pull_image_with_policy(
1454 &self,
1455 image: &str,
1456 _policy: PullPolicy,
1457 _auth: Option<&RegistryAuth>,
1458 _source: zlayer_spec::SourcePolicy,
1459 ) -> Result<()> {
1460 self.record("pull_image_with_policy", None);
1461 if let Some(err) = self.build_wrong_platform_error(image) {
1462 return Err(err);
1463 }
1464 if let Some(msg) = &self.pull_image_error {
1465 return Err(AgentError::Internal(msg.clone()));
1466 }
1467 Ok(())
1468 }
1469
1470 async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
1471 self.record("create_container", Some(id));
1472 Ok(())
1473 }
1474
1475 async fn start_container(&self, id: &ContainerId) -> Result<()> {
1476 self.record("start_container", Some(id));
1477 Ok(())
1478 }
1479
1480 async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
1481 self.record("stop_container", Some(id));
1482 Ok(())
1483 }
1484
1485 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
1486 self.record("remove_container", Some(id));
1487 Ok(())
1488 }
1489
1490 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
1491 self.record("container_state", Some(id));
1492 Ok(ContainerState::Running)
1493 }
1494
1495 async fn container_logs(&self, id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
1496 self.record("container_logs", Some(id));
1497 if self.reads_not_found {
1498 return Err(mock_not_found());
1499 }
1500 Ok(self.logs_response.clone())
1501 }
1502
1503 async fn exec(&self, id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
1504 self.record("exec", Some(id));
1505 Ok((0, String::new(), String::new()))
1506 }
1507
1508 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
1509 self.record("get_container_stats", Some(id));
1510 if self.reads_not_found {
1511 return Err(mock_not_found());
1512 }
1513 if self.stats_snapshot_unsupported {
1514 return Err(AgentError::Unsupported("mock has no snapshot stats".into()));
1515 }
1516 Ok(ContainerStats {
1517 cpu_usage_usec: 1_000,
1518 memory_bytes: 4096,
1519 memory_limit: 8192,
1520 timestamp: std::time::Instant::now(),
1521 })
1522 }
1523
1524 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
1525 self.record("wait_container", Some(id));
1526 Ok(0)
1527 }
1528
1529 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
1530 self.record("get_logs", Some(id));
1531 if self.reads_not_found {
1532 return Err(mock_not_found());
1533 }
1534 Ok(self.logs_response.clone())
1535 }
1536
1537 async fn logs_stream(
1538 &self,
1539 id: &ContainerId,
1540 _opts: LogsStreamOptions,
1541 ) -> Result<LogsStream> {
1542 self.record("logs_stream", Some(id));
1543 if self.reads_not_found {
1544 return Err(mock_not_found());
1545 }
1546 if self.stream_unsupported {
1547 return Err(AgentError::Unsupported("mock has no log stream".into()));
1548 }
1549 Ok(one_shot_logs_stream(
1551 self.logs_response.clone(),
1552 &LogsStreamOptions::default(),
1553 ))
1554 }
1555
1556 async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
1557 use futures_util::stream;
1558 self.record("stats_stream", Some(id));
1559 if self.reads_not_found {
1560 return Err(mock_not_found());
1561 }
1562 if self.stream_unsupported {
1563 return Err(AgentError::Unsupported("mock has no stats stream".into()));
1564 }
1565 Ok(Box::pin(stream::iter(vec![Ok(StatsSample {
1566 cpu_total_ns: 0,
1567 cpu_system_ns: 0,
1568 online_cpus: 1,
1569 mem_used_bytes: 4096,
1570 mem_limit_bytes: 8192,
1571 net_rx_bytes: 0,
1572 net_tx_bytes: 0,
1573 blkio_read_bytes: 0,
1574 blkio_write_bytes: 0,
1575 pids_current: 0,
1576 pids_limit: None,
1577 timestamp: chrono::Utc::now(),
1578 })])))
1579 }
1580
1581 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
1582 self.record("get_container_pid", Some(id));
1583 Ok(None)
1584 }
1585
1586 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
1587 self.record("get_container_ip", Some(id));
1588 Ok(None)
1589 }
1590
1591 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
1592 self.record("list_images", None);
1593 if let Some(msg) = &self.list_images_error {
1594 return Err(AgentError::Unsupported(msg.clone()));
1595 }
1596 Ok(self.list_images_response.clone())
1597 }
1598 }
1599
1600 fn make_spec(image: &str, platform: Option<TargetPlatform>) -> ServiceSpec {
1604 let yaml = format!(
1605 r"
1606version: v1
1607deployment: test
1608services:
1609 test:
1610 rtype: service
1611 image:
1612 name: {image}
1613 endpoints:
1614 - name: http
1615 protocol: http
1616 port: 8080
1617"
1618 );
1619 let mut spec = serde_yaml::from_str::<DeploymentSpec>(&yaml)
1620 .expect("valid deployment yaml")
1621 .services
1622 .remove("test")
1623 .expect("service 'test' present");
1624 spec.platform = platform;
1625 spec
1626 }
1627
1628 fn cid(service: &str, replica: u32) -> ContainerId {
1629 ContainerId::new(service.to_string(), replica)
1630 }
1631
1632 fn make_composite(with_delegate: bool) -> (CompositeRuntime, CallLog) {
1633 let calls = Arc::new(StdMutex::new(Vec::new()));
1634 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
1635 let delegate = if with_delegate {
1636 Some(Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>)
1637 } else {
1638 None
1639 };
1640 (
1641 CompositeRuntime::new(primary as Arc<dyn Runtime>, delegate),
1642 calls,
1643 )
1644 }
1645
1646 fn role_for(calls: &[CallRecord], method: &str) -> Option<Role> {
1647 calls
1648 .iter()
1649 .find(|(_, m, _)| m == method)
1650 .map(|(role, _, _)| *role)
1651 }
1652
1653 fn mock_not_found() -> AgentError {
1655 AgentError::NotFound {
1656 container: "mock".to_string(),
1657 reason: "mock backend does not own this container".to_string(),
1658 }
1659 }
1660
1661 #[tokio::test]
1662 async fn dispatch_windows_spec_goes_to_primary() {
1663 let (rt, calls) = make_composite(true);
1664 let id = cid("win-svc", 0);
1665 let spec = make_spec(
1666 "mcr.microsoft.com/windows/nanoserver:ltsc2022",
1667 Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
1668 );
1669
1670 rt.create_container(&id, &spec).await.unwrap();
1671 rt.start_container(&id).await.unwrap();
1672
1673 let calls = calls.lock().unwrap();
1674 assert_eq!(
1675 role_for(&calls, "create_container"),
1676 Some(Role::Primary),
1677 "create_container should hit primary for Windows spec"
1678 );
1679 assert_eq!(
1680 role_for(&calls, "start_container"),
1681 Some(Role::Primary),
1682 "start_container should hit primary for Windows spec"
1683 );
1684 }
1685
1686 #[tokio::test]
1687 async fn dispatch_linux_spec_goes_to_delegate() {
1688 let (rt, calls) = make_composite(true);
1689 let id = cid("lin-svc", 0);
1690 let spec = make_spec(
1691 "docker.io/library/alpine:3.19",
1692 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
1693 );
1694
1695 rt.create_container(&id, &spec).await.unwrap();
1696 rt.start_container(&id).await.unwrap();
1697
1698 let calls = calls.lock().unwrap();
1699 assert_eq!(
1700 role_for(&calls, "create_container"),
1701 Some(Role::Delegate),
1702 "create_container should hit delegate for Linux spec"
1703 );
1704 assert_eq!(
1705 role_for(&calls, "start_container"),
1706 Some(Role::Delegate),
1707 "start_container should hit delegate for Linux spec"
1708 );
1709 }
1710
1711 #[tokio::test]
1712 async fn dispatch_linux_without_delegate_errors() {
1713 let (rt, _calls) = make_composite(false);
1717 let id = cid("lin-svc", 0);
1718 let spec = make_spec(
1719 "docker.io/library/alpine:3.19",
1720 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
1721 );
1722
1723 let err = rt.create_container(&id, &spec).await.unwrap_err();
1724 match err {
1725 AgentError::RouteToPeer {
1726 service,
1727 required_os,
1728 reason,
1729 } => {
1730 assert_eq!(service, "lin-svc");
1731 assert_eq!(required_os, "linux");
1732 assert!(
1733 reason.contains("--install-wsl") && reason.contains("Linux peer"),
1734 "reason must name both remediations, got: {reason}"
1735 );
1736 }
1737 other => panic!("expected RouteToPeer, got {other:?}"),
1738 }
1739 }
1740
1741 #[tokio::test]
1742 async fn dispatch_linux_image_cache_without_delegate_routes_to_peer() {
1743 let (rt, _calls) = make_composite(false);
1748 let id = cid("svc", 0);
1749 let image = "docker.io/library/nginx:1.25";
1750 rt.record_image_os(image, OsKind::Linux).await;
1751
1752 let spec = make_spec(image, None);
1753 let err = rt.create_container(&id, &spec).await.unwrap_err();
1754 match err {
1755 AgentError::RouteToPeer {
1756 service,
1757 required_os,
1758 reason,
1759 } => {
1760 assert_eq!(service, "svc");
1761 assert_eq!(required_os, "linux");
1762 assert!(
1763 reason.contains(image),
1764 "reason should mention the image name, got: {reason}"
1765 );
1766 assert!(
1767 reason.contains("--install-wsl") && reason.contains("Linux peer"),
1768 "reason must name both remediations, got: {reason}"
1769 );
1770 }
1771 other => panic!("expected RouteToPeer, got {other:?}"),
1772 }
1773 }
1774
1775 #[tokio::test]
1776 async fn dispatch_macos_spec_goes_to_primary() {
1777 let (rt, calls) = make_composite(true);
1778 let id = cid("mac-svc", 0);
1779 let spec = make_spec(
1780 "ghcr.io/zlayer/macos:latest",
1781 Some(TargetPlatform::new(OsKind::Macos, ArchKind::Arm64)),
1782 );
1783
1784 rt.create_container(&id, &spec).await.unwrap();
1785
1786 let calls = calls.lock().unwrap();
1787 assert_eq!(
1788 role_for(&calls, "create_container"),
1789 Some(Role::Primary),
1790 "create_container should hit primary for Macos spec"
1791 );
1792 }
1793
1794 #[tokio::test]
1795 async fn dispatch_no_platform_no_image_os_falls_through_to_primary() {
1796 let (rt, calls) = make_composite(true);
1797 let id = cid("svc", 0);
1798 let spec = make_spec("docker.io/library/nginx:1.25", None);
1799
1800 rt.create_container(&id, &spec).await.unwrap();
1801
1802 let calls = calls.lock().unwrap();
1803 assert_eq!(
1804 role_for(&calls, "create_container"),
1805 Some(Role::Primary),
1806 "fall-through should pick primary when both platform and image-OS cache are unknown"
1807 );
1808 }
1809
1810 #[tokio::test]
1811 async fn dispatch_uses_image_os_cache_when_platform_missing() {
1812 let (rt, calls) = make_composite(true);
1813 let id = cid("svc", 0);
1814 let image = "docker.io/library/nginx:1.25";
1815 rt.record_image_os(image, OsKind::Linux).await;
1816
1817 let spec = make_spec(image, None);
1818 rt.create_container(&id, &spec).await.unwrap();
1819
1820 let calls = calls.lock().unwrap();
1821 assert_eq!(
1822 role_for(&calls, "create_container"),
1823 Some(Role::Delegate),
1824 "image-OS cache should route Linux images to the delegate"
1825 );
1826 }
1827
1828 fn make_composite_with_vz() -> (CompositeRuntime, CallLog) {
1831 let calls = Arc::new(StdMutex::new(Vec::new()));
1832 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
1833 let delegate =
1834 Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>;
1835 let vz = Arc::new(MockRuntime::new(Role::Vz, Arc::clone(&calls))) as Arc<dyn Runtime>;
1836 let rt = CompositeRuntime::new(primary as Arc<dyn Runtime>, Some(delegate))
1837 .with_vz_delegate(Some(vz));
1838 (rt, calls)
1839 }
1840
1841 #[tokio::test]
1842 async fn dispatch_vz_bundle_annotation_auto_routes_to_vz() {
1843 let (rt, calls) = make_composite_with_vz();
1844 let id = cid("mac-svc", 0);
1845 let image = "ghcr.io/org/macos-vz:sequoia";
1846 rt.record_image_runtime(image, "vz".to_string()).await;
1848
1849 let spec = make_spec(image, None);
1850 rt.create_container(&id, &spec).await.unwrap();
1851
1852 let calls = calls.lock().unwrap();
1853 assert_eq!(
1854 role_for(&calls, "create_container"),
1855 Some(Role::Vz),
1856 "a com.zlayer.runtime=vz bundle should auto-route to the VZ runtime"
1857 );
1858 }
1859
1860 #[tokio::test]
1861 async fn dispatch_vz_label_forces_vz() {
1862 let (rt, calls) = make_composite_with_vz();
1863 let id = cid("mac-svc", 0);
1864 let mut spec = make_spec("ghcr.io/org/whatever:1", None);
1865 spec.labels
1866 .insert("com.zlayer.isolation".to_string(), "vz".to_string());
1867
1868 rt.create_container(&id, &spec).await.unwrap();
1869
1870 let calls = calls.lock().unwrap();
1871 assert_eq!(
1872 role_for(&calls, "create_container"),
1873 Some(Role::Vz),
1874 "an explicit com.zlayer.isolation=vz label should force the VZ runtime"
1875 );
1876 }
1877
1878 #[tokio::test]
1879 async fn dispatch_sandbox_label_overrides_vz_bundle() {
1880 let (rt, calls) = make_composite_with_vz();
1881 let id = cid("mac-svc", 0);
1882 let image = "ghcr.io/org/macos-vz:sequoia";
1883 rt.record_image_runtime(image, "vz".to_string()).await;
1884
1885 let mut spec = make_spec(image, None);
1886 spec.labels
1887 .insert("com.zlayer.isolation".to_string(), "sandbox".to_string());
1888 rt.create_container(&id, &spec).await.unwrap();
1889
1890 let calls = calls.lock().unwrap();
1891 assert_eq!(
1892 role_for(&calls, "create_container"),
1893 Some(Role::Primary),
1894 "com.zlayer.isolation=sandbox should opt out of VZ auto-detect (force the sandbox)"
1895 );
1896 }
1897
1898 fn make_composite_with_vz_linux() -> (CompositeRuntime, CallLog) {
1901 let calls = Arc::new(StdMutex::new(Vec::new()));
1902 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
1903 let delegate =
1904 Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>;
1905 let vz_linux =
1906 Arc::new(MockRuntime::new(Role::VzLinux, Arc::clone(&calls))) as Arc<dyn Runtime>;
1907 let rt = CompositeRuntime::new(primary as Arc<dyn Runtime>, Some(delegate))
1908 .with_vz_linux_delegate(Some(vz_linux));
1909 (rt, calls)
1910 }
1911
1912 #[tokio::test]
1913 async fn dispatch_vz_linux_label_forces_vz_linux() {
1914 let (rt, calls) = make_composite_with_vz_linux();
1915 let id = cid("lin-svc", 0);
1916 let mut spec = make_spec("docker.io/library/alpine:3.19", None);
1917 spec.labels
1918 .insert("com.zlayer.isolation".to_string(), "vz-linux".to_string());
1919
1920 rt.create_container(&id, &spec).await.unwrap();
1921
1922 let calls = calls.lock().unwrap();
1923 assert_eq!(
1924 role_for(&calls, "create_container"),
1925 Some(Role::VzLinux),
1926 "com.zlayer.isolation=vz-linux must force the VZ Linux runtime"
1927 );
1928 }
1929
1930 #[tokio::test]
1931 async fn dispatch_vz_linux_marker_auto_routes_to_vz_linux() {
1932 let (rt, calls) = make_composite_with_vz_linux();
1933 let id = cid("lin-svc", 0);
1934 let image = "ghcr.io/org/linux-vz:bookworm";
1935 rt.record_image_runtime(image, "vz-linux".to_string()).await;
1936
1937 let spec = make_spec(image, None);
1938 rt.create_container(&id, &spec).await.unwrap();
1939
1940 let calls = calls.lock().unwrap();
1941 assert_eq!(
1942 role_for(&calls, "create_container"),
1943 Some(Role::VzLinux),
1944 "a com.zlayer.runtime=vz-linux marker should auto-route to the VZ Linux runtime"
1945 );
1946 }
1947
1948 #[tokio::test]
1949 async fn dispatch_linux_platform_with_vz_linux_routes_to_vz_linux() {
1950 let (rt, calls) = make_composite_with_vz_linux();
1951 let id = cid("lin-svc", 0);
1952 let spec = make_spec(
1955 "docker.io/library/alpine:3.19",
1956 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Arm64)),
1957 );
1958
1959 rt.create_container(&id, &spec).await.unwrap();
1960
1961 let calls = calls.lock().unwrap();
1962 assert_eq!(
1963 role_for(&calls, "create_container"),
1964 Some(Role::VzLinux),
1965 "a Linux platform spec must default to the VZ Linux runtime when present"
1966 );
1967 }
1968
1969 #[tokio::test]
1970 async fn dispatch_linux_image_os_with_vz_linux_routes_to_vz_linux() {
1971 let (rt, calls) = make_composite_with_vz_linux();
1972 let id = cid("lin-svc", 0);
1973 let image = "docker.io/library/nginx:1.25";
1974 rt.record_image_os(image, OsKind::Linux).await;
1975
1976 let spec = make_spec(image, None);
1977 rt.create_container(&id, &spec).await.unwrap();
1978
1979 let calls = calls.lock().unwrap();
1980 assert_eq!(
1981 role_for(&calls, "create_container"),
1982 Some(Role::VzLinux),
1983 "a Linux image-OS cache hit must default to the VZ Linux runtime when present"
1984 );
1985 }
1986
1987 #[tokio::test]
1988 async fn dispatch_macos_image_os_with_vz_linux_routes_to_primary() {
1989 let (rt, calls) = make_composite_with_vz_linux();
1993 let id = cid("mac-svc", 0);
1994 let image = "ghcr.io/zlayer/macos-native:latest";
1995 rt.record_image_os(image, OsKind::Macos).await;
1996
1997 let spec = make_spec(image, None);
1998 rt.create_container(&id, &spec).await.unwrap();
1999
2000 let calls = calls.lock().unwrap();
2001 assert_eq!(
2002 role_for(&calls, "create_container"),
2003 Some(Role::Primary),
2004 "image_os == Macos must route to primary even when VZ-Linux is the default",
2005 );
2006 }
2007
2008 #[tokio::test]
2009 async fn dispatch_unknown_os_with_vz_linux_defaults_to_vz_linux() {
2010 let (rt, calls) = make_composite_with_vz_linux();
2016 let id = cid("svc", 0);
2017 let spec = make_spec("docker.io/library/whatever:latest", None);
2018
2019 rt.create_container(&id, &spec).await.unwrap();
2020
2021 let calls = calls.lock().unwrap();
2022 assert_eq!(
2023 role_for(&calls, "create_container"),
2024 Some(Role::VzLinux),
2025 "an unknown-OS image must default to VZ-Linux when the delegate is present",
2026 );
2027 }
2028
2029 #[tokio::test]
2030 async fn dispatch_unknown_os_without_vz_linux_falls_through_to_primary() {
2031 let (rt, calls) = make_composite(true);
2035 let id = cid("svc", 0);
2036 let spec = make_spec("docker.io/library/whatever:latest", None);
2037
2038 rt.create_container(&id, &spec).await.unwrap();
2039
2040 let calls = calls.lock().unwrap();
2041 assert_eq!(
2042 role_for(&calls, "create_container"),
2043 Some(Role::Primary),
2044 "without a VZ-Linux delegate an unknown-OS image keeps the primary fallthrough",
2045 );
2046 }
2047
2048 async fn seed_persistent_linux_cache(path: &std::path::Path, image: &str) {
2052 seed_persistent_cache_with_os(path, image, "linux").await;
2053 }
2054
2055 async fn seed_persistent_cache_with_os(path: &std::path::Path, image: &str, os: &str) {
2058 let cache = zlayer_registry::CacheType::persistent_at(path)
2059 .build()
2060 .await
2061 .expect("open persistent blob cache");
2062
2063 let config_json = serde_json::json!({
2064 "architecture": "arm64",
2065 "os": os,
2066 "config": {},
2067 });
2068 let config_bytes = serde_json::to_vec(&config_json).unwrap();
2069 let config_digest = zlayer_registry::compute_digest(&config_bytes);
2070 cache.put(&config_digest, &config_bytes).await.unwrap();
2071
2072 let manifest = zlayer_registry::OciImageManifest {
2073 schema_version: 2,
2074 media_type: Some("application/vnd.oci.image.manifest.v1+json".to_string()),
2075 artifact_type: None,
2076 config: oci_client::manifest::OciDescriptor {
2077 media_type: "application/vnd.oci.image.config.v1+json".to_string(),
2078 digest: config_digest.clone(),
2079 size: i64::try_from(config_bytes.len()).unwrap(),
2080 urls: None,
2081 annotations: None,
2082 },
2083 layers: vec![],
2084 annotations: None,
2085 subject: None,
2086 };
2087 let manifest_bytes = serde_json::to_vec(&manifest).unwrap();
2088 let manifest_digest = zlayer_registry::compute_digest(&manifest_bytes);
2089 cache
2090 .put(&zlayer_registry::manifest_cache_key(image), &manifest_bytes)
2091 .await
2092 .unwrap();
2093 cache
2094 .put(
2095 &zlayer_registry::manifest_digest_cache_key(image),
2096 manifest_digest.as_bytes(),
2097 )
2098 .await
2099 .unwrap();
2100 }
2101
2102 #[tokio::test]
2108 async fn pull_then_dispatch_resolves_linux_os_from_local_cache_routes_to_vz_linux() {
2109 let tmp = tempfile::tempdir().unwrap();
2110 let cache_path = tmp.path().join("blobs.redb");
2111 let image = "docker.io/library/alpine:latest";
2112 seed_persistent_linux_cache(&cache_path, image).await;
2113
2114 let (rt, calls) = make_composite_with_vz_linux();
2115 let rt = rt.with_os_inspect_cache_path(Some(cache_path));
2116
2117 rt.pull_image(image).await.unwrap();
2119
2120 assert_eq!(
2122 rt.image_os.read().await.get(image).copied(),
2123 Some(OsKind::Linux),
2124 "pull_image must resolve Linux OS from the local persistent cache",
2125 );
2126
2127 let id = cid("lin-svc", 0);
2129 let spec = make_spec(image, None);
2130 rt.create_container(&id, &spec).await.unwrap();
2131
2132 let calls = calls.lock().unwrap();
2133 assert_eq!(
2134 role_for(&calls, "create_container"),
2135 Some(Role::VzLinux),
2136 "a Linux image whose OS came from the local cache must route to VZ-Linux",
2137 );
2138 }
2139
2140 #[tokio::test]
2147 async fn bare_ref_spec_resolves_os_from_qualified_seeded_cache_routes_to_vz_linux() {
2148 let tmp = tempfile::tempdir().unwrap();
2149 let cache_path = tmp.path().join("blobs.redb");
2150 seed_persistent_linux_cache(&cache_path, "docker.io/library/alpine:latest").await;
2152
2153 let (rt, calls) = make_composite_with_vz_linux();
2154 let rt = rt.with_os_inspect_cache_paths(vec![cache_path]);
2155
2156 let bare = "alpine:latest";
2159 rt.pull_image(bare).await.unwrap();
2160
2161 assert_eq!(
2162 rt.image_os.read().await.get(bare).copied(),
2163 Some(OsKind::Linux),
2164 "bare-ref inspect must resolve Linux from the qualified-seeded cache",
2165 );
2166
2167 let id = cid("lin-svc", 0);
2168 let spec = make_spec(bare, None);
2169 rt.create_container(&id, &spec).await.unwrap();
2170
2171 let calls = calls.lock().unwrap();
2172 assert_eq!(
2173 role_for(&calls, "create_container"),
2174 Some(Role::VzLinux),
2175 "bare-ref Linux image routes to VZ-Linux via the canonical-key cache hit",
2176 );
2177 }
2178
2179 #[tokio::test]
2185 async fn os_resolves_from_second_cache_when_first_is_empty() {
2186 let tmp = tempfile::tempdir().unwrap();
2187 let empty_cache = tmp.path().join("vz-linux-blobs.redb");
2188 let primary_cache = tmp.path().join("primary-blobs.redb");
2189 zlayer_registry::CacheType::persistent_at(&empty_cache)
2191 .build()
2192 .await
2193 .unwrap();
2194 seed_persistent_linux_cache(&primary_cache, "docker.io/library/alpine:latest").await;
2196
2197 let (rt, calls) = make_composite_with_vz_linux();
2198 let rt = rt.with_os_inspect_cache_paths(vec![empty_cache, primary_cache]);
2199
2200 let bare = "alpine:latest";
2201 rt.pull_image(bare).await.unwrap();
2202
2203 assert_eq!(
2204 rt.image_os.read().await.get(bare).copied(),
2205 Some(OsKind::Linux),
2206 "OS must resolve from the second cache after the first misses (no network)",
2207 );
2208
2209 let id = cid("lin-svc", 0);
2210 let spec = make_spec(bare, None);
2211 rt.create_container(&id, &spec).await.unwrap();
2212
2213 let calls = calls.lock().unwrap();
2214 assert_eq!(role_for(&calls, "create_container"), Some(Role::VzLinux),);
2215 }
2216
2217 #[tokio::test]
2229 async fn pull_with_network_429_still_dispatches_via_local_cache() {
2230 let tmp = tempfile::tempdir().unwrap();
2231 let cache_path = tmp.path().join("blobs.redb");
2232 let image = "registry.invalid.example/library/alpine:latest";
2235 seed_persistent_linux_cache(&cache_path, image).await;
2236
2237 let (rt, calls) = make_composite_with_vz_linux();
2238 let rt = rt.with_os_inspect_cache_path(Some(cache_path));
2239
2240 rt.pull_image(image).await.unwrap();
2244 assert_eq!(
2245 rt.image_os.read().await.get(image).copied(),
2246 Some(OsKind::Linux),
2247 "OS must be resolved from the local cache with no network call",
2248 );
2249
2250 let id = cid("lin-svc", 0);
2252 let spec = make_spec(image, None);
2253 rt.create_container(&id, &spec).await.unwrap();
2254
2255 let calls = calls.lock().unwrap();
2256 assert_eq!(
2257 role_for(&calls, "create_container"),
2258 Some(Role::VzLinux),
2259 "a would-be-429 pull must still route the cached Linux image to VZ-Linux",
2260 );
2261 }
2262
2263 #[tokio::test]
2268 async fn pull_then_dispatch_resolves_macos_os_from_local_cache_routes_to_primary() {
2269 let tmp = tempfile::tempdir().unwrap();
2270 let cache_path = tmp.path().join("blobs.redb");
2271 let image = "ghcr.io/zlayer/macos-native:latest";
2272 seed_persistent_cache_with_os(&cache_path, image, "darwin").await;
2273
2274 let (rt, calls) = make_composite_with_vz_linux();
2275 let rt = rt.with_os_inspect_cache_path(Some(cache_path));
2276
2277 rt.pull_image(image).await.unwrap();
2278 assert_eq!(
2279 rt.image_os.read().await.get(image).copied(),
2280 Some(OsKind::Macos),
2281 "pull_image must resolve macOS OS from the local persistent cache",
2282 );
2283
2284 let id = cid("mac-svc", 0);
2285 let spec = make_spec(image, None);
2286 rt.create_container(&id, &spec).await.unwrap();
2287
2288 let calls = calls.lock().unwrap();
2289 assert_eq!(
2290 role_for(&calls, "create_container"),
2291 Some(Role::Primary),
2292 "a macOS-native rootfs must route to primary even with VZ-Linux as default",
2293 );
2294 }
2295
2296 #[tokio::test]
2297 async fn dispatch_vm_label_forces_libkrun_delegate() {
2298 let (rt, calls) = make_composite_with_vz_linux();
2299 let id = cid("lin-svc", 0);
2300 let mut spec = make_spec(
2303 "docker.io/library/alpine:3.19",
2304 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Arm64)),
2305 );
2306 spec.labels
2307 .insert("com.zlayer.isolation".to_string(), "vm".to_string());
2308
2309 rt.create_container(&id, &spec).await.unwrap();
2310
2311 let calls = calls.lock().unwrap();
2312 assert_eq!(
2313 role_for(&calls, "create_container"),
2314 Some(Role::Delegate),
2315 "com.zlayer.isolation=vm must force the libkrun delegate even when VZ Linux is default"
2316 );
2317 }
2318
2319 #[tokio::test]
2320 async fn dispatch_unmarked_image_with_vz_delegate_falls_through_to_primary() {
2321 let (rt, calls) = make_composite_with_vz();
2322 let id = cid("mac-svc", 0);
2323 let spec = make_spec("ghcr.io/org/plain:1", None);
2326 rt.create_container(&id, &spec).await.unwrap();
2327
2328 let calls = calls.lock().unwrap();
2329 assert_eq!(
2330 role_for(&calls, "create_container"),
2331 Some(Role::Primary),
2332 "an unmarked image must fall through to primary even when a VZ delegate is attached"
2333 );
2334 }
2335
2336 #[tokio::test]
2337 async fn per_container_dispatch_cache_persists_through_start_stop() {
2338 let (rt, calls) = make_composite(true);
2339 let id = cid("win-svc", 0);
2340 let spec = make_spec(
2341 "mcr.microsoft.com/windows/nanoserver:ltsc2022",
2342 Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
2343 );
2344
2345 rt.create_container(&id, &spec).await.unwrap();
2346 rt.start_container(&id).await.unwrap();
2347 rt.stop_container(&id, Duration::from_secs(1))
2348 .await
2349 .unwrap();
2350 rt.remove_container(&id).await.unwrap();
2351
2352 let recorded = calls.lock().unwrap().clone();
2353 for method in [
2354 "create_container",
2355 "start_container",
2356 "stop_container",
2357 "remove_container",
2358 ] {
2359 assert_eq!(
2360 role_for(&recorded, method),
2361 Some(Role::Primary),
2362 "{method} should have dispatched to primary"
2363 );
2364 }
2365
2366 let after = rt
2368 .start_container(&id)
2369 .await
2370 .expect_err("lookup after remove should fail");
2371 assert!(
2372 matches!(after, AgentError::NotFound { .. }),
2373 "expected NotFound after remove, got {after:?}"
2374 );
2375 }
2376
2377 #[tokio::test]
2378 async fn pull_image_calls_both_runtimes() {
2379 let (rt, calls) = make_composite(true);
2380 rt.pull_image("docker.io/library/alpine:3.19")
2381 .await
2382 .unwrap();
2383
2384 let recorded = calls.lock().unwrap();
2385 let pull_calls: Vec<Role> = recorded
2386 .iter()
2387 .filter(|(_, m, _)| m == "pull_image")
2388 .map(|(r, _, _)| *r)
2389 .collect();
2390 assert!(
2391 pull_calls.contains(&Role::Primary),
2392 "primary should have been pulled: {pull_calls:?}",
2393 );
2394 assert!(
2395 pull_calls.contains(&Role::Delegate),
2396 "delegate should have been pulled: {pull_calls:?}",
2397 );
2398 }
2399
2400 #[tokio::test]
2401 async fn pull_image_delegate_error_does_not_fail() {
2402 let calls = Arc::new(StdMutex::new(Vec::new()));
2405 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
2406 let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2407 delegate.pull_image_error = Some("simulated delegate pull failure".to_string());
2408 let rt = CompositeRuntime::new(
2409 primary as Arc<dyn Runtime>,
2410 Some(Arc::new(delegate) as Arc<dyn Runtime>),
2411 );
2412
2413 rt.pull_image("docker.io/library/alpine:3.19")
2415 .await
2416 .unwrap();
2417
2418 let recorded = calls.lock().unwrap();
2419 let pull_calls: Vec<Role> = recorded
2420 .iter()
2421 .filter(|(_, m, _)| m == "pull_image")
2422 .map(|(r, _, _)| *r)
2423 .collect();
2424 assert!(
2425 pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
2426 "both runtimes should have been called: {pull_calls:?}",
2427 );
2428 }
2429
2430 #[tokio::test]
2431 async fn pull_image_primary_wrong_platform_does_not_fail() {
2432 let calls = Arc::new(StdMutex::new(Vec::new()));
2438 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2439 primary.pull_image_wrong_platform = Some(("windows", "linux"));
2440 let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2441 let rt = CompositeRuntime::new(
2442 Arc::new(primary) as Arc<dyn Runtime>,
2443 Some(Arc::new(delegate) as Arc<dyn Runtime>),
2444 );
2445
2446 rt.pull_image("docker.io/library/alpine:3.19")
2448 .await
2449 .expect("composite pull must tolerate WrongPlatform from primary");
2450
2451 let recorded = calls.lock().unwrap();
2452 let pull_calls: Vec<Role> = recorded
2453 .iter()
2454 .filter(|(_, m, _)| m == "pull_image")
2455 .map(|(r, _, _)| *r)
2456 .collect();
2457 assert!(
2458 pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
2459 "delegate must still be called when primary soft-skips: {pull_calls:?}",
2460 );
2461 }
2462
2463 #[tokio::test]
2464 async fn pull_image_with_policy_primary_wrong_platform_does_not_fail() {
2465 let calls = Arc::new(StdMutex::new(Vec::new()));
2470 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2471 primary.pull_image_wrong_platform = Some(("windows", "linux"));
2472 let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2473 let rt = CompositeRuntime::new(
2474 Arc::new(primary) as Arc<dyn Runtime>,
2475 Some(Arc::new(delegate) as Arc<dyn Runtime>),
2476 );
2477
2478 rt.pull_image_with_policy(
2479 "docker.io/library/alpine:3.19",
2480 PullPolicy::IfNotPresent,
2481 None,
2482 zlayer_spec::SourcePolicy::default(),
2483 )
2484 .await
2485 .expect("composite pull_image_with_policy must tolerate WrongPlatform from primary");
2486
2487 let recorded = calls.lock().unwrap();
2488 let pull_calls: Vec<Role> = recorded
2489 .iter()
2490 .filter(|(_, m, _)| m == "pull_image_with_policy")
2491 .map(|(r, _, _)| *r)
2492 .collect();
2493 assert!(
2494 pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
2495 "delegate must still be called when primary soft-skips: {pull_calls:?}",
2496 );
2497 }
2498
2499 #[tokio::test]
2500 async fn pull_image_primary_non_wrong_platform_error_still_fails() {
2501 let calls = Arc::new(StdMutex::new(Vec::new()));
2505 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2506 primary.pull_image_error = Some("simulated real failure".to_string());
2507 let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2508 let rt = CompositeRuntime::new(
2509 Arc::new(primary) as Arc<dyn Runtime>,
2510 Some(Arc::new(delegate) as Arc<dyn Runtime>),
2511 );
2512
2513 let err = rt
2514 .pull_image("docker.io/library/alpine:3.19")
2515 .await
2516 .expect_err("real primary error must propagate");
2517 assert!(
2518 matches!(err, AgentError::Internal(_)),
2519 "expected Internal, got {err:?}",
2520 );
2521 }
2522
2523 #[tokio::test]
2524 async fn list_images_merges_both() {
2525 let calls = Arc::new(StdMutex::new(Vec::new()));
2527 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2528 primary.list_images_response = vec![ImageInfo {
2529 reference: "primary/image:1".to_string(),
2530 digest: None,
2531 size_bytes: None,
2532 }];
2533 let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2534 delegate.list_images_response = vec![ImageInfo {
2535 reference: "delegate/image:1".to_string(),
2536 digest: None,
2537 size_bytes: None,
2538 }];
2539 let rt = CompositeRuntime::new(
2540 Arc::new(primary) as Arc<dyn Runtime>,
2541 Some(Arc::new(delegate) as Arc<dyn Runtime>),
2542 );
2543
2544 let merged = rt.list_images().await.unwrap();
2545 let refs: Vec<&str> = merged.iter().map(|i| i.reference.as_str()).collect();
2546 assert!(
2547 refs.contains(&"primary/image:1") && refs.contains(&"delegate/image:1"),
2548 "merged list should contain both entries, got {refs:?}",
2549 );
2550 }
2551
2552 #[tokio::test]
2560 async fn list_images_tolerates_primary_unsupported_and_uses_vz_linux() {
2561 let calls = Arc::new(StdMutex::new(Vec::new()));
2562 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2563 primary.list_images_error = Some("list_images is not supported".to_string());
2564 let mut vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2565 vz_linux.list_images_response = vec![ImageInfo {
2566 reference: "docker.io/library/alpine:latest".to_string(),
2567 digest: None,
2568 size_bytes: None,
2569 }];
2570
2571 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2572 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2573
2574 let images = rt
2575 .list_images()
2576 .await
2577 .expect("primary Unsupported must not fail the composite list_images");
2578 let refs: Vec<&str> = images.iter().map(|i| i.reference.as_str()).collect();
2579 assert_eq!(
2580 refs,
2581 vec!["docker.io/library/alpine:latest"],
2582 "should return the VZ-Linux delegate's images, got {refs:?}",
2583 );
2584 }
2585
2586 #[tokio::test]
2590 async fn list_images_errors_only_when_all_backends_fail() {
2591 let calls = Arc::new(StdMutex::new(Vec::new()));
2592 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2593 primary.list_images_error = Some("unsupported".to_string());
2594 let mut vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2595 vz_linux.list_images_error = Some("also unsupported".to_string());
2596
2597 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2598 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2599
2600 let err = rt.list_images().await.unwrap_err();
2601 assert!(
2602 matches!(err, AgentError::Unsupported(_)),
2603 "all-backends-fail should surface Unsupported, got {err:?}",
2604 );
2605 }
2606
2607 fn log_entry(stream: LogStream, message: &str) -> LogEntry {
2621 LogEntry {
2622 timestamp: chrono::Utc::now(),
2623 stream,
2624 source: zlayer_observability::logs::LogSource::Container("test".to_string()),
2625 message: message.to_string(),
2626 service: None,
2627 deployment: None,
2628 }
2629 }
2630
2631 async fn drain_logs(stream: LogsStream) -> String {
2633 use futures_util::StreamExt as _;
2634 let mut out = Vec::new();
2635 let mut s = stream;
2636 while let Some(item) = s.next().await {
2637 out.extend_from_slice(&item.expect("log chunk ok").bytes);
2638 }
2639 String::from_utf8(out).expect("utf8 log body")
2640 }
2641
2642 async fn drain_stats(stream: StatsStream) -> Vec<StatsSample> {
2644 use futures_util::StreamExt as _;
2645 let mut out = Vec::new();
2646 let mut s = stream;
2647 while let Some(item) = s.next().await {
2648 out.push(item.expect("stats sample ok"));
2649 }
2650 out
2651 }
2652
2653 async fn make_read_composite(owner: Role) -> (CompositeRuntime, ContainerId, CallLog) {
2659 let calls = Arc::new(StdMutex::new(Vec::new()));
2660 let logs = vec![
2661 log_entry(LogStream::Stdout, "hello stdout"),
2662 log_entry(LogStream::Stderr, "hello stderr"),
2663 ];
2664 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls))
2665 .with_stream_unsupported()
2666 .with_logs(logs.clone());
2667 let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls)).with_logs(logs);
2668 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2669 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2670
2671 let id = cid("read-svc", 0);
2672 let target = match owner {
2675 Role::Primary => DispatchTarget::Primary,
2676 Role::VzLinux => DispatchTarget::VzLinux,
2677 other => panic!("make_read_composite supports Primary/VzLinux, not {other:?}"),
2678 };
2679 rt.dispatch.write().await.insert(id.clone(), target);
2680 (rt, id, calls)
2681 }
2682
2683 #[tokio::test]
2684 async fn logs_stream_falls_back_to_snapshot_when_owner_has_no_stream() {
2685 let calls = Arc::new(StdMutex::new(Vec::new()));
2689 let logs = vec![
2690 log_entry(LogStream::Stdout, "hello stdout"),
2691 log_entry(LogStream::Stderr, "hello stderr"),
2692 ];
2693 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls))
2694 .with_stream_unsupported()
2695 .with_logs(logs);
2696 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None);
2697 let id = cid("read-svc", 0);
2698 rt.dispatch
2699 .write()
2700 .await
2701 .insert(id.clone(), DispatchTarget::Primary);
2702
2703 let stream = rt
2704 .logs_stream(&id, LogsStreamOptions::default())
2705 .await
2706 .expect("logs_stream must not 500 when snapshot reads work");
2707 let body = drain_logs(stream).await;
2708 assert!(
2709 body.contains("hello stdout") && body.contains("hello stderr"),
2710 "synthesised stream must carry the captured logs, got: {body:?}",
2711 );
2712 }
2713
2714 #[tokio::test]
2715 async fn logs_stream_routes_to_delegate_owner_native_stream() {
2716 let (rt, id, calls) = make_read_composite(Role::VzLinux).await;
2719 let stream = rt
2720 .logs_stream(&id, LogsStreamOptions::default())
2721 .await
2722 .expect("delegate-owned logs_stream must succeed");
2723 let body = drain_logs(stream).await;
2724 assert!(body.contains("hello stdout"), "got: {body:?}");
2725
2726 let log = calls.lock().expect("call-log mutex poisoned");
2727 assert_eq!(
2728 role_for(&log, "logs_stream"),
2729 Some(Role::VzLinux),
2730 "logs_stream must hit the owning delegate first, calls: {log:?}",
2731 );
2732 }
2733
2734 #[tokio::test]
2735 async fn get_logs_falls_back_across_backends() {
2736 let (rt, id, _calls) = make_read_composite(Role::Primary).await;
2740 let logs = rt.get_logs(&id).await.expect("get_logs must succeed");
2741 assert_eq!(logs.len(), 2, "owner snapshot logs should be returned");
2742 }
2743
2744 #[tokio::test]
2745 async fn stats_stream_falls_back_to_snapshot_when_owner_has_no_stream() {
2746 let calls = Arc::new(StdMutex::new(Vec::new()));
2751 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_stream_unsupported();
2752 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None);
2753 let id = cid("read-svc", 0);
2754 rt.dispatch
2755 .write()
2756 .await
2757 .insert(id.clone(), DispatchTarget::Primary);
2758
2759 let stream = rt
2760 .stats_stream(&id)
2761 .await
2762 .expect("stats_stream must not 500 when get_container_stats works");
2763 let samples = drain_stats(stream).await;
2764 assert_eq!(samples.len(), 1, "snapshot fallback yields one sample");
2765 assert!(
2766 samples[0].mem_used_bytes > 0,
2767 "synthesised sample must carry non-zero memory, got {:?}",
2768 samples[0],
2769 );
2770 assert_eq!(
2771 samples[0].cpu_total_ns, 1_000_000,
2772 "cpu microseconds must be scaled to nanoseconds in the synthesised sample",
2773 );
2774 }
2775
2776 #[tokio::test]
2777 async fn get_container_stats_tolerates_owner_miss_and_uses_other_backend() {
2778 let calls = Arc::new(StdMutex::new(Vec::new()));
2783 let primary =
2784 MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_stats_snapshot_unsupported();
2785 let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2786 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2787 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2788 let id = cid("read-svc", 0);
2789 rt.dispatch
2790 .write()
2791 .await
2792 .insert(id.clone(), DispatchTarget::Primary);
2793
2794 let stats = rt
2795 .get_container_stats(&id)
2796 .await
2797 .expect("owner Unsupported must fall back to the delegate, not 500");
2798 assert!(stats.memory_bytes > 0, "delegate stats should be returned");
2799
2800 let log = calls.lock().expect("call-log mutex poisoned");
2801 assert!(
2802 log.iter()
2803 .any(|(role, method, _)| *role == Role::Primary && method == "get_container_stats"),
2804 "primary must have been tried first, calls: {log:?}",
2805 );
2806 assert!(
2807 log.iter()
2808 .any(|(role, method, _)| *role == Role::VzLinux && method == "get_container_stats"),
2809 "delegate must have served the fallback, calls: {log:?}",
2810 );
2811 }
2812
2813 #[tokio::test]
2814 async fn reads_propagate_not_found_when_no_backend_owns_container() {
2815 let calls = Arc::new(StdMutex::new(Vec::new()));
2819 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_reads_not_found();
2820 let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls)).with_reads_not_found();
2821 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2822 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2823 let id = cid("read-svc", 0);
2824 rt.dispatch
2825 .write()
2826 .await
2827 .insert(id.clone(), DispatchTarget::Primary);
2828
2829 match rt.logs_stream(&id, LogsStreamOptions::default()).await {
2832 Err(AgentError::NotFound { .. }) => {}
2833 other => panic!(
2834 "all-not-found logs_stream must be NotFound (404), got {:?}",
2835 other.err(),
2836 ),
2837 }
2838 match rt.stats_stream(&id).await {
2839 Err(AgentError::NotFound { .. }) => {}
2840 other => panic!(
2841 "all-not-found stats_stream must be NotFound (404), got {:?}",
2842 other.err(),
2843 ),
2844 }
2845 let cl_err = rt.container_logs(&id, 10).await.unwrap_err();
2846 assert!(
2847 matches!(cl_err, AgentError::NotFound { .. }),
2848 "all-not-found container_logs must be NotFound (404), got {cl_err:?}",
2849 );
2850 }
2851
2852 #[tokio::test]
2853 async fn reads_on_undispatched_container_are_not_found() {
2854 let (rt, _calls) = make_composite(false);
2856 let id = cid("ghost", 0);
2857 match rt.logs_stream(&id, LogsStreamOptions::default()).await {
2858 Err(AgentError::NotFound { .. }) => {}
2859 other => panic!(
2860 "undispatched logs_stream must be NotFound, got {:?}",
2861 other.err()
2862 ),
2863 }
2864 }
2865
2866 #[tokio::test]
2872 async fn pull_image_fans_out_to_vz_linux() {
2873 let calls = Arc::new(StdMutex::new(Vec::new()));
2874 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2875 let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2876
2877 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2878 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2879
2880 rt.pull_image("docker.io/library/alpine:latest")
2881 .await
2882 .expect("pull should succeed");
2883
2884 let log = calls.lock().expect("call-log mutex poisoned");
2885 assert!(
2886 log.iter()
2887 .any(|(role, method, _)| *role == Role::VzLinux && method == "pull_image"),
2888 "pull_image must reach the VZ-Linux delegate, recorded calls: {log:?}",
2889 );
2890 }
2891
2892 #[tokio::test]
2893 async fn dispatch_lookup_unknown_container_errors() {
2894 let (rt, _calls) = make_composite(true);
2895 let id = cid("ghost", 0);
2896
2897 let err = rt.start_container(&id).await.unwrap_err();
2898 assert!(
2899 matches!(err, AgentError::NotFound { .. }),
2900 "expected NotFound for unknown container, got {err:?}"
2901 );
2902 }
2903
2904 async fn cached_os(rt: &CompositeRuntime, image: &str) -> Option<OsKind> {
2906 rt.image_os.read().await.get(image).copied()
2907 }
2908
2909 #[tokio::test]
2910 async fn apply_image_os_inspection_populates_cache_on_ok_some() {
2911 let (rt, _calls) = make_composite(true);
2915 let image = "docker.io/library/alpine:3.19";
2916
2917 rt.apply_image_os_inspection(image, Ok(Some(OsKind::Linux)))
2918 .await;
2919
2920 assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
2921 }
2922
2923 #[tokio::test]
2924 async fn apply_image_os_inspection_leaves_cache_untouched_on_ok_none() {
2925 let (rt, _calls) = make_composite(true);
2929 let image = "docker.io/library/nginx:1.25";
2930
2931 rt.apply_image_os_inspection(image, Ok(None)).await;
2932
2933 assert_eq!(cached_os(&rt, image).await, None);
2934 }
2935
2936 #[tokio::test]
2937 async fn apply_image_os_inspection_leaves_cache_untouched_on_err() {
2938 let (rt, _calls) = make_composite(true);
2941 let image = "docker.io/library/nginx:1.25";
2942
2943 rt.record_image_os(image, OsKind::Linux).await;
2946
2947 let err = zlayer_registry::RegistryError::NotFound {
2948 registry: "docker.io".to_string(),
2949 image: image.to_string(),
2950 };
2951 rt.apply_image_os_inspection(image, Err(err)).await;
2952
2953 assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
2955 }
2956
2957 #[tokio::test]
2958 async fn pull_image_inspection_failure_does_not_fail_pull() {
2959 let (rt, _calls) = make_composite(true);
2965 let image = "invalid.example.invalid/ghost:v1";
2966
2967 rt.pull_image(image).await.unwrap();
2968
2969 assert_eq!(
2970 cached_os(&rt, image).await,
2971 None,
2972 "failed inspection must not populate the image-OS cache"
2973 );
2974 }
2975
2976 #[tokio::test]
2977 async fn pull_image_with_policy_inspection_failure_does_not_fail_pull() {
2978 let (rt, _calls) = make_composite(true);
2981 let image = "invalid.example.invalid/ghost:v1";
2982
2983 rt.pull_image_with_policy(
2984 image,
2985 PullPolicy::IfNotPresent,
2986 None,
2987 zlayer_spec::SourcePolicy::default(),
2988 )
2989 .await
2990 .unwrap();
2991
2992 assert_eq!(cached_os(&rt, image).await, None);
2993 }
2994
2995 #[test]
2996 fn os_kind_from_oci_str_roundtrip() {
2997 for os in [OsKind::Linux, OsKind::Windows, OsKind::Macos] {
3002 assert_eq!(OsKind::from_oci_str(os.as_oci_str()), Some(os));
3003 }
3004 assert_eq!(OsKind::from_oci_str(""), None);
3005 assert_eq!(OsKind::from_oci_str("freebsd"), None);
3006 }
3007}