1use std::collections::HashMap;
37use std::net::IpAddr;
38use std::sync::Arc;
39use std::time::Duration;
40
41use async_trait::async_trait;
42use tokio::sync::RwLock;
43use zlayer_observability::logs::{LogEntry, LogStream};
44use zlayer_spec::{OsKind, PullPolicy, RegistryAuth, ServiceSpec};
45
46use crate::cgroups_stats::ContainerStats;
47use crate::error::{AgentError, Result};
48use crate::runtime::{
49 ContainerId, ContainerInspectDetails, ContainerState, ExecEventStream, ImageInfo, LogChannel,
50 LogChunk, LogsStream, LogsStreamOptions, OverlayAttachKind, PruneResult, Runtime, StatsSample,
51 StatsStream, WaitCondition, WaitOutcome,
52};
53
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56enum DispatchTarget {
57 Primary,
58 Delegate,
59 Vz,
63 VzLinux,
68}
69
70pub struct CompositeRuntime {
74 primary: Arc<dyn Runtime>,
75 delegate: Option<Arc<dyn Runtime>>,
76 vz: Option<Arc<dyn Runtime>>,
79 vz_linux: Option<Arc<dyn Runtime>>,
83 dispatch: Arc<RwLock<HashMap<ContainerId, DispatchTarget>>>,
86 image_os: Arc<RwLock<HashMap<String, OsKind>>>,
90 image_runtime: Arc<RwLock<HashMap<String, String>>>,
96 os_inspect_cache_paths: Vec<std::path::PathBuf>,
115}
116
117impl CompositeRuntime {
118 #[must_use]
124 pub fn new(primary: Arc<dyn Runtime>, delegate: Option<Arc<dyn Runtime>>) -> Self {
125 Self {
126 primary,
127 delegate,
128 vz: None,
129 vz_linux: None,
130 dispatch: Arc::new(RwLock::new(HashMap::new())),
131 image_os: Arc::new(RwLock::new(HashMap::new())),
132 image_runtime: Arc::new(RwLock::new(HashMap::new())),
133 os_inspect_cache_paths: Vec::new(),
134 }
135 }
136
137 #[must_use]
145 pub fn with_os_inspect_cache_path(self, path: Option<std::path::PathBuf>) -> Self {
146 self.with_os_inspect_cache_paths(path.into_iter().collect())
147 }
148
149 #[must_use]
161 pub fn with_os_inspect_cache_paths(mut self, paths: Vec<std::path::PathBuf>) -> Self {
162 self.os_inspect_cache_paths = paths;
163 self
164 }
165
166 async fn inspect_image_os(
186 &self,
187 image: &str,
188 ) -> std::result::Result<Option<OsKind>, zlayer_registry::RegistryError> {
189 for path in &self.os_inspect_cache_paths {
190 match zlayer_registry::CacheType::persistent_at(path)
191 .build()
192 .await
193 {
194 Ok(cache) => {
195 match zlayer_registry::fetch_image_os_in_cache_only(image, cache, None).await {
196 Ok(Some(os)) => return Ok(Some(os)),
197 Ok(None) => {
198 tracing::trace!(
199 image,
200 cache = %path.display(),
201 "image OS not resolvable from this local cache; trying next",
202 );
203 }
204 Err(e) => return Err(e),
205 }
206 }
207 Err(e) => {
208 tracing::debug!(
209 image,
210 cache = %path.display(),
211 error = %e,
212 "failed to open OS-inspect blob cache; trying next",
213 );
214 }
215 }
216 }
217 Ok(None)
222 }
223
224 async fn inspect_image_runtime_marker(
227 &self,
228 image: &str,
229 auth: Option<&RegistryAuth>,
230 ) -> std::result::Result<Option<String>, zlayer_registry::RegistryError> {
231 for path in &self.os_inspect_cache_paths {
232 match zlayer_registry::CacheType::persistent_at(path)
233 .build()
234 .await
235 {
236 Ok(cache) => {
237 match zlayer_registry::fetch_image_runtime_marker_in_cache_only(
238 image, cache, None,
239 )
240 .await
241 {
242 Ok(Some(marker)) => return Ok(Some(marker)),
243 Ok(None) => {
244 tracing::trace!(
245 image,
246 cache = %path.display(),
247 "runtime marker not resolvable from this local cache; trying next",
248 );
249 }
250 Err(e) => return Err(e),
251 }
252 }
253 Err(e) => {
254 tracing::debug!(
255 image,
256 cache = %path.display(),
257 error = %e,
258 "failed to open marker-inspect blob cache; trying next",
259 );
260 }
261 }
262 }
263 zlayer_registry::fetch_image_runtime_marker(image, auth).await
264 }
265
266 #[must_use]
269 pub fn with_vz_delegate(mut self, vz: Option<Arc<dyn Runtime>>) -> Self {
270 self.vz = vz;
271 self
272 }
273
274 #[must_use]
280 pub fn with_vz_linux_delegate(mut self, vz_linux: Option<Arc<dyn Runtime>>) -> Self {
281 self.vz_linux = vz_linux;
282 self
283 }
284
285 #[must_use]
287 pub fn primary(&self) -> &Arc<dyn Runtime> {
288 &self.primary
289 }
290
291 #[must_use]
293 pub fn delegate(&self) -> Option<&Arc<dyn Runtime>> {
294 self.delegate.as_ref()
295 }
296
297 pub(crate) async fn record_image_os(&self, image: &str, os: OsKind) {
303 self.image_os.write().await.insert(image.to_string(), os);
304 }
305
306 pub(crate) async fn record_image_runtime(&self, image: &str, marker: String) {
309 self.image_runtime
310 .write()
311 .await
312 .insert(image.to_string(), marker);
313 }
314
315 async fn apply_image_runtime_inspection(
320 &self,
321 image: &str,
322 result: std::result::Result<Option<String>, zlayer_registry::RegistryError>,
323 ) {
324 match result {
325 Ok(Some(marker)) => {
326 tracing::debug!(image, marker, "cached image runtime marker for dispatch");
327 self.record_image_runtime(image, marker).await;
328 }
329 Ok(None) => {}
330 Err(e) => {
331 tracing::trace!(
332 image,
333 error = %e,
334 "failed to inspect image runtime marker — dispatch unaffected",
335 );
336 }
337 }
338 }
339
340 async fn apply_image_os_inspection(
357 &self,
358 image: &str,
359 result: std::result::Result<Option<OsKind>, zlayer_registry::RegistryError>,
360 ) {
361 match result {
362 Ok(Some(os)) => {
363 self.record_image_os(image, os).await;
364 tracing::debug!(image, ?os, "cached image OS for dispatch");
365 }
366 Ok(None) => {
367 tracing::trace!(
368 image,
369 "image manifest has no OS field — dispatch will fall through to primary",
370 );
371 }
372 Err(e) => {
373 tracing::warn!(
374 image,
375 error = %e,
376 "failed to inspect image manifest OS — dispatch will fall through to primary",
377 );
378 }
379 }
380 }
381
382 async fn select_for(&self, service: &str, spec: &ServiceSpec) -> Result<DispatchTarget> {
409 if let Some(label) = spec.labels.get("com.zlayer.isolation") {
416 if self.vz.is_some() && label.eq_ignore_ascii_case("vz") {
417 return Ok(DispatchTarget::Vz);
418 }
419 if self.vz_linux.is_some() && label.eq_ignore_ascii_case("vz-linux") {
420 return Ok(DispatchTarget::VzLinux);
421 }
422 if label.eq_ignore_ascii_case("vm") || label.eq_ignore_ascii_case("libkrun") {
423 if self.delegate.is_some() {
427 return Ok(DispatchTarget::Delegate);
428 }
429 }
430 if label.eq_ignore_ascii_case("sandbox") || label.eq_ignore_ascii_case("seatbelt") {
431 return Ok(DispatchTarget::Primary);
432 }
433 }
434
435 if self.vz.is_some()
441 && self
442 .image_runtime
443 .read()
444 .await
445 .get(&spec.image.name.to_string())
446 .is_some_and(|m| m.eq_ignore_ascii_case(zlayer_registry::ZLAYER_RUNTIME_VZ))
447 {
448 return Ok(DispatchTarget::Vz);
449 }
450
451 if self.vz_linux.is_some()
454 && self
455 .image_runtime
456 .read()
457 .await
458 .get(&spec.image.name.to_string())
459 .is_some_and(|m| m.eq_ignore_ascii_case(zlayer_registry::ZLAYER_RUNTIME_LINUX_VZ))
460 {
461 return Ok(DispatchTarget::VzLinux);
462 }
463
464 if let Some(platform) = &spec.platform {
465 let target = match platform.os {
466 OsKind::Windows | OsKind::Macos => DispatchTarget::Primary,
467 OsKind::Linux if self.vz_linux.is_some() => DispatchTarget::VzLinux,
470 OsKind::Linux => DispatchTarget::Delegate,
471 };
472 if matches!(target, DispatchTarget::Delegate) && self.delegate.is_none() {
473 return Err(AgentError::RouteToPeer {
474 service: service.to_string(),
475 required_os: OsKind::Linux.as_oci_str().to_string(),
476 reason: "spec.platform.os = linux but this node has no WSL2 delegate \
477 configured; enable `--install-wsl yes` on this node or add a Linux \
478 peer to the cluster"
479 .to_string(),
480 });
481 }
482 return Ok(target);
483 }
484
485 if let Some(os) = self
486 .image_os
487 .read()
488 .await
489 .get(&spec.image.name.to_string())
490 .copied()
491 {
492 return match os {
493 OsKind::Linux => {
494 if self.vz_linux.is_some() {
495 Ok(DispatchTarget::VzLinux)
497 } else if self.delegate.is_some() {
498 Ok(DispatchTarget::Delegate)
499 } else {
500 Err(AgentError::RouteToPeer {
505 service: service.to_string(),
506 required_os: OsKind::Linux.as_oci_str().to_string(),
507 reason: format!(
508 "image '{}' manifest reports os=linux but this node has no WSL2 \
509 delegate configured; enable `--install-wsl yes` on this node or \
510 add a Linux peer to the cluster",
511 spec.image.name
512 ),
513 })
514 }
515 }
516 OsKind::Windows | OsKind::Macos => Ok(DispatchTarget::Primary),
517 };
518 }
519
520 if self.vz_linux.is_some() {
535 return Ok(DispatchTarget::VzLinux);
536 }
537
538 Ok(DispatchTarget::Primary)
539 }
540
541 async fn lookup(&self, id: &ContainerId) -> Result<Arc<dyn Runtime>> {
543 let target =
544 self.dispatch
545 .read()
546 .await
547 .get(id)
548 .copied()
549 .ok_or_else(|| AgentError::NotFound {
550 container: id.to_string(),
551 reason: "no dispatch record in CompositeRuntime".to_string(),
552 })?;
553 Ok(self.runtime_for(target).clone())
554 }
555
556 fn runtime_for(&self, t: DispatchTarget) -> &Arc<dyn Runtime> {
563 match t {
564 DispatchTarget::Primary => &self.primary,
565 DispatchTarget::Delegate => self
566 .delegate
567 .as_ref()
568 .expect("delegate target requires delegate to exist"),
569 DispatchTarget::Vz => self.vz.as_ref().unwrap_or(&self.primary),
572 DispatchTarget::VzLinux => self.vz_linux.as_ref().unwrap_or(&self.primary),
575 }
576 }
577
578 async fn read_backends(
590 &self,
591 id: &ContainerId,
592 ) -> Result<Vec<(&'static str, Arc<dyn Runtime>)>> {
593 let owner =
594 self.dispatch
595 .read()
596 .await
597 .get(id)
598 .copied()
599 .ok_or_else(|| AgentError::NotFound {
600 container: id.to_string(),
601 reason: "no dispatch record in CompositeRuntime".to_string(),
602 })?;
603
604 let all: [(DispatchTarget, Option<&Arc<dyn Runtime>>); 4] = [
608 (DispatchTarget::Primary, Some(&self.primary)),
609 (DispatchTarget::Delegate, self.delegate.as_ref()),
610 (DispatchTarget::Vz, self.vz.as_ref()),
611 (DispatchTarget::VzLinux, self.vz_linux.as_ref()),
612 ];
613
614 let label_for = |t: DispatchTarget| match t {
615 DispatchTarget::Primary => "primary",
616 DispatchTarget::Delegate => "delegate",
617 DispatchTarget::Vz => "vz",
618 DispatchTarget::VzLinux => "vz_linux",
619 };
620
621 let mut out: Vec<(&'static str, Arc<dyn Runtime>)> =
622 vec![(label_for(owner), self.runtime_for(owner).clone())];
623 for (target, rt) in all {
624 if target != owner {
625 if let Some(rt) = rt {
626 out.push((label_for(target), rt.clone()));
627 }
628 }
629 }
630 Ok(out)
631 }
632}
633
634#[derive(Default)]
648struct ReadMissAccumulator {
649 soft_err: Option<AgentError>,
651 not_found: Option<AgentError>,
653}
654
655impl ReadMissAccumulator {
656 fn record(&mut self, e: AgentError) {
657 if matches!(e, AgentError::NotFound { .. }) {
658 self.not_found = Some(e);
659 } else {
660 self.soft_err = Some(e);
661 }
662 }
663
664 fn into_error(self, what: &str) -> AgentError {
670 self.soft_err
671 .or(self.not_found)
672 .unwrap_or_else(|| AgentError::Unsupported(format!("no backend could serve {what}")))
673 }
674}
675
676fn one_shot_logs_stream(entries: Vec<LogEntry>, opts: &LogsStreamOptions) -> LogsStream {
685 use futures_util::stream;
686
687 let want_stdout = opts.stdout || !opts.stderr;
691 let want_stderr = opts.stderr || !opts.stdout;
692 let timestamps = opts.timestamps;
693
694 let chunks: Vec<Result<LogChunk>> = entries
695 .into_iter()
696 .filter_map(|e| {
697 let channel = match e.stream {
698 LogStream::Stdout => LogChannel::Stdout,
699 LogStream::Stderr => LogChannel::Stderr,
700 };
701 let keep = match channel {
702 LogChannel::Stdout => want_stdout,
703 LogChannel::Stderr => want_stderr,
704 LogChannel::Stdin => false,
705 };
706 if !keep {
707 return None;
708 }
709 let mut bytes = e.message.into_bytes();
710 bytes.push(b'\n');
711 Some(Ok(LogChunk {
712 stream: channel,
713 bytes: bytes::Bytes::from(bytes),
714 timestamp: timestamps.then_some(e.timestamp),
715 }))
716 })
717 .collect();
718
719 Box::pin(stream::iter(chunks))
720}
721
722fn one_shot_stats_stream(stats: &ContainerStats) -> StatsStream {
732 use futures_util::stream;
733
734 let sample = StatsSample {
735 cpu_total_ns: stats.cpu_usage_usec.saturating_mul(1_000),
736 cpu_system_ns: 0,
737 online_cpus: 1,
738 mem_used_bytes: stats.memory_bytes,
739 mem_limit_bytes: stats.memory_limit,
740 net_rx_bytes: 0,
741 net_tx_bytes: 0,
742 blkio_read_bytes: 0,
743 blkio_write_bytes: 0,
744 pids_current: 0,
745 pids_limit: None,
746 timestamp: chrono::Utc::now(),
747 };
748 Box::pin(stream::iter(vec![Ok(sample)]))
749}
750
751#[async_trait]
752impl Runtime for CompositeRuntime {
753 async fn pull_image(&self, image: &str) -> Result<()> {
754 if let Err(e) = self.primary.pull_image(image).await {
761 if matches!(e, AgentError::WrongPlatform { .. }) {
762 tracing::debug!(
763 image,
764 error = %e,
765 "primary runtime cannot service image (wrong platform); delegating",
766 );
767 } else {
768 return Err(e);
769 }
770 }
771 if let Some(delegate) = &self.delegate {
772 if let Err(e) = delegate.pull_image(image).await {
773 tracing::debug!(
778 image,
779 error = %e,
780 "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
781 );
782 }
783 }
784 for (label, rt) in [
792 self.vz.as_ref().map(|r| ("vz", r)),
793 self.vz_linux.as_ref().map(|r| ("vz_linux", r)),
794 ]
795 .into_iter()
796 .flatten()
797 {
798 if let Err(e) = rt.pull_image(image).await {
799 tracing::debug!(
800 image,
801 runtime = label,
802 error = %e,
803 "vz delegate failed to pull image (likely wrong OS); continuing",
804 );
805 }
806 }
807
808 let os_result = self.inspect_image_os(image).await;
812 self.apply_image_os_inspection(image, os_result).await;
813 let marker_result = self.inspect_image_runtime_marker(image, None).await;
814 self.apply_image_runtime_inspection(image, marker_result)
815 .await;
816
817 Ok(())
818 }
819
820 async fn pull_image_with_policy(
821 &self,
822 image: &str,
823 policy: PullPolicy,
824 auth: Option<&RegistryAuth>,
825 source: zlayer_spec::SourcePolicy,
826 ) -> Result<()> {
827 if let Err(e) = self
829 .primary
830 .pull_image_with_policy(image, policy, auth, source)
831 .await
832 {
833 if matches!(e, AgentError::WrongPlatform { .. }) {
834 tracing::debug!(
835 image,
836 error = %e,
837 "primary runtime cannot service image (wrong platform); delegating",
838 );
839 } else {
840 return Err(e);
841 }
842 }
843 if let Some(delegate) = &self.delegate {
844 if let Err(e) = delegate
845 .pull_image_with_policy(image, policy, auth, source)
846 .await
847 {
848 tracing::debug!(
849 image,
850 error = %e,
851 "delegate runtime failed to pull image (likely wrong OS); continuing with primary result",
852 );
853 }
854 }
855 for (label, rt) in [
859 self.vz.as_ref().map(|r| ("vz", r)),
860 self.vz_linux.as_ref().map(|r| ("vz_linux", r)),
861 ]
862 .into_iter()
863 .flatten()
864 {
865 if let Err(e) = rt.pull_image_with_policy(image, policy, auth, source).await {
866 tracing::debug!(
867 image,
868 runtime = label,
869 error = %e,
870 "vz delegate failed to pull image (likely wrong OS); continuing",
871 );
872 }
873 }
874
875 let os_result = self.inspect_image_os(image).await;
876 self.apply_image_os_inspection(image, os_result).await;
877 let marker_result = self.inspect_image_runtime_marker(image, auth).await;
878 self.apply_image_runtime_inspection(image, marker_result)
879 .await;
880
881 Ok(())
882 }
883
884 async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
885 let target = self.select_for(&id.service, spec).await?;
886 {
887 let mut dispatch = self.dispatch.write().await;
888 dispatch.insert(id.clone(), target);
889 }
890 let rt = self.runtime_for(target).clone();
891 match rt.create_container(id, spec).await {
892 Ok(()) => Ok(()),
893 Err(e) => {
894 self.dispatch.write().await.remove(id);
897 Err(e)
898 }
899 }
900 }
901
902 async fn start_container(&self, id: &ContainerId) -> Result<()> {
903 let rt = self.lookup(id).await?;
904 rt.start_container(id).await
905 }
906
907 async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
908 let rt = self.lookup(id).await?;
909 rt.stop_container(id, timeout).await
910 }
911
912 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
913 let rt = self.lookup(id).await?;
914 let res = rt.remove_container(id).await;
915 self.dispatch.write().await.remove(id);
916 res
917 }
918
919 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
920 let rt = self.lookup(id).await?;
921 rt.container_state(id).await
922 }
923
924 async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
925 let backends = self.read_backends(id).await?;
926 let mut misses = ReadMissAccumulator::default();
927 for (label, rt) in backends {
928 match rt.container_logs(id, tail).await {
929 Ok(logs) => return Ok(logs),
930 Err(e) => {
931 tracing::warn!(
932 container = %id,
933 runtime = label,
934 error = %e,
935 "composite container_logs: backend could not serve logs; trying next backend",
936 );
937 misses.record(e);
938 }
939 }
940 }
941 Err(misses.into_error("container_logs"))
942 }
943
944 async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
945 let rt = self.lookup(id).await?;
946 rt.exec(id, cmd).await
947 }
948
949 async fn exec_with_opts(
950 &self,
951 id: &ContainerId,
952 opts: &crate::runtime::ExecOptions,
953 ) -> Result<(i32, String, String)> {
954 let rt = self.lookup(id).await?;
959 rt.exec_with_opts(id, opts).await
960 }
961
962 async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
963 let rt = self.lookup(id).await?;
964 rt.exec_stream(id, cmd).await
965 }
966
967 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
968 let backends = self.read_backends(id).await?;
969 let mut misses = ReadMissAccumulator::default();
970 for (label, rt) in backends {
971 match rt.get_container_stats(id).await {
972 Ok(stats) => return Ok(stats),
973 Err(e) => {
974 tracing::warn!(
975 container = %id,
976 runtime = label,
977 error = %e,
978 "composite get_container_stats: backend could not serve stats; \
979 trying next backend",
980 );
981 misses.record(e);
982 }
983 }
984 }
985 Err(misses.into_error("get_container_stats"))
986 }
987
988 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
989 let rt = self.lookup(id).await?;
990 rt.wait_container(id).await
991 }
992
993 async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
994 let rt = self.lookup(id).await?;
995 rt.wait_outcome(id).await
996 }
997
998 async fn wait_outcome_with_condition(
999 &self,
1000 id: &ContainerId,
1001 condition: WaitCondition,
1002 ) -> Result<WaitOutcome> {
1003 let rt = self.lookup(id).await?;
1004 rt.wait_outcome_with_condition(id, condition).await
1005 }
1006
1007 async fn rename_container(&self, id: &ContainerId, new_name: &str) -> Result<()> {
1008 let rt = self.lookup(id).await?;
1009 rt.rename_container(id, new_name).await
1010 }
1011
1012 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
1013 let backends = self.read_backends(id).await?;
1014 let mut misses = ReadMissAccumulator::default();
1015 for (label, rt) in backends {
1016 match rt.get_logs(id).await {
1017 Ok(logs) => return Ok(logs),
1018 Err(e) => {
1019 tracing::warn!(
1020 container = %id,
1021 runtime = label,
1022 error = %e,
1023 "composite get_logs: backend could not serve logs; trying next backend",
1024 );
1025 misses.record(e);
1026 }
1027 }
1028 }
1029 Err(misses.into_error("get_logs"))
1030 }
1031
1032 async fn logs_stream(&self, id: &ContainerId, opts: LogsStreamOptions) -> Result<LogsStream> {
1033 let backends = self.read_backends(id).await?;
1044 let mut misses = ReadMissAccumulator::default();
1045 for (label, rt) in &backends {
1046 match rt.logs_stream(id, opts.clone()).await {
1047 Ok(stream) => return Ok(stream),
1048 Err(e) => {
1049 tracing::warn!(
1050 container = %id,
1051 runtime = label,
1052 error = %e,
1053 "composite logs_stream: backend has no native log stream; \
1054 falling back to a one-shot snapshot",
1055 );
1056 misses.record(e);
1057 }
1058 }
1059 }
1060
1061 let tail = opts
1064 .tail
1065 .map_or(1000, |n| usize::try_from(n).unwrap_or(1000));
1066 for (label, rt) in &backends {
1067 match rt.container_logs(id, tail).await {
1068 Ok(entries) => {
1069 return Ok(one_shot_logs_stream(entries, &opts));
1070 }
1071 Err(e) => {
1072 tracing::warn!(
1073 container = %id,
1074 runtime = label,
1075 error = %e,
1076 "composite logs_stream: backend snapshot fallback failed; trying next",
1077 );
1078 misses.record(e);
1079 }
1080 }
1081 }
1082 Err(misses.into_error("container logs"))
1083 }
1084
1085 async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
1086 let backends = self.read_backends(id).await?;
1092 let mut misses = ReadMissAccumulator::default();
1093 for (label, rt) in &backends {
1094 match rt.stats_stream(id).await {
1095 Ok(stream) => return Ok(stream),
1096 Err(e) => {
1097 tracing::warn!(
1098 container = %id,
1099 runtime = label,
1100 error = %e,
1101 "composite stats_stream: backend has no native stats stream; \
1102 falling back to a one-shot sample",
1103 );
1104 misses.record(e);
1105 }
1106 }
1107 }
1108
1109 for (label, rt) in &backends {
1110 match rt.get_container_stats(id).await {
1111 Ok(stats) => return Ok(one_shot_stats_stream(&stats)),
1112 Err(e) => {
1113 tracing::warn!(
1114 container = %id,
1115 runtime = label,
1116 error = %e,
1117 "composite stats_stream: backend sample fallback failed; trying next",
1118 );
1119 misses.record(e);
1120 }
1121 }
1122 }
1123 Err(misses.into_error("container stats"))
1124 }
1125
1126 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
1127 let rt = self.lookup(id).await?;
1128 rt.get_container_pid(id).await
1129 }
1130
1131 fn overlay_attach_kind(&self) -> OverlayAttachKind {
1132 self.vz_linux.as_ref().map_or_else(
1139 || self.primary.overlay_attach_kind(),
1140 |vz| vz.overlay_attach_kind(),
1141 )
1142 }
1143
1144 async fn push_overlay_config(
1145 &self,
1146 id: &ContainerId,
1147 config: &zlayer_types::overlayd::GuestOverlayConfig,
1148 ) -> Result<()> {
1149 let rt = self.lookup(id).await?;
1150 rt.push_overlay_config(id, config).await
1151 }
1152
1153 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
1154 let rt = self.lookup(id).await?;
1155 rt.get_container_ip(id).await
1156 }
1157
1158 async fn get_container_port_override(&self, id: &ContainerId) -> Result<Option<u16>> {
1159 let rt = self.lookup(id).await?;
1160 rt.get_container_port_override(id).await
1161 }
1162
1163 #[cfg(target_os = "windows")]
1164 async fn get_container_namespace_id(
1165 &self,
1166 id: &ContainerId,
1167 ) -> Result<Option<windows::core::GUID>> {
1168 let rt = self.lookup(id).await?;
1169 rt.get_container_namespace_id(id).await
1170 }
1171
1172 async fn sync_container_volumes(&self, id: &ContainerId) -> Result<()> {
1173 let rt = self.lookup(id).await?;
1174 rt.sync_container_volumes(id).await
1175 }
1176
1177 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
1178 let mut out: Vec<ImageInfo> = Vec::new();
1189 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
1190 let mut any_ok = false;
1191 let mut last_err: Option<AgentError> = None;
1192
1193 for (label, rt) in [
1194 Some(("primary", &self.primary)),
1195 self.delegate.as_ref().map(|d| ("delegate", d)),
1196 self.vz.as_ref().map(|d| ("vz", d)),
1197 self.vz_linux.as_ref().map(|d| ("vz_linux", d)),
1198 ]
1199 .into_iter()
1200 .flatten()
1201 {
1202 match rt.list_images().await {
1203 Ok(images) => {
1204 any_ok = true;
1205 for img in images {
1206 if seen.insert(img.reference.clone()) {
1209 out.push(img);
1210 }
1211 }
1212 }
1213 Err(e) => {
1214 tracing::debug!(
1215 runtime = label,
1216 error = %e,
1217 "composite list_images: backend returned an error; skipping it",
1218 );
1219 last_err = Some(e);
1220 }
1221 }
1222 }
1223
1224 if any_ok {
1228 Ok(out)
1229 } else {
1230 Err(last_err.unwrap_or_else(|| {
1231 AgentError::Unsupported("no runtime implements list_images".into())
1232 }))
1233 }
1234 }
1235
1236 async fn remove_image(&self, image: &str, force: bool) -> Result<()> {
1237 match self.primary.remove_image(image, force).await {
1238 Ok(()) => Ok(()),
1239 Err(primary_err) => {
1240 if let Some(delegate) = &self.delegate {
1241 match delegate.remove_image(image, force).await {
1242 Ok(()) => Ok(()),
1243 Err(delegate_err) => {
1244 tracing::debug!(
1245 image,
1246 %delegate_err,
1247 "delegate remove_image also failed; returning primary error",
1248 );
1249 Err(primary_err)
1250 }
1251 }
1252 } else {
1253 Err(primary_err)
1254 }
1255 }
1256 }
1257 }
1258
1259 async fn prune_images(&self) -> Result<PruneResult> {
1260 let mut result = match self.primary.prune_images().await {
1266 Ok(r) => r,
1267 Err(AgentError::Unsupported(reason)) if self.delegate.is_some() => {
1268 tracing::debug!(
1269 %reason,
1270 "primary runtime does not support prune_images; relying on delegate",
1271 );
1272 PruneResult::default()
1273 }
1274 Err(e) => return Err(e),
1275 };
1276 if let Some(delegate) = &self.delegate {
1277 match delegate.prune_images().await {
1278 Ok(extra) => {
1279 result.deleted.extend(extra.deleted);
1280 result.space_reclaimed =
1281 result.space_reclaimed.saturating_add(extra.space_reclaimed);
1282 }
1283 Err(e) => tracing::warn!(
1284 error = %e,
1285 "delegate runtime prune_images failed; returning primary result only",
1286 ),
1287 }
1288 }
1289 Ok(result)
1290 }
1291
1292 async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
1293 let rt = self.lookup(id).await?;
1294 rt.kill_container(id, signal).await
1295 }
1296
1297 async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
1298 match self.primary.tag_image(source, target).await {
1299 Ok(()) => Ok(()),
1300 Err(primary_err) => {
1301 if let Some(delegate) = &self.delegate {
1302 match delegate.tag_image(source, target).await {
1303 Ok(()) => Ok(()),
1304 Err(delegate_err) => {
1305 tracing::debug!(
1306 source,
1307 target,
1308 %delegate_err,
1309 "delegate tag_image also failed; returning primary error",
1310 );
1311 Err(primary_err)
1312 }
1313 }
1314 } else {
1315 Err(primary_err)
1316 }
1317 }
1318 }
1319 }
1320
1321 async fn inspect_detailed(&self, id: &ContainerId) -> Result<ContainerInspectDetails> {
1322 let rt = self.lookup(id).await?;
1323 rt.inspect_detailed(id).await
1324 }
1325}
1326
1327#[cfg(test)]
1328mod tests {
1329 use super::*;
1330 use crate::cgroups_stats::ContainerStats;
1331 use std::sync::Mutex as StdMutex;
1332 use zlayer_spec::{ArchKind, DeploymentSpec, TargetPlatform};
1333
1334 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
1337 enum Role {
1338 Primary,
1339 Delegate,
1340 Vz,
1341 VzLinux,
1342 }
1343
1344 type CallRecord = (Role, String, Option<ContainerId>);
1346 type CallLog = Arc<StdMutex<Vec<CallRecord>>>;
1348
1349 struct MockRuntime {
1356 role: Role,
1357 calls: CallLog,
1358 list_images_response: Vec<ImageInfo>,
1359 list_images_error: Option<String>,
1363 pull_image_error: Option<String>,
1364 pull_image_wrong_platform: Option<(&'static str, &'static str)>,
1369 stream_unsupported: bool,
1375 reads_not_found: bool,
1381 logs_response: Vec<LogEntry>,
1385 stats_snapshot_unsupported: bool,
1390 prune_images_response: Option<PruneResult>,
1395 }
1396
1397 impl MockRuntime {
1398 fn new(role: Role, calls: CallLog) -> Self {
1399 Self {
1400 role,
1401 calls,
1402 list_images_response: Vec::new(),
1403 list_images_error: None,
1404 pull_image_error: None,
1405 pull_image_wrong_platform: None,
1406 stream_unsupported: false,
1407 reads_not_found: false,
1408 logs_response: Vec::new(),
1409 stats_snapshot_unsupported: false,
1410 prune_images_response: None,
1411 }
1412 }
1413
1414 fn with_stream_unsupported(mut self) -> Self {
1416 self.stream_unsupported = true;
1417 self
1418 }
1419
1420 fn with_reads_not_found(mut self) -> Self {
1422 self.reads_not_found = true;
1423 self
1424 }
1425
1426 fn with_logs(mut self, logs: Vec<LogEntry>) -> Self {
1428 self.logs_response = logs;
1429 self
1430 }
1431
1432 fn with_stats_snapshot_unsupported(mut self) -> Self {
1434 self.stats_snapshot_unsupported = true;
1435 self
1436 }
1437
1438 fn with_prune_result(mut self, result: PruneResult) -> Self {
1440 self.prune_images_response = Some(result);
1441 self
1442 }
1443
1444 fn build_wrong_platform_error(&self, image: &str) -> Option<AgentError> {
1445 self.pull_image_wrong_platform
1446 .map(|(expected, actual)| AgentError::WrongPlatform {
1447 runtime: match self.role {
1448 Role::Primary => "primary-mock".to_string(),
1449 Role::Delegate => "delegate-mock".to_string(),
1450 Role::Vz => "vz-mock".to_string(),
1451 Role::VzLinux => "vz-linux-mock".to_string(),
1452 },
1453 expected: expected.to_string(),
1454 actual: actual.to_string(),
1455 image: image.to_string(),
1456 })
1457 }
1458
1459 fn record(&self, method: &str, id: Option<&ContainerId>) {
1460 self.calls
1461 .lock()
1462 .expect("mock call-log mutex poisoned")
1463 .push((self.role, method.to_string(), id.cloned()));
1464 }
1465 }
1466
1467 #[async_trait]
1468 impl Runtime for MockRuntime {
1469 async fn pull_image(&self, image: &str) -> Result<()> {
1470 self.record("pull_image", None);
1471 if let Some(err) = self.build_wrong_platform_error(image) {
1472 return Err(err);
1473 }
1474 if let Some(msg) = &self.pull_image_error {
1475 return Err(AgentError::Internal(msg.clone()));
1476 }
1477 Ok(())
1478 }
1479
1480 async fn pull_image_with_policy(
1481 &self,
1482 image: &str,
1483 _policy: PullPolicy,
1484 _auth: Option<&RegistryAuth>,
1485 _source: zlayer_spec::SourcePolicy,
1486 ) -> Result<()> {
1487 self.record("pull_image_with_policy", None);
1488 if let Some(err) = self.build_wrong_platform_error(image) {
1489 return Err(err);
1490 }
1491 if let Some(msg) = &self.pull_image_error {
1492 return Err(AgentError::Internal(msg.clone()));
1493 }
1494 Ok(())
1495 }
1496
1497 async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
1498 self.record("create_container", Some(id));
1499 Ok(())
1500 }
1501
1502 async fn start_container(&self, id: &ContainerId) -> Result<()> {
1503 self.record("start_container", Some(id));
1504 Ok(())
1505 }
1506
1507 async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
1508 self.record("stop_container", Some(id));
1509 Ok(())
1510 }
1511
1512 async fn remove_container(&self, id: &ContainerId) -> Result<()> {
1513 self.record("remove_container", Some(id));
1514 Ok(())
1515 }
1516
1517 async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
1518 self.record("container_state", Some(id));
1519 Ok(ContainerState::Running)
1520 }
1521
1522 async fn container_logs(&self, id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
1523 self.record("container_logs", Some(id));
1524 if self.reads_not_found {
1525 return Err(mock_not_found());
1526 }
1527 Ok(self.logs_response.clone())
1528 }
1529
1530 async fn exec(&self, id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
1531 self.record("exec", Some(id));
1532 Ok((0, String::new(), String::new()))
1533 }
1534
1535 async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
1536 self.record("get_container_stats", Some(id));
1537 if self.reads_not_found {
1538 return Err(mock_not_found());
1539 }
1540 if self.stats_snapshot_unsupported {
1541 return Err(AgentError::Unsupported("mock has no snapshot stats".into()));
1542 }
1543 Ok(ContainerStats {
1544 cpu_usage_usec: 1_000,
1545 memory_bytes: 4096,
1546 memory_limit: 8192,
1547 timestamp: std::time::Instant::now(),
1548 })
1549 }
1550
1551 async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
1552 self.record("wait_container", Some(id));
1553 Ok(0)
1554 }
1555
1556 async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
1557 self.record("get_logs", Some(id));
1558 if self.reads_not_found {
1559 return Err(mock_not_found());
1560 }
1561 Ok(self.logs_response.clone())
1562 }
1563
1564 async fn logs_stream(
1565 &self,
1566 id: &ContainerId,
1567 _opts: LogsStreamOptions,
1568 ) -> Result<LogsStream> {
1569 self.record("logs_stream", Some(id));
1570 if self.reads_not_found {
1571 return Err(mock_not_found());
1572 }
1573 if self.stream_unsupported {
1574 return Err(AgentError::Unsupported("mock has no log stream".into()));
1575 }
1576 Ok(one_shot_logs_stream(
1578 self.logs_response.clone(),
1579 &LogsStreamOptions::default(),
1580 ))
1581 }
1582
1583 async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
1584 use futures_util::stream;
1585 self.record("stats_stream", Some(id));
1586 if self.reads_not_found {
1587 return Err(mock_not_found());
1588 }
1589 if self.stream_unsupported {
1590 return Err(AgentError::Unsupported("mock has no stats stream".into()));
1591 }
1592 Ok(Box::pin(stream::iter(vec![Ok(StatsSample {
1593 cpu_total_ns: 0,
1594 cpu_system_ns: 0,
1595 online_cpus: 1,
1596 mem_used_bytes: 4096,
1597 mem_limit_bytes: 8192,
1598 net_rx_bytes: 0,
1599 net_tx_bytes: 0,
1600 blkio_read_bytes: 0,
1601 blkio_write_bytes: 0,
1602 pids_current: 0,
1603 pids_limit: None,
1604 timestamp: chrono::Utc::now(),
1605 })])))
1606 }
1607
1608 async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
1609 self.record("get_container_pid", Some(id));
1610 Ok(None)
1611 }
1612
1613 async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
1614 self.record("get_container_ip", Some(id));
1615 Ok(None)
1616 }
1617
1618 async fn list_images(&self) -> Result<Vec<ImageInfo>> {
1619 self.record("list_images", None);
1620 if let Some(msg) = &self.list_images_error {
1621 return Err(AgentError::Unsupported(msg.clone()));
1622 }
1623 Ok(self.list_images_response.clone())
1624 }
1625
1626 async fn prune_images(&self) -> Result<PruneResult> {
1627 self.record("prune_images", None);
1628 match &self.prune_images_response {
1629 Some(result) => Ok(result.clone()),
1630 None => Err(AgentError::Unsupported(
1631 "mock runtime does not support prune_images".into(),
1632 )),
1633 }
1634 }
1635 }
1636
1637 fn make_spec(image: &str, platform: Option<TargetPlatform>) -> ServiceSpec {
1641 let yaml = format!(
1642 r"
1643version: v1
1644deployment: test
1645services:
1646 test:
1647 rtype: service
1648 image:
1649 name: {image}
1650 endpoints:
1651 - name: http
1652 protocol: http
1653 port: 8080
1654"
1655 );
1656 let mut spec = serde_yaml::from_str::<DeploymentSpec>(&yaml)
1657 .expect("valid deployment yaml")
1658 .services
1659 .remove("test")
1660 .expect("service 'test' present");
1661 spec.platform = platform;
1662 spec
1663 }
1664
1665 fn cid(service: &str, replica: u32) -> ContainerId {
1666 ContainerId::new(service.to_string(), replica)
1667 }
1668
1669 fn make_composite(with_delegate: bool) -> (CompositeRuntime, CallLog) {
1670 let calls = Arc::new(StdMutex::new(Vec::new()));
1671 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
1672 let delegate = if with_delegate {
1673 Some(Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>)
1674 } else {
1675 None
1676 };
1677 (
1678 CompositeRuntime::new(primary as Arc<dyn Runtime>, delegate),
1679 calls,
1680 )
1681 }
1682
1683 fn role_for(calls: &[CallRecord], method: &str) -> Option<Role> {
1684 calls
1685 .iter()
1686 .find(|(_, m, _)| m == method)
1687 .map(|(role, _, _)| *role)
1688 }
1689
1690 fn mock_not_found() -> AgentError {
1692 AgentError::NotFound {
1693 container: "mock".to_string(),
1694 reason: "mock backend does not own this container".to_string(),
1695 }
1696 }
1697
1698 #[tokio::test]
1699 async fn dispatch_windows_spec_goes_to_primary() {
1700 let (rt, calls) = make_composite(true);
1701 let id = cid("win-svc", 0);
1702 let spec = make_spec(
1703 "mcr.microsoft.com/windows/nanoserver:ltsc2022",
1704 Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
1705 );
1706
1707 rt.create_container(&id, &spec).await.unwrap();
1708 rt.start_container(&id).await.unwrap();
1709
1710 let calls = calls.lock().unwrap();
1711 assert_eq!(
1712 role_for(&calls, "create_container"),
1713 Some(Role::Primary),
1714 "create_container should hit primary for Windows spec"
1715 );
1716 assert_eq!(
1717 role_for(&calls, "start_container"),
1718 Some(Role::Primary),
1719 "start_container should hit primary for Windows spec"
1720 );
1721 }
1722
1723 #[tokio::test]
1724 async fn dispatch_linux_spec_goes_to_delegate() {
1725 let (rt, calls) = make_composite(true);
1726 let id = cid("lin-svc", 0);
1727 let spec = make_spec(
1728 "docker.io/library/alpine:3.19",
1729 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
1730 );
1731
1732 rt.create_container(&id, &spec).await.unwrap();
1733 rt.start_container(&id).await.unwrap();
1734
1735 let calls = calls.lock().unwrap();
1736 assert_eq!(
1737 role_for(&calls, "create_container"),
1738 Some(Role::Delegate),
1739 "create_container should hit delegate for Linux spec"
1740 );
1741 assert_eq!(
1742 role_for(&calls, "start_container"),
1743 Some(Role::Delegate),
1744 "start_container should hit delegate for Linux spec"
1745 );
1746 }
1747
1748 #[tokio::test]
1749 async fn dispatch_linux_without_delegate_errors() {
1750 let (rt, _calls) = make_composite(false);
1754 let id = cid("lin-svc", 0);
1755 let spec = make_spec(
1756 "docker.io/library/alpine:3.19",
1757 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Amd64)),
1758 );
1759
1760 let err = rt.create_container(&id, &spec).await.unwrap_err();
1761 match err {
1762 AgentError::RouteToPeer {
1763 service,
1764 required_os,
1765 reason,
1766 } => {
1767 assert_eq!(service, "lin-svc");
1768 assert_eq!(required_os, "linux");
1769 assert!(
1770 reason.contains("--install-wsl") && reason.contains("Linux peer"),
1771 "reason must name both remediations, got: {reason}"
1772 );
1773 }
1774 other => panic!("expected RouteToPeer, got {other:?}"),
1775 }
1776 }
1777
1778 #[tokio::test]
1779 async fn dispatch_linux_image_cache_without_delegate_routes_to_peer() {
1780 let (rt, _calls) = make_composite(false);
1785 let id = cid("svc", 0);
1786 let image = "docker.io/library/nginx:1.25";
1787 rt.record_image_os(image, OsKind::Linux).await;
1788
1789 let spec = make_spec(image, None);
1790 let err = rt.create_container(&id, &spec).await.unwrap_err();
1791 match err {
1792 AgentError::RouteToPeer {
1793 service,
1794 required_os,
1795 reason,
1796 } => {
1797 assert_eq!(service, "svc");
1798 assert_eq!(required_os, "linux");
1799 assert!(
1800 reason.contains(image),
1801 "reason should mention the image name, got: {reason}"
1802 );
1803 assert!(
1804 reason.contains("--install-wsl") && reason.contains("Linux peer"),
1805 "reason must name both remediations, got: {reason}"
1806 );
1807 }
1808 other => panic!("expected RouteToPeer, got {other:?}"),
1809 }
1810 }
1811
1812 #[tokio::test]
1813 async fn dispatch_macos_spec_goes_to_primary() {
1814 let (rt, calls) = make_composite(true);
1815 let id = cid("mac-svc", 0);
1816 let spec = make_spec(
1817 "ghcr.io/zlayer/macos:latest",
1818 Some(TargetPlatform::new(OsKind::Macos, ArchKind::Arm64)),
1819 );
1820
1821 rt.create_container(&id, &spec).await.unwrap();
1822
1823 let calls = calls.lock().unwrap();
1824 assert_eq!(
1825 role_for(&calls, "create_container"),
1826 Some(Role::Primary),
1827 "create_container should hit primary for Macos spec"
1828 );
1829 }
1830
1831 #[tokio::test]
1832 async fn dispatch_no_platform_no_image_os_falls_through_to_primary() {
1833 let (rt, calls) = make_composite(true);
1834 let id = cid("svc", 0);
1835 let spec = make_spec("docker.io/library/nginx:1.25", None);
1836
1837 rt.create_container(&id, &spec).await.unwrap();
1838
1839 let calls = calls.lock().unwrap();
1840 assert_eq!(
1841 role_for(&calls, "create_container"),
1842 Some(Role::Primary),
1843 "fall-through should pick primary when both platform and image-OS cache are unknown"
1844 );
1845 }
1846
1847 #[tokio::test]
1848 async fn dispatch_uses_image_os_cache_when_platform_missing() {
1849 let (rt, calls) = make_composite(true);
1850 let id = cid("svc", 0);
1851 let image = "docker.io/library/nginx:1.25";
1852 rt.record_image_os(image, OsKind::Linux).await;
1853
1854 let spec = make_spec(image, None);
1855 rt.create_container(&id, &spec).await.unwrap();
1856
1857 let calls = calls.lock().unwrap();
1858 assert_eq!(
1859 role_for(&calls, "create_container"),
1860 Some(Role::Delegate),
1861 "image-OS cache should route Linux images to the delegate"
1862 );
1863 }
1864
1865 fn make_composite_with_vz() -> (CompositeRuntime, CallLog) {
1868 let calls = Arc::new(StdMutex::new(Vec::new()));
1869 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
1870 let delegate =
1871 Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>;
1872 let vz = Arc::new(MockRuntime::new(Role::Vz, Arc::clone(&calls))) as Arc<dyn Runtime>;
1873 let rt = CompositeRuntime::new(primary as Arc<dyn Runtime>, Some(delegate))
1874 .with_vz_delegate(Some(vz));
1875 (rt, calls)
1876 }
1877
1878 #[tokio::test]
1879 async fn dispatch_vz_bundle_annotation_auto_routes_to_vz() {
1880 let (rt, calls) = make_composite_with_vz();
1881 let id = cid("mac-svc", 0);
1882 let image = "ghcr.io/org/macos-vz:sequoia";
1883 rt.record_image_runtime(image, "vz".to_string()).await;
1885
1886 let spec = make_spec(image, None);
1887 rt.create_container(&id, &spec).await.unwrap();
1888
1889 let calls = calls.lock().unwrap();
1890 assert_eq!(
1891 role_for(&calls, "create_container"),
1892 Some(Role::Vz),
1893 "a com.zlayer.runtime=vz bundle should auto-route to the VZ runtime"
1894 );
1895 }
1896
1897 #[tokio::test]
1898 async fn dispatch_vz_label_forces_vz() {
1899 let (rt, calls) = make_composite_with_vz();
1900 let id = cid("mac-svc", 0);
1901 let mut spec = make_spec("ghcr.io/org/whatever:1", None);
1902 spec.labels
1903 .insert("com.zlayer.isolation".to_string(), "vz".to_string());
1904
1905 rt.create_container(&id, &spec).await.unwrap();
1906
1907 let calls = calls.lock().unwrap();
1908 assert_eq!(
1909 role_for(&calls, "create_container"),
1910 Some(Role::Vz),
1911 "an explicit com.zlayer.isolation=vz label should force the VZ runtime"
1912 );
1913 }
1914
1915 #[tokio::test]
1916 async fn dispatch_sandbox_label_overrides_vz_bundle() {
1917 let (rt, calls) = make_composite_with_vz();
1918 let id = cid("mac-svc", 0);
1919 let image = "ghcr.io/org/macos-vz:sequoia";
1920 rt.record_image_runtime(image, "vz".to_string()).await;
1921
1922 let mut spec = make_spec(image, None);
1923 spec.labels
1924 .insert("com.zlayer.isolation".to_string(), "sandbox".to_string());
1925 rt.create_container(&id, &spec).await.unwrap();
1926
1927 let calls = calls.lock().unwrap();
1928 assert_eq!(
1929 role_for(&calls, "create_container"),
1930 Some(Role::Primary),
1931 "com.zlayer.isolation=sandbox should opt out of VZ auto-detect (force the sandbox)"
1932 );
1933 }
1934
1935 fn make_composite_with_vz_linux() -> (CompositeRuntime, CallLog) {
1938 let calls = Arc::new(StdMutex::new(Vec::new()));
1939 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
1940 let delegate =
1941 Arc::new(MockRuntime::new(Role::Delegate, Arc::clone(&calls))) as Arc<dyn Runtime>;
1942 let vz_linux =
1943 Arc::new(MockRuntime::new(Role::VzLinux, Arc::clone(&calls))) as Arc<dyn Runtime>;
1944 let rt = CompositeRuntime::new(primary as Arc<dyn Runtime>, Some(delegate))
1945 .with_vz_linux_delegate(Some(vz_linux));
1946 (rt, calls)
1947 }
1948
1949 #[tokio::test]
1950 async fn dispatch_vz_linux_label_forces_vz_linux() {
1951 let (rt, calls) = make_composite_with_vz_linux();
1952 let id = cid("lin-svc", 0);
1953 let mut spec = make_spec("docker.io/library/alpine:3.19", None);
1954 spec.labels
1955 .insert("com.zlayer.isolation".to_string(), "vz-linux".to_string());
1956
1957 rt.create_container(&id, &spec).await.unwrap();
1958
1959 let calls = calls.lock().unwrap();
1960 assert_eq!(
1961 role_for(&calls, "create_container"),
1962 Some(Role::VzLinux),
1963 "com.zlayer.isolation=vz-linux must force the VZ Linux runtime"
1964 );
1965 }
1966
1967 #[tokio::test]
1968 async fn dispatch_vz_linux_marker_auto_routes_to_vz_linux() {
1969 let (rt, calls) = make_composite_with_vz_linux();
1970 let id = cid("lin-svc", 0);
1971 let image = "ghcr.io/org/linux-vz:bookworm";
1972 rt.record_image_runtime(image, "vz-linux".to_string()).await;
1973
1974 let spec = make_spec(image, None);
1975 rt.create_container(&id, &spec).await.unwrap();
1976
1977 let calls = calls.lock().unwrap();
1978 assert_eq!(
1979 role_for(&calls, "create_container"),
1980 Some(Role::VzLinux),
1981 "a com.zlayer.runtime=vz-linux marker should auto-route to the VZ Linux runtime"
1982 );
1983 }
1984
1985 #[tokio::test]
1986 async fn dispatch_linux_platform_with_vz_linux_routes_to_vz_linux() {
1987 let (rt, calls) = make_composite_with_vz_linux();
1988 let id = cid("lin-svc", 0);
1989 let spec = make_spec(
1992 "docker.io/library/alpine:3.19",
1993 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Arm64)),
1994 );
1995
1996 rt.create_container(&id, &spec).await.unwrap();
1997
1998 let calls = calls.lock().unwrap();
1999 assert_eq!(
2000 role_for(&calls, "create_container"),
2001 Some(Role::VzLinux),
2002 "a Linux platform spec must default to the VZ Linux runtime when present"
2003 );
2004 }
2005
2006 #[tokio::test]
2007 async fn dispatch_linux_image_os_with_vz_linux_routes_to_vz_linux() {
2008 let (rt, calls) = make_composite_with_vz_linux();
2009 let id = cid("lin-svc", 0);
2010 let image = "docker.io/library/nginx:1.25";
2011 rt.record_image_os(image, OsKind::Linux).await;
2012
2013 let spec = make_spec(image, None);
2014 rt.create_container(&id, &spec).await.unwrap();
2015
2016 let calls = calls.lock().unwrap();
2017 assert_eq!(
2018 role_for(&calls, "create_container"),
2019 Some(Role::VzLinux),
2020 "a Linux image-OS cache hit must default to the VZ Linux runtime when present"
2021 );
2022 }
2023
2024 #[tokio::test]
2025 async fn dispatch_macos_image_os_with_vz_linux_routes_to_primary() {
2026 let (rt, calls) = make_composite_with_vz_linux();
2030 let id = cid("mac-svc", 0);
2031 let image = "ghcr.io/zlayer/macos-native:latest";
2032 rt.record_image_os(image, OsKind::Macos).await;
2033
2034 let spec = make_spec(image, None);
2035 rt.create_container(&id, &spec).await.unwrap();
2036
2037 let calls = calls.lock().unwrap();
2038 assert_eq!(
2039 role_for(&calls, "create_container"),
2040 Some(Role::Primary),
2041 "image_os == Macos must route to primary even when VZ-Linux is the default",
2042 );
2043 }
2044
2045 #[tokio::test]
2046 async fn dispatch_unknown_os_with_vz_linux_defaults_to_vz_linux() {
2047 let (rt, calls) = make_composite_with_vz_linux();
2053 let id = cid("svc", 0);
2054 let spec = make_spec("docker.io/library/whatever:latest", None);
2055
2056 rt.create_container(&id, &spec).await.unwrap();
2057
2058 let calls = calls.lock().unwrap();
2059 assert_eq!(
2060 role_for(&calls, "create_container"),
2061 Some(Role::VzLinux),
2062 "an unknown-OS image must default to VZ-Linux when the delegate is present",
2063 );
2064 }
2065
2066 #[tokio::test]
2067 async fn dispatch_unknown_os_without_vz_linux_falls_through_to_primary() {
2068 let (rt, calls) = make_composite(true);
2072 let id = cid("svc", 0);
2073 let spec = make_spec("docker.io/library/whatever:latest", None);
2074
2075 rt.create_container(&id, &spec).await.unwrap();
2076
2077 let calls = calls.lock().unwrap();
2078 assert_eq!(
2079 role_for(&calls, "create_container"),
2080 Some(Role::Primary),
2081 "without a VZ-Linux delegate an unknown-OS image keeps the primary fallthrough",
2082 );
2083 }
2084
2085 async fn seed_persistent_linux_cache(path: &std::path::Path, image: &str) {
2089 seed_persistent_cache_with_os(path, image, "linux").await;
2090 }
2091
2092 async fn seed_persistent_cache_with_os(path: &std::path::Path, image: &str, os: &str) {
2095 let cache = zlayer_registry::CacheType::persistent_at(path)
2096 .build()
2097 .await
2098 .expect("open persistent blob cache");
2099
2100 let config_json = serde_json::json!({
2101 "architecture": "arm64",
2102 "os": os,
2103 "config": {},
2104 });
2105 let config_bytes = serde_json::to_vec(&config_json).unwrap();
2106 let config_digest = zlayer_registry::compute_digest(&config_bytes);
2107 cache.put(&config_digest, &config_bytes).await.unwrap();
2108
2109 let manifest = zlayer_registry::OciImageManifest {
2110 schema_version: 2,
2111 media_type: Some("application/vnd.oci.image.manifest.v1+json".to_string()),
2112 artifact_type: None,
2113 config: oci_client::manifest::OciDescriptor {
2114 media_type: "application/vnd.oci.image.config.v1+json".to_string(),
2115 digest: config_digest.clone(),
2116 size: i64::try_from(config_bytes.len()).unwrap(),
2117 urls: None,
2118 annotations: None,
2119 },
2120 layers: vec![],
2121 annotations: None,
2122 subject: None,
2123 };
2124 let manifest_bytes = serde_json::to_vec(&manifest).unwrap();
2125 let manifest_digest = zlayer_registry::compute_digest(&manifest_bytes);
2126 cache
2127 .put(&zlayer_registry::manifest_cache_key(image), &manifest_bytes)
2128 .await
2129 .unwrap();
2130 cache
2131 .put(
2132 &zlayer_registry::manifest_digest_cache_key(image),
2133 manifest_digest.as_bytes(),
2134 )
2135 .await
2136 .unwrap();
2137 }
2138
2139 #[tokio::test]
2145 async fn pull_then_dispatch_resolves_linux_os_from_local_cache_routes_to_vz_linux() {
2146 let tmp = tempfile::tempdir().unwrap();
2147 let cache_path = tmp.path().join("blobs.redb");
2148 let image = "docker.io/library/alpine:latest";
2149 seed_persistent_linux_cache(&cache_path, image).await;
2150
2151 let (rt, calls) = make_composite_with_vz_linux();
2152 let rt = rt.with_os_inspect_cache_path(Some(cache_path));
2153
2154 rt.pull_image(image).await.unwrap();
2156
2157 assert_eq!(
2159 rt.image_os.read().await.get(image).copied(),
2160 Some(OsKind::Linux),
2161 "pull_image must resolve Linux OS from the local persistent cache",
2162 );
2163
2164 let id = cid("lin-svc", 0);
2166 let spec = make_spec(image, None);
2167 rt.create_container(&id, &spec).await.unwrap();
2168
2169 let calls = calls.lock().unwrap();
2170 assert_eq!(
2171 role_for(&calls, "create_container"),
2172 Some(Role::VzLinux),
2173 "a Linux image whose OS came from the local cache must route to VZ-Linux",
2174 );
2175 }
2176
2177 #[tokio::test]
2184 async fn bare_ref_spec_resolves_os_from_qualified_seeded_cache_routes_to_vz_linux() {
2185 let tmp = tempfile::tempdir().unwrap();
2186 let cache_path = tmp.path().join("blobs.redb");
2187 seed_persistent_linux_cache(&cache_path, "docker.io/library/alpine:latest").await;
2189
2190 let (rt, calls) = make_composite_with_vz_linux();
2191 let rt = rt.with_os_inspect_cache_paths(vec![cache_path]);
2192
2193 let bare = "alpine:latest";
2196 rt.pull_image(bare).await.unwrap();
2197
2198 assert_eq!(
2199 rt.image_os.read().await.get(bare).copied(),
2200 Some(OsKind::Linux),
2201 "bare-ref inspect must resolve Linux from the qualified-seeded cache",
2202 );
2203
2204 let id = cid("lin-svc", 0);
2205 let spec = make_spec(bare, None);
2206 rt.create_container(&id, &spec).await.unwrap();
2207
2208 let calls = calls.lock().unwrap();
2209 assert_eq!(
2210 role_for(&calls, "create_container"),
2211 Some(Role::VzLinux),
2212 "bare-ref Linux image routes to VZ-Linux via the canonical-key cache hit",
2213 );
2214 }
2215
2216 #[tokio::test]
2222 async fn os_resolves_from_second_cache_when_first_is_empty() {
2223 let tmp = tempfile::tempdir().unwrap();
2224 let empty_cache = tmp.path().join("vz-linux-blobs.redb");
2225 let primary_cache = tmp.path().join("primary-blobs.redb");
2226 zlayer_registry::CacheType::persistent_at(&empty_cache)
2228 .build()
2229 .await
2230 .unwrap();
2231 seed_persistent_linux_cache(&primary_cache, "docker.io/library/alpine:latest").await;
2233
2234 let (rt, calls) = make_composite_with_vz_linux();
2235 let rt = rt.with_os_inspect_cache_paths(vec![empty_cache, primary_cache]);
2236
2237 let bare = "alpine:latest";
2238 rt.pull_image(bare).await.unwrap();
2239
2240 assert_eq!(
2241 rt.image_os.read().await.get(bare).copied(),
2242 Some(OsKind::Linux),
2243 "OS must resolve from the second cache after the first misses (no network)",
2244 );
2245
2246 let id = cid("lin-svc", 0);
2247 let spec = make_spec(bare, None);
2248 rt.create_container(&id, &spec).await.unwrap();
2249
2250 let calls = calls.lock().unwrap();
2251 assert_eq!(role_for(&calls, "create_container"), Some(Role::VzLinux),);
2252 }
2253
2254 #[tokio::test]
2266 async fn pull_with_network_429_still_dispatches_via_local_cache() {
2267 let tmp = tempfile::tempdir().unwrap();
2268 let cache_path = tmp.path().join("blobs.redb");
2269 let image = "registry.invalid.example/library/alpine:latest";
2272 seed_persistent_linux_cache(&cache_path, image).await;
2273
2274 let (rt, calls) = make_composite_with_vz_linux();
2275 let rt = rt.with_os_inspect_cache_path(Some(cache_path));
2276
2277 rt.pull_image(image).await.unwrap();
2281 assert_eq!(
2282 rt.image_os.read().await.get(image).copied(),
2283 Some(OsKind::Linux),
2284 "OS must be resolved from the local cache with no network call",
2285 );
2286
2287 let id = cid("lin-svc", 0);
2289 let spec = make_spec(image, None);
2290 rt.create_container(&id, &spec).await.unwrap();
2291
2292 let calls = calls.lock().unwrap();
2293 assert_eq!(
2294 role_for(&calls, "create_container"),
2295 Some(Role::VzLinux),
2296 "a would-be-429 pull must still route the cached Linux image to VZ-Linux",
2297 );
2298 }
2299
2300 #[tokio::test]
2305 async fn pull_then_dispatch_resolves_macos_os_from_local_cache_routes_to_primary() {
2306 let tmp = tempfile::tempdir().unwrap();
2307 let cache_path = tmp.path().join("blobs.redb");
2308 let image = "ghcr.io/zlayer/macos-native:latest";
2309 seed_persistent_cache_with_os(&cache_path, image, "darwin").await;
2310
2311 let (rt, calls) = make_composite_with_vz_linux();
2312 let rt = rt.with_os_inspect_cache_path(Some(cache_path));
2313
2314 rt.pull_image(image).await.unwrap();
2315 assert_eq!(
2316 rt.image_os.read().await.get(image).copied(),
2317 Some(OsKind::Macos),
2318 "pull_image must resolve macOS OS from the local persistent cache",
2319 );
2320
2321 let id = cid("mac-svc", 0);
2322 let spec = make_spec(image, None);
2323 rt.create_container(&id, &spec).await.unwrap();
2324
2325 let calls = calls.lock().unwrap();
2326 assert_eq!(
2327 role_for(&calls, "create_container"),
2328 Some(Role::Primary),
2329 "a macOS-native rootfs must route to primary even with VZ-Linux as default",
2330 );
2331 }
2332
2333 #[tokio::test]
2334 async fn dispatch_vm_label_forces_libkrun_delegate() {
2335 let (rt, calls) = make_composite_with_vz_linux();
2336 let id = cid("lin-svc", 0);
2337 let mut spec = make_spec(
2340 "docker.io/library/alpine:3.19",
2341 Some(TargetPlatform::new(OsKind::Linux, ArchKind::Arm64)),
2342 );
2343 spec.labels
2344 .insert("com.zlayer.isolation".to_string(), "vm".to_string());
2345
2346 rt.create_container(&id, &spec).await.unwrap();
2347
2348 let calls = calls.lock().unwrap();
2349 assert_eq!(
2350 role_for(&calls, "create_container"),
2351 Some(Role::Delegate),
2352 "com.zlayer.isolation=vm must force the libkrun delegate even when VZ Linux is default"
2353 );
2354 }
2355
2356 #[tokio::test]
2357 async fn dispatch_unmarked_image_with_vz_delegate_falls_through_to_primary() {
2358 let (rt, calls) = make_composite_with_vz();
2359 let id = cid("mac-svc", 0);
2360 let spec = make_spec("ghcr.io/org/plain:1", None);
2363 rt.create_container(&id, &spec).await.unwrap();
2364
2365 let calls = calls.lock().unwrap();
2366 assert_eq!(
2367 role_for(&calls, "create_container"),
2368 Some(Role::Primary),
2369 "an unmarked image must fall through to primary even when a VZ delegate is attached"
2370 );
2371 }
2372
2373 #[tokio::test]
2374 async fn per_container_dispatch_cache_persists_through_start_stop() {
2375 let (rt, calls) = make_composite(true);
2376 let id = cid("win-svc", 0);
2377 let spec = make_spec(
2378 "mcr.microsoft.com/windows/nanoserver:ltsc2022",
2379 Some(TargetPlatform::new(OsKind::Windows, ArchKind::Amd64)),
2380 );
2381
2382 rt.create_container(&id, &spec).await.unwrap();
2383 rt.start_container(&id).await.unwrap();
2384 rt.stop_container(&id, Duration::from_secs(1))
2385 .await
2386 .unwrap();
2387 rt.remove_container(&id).await.unwrap();
2388
2389 let recorded = calls.lock().unwrap().clone();
2390 for method in [
2391 "create_container",
2392 "start_container",
2393 "stop_container",
2394 "remove_container",
2395 ] {
2396 assert_eq!(
2397 role_for(&recorded, method),
2398 Some(Role::Primary),
2399 "{method} should have dispatched to primary"
2400 );
2401 }
2402
2403 let after = rt
2405 .start_container(&id)
2406 .await
2407 .expect_err("lookup after remove should fail");
2408 assert!(
2409 matches!(after, AgentError::NotFound { .. }),
2410 "expected NotFound after remove, got {after:?}"
2411 );
2412 }
2413
2414 #[tokio::test]
2415 async fn pull_image_calls_both_runtimes() {
2416 let (rt, calls) = make_composite(true);
2417 rt.pull_image("docker.io/library/alpine:3.19")
2418 .await
2419 .unwrap();
2420
2421 let recorded = calls.lock().unwrap();
2422 let pull_calls: Vec<Role> = recorded
2423 .iter()
2424 .filter(|(_, m, _)| m == "pull_image")
2425 .map(|(r, _, _)| *r)
2426 .collect();
2427 assert!(
2428 pull_calls.contains(&Role::Primary),
2429 "primary should have been pulled: {pull_calls:?}",
2430 );
2431 assert!(
2432 pull_calls.contains(&Role::Delegate),
2433 "delegate should have been pulled: {pull_calls:?}",
2434 );
2435 }
2436
2437 #[tokio::test]
2438 async fn pull_image_delegate_error_does_not_fail() {
2439 let calls = Arc::new(StdMutex::new(Vec::new()));
2442 let primary = Arc::new(MockRuntime::new(Role::Primary, Arc::clone(&calls)));
2443 let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2444 delegate.pull_image_error = Some("simulated delegate pull failure".to_string());
2445 let rt = CompositeRuntime::new(
2446 primary as Arc<dyn Runtime>,
2447 Some(Arc::new(delegate) as Arc<dyn Runtime>),
2448 );
2449
2450 rt.pull_image("docker.io/library/alpine:3.19")
2452 .await
2453 .unwrap();
2454
2455 let recorded = calls.lock().unwrap();
2456 let pull_calls: Vec<Role> = recorded
2457 .iter()
2458 .filter(|(_, m, _)| m == "pull_image")
2459 .map(|(r, _, _)| *r)
2460 .collect();
2461 assert!(
2462 pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
2463 "both runtimes should have been called: {pull_calls:?}",
2464 );
2465 }
2466
2467 #[tokio::test]
2468 async fn pull_image_primary_wrong_platform_does_not_fail() {
2469 let calls = Arc::new(StdMutex::new(Vec::new()));
2475 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2476 primary.pull_image_wrong_platform = Some(("windows", "linux"));
2477 let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2478 let rt = CompositeRuntime::new(
2479 Arc::new(primary) as Arc<dyn Runtime>,
2480 Some(Arc::new(delegate) as Arc<dyn Runtime>),
2481 );
2482
2483 rt.pull_image("docker.io/library/alpine:3.19")
2485 .await
2486 .expect("composite pull must tolerate WrongPlatform from primary");
2487
2488 let recorded = calls.lock().unwrap();
2489 let pull_calls: Vec<Role> = recorded
2490 .iter()
2491 .filter(|(_, m, _)| m == "pull_image")
2492 .map(|(r, _, _)| *r)
2493 .collect();
2494 assert!(
2495 pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
2496 "delegate must still be called when primary soft-skips: {pull_calls:?}",
2497 );
2498 }
2499
2500 #[tokio::test]
2501 async fn pull_image_with_policy_primary_wrong_platform_does_not_fail() {
2502 let calls = Arc::new(StdMutex::new(Vec::new()));
2507 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2508 primary.pull_image_wrong_platform = Some(("windows", "linux"));
2509 let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2510 let rt = CompositeRuntime::new(
2511 Arc::new(primary) as Arc<dyn Runtime>,
2512 Some(Arc::new(delegate) as Arc<dyn Runtime>),
2513 );
2514
2515 rt.pull_image_with_policy(
2516 "docker.io/library/alpine:3.19",
2517 PullPolicy::IfNotPresent,
2518 None,
2519 zlayer_spec::SourcePolicy::default(),
2520 )
2521 .await
2522 .expect("composite pull_image_with_policy must tolerate WrongPlatform from primary");
2523
2524 let recorded = calls.lock().unwrap();
2525 let pull_calls: Vec<Role> = recorded
2526 .iter()
2527 .filter(|(_, m, _)| m == "pull_image_with_policy")
2528 .map(|(r, _, _)| *r)
2529 .collect();
2530 assert!(
2531 pull_calls.contains(&Role::Primary) && pull_calls.contains(&Role::Delegate),
2532 "delegate must still be called when primary soft-skips: {pull_calls:?}",
2533 );
2534 }
2535
2536 #[tokio::test]
2537 async fn pull_image_primary_non_wrong_platform_error_still_fails() {
2538 let calls = Arc::new(StdMutex::new(Vec::new()));
2542 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2543 primary.pull_image_error = Some("simulated real failure".to_string());
2544 let delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2545 let rt = CompositeRuntime::new(
2546 Arc::new(primary) as Arc<dyn Runtime>,
2547 Some(Arc::new(delegate) as Arc<dyn Runtime>),
2548 );
2549
2550 let err = rt
2551 .pull_image("docker.io/library/alpine:3.19")
2552 .await
2553 .expect_err("real primary error must propagate");
2554 assert!(
2555 matches!(err, AgentError::Internal(_)),
2556 "expected Internal, got {err:?}",
2557 );
2558 }
2559
2560 #[tokio::test]
2561 async fn list_images_merges_both() {
2562 let calls = Arc::new(StdMutex::new(Vec::new()));
2564 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2565 primary.list_images_response = vec![ImageInfo {
2566 reference: "primary/image:1".to_string(),
2567 digest: None,
2568 size_bytes: None,
2569 }];
2570 let mut delegate = MockRuntime::new(Role::Delegate, Arc::clone(&calls));
2571 delegate.list_images_response = vec![ImageInfo {
2572 reference: "delegate/image:1".to_string(),
2573 digest: None,
2574 size_bytes: None,
2575 }];
2576 let rt = CompositeRuntime::new(
2577 Arc::new(primary) as Arc<dyn Runtime>,
2578 Some(Arc::new(delegate) as Arc<dyn Runtime>),
2579 );
2580
2581 let merged = rt.list_images().await.unwrap();
2582 let refs: Vec<&str> = merged.iter().map(|i| i.reference.as_str()).collect();
2583 assert!(
2584 refs.contains(&"primary/image:1") && refs.contains(&"delegate/image:1"),
2585 "merged list should contain both entries, got {refs:?}",
2586 );
2587 }
2588
2589 #[tokio::test]
2597 async fn list_images_tolerates_primary_unsupported_and_uses_vz_linux() {
2598 let calls = Arc::new(StdMutex::new(Vec::new()));
2599 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2600 primary.list_images_error = Some("list_images is not supported".to_string());
2601 let mut vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2602 vz_linux.list_images_response = vec![ImageInfo {
2603 reference: "docker.io/library/alpine:latest".to_string(),
2604 digest: None,
2605 size_bytes: None,
2606 }];
2607
2608 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2609 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2610
2611 let images = rt
2612 .list_images()
2613 .await
2614 .expect("primary Unsupported must not fail the composite list_images");
2615 let refs: Vec<&str> = images.iter().map(|i| i.reference.as_str()).collect();
2616 assert_eq!(
2617 refs,
2618 vec!["docker.io/library/alpine:latest"],
2619 "should return the VZ-Linux delegate's images, got {refs:?}",
2620 );
2621 }
2622
2623 #[tokio::test]
2627 async fn list_images_errors_only_when_all_backends_fail() {
2628 let calls = Arc::new(StdMutex::new(Vec::new()));
2629 let mut primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2630 primary.list_images_error = Some("unsupported".to_string());
2631 let mut vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2632 vz_linux.list_images_error = Some("also unsupported".to_string());
2633
2634 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2635 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2636
2637 let err = rt.list_images().await.unwrap_err();
2638 assert!(
2639 matches!(err, AgentError::Unsupported(_)),
2640 "all-backends-fail should surface Unsupported, got {err:?}",
2641 );
2642 }
2643
2644 #[tokio::test]
2650 async fn prune_images_tolerates_primary_unsupported_and_uses_delegate() {
2651 let calls = Arc::new(StdMutex::new(Vec::new()));
2652 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2654 let delegate =
2655 MockRuntime::new(Role::Delegate, Arc::clone(&calls)).with_prune_result(PruneResult {
2656 deleted: vec![
2657 "docker.io/library/alpine:3.19".to_string(),
2658 "docker.io/library/nginx:1.25".to_string(),
2659 ],
2660 space_reclaimed: 4096,
2661 });
2662
2663 let rt = CompositeRuntime::new(
2664 Arc::new(primary) as Arc<dyn Runtime>,
2665 Some(Arc::new(delegate) as Arc<dyn Runtime>),
2666 );
2667
2668 let result = rt
2669 .prune_images()
2670 .await
2671 .expect("primary Unsupported must not fail the composite prune_images");
2672 assert_eq!(
2673 result.deleted,
2674 vec![
2675 "docker.io/library/alpine:3.19".to_string(),
2676 "docker.io/library/nginx:1.25".to_string(),
2677 ],
2678 "should return the delegate's deleted images, got {:?}",
2679 result.deleted,
2680 );
2681 assert_eq!(
2682 result.space_reclaimed, 4096,
2683 "should return the delegate's reclaimed bytes",
2684 );
2685
2686 let calls = calls.lock().unwrap();
2687 assert_eq!(
2688 role_for(&calls, "prune_images"),
2689 Some(Role::Primary),
2690 "primary prune_images must still be attempted first",
2691 );
2692 assert!(
2693 calls
2694 .iter()
2695 .any(|(role, m, _)| *role == Role::Delegate && m == "prune_images"),
2696 "delegate prune_images must be invoked after the primary miss",
2697 );
2698 }
2699
2700 fn log_entry(stream: LogStream, message: &str) -> LogEntry {
2714 LogEntry {
2715 timestamp: chrono::Utc::now(),
2716 stream,
2717 source: zlayer_observability::logs::LogSource::Container("test".to_string()),
2718 message: message.to_string(),
2719 service: None,
2720 deployment: None,
2721 }
2722 }
2723
2724 async fn drain_logs(stream: LogsStream) -> String {
2726 use futures_util::StreamExt as _;
2727 let mut out = Vec::new();
2728 let mut s = stream;
2729 while let Some(item) = s.next().await {
2730 out.extend_from_slice(&item.expect("log chunk ok").bytes);
2731 }
2732 String::from_utf8(out).expect("utf8 log body")
2733 }
2734
2735 async fn drain_stats(stream: StatsStream) -> Vec<StatsSample> {
2737 use futures_util::StreamExt as _;
2738 let mut out = Vec::new();
2739 let mut s = stream;
2740 while let Some(item) = s.next().await {
2741 out.push(item.expect("stats sample ok"));
2742 }
2743 out
2744 }
2745
2746 async fn make_read_composite(owner: Role) -> (CompositeRuntime, ContainerId, CallLog) {
2752 let calls = Arc::new(StdMutex::new(Vec::new()));
2753 let logs = vec![
2754 log_entry(LogStream::Stdout, "hello stdout"),
2755 log_entry(LogStream::Stderr, "hello stderr"),
2756 ];
2757 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls))
2758 .with_stream_unsupported()
2759 .with_logs(logs.clone());
2760 let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls)).with_logs(logs);
2761 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2762 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2763
2764 let id = cid("read-svc", 0);
2765 let target = match owner {
2768 Role::Primary => DispatchTarget::Primary,
2769 Role::VzLinux => DispatchTarget::VzLinux,
2770 other => panic!("make_read_composite supports Primary/VzLinux, not {other:?}"),
2771 };
2772 rt.dispatch.write().await.insert(id.clone(), target);
2773 (rt, id, calls)
2774 }
2775
2776 #[tokio::test]
2777 async fn logs_stream_falls_back_to_snapshot_when_owner_has_no_stream() {
2778 let calls = Arc::new(StdMutex::new(Vec::new()));
2782 let logs = vec![
2783 log_entry(LogStream::Stdout, "hello stdout"),
2784 log_entry(LogStream::Stderr, "hello stderr"),
2785 ];
2786 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls))
2787 .with_stream_unsupported()
2788 .with_logs(logs);
2789 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None);
2790 let id = cid("read-svc", 0);
2791 rt.dispatch
2792 .write()
2793 .await
2794 .insert(id.clone(), DispatchTarget::Primary);
2795
2796 let stream = rt
2797 .logs_stream(&id, LogsStreamOptions::default())
2798 .await
2799 .expect("logs_stream must not 500 when snapshot reads work");
2800 let body = drain_logs(stream).await;
2801 assert!(
2802 body.contains("hello stdout") && body.contains("hello stderr"),
2803 "synthesised stream must carry the captured logs, got: {body:?}",
2804 );
2805 }
2806
2807 #[tokio::test]
2808 async fn logs_stream_routes_to_delegate_owner_native_stream() {
2809 let (rt, id, calls) = make_read_composite(Role::VzLinux).await;
2812 let stream = rt
2813 .logs_stream(&id, LogsStreamOptions::default())
2814 .await
2815 .expect("delegate-owned logs_stream must succeed");
2816 let body = drain_logs(stream).await;
2817 assert!(body.contains("hello stdout"), "got: {body:?}");
2818
2819 let log = calls.lock().expect("call-log mutex poisoned");
2820 assert_eq!(
2821 role_for(&log, "logs_stream"),
2822 Some(Role::VzLinux),
2823 "logs_stream must hit the owning delegate first, calls: {log:?}",
2824 );
2825 }
2826
2827 #[tokio::test]
2828 async fn get_logs_falls_back_across_backends() {
2829 let (rt, id, _calls) = make_read_composite(Role::Primary).await;
2833 let logs = rt.get_logs(&id).await.expect("get_logs must succeed");
2834 assert_eq!(logs.len(), 2, "owner snapshot logs should be returned");
2835 }
2836
2837 #[tokio::test]
2838 async fn stats_stream_falls_back_to_snapshot_when_owner_has_no_stream() {
2839 let calls = Arc::new(StdMutex::new(Vec::new()));
2844 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_stream_unsupported();
2845 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None);
2846 let id = cid("read-svc", 0);
2847 rt.dispatch
2848 .write()
2849 .await
2850 .insert(id.clone(), DispatchTarget::Primary);
2851
2852 let stream = rt
2853 .stats_stream(&id)
2854 .await
2855 .expect("stats_stream must not 500 when get_container_stats works");
2856 let samples = drain_stats(stream).await;
2857 assert_eq!(samples.len(), 1, "snapshot fallback yields one sample");
2858 assert!(
2859 samples[0].mem_used_bytes > 0,
2860 "synthesised sample must carry non-zero memory, got {:?}",
2861 samples[0],
2862 );
2863 assert_eq!(
2864 samples[0].cpu_total_ns, 1_000_000,
2865 "cpu microseconds must be scaled to nanoseconds in the synthesised sample",
2866 );
2867 }
2868
2869 #[tokio::test]
2870 async fn get_container_stats_tolerates_owner_miss_and_uses_other_backend() {
2871 let calls = Arc::new(StdMutex::new(Vec::new()));
2876 let primary =
2877 MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_stats_snapshot_unsupported();
2878 let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2879 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2880 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2881 let id = cid("read-svc", 0);
2882 rt.dispatch
2883 .write()
2884 .await
2885 .insert(id.clone(), DispatchTarget::Primary);
2886
2887 let stats = rt
2888 .get_container_stats(&id)
2889 .await
2890 .expect("owner Unsupported must fall back to the delegate, not 500");
2891 assert!(stats.memory_bytes > 0, "delegate stats should be returned");
2892
2893 let log = calls.lock().expect("call-log mutex poisoned");
2894 assert!(
2895 log.iter()
2896 .any(|(role, method, _)| *role == Role::Primary && method == "get_container_stats"),
2897 "primary must have been tried first, calls: {log:?}",
2898 );
2899 assert!(
2900 log.iter()
2901 .any(|(role, method, _)| *role == Role::VzLinux && method == "get_container_stats"),
2902 "delegate must have served the fallback, calls: {log:?}",
2903 );
2904 }
2905
2906 #[tokio::test]
2907 async fn reads_propagate_not_found_when_no_backend_owns_container() {
2908 let calls = Arc::new(StdMutex::new(Vec::new()));
2912 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls)).with_reads_not_found();
2913 let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls)).with_reads_not_found();
2914 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2915 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2916 let id = cid("read-svc", 0);
2917 rt.dispatch
2918 .write()
2919 .await
2920 .insert(id.clone(), DispatchTarget::Primary);
2921
2922 match rt.logs_stream(&id, LogsStreamOptions::default()).await {
2925 Err(AgentError::NotFound { .. }) => {}
2926 other => panic!(
2927 "all-not-found logs_stream must be NotFound (404), got {:?}",
2928 other.err(),
2929 ),
2930 }
2931 match rt.stats_stream(&id).await {
2932 Err(AgentError::NotFound { .. }) => {}
2933 other => panic!(
2934 "all-not-found stats_stream must be NotFound (404), got {:?}",
2935 other.err(),
2936 ),
2937 }
2938 let cl_err = rt.container_logs(&id, 10).await.unwrap_err();
2939 assert!(
2940 matches!(cl_err, AgentError::NotFound { .. }),
2941 "all-not-found container_logs must be NotFound (404), got {cl_err:?}",
2942 );
2943 }
2944
2945 #[tokio::test]
2946 async fn reads_on_undispatched_container_are_not_found() {
2947 let (rt, _calls) = make_composite(false);
2949 let id = cid("ghost", 0);
2950 match rt.logs_stream(&id, LogsStreamOptions::default()).await {
2951 Err(AgentError::NotFound { .. }) => {}
2952 other => panic!(
2953 "undispatched logs_stream must be NotFound, got {:?}",
2954 other.err()
2955 ),
2956 }
2957 }
2958
2959 #[tokio::test]
2965 async fn pull_image_fans_out_to_vz_linux() {
2966 let calls = Arc::new(StdMutex::new(Vec::new()));
2967 let primary = MockRuntime::new(Role::Primary, Arc::clone(&calls));
2968 let vz_linux = MockRuntime::new(Role::VzLinux, Arc::clone(&calls));
2969
2970 let rt = CompositeRuntime::new(Arc::new(primary) as Arc<dyn Runtime>, None)
2971 .with_vz_linux_delegate(Some(Arc::new(vz_linux) as Arc<dyn Runtime>));
2972
2973 rt.pull_image("docker.io/library/alpine:latest")
2974 .await
2975 .expect("pull should succeed");
2976
2977 let log = calls.lock().expect("call-log mutex poisoned");
2978 assert!(
2979 log.iter()
2980 .any(|(role, method, _)| *role == Role::VzLinux && method == "pull_image"),
2981 "pull_image must reach the VZ-Linux delegate, recorded calls: {log:?}",
2982 );
2983 }
2984
2985 #[tokio::test]
2986 async fn dispatch_lookup_unknown_container_errors() {
2987 let (rt, _calls) = make_composite(true);
2988 let id = cid("ghost", 0);
2989
2990 let err = rt.start_container(&id).await.unwrap_err();
2991 assert!(
2992 matches!(err, AgentError::NotFound { .. }),
2993 "expected NotFound for unknown container, got {err:?}"
2994 );
2995 }
2996
2997 async fn cached_os(rt: &CompositeRuntime, image: &str) -> Option<OsKind> {
2999 rt.image_os.read().await.get(image).copied()
3000 }
3001
3002 #[tokio::test]
3003 async fn apply_image_os_inspection_populates_cache_on_ok_some() {
3004 let (rt, _calls) = make_composite(true);
3008 let image = "docker.io/library/alpine:3.19";
3009
3010 rt.apply_image_os_inspection(image, Ok(Some(OsKind::Linux)))
3011 .await;
3012
3013 assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
3014 }
3015
3016 #[tokio::test]
3017 async fn apply_image_os_inspection_leaves_cache_untouched_on_ok_none() {
3018 let (rt, _calls) = make_composite(true);
3022 let image = "docker.io/library/nginx:1.25";
3023
3024 rt.apply_image_os_inspection(image, Ok(None)).await;
3025
3026 assert_eq!(cached_os(&rt, image).await, None);
3027 }
3028
3029 #[tokio::test]
3030 async fn apply_image_os_inspection_leaves_cache_untouched_on_err() {
3031 let (rt, _calls) = make_composite(true);
3034 let image = "docker.io/library/nginx:1.25";
3035
3036 rt.record_image_os(image, OsKind::Linux).await;
3039
3040 let err = zlayer_registry::RegistryError::NotFound {
3041 registry: "docker.io".to_string(),
3042 image: image.to_string(),
3043 };
3044 rt.apply_image_os_inspection(image, Err(err)).await;
3045
3046 assert_eq!(cached_os(&rt, image).await, Some(OsKind::Linux));
3048 }
3049
3050 #[tokio::test]
3051 async fn pull_image_inspection_failure_does_not_fail_pull() {
3052 let (rt, _calls) = make_composite(true);
3058 let image = "invalid.example.invalid/ghost:v1";
3059
3060 rt.pull_image(image).await.unwrap();
3061
3062 assert_eq!(
3063 cached_os(&rt, image).await,
3064 None,
3065 "failed inspection must not populate the image-OS cache"
3066 );
3067 }
3068
3069 #[tokio::test]
3070 async fn pull_image_with_policy_inspection_failure_does_not_fail_pull() {
3071 let (rt, _calls) = make_composite(true);
3074 let image = "invalid.example.invalid/ghost:v1";
3075
3076 rt.pull_image_with_policy(
3077 image,
3078 PullPolicy::IfNotPresent,
3079 None,
3080 zlayer_spec::SourcePolicy::default(),
3081 )
3082 .await
3083 .unwrap();
3084
3085 assert_eq!(cached_os(&rt, image).await, None);
3086 }
3087
3088 #[test]
3089 fn os_kind_from_oci_str_roundtrip() {
3090 for os in [OsKind::Linux, OsKind::Windows, OsKind::Macos] {
3095 assert_eq!(OsKind::from_oci_str(os.as_oci_str()), Some(os));
3096 }
3097 assert_eq!(OsKind::from_oci_str(""), None);
3098 assert_eq!(OsKind::from_oci_str("freebsd"), None);
3099 }
3100}