use crate::cgroups_stats::ContainerStats;
use crate::error::{AgentError, Result};
use futures_util::Stream;
use std::collections::VecDeque;
use std::future::Future;
use std::net::IpAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use zlayer_observability::logs::{LogEntry, LogSource, LogStream};
use zlayer_spec::{PullPolicy, RegistryAuth, ServiceSpec};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ContainerState {
Pending,
Initializing,
Running,
Stopping,
Exited { code: i32 },
Failed { reason: String },
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ContainerId {
pub service: String,
pub replica: u32,
}
impl std::fmt::Display for ContainerId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}-rep-{}", self.service, self.replica)
}
}
pub struct Container {
pub id: ContainerId,
pub state: ContainerState,
pub pid: Option<u32>,
pub task: Option<JoinHandle<std::io::Result<()>>>,
pub overlay_ip: Option<IpAddr>,
pub health_monitor: Option<JoinHandle<()>>,
pub port_override: Option<u16>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ImageInfo {
pub reference: String,
pub digest: Option<String>,
pub size_bytes: Option<u64>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct PruneResult {
pub deleted: Vec<String>,
pub space_reclaimed: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WaitReason {
Exited,
Signal,
OomKilled,
RuntimeError,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum WaitCondition {
#[default]
NotRunning,
NextExit,
Removed,
}
impl WaitCondition {
#[must_use]
pub const fn as_wire_str(self) -> &'static str {
match self {
Self::NotRunning => "not-running",
Self::NextExit => "next-exit",
Self::Removed => "removed",
}
}
#[must_use]
pub fn from_wire_str(s: &str) -> Option<Self> {
match s {
"not-running" | "" => Some(Self::NotRunning),
"next-exit" => Some(Self::NextExit),
"removed" => Some(Self::Removed),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct WaitOutcome {
pub exit_code: i32,
pub reason: WaitReason,
pub signal: Option<String>,
pub finished_at: Option<chrono::DateTime<chrono::Utc>>,
}
impl WaitOutcome {
#[must_use]
pub fn exited(exit_code: i32) -> Self {
Self {
exit_code,
reason: WaitReason::Exited,
signal: None,
finished_at: None,
}
}
}
#[must_use]
pub fn signal_name_from_exit_code(exit_code: i32) -> Option<String> {
if exit_code <= 128 {
return None;
}
let n = exit_code - 128;
let name = match n {
1 => "SIGHUP",
2 => "SIGINT",
3 => "SIGQUIT",
4 => "SIGILL",
6 => "SIGABRT",
7 => "SIGBUS",
8 => "SIGFPE",
9 => "SIGKILL",
10 => "SIGUSR1",
11 => "SIGSEGV",
12 => "SIGUSR2",
13 => "SIGPIPE",
14 => "SIGALRM",
15 => "SIGTERM",
17 => "SIGSTOP",
18 => "SIGCONT",
_ => return Some(format!("signal_{n}")),
};
Some(name.to_string())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExecEvent {
Stdout(String),
Stderr(String),
Exit(i32),
}
pub type ExecEventStream = Pin<Box<dyn Stream<Item = ExecEvent> + Send>>;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
#[allow(clippy::struct_excessive_bools)] pub struct ExecOptions {
pub command: Vec<String>,
pub env: Vec<String>,
pub working_dir: Option<String>,
pub user: Option<String>,
pub privileged: bool,
pub tty: bool,
pub attach_stdin: bool,
pub attach_stdout: bool,
pub attach_stderr: bool,
}
pub trait ExecDuplex: AsyncRead + AsyncWrite + Send + Unpin {}
impl<T> ExecDuplex for T where T: AsyncRead + AsyncWrite + Send + Unpin + ?Sized {}
pub type ExecPtyStream = Box<dyn ExecDuplex + 'static>;
pub type ExecExitFuture = Pin<Box<dyn Future<Output = Result<i32>> + Send>>;
pub struct ExecHandle {
pub stream: ExecPtyStream,
pub resize: tokio::sync::mpsc::Sender<(u16, u16)>,
pub exit: ExecExitFuture,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum LogChannel {
Stdin,
Stdout,
Stderr,
}
#[derive(Debug, Clone)]
pub struct LogChunk {
pub stream: LogChannel,
pub bytes: bytes::Bytes,
pub timestamp: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Default)]
#[allow(clippy::struct_excessive_bools)] pub struct LogsStreamOptions {
pub follow: bool,
pub tail: Option<u64>,
pub since: Option<i64>,
pub until: Option<i64>,
pub timestamps: bool,
pub stdout: bool,
pub stderr: bool,
}
#[derive(Debug, Clone)]
pub struct StatsSample {
pub cpu_total_ns: u64,
pub cpu_system_ns: u64,
pub online_cpus: u32,
pub mem_used_bytes: u64,
pub mem_limit_bytes: u64,
pub net_rx_bytes: u64,
pub net_tx_bytes: u64,
pub blkio_read_bytes: u64,
pub blkio_write_bytes: u64,
pub pids_current: u64,
pub pids_limit: Option<u64>,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone)]
pub enum PullProgress {
Status {
id: Option<String>,
status: String,
progress: Option<String>,
current: Option<u64>,
total: Option<u64>,
},
Done {
reference: String,
digest: Option<String>,
},
}
pub type LogsStream = Pin<Box<dyn Stream<Item = Result<LogChunk>> + Send + 'static>>;
pub type StatsStream = Pin<Box<dyn Stream<Item = Result<StatsSample>> + Send + 'static>>;
pub type PullProgressStream = Pin<Box<dyn Stream<Item = Result<PullProgress>> + Send + 'static>>;
pub type ArchiveStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes>> + Send + 'static>>;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PathStat {
pub name: String,
pub size: i64,
pub mode: u32,
pub mtime: Option<String>,
pub link_target: String,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct ArchivePutOptions {
pub no_overwrite_dir_non_dir: bool,
pub copy_uid_gid: bool,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct NetworkAttachmentDetail {
pub network: String,
pub aliases: Vec<String>,
pub ipv4: Option<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct HealthDetail {
pub status: String,
pub failing_streak: Option<u32>,
pub last_output: Option<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ContainerInspectDetails {
pub ports: Vec<zlayer_spec::PortMapping>,
pub networks: Vec<NetworkAttachmentDetail>,
pub ipv4: Option<String>,
pub health: Option<HealthDetail>,
pub exit_code: Option<i32>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct RuntimeContainerSummary {
pub runtime_id: String,
pub zlayer_container_id_label: Option<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ContainerTopOutput {
pub titles: Vec<String>,
pub processes: Vec<Vec<String>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FilesystemChangeKind {
Modified,
Added,
Deleted,
}
impl FilesystemChangeKind {
#[must_use]
pub const fn as_docker_kind(self) -> u8 {
match self {
Self::Modified => 0,
Self::Added => 1,
Self::Deleted => 2,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct FilesystemChangeEntry {
pub path: String,
pub kind: FilesystemChangeKind,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PortMappingEntry {
pub container_port: u16,
pub protocol: String,
pub host_ip: Option<String>,
pub host_port: Option<u16>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct ContainerPruneResult {
pub deleted: Vec<String>,
pub space_reclaimed: u64,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ContainerRestartPolicyUpdate {
pub name: Option<String>,
pub maximum_retry_count: Option<i64>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ContainerResourceUpdate {
pub cpu_shares: Option<i64>,
pub memory: Option<i64>,
pub cpu_period: Option<i64>,
pub cpu_quota: Option<i64>,
pub cpu_realtime_period: Option<i64>,
pub cpu_realtime_runtime: Option<i64>,
pub cpuset_cpus: Option<String>,
pub cpuset_mems: Option<String>,
pub memory_reservation: Option<i64>,
pub memory_swap: Option<i64>,
pub kernel_memory: Option<i64>,
pub blkio_weight: Option<u16>,
pub pids_limit: Option<i64>,
pub restart_policy: Option<ContainerRestartPolicyUpdate>,
}
impl ContainerResourceUpdate {
#[must_use]
pub fn is_empty(&self) -> bool {
self.cpu_shares.is_none()
&& self.memory.is_none()
&& self.cpu_period.is_none()
&& self.cpu_quota.is_none()
&& self.cpu_realtime_period.is_none()
&& self.cpu_realtime_runtime.is_none()
&& self.cpuset_cpus.is_none()
&& self.cpuset_mems.is_none()
&& self.memory_reservation.is_none()
&& self.memory_swap.is_none()
&& self.kernel_memory.is_none()
&& self.blkio_weight.is_none()
&& self.pids_limit.is_none()
&& self.restart_policy.is_none()
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ContainerUpdateOutcome {
pub warnings: Vec<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct ImageInspectInfo {
pub id: Option<String>,
pub repo_tags: Vec<String>,
pub repo_digests: Vec<String>,
pub parent: Option<String>,
pub comment: Option<String>,
pub created: Option<String>,
pub container: Option<String>,
pub docker_version: Option<String>,
pub author: Option<String>,
pub architecture: Option<String>,
pub os: Option<String>,
pub size: Option<u64>,
pub layers: Vec<String>,
pub env: Vec<String>,
pub cmd: Vec<String>,
pub entrypoint: Vec<String>,
pub working_dir: Option<String>,
pub user: Option<String>,
pub labels: std::collections::BTreeMap<String, String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct ImageHistoryEntry {
pub id: String,
pub created: i64,
pub created_by: String,
pub tags: Vec<String>,
pub size: u64,
pub comment: String,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct ImageSearchResult {
pub name: String,
pub description: String,
pub star_count: u64,
pub official: bool,
pub automated: bool,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct CommitOutcome {
pub id: String,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct CommitOptions {
pub repo: Option<String>,
pub tag: Option<String>,
pub comment: Option<String>,
pub author: Option<String>,
pub pause: bool,
pub changes: Option<String>,
}
pub type ImageExportStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes>> + Send + 'static>>;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum LoadProgress {
Status {
#[serde(default, skip_serializing_if = "Option::is_none")]
id: Option<String>,
status: String,
},
Done {
references: Vec<String>,
},
}
pub type LoadProgressStream = Pin<Box<dyn Stream<Item = Result<LoadProgress>> + Send + 'static>>;
#[async_trait::async_trait]
pub trait Runtime: Send + Sync {
async fn pull_image(&self, image: &str) -> Result<()>;
async fn pull_image_with_policy(
&self,
image: &str,
policy: PullPolicy,
auth: Option<&RegistryAuth>,
) -> Result<()>;
async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()>;
async fn start_container(&self, id: &ContainerId) -> Result<()>;
async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()>;
async fn remove_container(&self, id: &ContainerId) -> Result<()>;
async fn container_state(&self, id: &ContainerId) -> Result<ContainerState>;
async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>>;
async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)>;
async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
let (exit, stdout, stderr) = self.exec(id, cmd).await?;
let mut events: Vec<ExecEvent> = Vec::with_capacity(3);
if !stdout.is_empty() {
events.push(ExecEvent::Stdout(stdout));
}
if !stderr.is_empty() {
events.push(ExecEvent::Stderr(stderr));
}
events.push(ExecEvent::Exit(exit));
Ok(Box::pin(futures_util::stream::iter(events)))
}
async fn exec_pty(&self, _id: &ContainerId, _opts: ExecOptions) -> Result<ExecHandle> {
Err(AgentError::Unsupported(
"exec_pty is not supported by this runtime".into(),
))
}
async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats>;
async fn wait_container(&self, id: &ContainerId) -> Result<i32>;
async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
let exit_code = self.wait_container(id).await?;
Ok(WaitOutcome::exited(exit_code))
}
async fn wait_outcome_with_condition(
&self,
id: &ContainerId,
_condition: WaitCondition,
) -> Result<WaitOutcome> {
self.wait_outcome(id).await
}
async fn rename_container(&self, _id: &ContainerId, _new_name: &str) -> Result<()> {
Err(AgentError::Unsupported(
"rename_container is not supported by this runtime".into(),
))
}
async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>>;
async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>>;
async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>>;
async fn get_container_port_override(&self, _id: &ContainerId) -> Result<Option<u16>> {
Ok(None)
}
#[cfg(target_os = "windows")]
async fn get_container_namespace_id(
&self,
_id: &ContainerId,
) -> Result<Option<windows::core::GUID>> {
Ok(None)
}
async fn sync_container_volumes(&self, _id: &ContainerId) -> Result<()> {
Ok(())
}
async fn logs_stream(&self, _id: &ContainerId, _opts: LogsStreamOptions) -> Result<LogsStream> {
Err(AgentError::Unsupported(
"logs_stream is not supported by this runtime".into(),
))
}
async fn stats_stream(&self, _id: &ContainerId) -> Result<StatsStream> {
Err(AgentError::Unsupported(
"stats_stream is not supported by this runtime".into(),
))
}
async fn pull_image_stream(
&self,
_image: &str,
_auth: Option<&RegistryAuth>,
) -> Result<PullProgressStream> {
Err(AgentError::Unsupported(
"pull_image_stream is not supported by this runtime".into(),
))
}
async fn list_images(&self) -> Result<Vec<ImageInfo>> {
Err(AgentError::Unsupported(
"list_images is not supported by this runtime".into(),
))
}
async fn remove_image(&self, _image: &str, _force: bool) -> Result<()> {
Err(AgentError::Unsupported(
"remove_image is not supported by this runtime".into(),
))
}
async fn prune_images(&self) -> Result<PruneResult> {
Err(AgentError::Unsupported(
"prune_images is not supported by this runtime".into(),
))
}
async fn kill_container(&self, _id: &ContainerId, _signal: Option<&str>) -> Result<()> {
Err(AgentError::Unsupported(
"kill_container is not supported by this runtime".into(),
))
}
async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
Err(AgentError::Unsupported(
"tag_image is not supported by this runtime".into(),
))
}
async fn inspect_image_native(&self, _image: &str) -> Result<ImageInspectInfo> {
Err(AgentError::Unsupported(
"inspect_image_native is not supported by this runtime".into(),
))
}
async fn image_history(&self, _image: &str) -> Result<Vec<ImageHistoryEntry>> {
Err(AgentError::Unsupported(
"image_history is not supported by this runtime".into(),
))
}
async fn search_images(&self, _term: &str, _limit: u32) -> Result<Vec<ImageSearchResult>> {
Err(AgentError::Unsupported(
"search_images is not supported by this runtime".into(),
))
}
async fn save_images(&self, _names: &[String]) -> Result<ImageExportStream> {
Err(AgentError::Unsupported(
"save_images is not supported by this runtime".into(),
))
}
async fn load_images(
&self,
_tar_bytes: bytes::Bytes,
_quiet: bool,
) -> Result<LoadProgressStream> {
Err(AgentError::Unsupported(
"load_images is not supported by this runtime".into(),
))
}
async fn import_image(
&self,
_tar_bytes: bytes::Bytes,
_repo: Option<&str>,
_tag: Option<&str>,
) -> Result<String> {
Err(AgentError::Unsupported(
"import_image is not supported by this runtime".into(),
))
}
async fn export_container_fs(&self, _id: &ContainerId) -> Result<ImageExportStream> {
Err(AgentError::Unsupported(
"export_container_fs is not supported by this runtime".into(),
))
}
async fn commit_container(
&self,
_id: &ContainerId,
_opts: &CommitOptions,
) -> Result<CommitOutcome> {
Err(AgentError::Unsupported(
"commit_container is not supported by this runtime".into(),
))
}
async fn inspect_detailed(&self, _id: &ContainerId) -> Result<ContainerInspectDetails> {
Ok(ContainerInspectDetails::default())
}
async fn pause_container(&self, _id: &ContainerId) -> Result<()> {
Err(AgentError::Unsupported(
"pause_container is not supported by this runtime".into(),
))
}
async fn unpause_container(&self, _id: &ContainerId) -> Result<()> {
Err(AgentError::Unsupported(
"unpause_container is not supported by this runtime".into(),
))
}
async fn update_container_resources(
&self,
_id: &ContainerId,
_update: &ContainerResourceUpdate,
) -> Result<ContainerUpdateOutcome> {
Err(AgentError::Unsupported(
"update_container_resources is not supported by this runtime".into(),
))
}
async fn top_container(
&self,
_id: &ContainerId,
_ps_args: &[String],
) -> Result<ContainerTopOutput> {
Err(AgentError::Unsupported(
"top_container is not supported by this runtime".into(),
))
}
async fn changes_container(&self, _id: &ContainerId) -> Result<Vec<FilesystemChangeEntry>> {
Err(AgentError::Unsupported(
"changes_container is not supported by this runtime".into(),
))
}
async fn port_mappings_container(&self, _id: &ContainerId) -> Result<Vec<PortMappingEntry>> {
Err(AgentError::Unsupported(
"port_mappings_container is not supported by this runtime".into(),
))
}
async fn prune_containers(&self) -> Result<ContainerPruneResult> {
Err(AgentError::Unsupported(
"prune_containers is not supported by this runtime".into(),
))
}
async fn list_containers(&self) -> Result<Vec<RuntimeContainerSummary>> {
Ok(Vec::new())
}
async fn archive_get(&self, _id: &ContainerId, _path: &str) -> Result<ArchiveStream> {
Err(AgentError::Unsupported(
"archive_get is not supported by this runtime".into(),
))
}
async fn archive_put(
&self,
_id: &ContainerId,
_path: &str,
_tar_bytes: bytes::Bytes,
_opts: ArchivePutOptions,
) -> Result<()> {
Err(AgentError::Unsupported(
"archive_put is not supported by this runtime".into(),
))
}
async fn archive_head(&self, _id: &ContainerId, _path: &str) -> Result<PathStat> {
Err(AgentError::Unsupported(
"archive_head is not supported by this runtime".into(),
))
}
}
pub fn validate_signal(signal: &str) -> Result<String> {
let trimmed = signal.trim();
if trimmed.is_empty() {
return Err(AgentError::InvalidSpec(
"signal must not be empty".to_string(),
));
}
let upper = trimmed.to_ascii_uppercase();
let canonical = if upper.starts_with("SIG") {
upper
} else {
format!("SIG{upper}")
};
match canonical.as_str() {
"SIGKILL" | "SIGTERM" | "SIGINT" | "SIGHUP" | "SIGUSR1" | "SIGUSR2" => Ok(canonical),
other => Err(AgentError::InvalidSpec(format!(
"unsupported signal '{other}'; allowed: SIGKILL, SIGTERM, SIGINT, SIGHUP, SIGUSR1, SIGUSR2"
))),
}
}
#[derive(Debug, Clone)]
pub struct ContainerAuthContext {
pub api_url: String,
pub jwt_secret: String,
pub socket_path: String,
}
pub struct MockRuntime {
containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
pub logs_to_yield:
Arc<Mutex<std::collections::HashMap<ContainerId, VecDeque<Result<LogChunk>>>>>,
pub stats_to_yield:
Arc<Mutex<std::collections::HashMap<ContainerId, VecDeque<Result<StatsSample>>>>>,
pub pull_progress_to_yield:
Arc<Mutex<std::collections::HashMap<String, VecDeque<Result<PullProgress>>>>>,
}
impl MockRuntime {
#[must_use]
pub fn new() -> Self {
Self {
containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
logs_to_yield: Arc::new(Mutex::new(std::collections::HashMap::new())),
stats_to_yield: Arc::new(Mutex::new(std::collections::HashMap::new())),
pull_progress_to_yield: Arc::new(Mutex::new(std::collections::HashMap::new())),
}
}
pub async fn enqueue_log_chunk(&self, id: &ContainerId, chunk: LogChunk) {
self.logs_to_yield
.lock()
.await
.entry(id.clone())
.or_default()
.push_back(Ok(chunk));
}
pub async fn enqueue_log_error(&self, id: &ContainerId, err: AgentError) {
self.logs_to_yield
.lock()
.await
.entry(id.clone())
.or_default()
.push_back(Err(err));
}
pub async fn enqueue_stats_sample(&self, id: &ContainerId, sample: StatsSample) {
self.stats_to_yield
.lock()
.await
.entry(id.clone())
.or_default()
.push_back(Ok(sample));
}
pub async fn enqueue_pull_progress(&self, image: &str, progress: PullProgress) {
self.pull_progress_to_yield
.lock()
.await
.entry(image.to_string())
.or_default()
.push_back(Ok(progress));
}
}
impl Default for MockRuntime {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl Runtime for MockRuntime {
async fn pull_image(&self, _image: &str) -> Result<()> {
self.pull_image_with_policy(_image, PullPolicy::IfNotPresent, None)
.await
}
async fn pull_image_with_policy(
&self,
_image: &str,
_policy: PullPolicy,
_auth: Option<&RegistryAuth>,
) -> Result<()> {
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
let mut containers = self.containers.write().await;
containers.insert(
id.clone(),
Container {
id: id.clone(),
state: ContainerState::Pending,
pid: None,
task: None,
overlay_ip: None,
health_monitor: None,
port_override: None,
},
);
Ok(())
}
async fn start_container(&self, id: &ContainerId) -> Result<()> {
let mut containers = self.containers.write().await;
if let Some(container) = containers.get_mut(id) {
container.state = ContainerState::Running;
container.pid = Some(std::process::id()); }
Ok(())
}
async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
let mut containers = self.containers.write().await;
if let Some(container) = containers.get_mut(id) {
container.state = ContainerState::Exited { code: 0 };
}
Ok(())
}
async fn remove_container(&self, id: &ContainerId) -> Result<()> {
let mut containers = self.containers.write().await;
containers.remove(id);
Ok(())
}
async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
let containers = self.containers.read().await;
containers
.get(id)
.map(|c| c.state.clone())
.ok_or_else(|| AgentError::NotFound {
container: id.to_string(),
reason: "container not found".to_string(),
})
}
async fn container_logs(&self, _id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
let entries = vec![
LogEntry {
timestamp: chrono::Utc::now(),
stream: LogStream::Stdout,
message: "mock log line 1".to_string(),
source: LogSource::Container("mock".to_string()),
service: None,
deployment: None,
},
LogEntry {
timestamp: chrono::Utc::now(),
stream: LogStream::Stderr,
message: "mock error line".to_string(),
source: LogSource::Container("mock".to_string()),
service: None,
deployment: None,
},
];
let skip = entries.len().saturating_sub(tail);
Ok(entries.into_iter().skip(skip).collect())
}
async fn exec(&self, _id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
Ok((0, cmd.join(" "), String::new()))
}
async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
let containers = self.containers.read().await;
if containers.contains_key(id) {
Ok(ContainerStats {
cpu_usage_usec: 1_000_000, memory_bytes: 50 * 1024 * 1024, memory_limit: 256 * 1024 * 1024, timestamp: std::time::Instant::now(),
})
} else {
Err(AgentError::NotFound {
container: id.to_string(),
reason: "container not found".to_string(),
})
}
}
async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
let containers = self.containers.read().await;
if let Some(container) = containers.get(id) {
match &container.state {
ContainerState::Exited { code } => Ok(*code),
ContainerState::Failed { .. } => Ok(1),
_ => {
drop(containers);
tokio::time::sleep(Duration::from_millis(50)).await;
Ok(0)
}
}
} else {
Err(AgentError::NotFound {
container: id.to_string(),
reason: "container not found".to_string(),
})
}
}
async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
let containers = self.containers.read().await;
if containers.contains_key(id) {
let container_name = id.to_string();
Ok(vec![
LogEntry {
timestamp: chrono::Utc::now(),
stream: LogStream::Stdout,
message: format!("[{container_name}] Container started"),
source: LogSource::Container(container_name.clone()),
service: None,
deployment: None,
},
LogEntry {
timestamp: chrono::Utc::now(),
stream: LogStream::Stdout,
message: format!("[{container_name}] Executing command..."),
source: LogSource::Container(container_name.clone()),
service: None,
deployment: None,
},
LogEntry {
timestamp: chrono::Utc::now(),
stream: LogStream::Stdout,
message: format!("[{container_name}] Command completed successfully"),
source: LogSource::Container(container_name),
service: None,
deployment: None,
},
])
} else {
Err(AgentError::NotFound {
container: id.to_string(),
reason: "container not found".to_string(),
})
}
}
async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
let containers = self.containers.read().await;
if let Some(container) = containers.get(id) {
Ok(container.pid)
} else {
Err(AgentError::NotFound {
container: id.to_string(),
reason: "container not found".to_string(),
})
}
}
async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
let containers = self.containers.read().await;
if containers.contains_key(id) {
#[allow(clippy::cast_possible_truncation)]
let last_octet = (id.replica + 2) as u8;
Ok(Some(IpAddr::V4(std::net::Ipv4Addr::new(
172, 17, 0, last_octet,
))))
} else {
Err(AgentError::NotFound {
container: id.to_string(),
reason: "container not found".to_string(),
})
}
}
async fn list_images(&self) -> Result<Vec<ImageInfo>> {
Ok(Vec::new())
}
async fn remove_image(&self, _image: &str, _force: bool) -> Result<()> {
Ok(())
}
async fn prune_images(&self) -> Result<PruneResult> {
Ok(PruneResult::default())
}
async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
let _canonical = validate_signal(signal.unwrap_or("SIGKILL"))?;
let mut containers = self.containers.write().await;
let container = containers.get_mut(id).ok_or_else(|| AgentError::NotFound {
container: id.to_string(),
reason: "container not found".to_string(),
})?;
container.state = ContainerState::Exited { code: 137 };
Ok(())
}
async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
Ok(())
}
async fn logs_stream(&self, id: &ContainerId, opts: LogsStreamOptions) -> Result<LogsStream> {
use futures_util::StreamExt;
let queued: Vec<Result<LogChunk>> = {
let mut guard = self.logs_to_yield.lock().await;
guard.remove(id).map(Vec::from).unwrap_or_default()
};
let head = futures_util::stream::iter(queued);
if opts.follow {
let tail = futures_util::stream::pending::<Result<LogChunk>>();
Ok(Box::pin(head.chain(tail)))
} else {
Ok(Box::pin(head))
}
}
async fn stats_stream(&self, id: &ContainerId) -> Result<StatsStream> {
let queued: Vec<Result<StatsSample>> = {
let mut guard = self.stats_to_yield.lock().await;
guard.remove(id).map(Vec::from).unwrap_or_default()
};
Ok(Box::pin(futures_util::stream::iter(queued)))
}
async fn pull_image_stream(
&self,
image: &str,
_auth: Option<&RegistryAuth>,
) -> Result<PullProgressStream> {
let queued: Vec<Result<PullProgress>> = {
let mut guard = self.pull_progress_to_yield.lock().await;
guard.remove(image).map(Vec::from).unwrap_or_default()
};
Ok(Box::pin(futures_util::stream::iter(queued)))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_mock_runtime() {
let runtime = MockRuntime::new();
let id = ContainerId {
service: "test".to_string(),
replica: 1,
};
runtime.pull_image("test:latest").await.unwrap();
runtime.create_container(&id, &mock_spec()).await.unwrap();
runtime.start_container(&id).await.unwrap();
let state = runtime.container_state(&id).await.unwrap();
assert_eq!(state, ContainerState::Running);
}
#[test]
fn validate_signal_accepts_known_signals() {
assert_eq!(validate_signal("SIGKILL").unwrap(), "SIGKILL");
assert_eq!(validate_signal("SIGTERM").unwrap(), "SIGTERM");
assert_eq!(validate_signal("SIGINT").unwrap(), "SIGINT");
assert_eq!(validate_signal("SIGHUP").unwrap(), "SIGHUP");
assert_eq!(validate_signal("SIGUSR1").unwrap(), "SIGUSR1");
assert_eq!(validate_signal("SIGUSR2").unwrap(), "SIGUSR2");
assert_eq!(validate_signal("KILL").unwrap(), "SIGKILL");
assert_eq!(validate_signal("term").unwrap(), "SIGTERM");
assert_eq!(validate_signal(" INT ").unwrap(), "SIGINT");
}
#[test]
fn validate_signal_rejects_unknown_or_empty() {
assert!(matches!(
validate_signal(""),
Err(AgentError::InvalidSpec(_))
));
assert!(matches!(
validate_signal(" "),
Err(AgentError::InvalidSpec(_))
));
assert!(matches!(
validate_signal("SIGSEGV"),
Err(AgentError::InvalidSpec(_))
));
assert!(matches!(
validate_signal("NOPE"),
Err(AgentError::InvalidSpec(_))
));
assert!(matches!(
validate_signal("SIGPIPE"),
Err(AgentError::InvalidSpec(_))
));
}
#[tokio::test]
async fn mock_kill_container_defaults_to_sigkill() {
let runtime = MockRuntime::new();
let id = ContainerId {
service: "kill-me".to_string(),
replica: 0,
};
runtime.create_container(&id, &mock_spec()).await.unwrap();
runtime.start_container(&id).await.unwrap();
runtime.kill_container(&id, None).await.unwrap();
let state = runtime.container_state(&id).await.unwrap();
assert!(
matches!(state, ContainerState::Exited { code: 137 }),
"expected Exited(137), got {state:?}"
);
}
#[test]
fn wait_reason_serializes_as_snake_case() {
assert_eq!(
serde_json::to_string(&WaitReason::Exited).unwrap(),
"\"exited\""
);
assert_eq!(
serde_json::to_string(&WaitReason::Signal).unwrap(),
"\"signal\""
);
assert_eq!(
serde_json::to_string(&WaitReason::OomKilled).unwrap(),
"\"oom_killed\""
);
assert_eq!(
serde_json::to_string(&WaitReason::RuntimeError).unwrap(),
"\"runtime_error\""
);
}
#[test]
fn wait_reason_deserialize_roundtrip() {
for variant in [
WaitReason::Exited,
WaitReason::Signal,
WaitReason::OomKilled,
WaitReason::RuntimeError,
] {
let s = serde_json::to_string(&variant).unwrap();
let back: WaitReason = serde_json::from_str(&s).unwrap();
assert_eq!(variant, back, "roundtrip failed for {variant:?}");
}
}
#[test]
fn signal_name_from_exit_code_known_signals() {
assert_eq!(signal_name_from_exit_code(137).as_deref(), Some("SIGKILL"));
assert_eq!(signal_name_from_exit_code(143).as_deref(), Some("SIGTERM"));
assert_eq!(signal_name_from_exit_code(130).as_deref(), Some("SIGINT"));
assert_eq!(signal_name_from_exit_code(129).as_deref(), Some("SIGHUP"));
assert_eq!(signal_name_from_exit_code(139).as_deref(), Some("SIGSEGV"));
}
#[test]
fn signal_name_from_exit_code_handles_unknown_and_normal() {
assert_eq!(signal_name_from_exit_code(0), None);
assert_eq!(signal_name_from_exit_code(1), None);
assert_eq!(signal_name_from_exit_code(128), None);
assert_eq!(
signal_name_from_exit_code(128 + 99).as_deref(),
Some("signal_99")
);
}
#[tokio::test]
async fn default_wait_outcome_delegates_to_wait_container() {
let runtime = MockRuntime::new();
let id = ContainerId {
service: "wait-test".to_string(),
replica: 0,
};
runtime.create_container(&id, &mock_spec()).await.unwrap();
runtime.start_container(&id).await.unwrap();
let outcome = runtime.wait_outcome(&id).await.unwrap();
assert_eq!(outcome.exit_code, 0);
assert_eq!(outcome.reason, WaitReason::Exited);
assert!(outcome.signal.is_none());
assert!(outcome.finished_at.is_none());
}
#[tokio::test]
async fn mock_kill_container_rejects_bogus_signal() {
let runtime = MockRuntime::new();
let id = ContainerId {
service: "kill-me".to_string(),
replica: 0,
};
runtime.create_container(&id, &mock_spec()).await.unwrap();
runtime.start_container(&id).await.unwrap();
let err = runtime
.kill_container(&id, Some("SIGFOO"))
.await
.unwrap_err();
assert!(
matches!(err, AgentError::InvalidSpec(_)),
"expected InvalidSpec, got {err:?}"
);
}
struct BareRuntime;
#[async_trait::async_trait]
impl Runtime for BareRuntime {
async fn pull_image(&self, _image: &str) -> Result<()> {
unimplemented!()
}
async fn pull_image_with_policy(
&self,
_image: &str,
_policy: PullPolicy,
_auth: Option<&RegistryAuth>,
) -> Result<()> {
unimplemented!()
}
async fn create_container(&self, _id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
unimplemented!()
}
async fn start_container(&self, _id: &ContainerId) -> Result<()> {
unimplemented!()
}
async fn stop_container(&self, _id: &ContainerId, _timeout: Duration) -> Result<()> {
unimplemented!()
}
async fn remove_container(&self, _id: &ContainerId) -> Result<()> {
unimplemented!()
}
async fn container_state(&self, _id: &ContainerId) -> Result<ContainerState> {
unimplemented!()
}
async fn container_logs(&self, _id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
unimplemented!()
}
async fn exec(&self, _id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
unimplemented!()
}
async fn get_container_stats(&self, _id: &ContainerId) -> Result<ContainerStats> {
unimplemented!()
}
async fn wait_container(&self, _id: &ContainerId) -> Result<i32> {
unimplemented!()
}
async fn get_logs(&self, _id: &ContainerId) -> Result<Vec<LogEntry>> {
unimplemented!()
}
async fn get_container_pid(&self, _id: &ContainerId) -> Result<Option<u32>> {
unimplemented!()
}
async fn get_container_ip(&self, _id: &ContainerId) -> Result<Option<IpAddr>> {
unimplemented!()
}
}
#[tokio::test]
async fn default_logs_stream_is_unsupported() {
let runtime = BareRuntime;
let id = ContainerId {
service: "stream-test".to_string(),
replica: 0,
};
match runtime.logs_stream(&id, LogsStreamOptions::default()).await {
Err(AgentError::Unsupported(_)) => {}
Err(other) => panic!("expected Unsupported, got {other:?}"),
Ok(_) => panic!("expected Err(Unsupported), got Ok"),
}
}
#[tokio::test]
async fn default_stats_stream_is_unsupported() {
let runtime = BareRuntime;
let id = ContainerId {
service: "stream-test".to_string(),
replica: 0,
};
match runtime.stats_stream(&id).await {
Err(AgentError::Unsupported(_)) => {}
Err(other) => panic!("expected Unsupported, got {other:?}"),
Ok(_) => panic!("expected Err(Unsupported), got Ok"),
}
}
#[tokio::test]
async fn default_pull_image_stream_is_unsupported() {
let runtime = BareRuntime;
match runtime.pull_image_stream("alpine:latest", None).await {
Err(AgentError::Unsupported(_)) => {}
Err(other) => panic!("expected Unsupported, got {other:?}"),
Ok(_) => panic!("expected Err(Unsupported), got Ok"),
}
}
#[tokio::test]
async fn default_archive_get_is_unsupported() {
let runtime = BareRuntime;
let id = ContainerId {
service: "archive-test".to_string(),
replica: 0,
};
match runtime.archive_get(&id, "/etc/hosts").await {
Err(AgentError::Unsupported(_)) => {}
Err(other) => panic!("expected Unsupported, got {other:?}"),
Ok(_) => panic!("expected Err(Unsupported), got Ok"),
}
}
#[tokio::test]
async fn default_archive_put_is_unsupported() {
let runtime = BareRuntime;
let id = ContainerId {
service: "archive-test".to_string(),
replica: 0,
};
let err = runtime
.archive_put(
&id,
"/tmp",
bytes::Bytes::from_static(&[]),
ArchivePutOptions::default(),
)
.await
.unwrap_err();
assert!(matches!(err, AgentError::Unsupported(_)));
}
#[tokio::test]
async fn default_archive_head_is_unsupported() {
let runtime = BareRuntime;
let id = ContainerId {
service: "archive-test".to_string(),
replica: 0,
};
let err = runtime.archive_head(&id, "/etc/hosts").await.unwrap_err();
assert!(matches!(err, AgentError::Unsupported(_)));
}
#[tokio::test]
async fn default_exec_pty_is_unsupported() {
let runtime = BareRuntime;
let id = ContainerId {
service: "exec-pty".to_string(),
replica: 0,
};
match runtime.exec_pty(&id, ExecOptions::default()).await {
Err(AgentError::Unsupported(_)) => {}
Err(other) => panic!("expected Unsupported, got {other:?}"),
Ok(_) => panic!("expected Err(Unsupported), got Ok"),
}
}
#[tokio::test]
async fn default_inspect_image_native_is_unsupported() {
let runtime = BareRuntime;
let err = runtime.inspect_image_native("alpine").await.unwrap_err();
assert!(matches!(err, AgentError::Unsupported(_)));
}
#[tokio::test]
async fn default_image_history_is_unsupported() {
let runtime = BareRuntime;
let err = runtime.image_history("alpine").await.unwrap_err();
assert!(matches!(err, AgentError::Unsupported(_)));
}
#[tokio::test]
async fn default_search_images_is_unsupported() {
let runtime = BareRuntime;
let err = runtime.search_images("nginx", 10).await.unwrap_err();
assert!(matches!(err, AgentError::Unsupported(_)));
}
#[tokio::test]
async fn default_save_images_is_unsupported() {
let runtime = BareRuntime;
match runtime.save_images(&["alpine".to_string()]).await {
Err(AgentError::Unsupported(_)) => {}
Err(other) => panic!("expected Unsupported, got {other:?}"),
Ok(_) => panic!("expected Err(Unsupported), got Ok"),
}
}
#[tokio::test]
async fn default_load_images_is_unsupported() {
let runtime = BareRuntime;
match runtime
.load_images(bytes::Bytes::from_static(&[]), false)
.await
{
Err(AgentError::Unsupported(_)) => {}
Err(other) => panic!("expected Unsupported, got {other:?}"),
Ok(_) => panic!("expected Err(Unsupported), got Ok"),
}
}
#[tokio::test]
async fn default_import_image_is_unsupported() {
let runtime = BareRuntime;
let err = runtime
.import_image(bytes::Bytes::from_static(&[]), None, None)
.await
.unwrap_err();
assert!(matches!(err, AgentError::Unsupported(_)));
}
#[tokio::test]
async fn default_export_container_fs_is_unsupported() {
let runtime = BareRuntime;
let id = ContainerId {
service: "export".to_string(),
replica: 0,
};
match runtime.export_container_fs(&id).await {
Err(AgentError::Unsupported(_)) => {}
Err(other) => panic!("expected Unsupported, got {other:?}"),
Ok(_) => panic!("expected Err(Unsupported), got Ok"),
}
}
#[tokio::test]
async fn default_commit_container_is_unsupported() {
let runtime = BareRuntime;
let id = ContainerId {
service: "commit".to_string(),
replica: 0,
};
let err = runtime
.commit_container(&id, &CommitOptions::default())
.await
.unwrap_err();
assert!(matches!(err, AgentError::Unsupported(_)));
}
#[test]
fn load_progress_serializes_with_kind_discriminator() {
let status = LoadProgress::Status {
id: Some("abc".to_string()),
status: "Loading layer".to_string(),
};
let json = serde_json::to_value(&status).unwrap();
assert_eq!(json["kind"], "status");
assert_eq!(json["status"], "Loading layer");
let done = LoadProgress::Done {
references: vec!["alpine:latest".to_string()],
};
let json = serde_json::to_value(&done).unwrap();
assert_eq!(json["kind"], "done");
assert_eq!(json["references"], serde_json::json!(["alpine:latest"]));
}
#[test]
fn commit_options_default_is_no_op_pause_false() {
let opts = CommitOptions::default();
assert!(opts.repo.is_none());
assert!(opts.tag.is_none());
assert!(opts.comment.is_none());
assert!(opts.author.is_none());
assert!(!opts.pause);
assert!(opts.changes.is_none());
}
#[test]
fn image_inspect_info_default_round_trips_via_serde() {
let info = ImageInspectInfo::default();
let json = serde_json::to_string(&info).unwrap();
let back: ImageInspectInfo = serde_json::from_str(&json).unwrap();
assert_eq!(info, back);
}
#[tokio::test]
async fn mock_logs_stream_yields_queued_items_in_order() {
use futures_util::StreamExt;
let runtime = MockRuntime::new();
let id = ContainerId {
service: "logs-order".to_string(),
replica: 0,
};
let make_chunk = |s: &str, ch: LogChannel| LogChunk {
stream: ch,
bytes: bytes::Bytes::copy_from_slice(s.as_bytes()),
timestamp: None,
};
runtime
.enqueue_log_chunk(&id, make_chunk("first", LogChannel::Stdout))
.await;
runtime
.enqueue_log_chunk(&id, make_chunk("second", LogChannel::Stderr))
.await;
runtime
.enqueue_log_chunk(&id, make_chunk("third", LogChannel::Stdout))
.await;
let opts = LogsStreamOptions {
follow: false,
..LogsStreamOptions::default()
};
let mut stream = runtime.logs_stream(&id, opts).await.unwrap();
let mut got = Vec::new();
while let Some(item) = stream.next().await {
let chunk = item.unwrap();
got.push((
chunk.stream,
String::from_utf8(chunk.bytes.to_vec()).unwrap(),
));
}
assert_eq!(
got,
vec![
(LogChannel::Stdout, "first".to_string()),
(LogChannel::Stderr, "second".to_string()),
(LogChannel::Stdout, "third".to_string()),
]
);
}
#[tokio::test]
async fn mock_logs_stream_empty_queue_ends_immediately_when_not_follow() {
use futures_util::StreamExt;
let runtime = MockRuntime::new();
let id = ContainerId {
service: "logs-empty".to_string(),
replica: 0,
};
let opts = LogsStreamOptions {
follow: false,
..LogsStreamOptions::default()
};
let mut stream = runtime.logs_stream(&id, opts).await.unwrap();
let next = tokio::time::timeout(Duration::from_millis(500), stream.next())
.await
.expect("stream did not terminate; expected immediate close on empty queue");
assert!(
next.is_none(),
"expected stream to be exhausted, got Some(_)"
);
}
#[tokio::test]
async fn mock_stats_stream_yields_queued_samples_in_order() {
use futures_util::StreamExt;
let runtime = MockRuntime::new();
let id = ContainerId {
service: "stats-order".to_string(),
replica: 0,
};
let now = chrono::Utc::now();
let mk = |cpu: u64| StatsSample {
cpu_total_ns: cpu,
cpu_system_ns: 0,
online_cpus: 1,
mem_used_bytes: 0,
mem_limit_bytes: 0,
net_rx_bytes: 0,
net_tx_bytes: 0,
blkio_read_bytes: 0,
blkio_write_bytes: 0,
pids_current: 0,
pids_limit: None,
timestamp: now,
};
runtime.enqueue_stats_sample(&id, mk(100)).await;
runtime.enqueue_stats_sample(&id, mk(200)).await;
runtime.enqueue_stats_sample(&id, mk(300)).await;
let mut stream = runtime.stats_stream(&id).await.unwrap();
let mut cpus = Vec::new();
while let Some(item) = stream.next().await {
cpus.push(item.unwrap().cpu_total_ns);
}
assert_eq!(cpus, vec![100, 200, 300]);
}
#[tokio::test]
async fn mock_pull_image_stream_yields_queued_progress_in_order() {
use futures_util::StreamExt;
let runtime = MockRuntime::new();
let image = "alpine:latest";
runtime
.enqueue_pull_progress(
image,
PullProgress::Status {
id: Some("layer-1".to_string()),
status: "Pulling fs layer".to_string(),
progress: None,
current: None,
total: None,
},
)
.await;
runtime
.enqueue_pull_progress(
image,
PullProgress::Status {
id: Some("layer-1".to_string()),
status: "Downloading".to_string(),
progress: Some("[==> ] 1MB/4MB".to_string()),
current: Some(1024 * 1024),
total: Some(4 * 1024 * 1024),
},
)
.await;
runtime
.enqueue_pull_progress(
image,
PullProgress::Done {
reference: image.to_string(),
digest: Some("sha256:deadbeef".to_string()),
},
)
.await;
let mut stream = runtime.pull_image_stream(image, None).await.unwrap();
let mut events = Vec::new();
while let Some(item) = stream.next().await {
events.push(item.unwrap());
}
assert_eq!(events.len(), 3);
match &events[0] {
PullProgress::Status { status, .. } => assert_eq!(status, "Pulling fs layer"),
done @ PullProgress::Done { .. } => panic!("expected Status, got {done:?}"),
}
match &events[1] {
PullProgress::Status {
status,
current,
total,
..
} => {
assert_eq!(status, "Downloading");
assert_eq!(*current, Some(1024 * 1024));
assert_eq!(*total, Some(4 * 1024 * 1024));
}
done @ PullProgress::Done { .. } => panic!("expected Status, got {done:?}"),
}
match &events[2] {
PullProgress::Done { reference, digest } => {
assert_eq!(reference, image);
assert_eq!(digest.as_deref(), Some("sha256:deadbeef"));
}
status @ PullProgress::Status { .. } => panic!("expected Done, got {status:?}"),
}
}
#[test]
fn log_channel_serializes_as_snake_case() {
assert_eq!(
serde_json::to_string(&LogChannel::Stdin).unwrap(),
"\"stdin\""
);
assert_eq!(
serde_json::to_string(&LogChannel::Stdout).unwrap(),
"\"stdout\""
);
assert_eq!(
serde_json::to_string(&LogChannel::Stderr).unwrap(),
"\"stderr\""
);
}
fn mock_spec() -> ServiceSpec {
use zlayer_spec::*;
serde_yaml::from_str::<DeploymentSpec>(
r"
version: v1
deployment: test
services:
test:
rtype: service
image:
name: test:latest
endpoints:
- name: http
protocol: http
port: 8080
",
)
.unwrap()
.services
.remove("test")
.unwrap()
}
}