use crate::cgroups_stats::ContainerStats;
use crate::error::{AgentError, Result};
use futures_util::Stream;
use std::net::IpAddr;
use std::pin::Pin;
use std::time::Duration;
use tokio::task::JoinHandle;
use zlayer_observability::logs::{LogEntry, LogSource, LogStream};
use zlayer_spec::{PullPolicy, RegistryAuth, ServiceSpec};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ContainerState {
Pending,
Initializing,
Running,
Stopping,
Exited { code: i32 },
Failed { reason: String },
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ContainerId {
pub service: String,
pub replica: u32,
}
impl std::fmt::Display for ContainerId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}-rep-{}", self.service, self.replica)
}
}
pub struct Container {
pub id: ContainerId,
pub state: ContainerState,
pub pid: Option<u32>,
pub task: Option<JoinHandle<std::io::Result<()>>>,
pub overlay_ip: Option<IpAddr>,
pub health_monitor: Option<JoinHandle<()>>,
pub port_override: Option<u16>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ImageInfo {
pub reference: String,
pub digest: Option<String>,
pub size_bytes: Option<u64>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct PruneResult {
pub deleted: Vec<String>,
pub space_reclaimed: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WaitReason {
Exited,
Signal,
OomKilled,
RuntimeError,
}
#[derive(Debug, Clone)]
pub struct WaitOutcome {
pub exit_code: i32,
pub reason: WaitReason,
pub signal: Option<String>,
pub finished_at: Option<chrono::DateTime<chrono::Utc>>,
}
impl WaitOutcome {
#[must_use]
pub fn exited(exit_code: i32) -> Self {
Self {
exit_code,
reason: WaitReason::Exited,
signal: None,
finished_at: None,
}
}
}
#[must_use]
pub fn signal_name_from_exit_code(exit_code: i32) -> Option<String> {
if exit_code <= 128 {
return None;
}
let n = exit_code - 128;
let name = match n {
1 => "SIGHUP",
2 => "SIGINT",
3 => "SIGQUIT",
4 => "SIGILL",
6 => "SIGABRT",
7 => "SIGBUS",
8 => "SIGFPE",
9 => "SIGKILL",
10 => "SIGUSR1",
11 => "SIGSEGV",
12 => "SIGUSR2",
13 => "SIGPIPE",
14 => "SIGALRM",
15 => "SIGTERM",
17 => "SIGSTOP",
18 => "SIGCONT",
_ => return Some(format!("signal_{n}")),
};
Some(name.to_string())
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExecEvent {
Stdout(String),
Stderr(String),
Exit(i32),
}
pub type ExecEventStream = Pin<Box<dyn Stream<Item = ExecEvent> + Send>>;
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct NetworkAttachmentDetail {
pub network: String,
pub aliases: Vec<String>,
pub ipv4: Option<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct HealthDetail {
pub status: String,
pub failing_streak: Option<u32>,
pub last_output: Option<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ContainerInspectDetails {
pub ports: Vec<zlayer_spec::PortMapping>,
pub networks: Vec<NetworkAttachmentDetail>,
pub ipv4: Option<String>,
pub health: Option<HealthDetail>,
pub exit_code: Option<i32>,
}
#[async_trait::async_trait]
pub trait Runtime: Send + Sync {
async fn pull_image(&self, image: &str) -> Result<()>;
async fn pull_image_with_policy(
&self,
image: &str,
policy: PullPolicy,
auth: Option<&RegistryAuth>,
) -> Result<()>;
async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()>;
async fn start_container(&self, id: &ContainerId) -> Result<()>;
async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()>;
async fn remove_container(&self, id: &ContainerId) -> Result<()>;
async fn container_state(&self, id: &ContainerId) -> Result<ContainerState>;
async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>>;
async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)>;
async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
let (exit, stdout, stderr) = self.exec(id, cmd).await?;
let mut events: Vec<ExecEvent> = Vec::with_capacity(3);
if !stdout.is_empty() {
events.push(ExecEvent::Stdout(stdout));
}
if !stderr.is_empty() {
events.push(ExecEvent::Stderr(stderr));
}
events.push(ExecEvent::Exit(exit));
Ok(Box::pin(futures_util::stream::iter(events)))
}
async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats>;
async fn wait_container(&self, id: &ContainerId) -> Result<i32>;
async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
let exit_code = self.wait_container(id).await?;
Ok(WaitOutcome::exited(exit_code))
}
async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>>;
async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>>;
async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>>;
async fn get_container_port_override(&self, _id: &ContainerId) -> Result<Option<u16>> {
Ok(None)
}
#[cfg(target_os = "windows")]
async fn get_container_namespace_id(
&self,
_id: &ContainerId,
) -> Result<Option<windows::core::GUID>> {
Ok(None)
}
async fn sync_container_volumes(&self, _id: &ContainerId) -> Result<()> {
Ok(())
}
async fn list_images(&self) -> Result<Vec<ImageInfo>> {
Err(AgentError::Unsupported(
"list_images is not supported by this runtime".into(),
))
}
async fn remove_image(&self, _image: &str, _force: bool) -> Result<()> {
Err(AgentError::Unsupported(
"remove_image is not supported by this runtime".into(),
))
}
async fn prune_images(&self) -> Result<PruneResult> {
Err(AgentError::Unsupported(
"prune_images is not supported by this runtime".into(),
))
}
async fn kill_container(&self, _id: &ContainerId, _signal: Option<&str>) -> Result<()> {
Err(AgentError::Unsupported(
"kill_container is not supported by this runtime".into(),
))
}
async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
Err(AgentError::Unsupported(
"tag_image is not supported by this runtime".into(),
))
}
async fn inspect_detailed(&self, _id: &ContainerId) -> Result<ContainerInspectDetails> {
Ok(ContainerInspectDetails::default())
}
}
pub fn validate_signal(signal: &str) -> Result<String> {
let trimmed = signal.trim();
if trimmed.is_empty() {
return Err(AgentError::InvalidSpec(
"signal must not be empty".to_string(),
));
}
let upper = trimmed.to_ascii_uppercase();
let canonical = if upper.starts_with("SIG") {
upper
} else {
format!("SIG{upper}")
};
match canonical.as_str() {
"SIGKILL" | "SIGTERM" | "SIGINT" | "SIGHUP" | "SIGUSR1" | "SIGUSR2" => Ok(canonical),
other => Err(AgentError::InvalidSpec(format!(
"unsupported signal '{other}'; allowed: SIGKILL, SIGTERM, SIGINT, SIGHUP, SIGUSR1, SIGUSR2"
))),
}
}
#[derive(Debug, Clone)]
pub struct ContainerAuthContext {
pub api_url: String,
pub jwt_secret: String,
pub socket_path: String,
}
pub struct MockRuntime {
containers: tokio::sync::RwLock<std::collections::HashMap<ContainerId, Container>>,
}
impl MockRuntime {
#[must_use]
pub fn new() -> Self {
Self {
containers: tokio::sync::RwLock::new(std::collections::HashMap::new()),
}
}
}
impl Default for MockRuntime {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl Runtime for MockRuntime {
async fn pull_image(&self, _image: &str) -> Result<()> {
self.pull_image_with_policy(_image, PullPolicy::IfNotPresent, None)
.await
}
async fn pull_image_with_policy(
&self,
_image: &str,
_policy: PullPolicy,
_auth: Option<&RegistryAuth>,
) -> Result<()> {
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(())
}
async fn create_container(&self, id: &ContainerId, _spec: &ServiceSpec) -> Result<()> {
let mut containers = self.containers.write().await;
containers.insert(
id.clone(),
Container {
id: id.clone(),
state: ContainerState::Pending,
pid: None,
task: None,
overlay_ip: None,
health_monitor: None,
port_override: None,
},
);
Ok(())
}
async fn start_container(&self, id: &ContainerId) -> Result<()> {
let mut containers = self.containers.write().await;
if let Some(container) = containers.get_mut(id) {
container.state = ContainerState::Running;
container.pid = Some(std::process::id()); }
Ok(())
}
async fn stop_container(&self, id: &ContainerId, _timeout: Duration) -> Result<()> {
let mut containers = self.containers.write().await;
if let Some(container) = containers.get_mut(id) {
container.state = ContainerState::Exited { code: 0 };
}
Ok(())
}
async fn remove_container(&self, id: &ContainerId) -> Result<()> {
let mut containers = self.containers.write().await;
containers.remove(id);
Ok(())
}
async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
let containers = self.containers.read().await;
containers
.get(id)
.map(|c| c.state.clone())
.ok_or_else(|| AgentError::NotFound {
container: id.to_string(),
reason: "container not found".to_string(),
})
}
async fn container_logs(&self, _id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
let entries = vec![
LogEntry {
timestamp: chrono::Utc::now(),
stream: LogStream::Stdout,
message: "mock log line 1".to_string(),
source: LogSource::Container("mock".to_string()),
service: None,
deployment: None,
},
LogEntry {
timestamp: chrono::Utc::now(),
stream: LogStream::Stderr,
message: "mock error line".to_string(),
source: LogSource::Container("mock".to_string()),
service: None,
deployment: None,
},
];
let skip = entries.len().saturating_sub(tail);
Ok(entries.into_iter().skip(skip).collect())
}
async fn exec(&self, _id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
Ok((0, cmd.join(" "), String::new()))
}
async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
let containers = self.containers.read().await;
if containers.contains_key(id) {
Ok(ContainerStats {
cpu_usage_usec: 1_000_000, memory_bytes: 50 * 1024 * 1024, memory_limit: 256 * 1024 * 1024, timestamp: std::time::Instant::now(),
})
} else {
Err(AgentError::NotFound {
container: id.to_string(),
reason: "container not found".to_string(),
})
}
}
async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
let containers = self.containers.read().await;
if let Some(container) = containers.get(id) {
match &container.state {
ContainerState::Exited { code } => Ok(*code),
ContainerState::Failed { .. } => Ok(1),
_ => {
drop(containers);
tokio::time::sleep(Duration::from_millis(50)).await;
Ok(0)
}
}
} else {
Err(AgentError::NotFound {
container: id.to_string(),
reason: "container not found".to_string(),
})
}
}
async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
let containers = self.containers.read().await;
if containers.contains_key(id) {
let container_name = id.to_string();
Ok(vec![
LogEntry {
timestamp: chrono::Utc::now(),
stream: LogStream::Stdout,
message: format!("[{container_name}] Container started"),
source: LogSource::Container(container_name.clone()),
service: None,
deployment: None,
},
LogEntry {
timestamp: chrono::Utc::now(),
stream: LogStream::Stdout,
message: format!("[{container_name}] Executing command..."),
source: LogSource::Container(container_name.clone()),
service: None,
deployment: None,
},
LogEntry {
timestamp: chrono::Utc::now(),
stream: LogStream::Stdout,
message: format!("[{container_name}] Command completed successfully"),
source: LogSource::Container(container_name),
service: None,
deployment: None,
},
])
} else {
Err(AgentError::NotFound {
container: id.to_string(),
reason: "container not found".to_string(),
})
}
}
async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
let containers = self.containers.read().await;
if let Some(container) = containers.get(id) {
Ok(container.pid)
} else {
Err(AgentError::NotFound {
container: id.to_string(),
reason: "container not found".to_string(),
})
}
}
async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
let containers = self.containers.read().await;
if containers.contains_key(id) {
#[allow(clippy::cast_possible_truncation)]
let last_octet = (id.replica + 2) as u8;
Ok(Some(IpAddr::V4(std::net::Ipv4Addr::new(
172, 17, 0, last_octet,
))))
} else {
Err(AgentError::NotFound {
container: id.to_string(),
reason: "container not found".to_string(),
})
}
}
async fn list_images(&self) -> Result<Vec<ImageInfo>> {
Ok(Vec::new())
}
async fn remove_image(&self, _image: &str, _force: bool) -> Result<()> {
Ok(())
}
async fn prune_images(&self) -> Result<PruneResult> {
Ok(PruneResult::default())
}
async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
let _canonical = validate_signal(signal.unwrap_or("SIGKILL"))?;
let mut containers = self.containers.write().await;
let container = containers.get_mut(id).ok_or_else(|| AgentError::NotFound {
container: id.to_string(),
reason: "container not found".to_string(),
})?;
container.state = ContainerState::Exited { code: 137 };
Ok(())
}
async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_mock_runtime() {
let runtime = MockRuntime::new();
let id = ContainerId {
service: "test".to_string(),
replica: 1,
};
runtime.pull_image("test:latest").await.unwrap();
runtime.create_container(&id, &mock_spec()).await.unwrap();
runtime.start_container(&id).await.unwrap();
let state = runtime.container_state(&id).await.unwrap();
assert_eq!(state, ContainerState::Running);
}
#[test]
fn validate_signal_accepts_known_signals() {
assert_eq!(validate_signal("SIGKILL").unwrap(), "SIGKILL");
assert_eq!(validate_signal("SIGTERM").unwrap(), "SIGTERM");
assert_eq!(validate_signal("SIGINT").unwrap(), "SIGINT");
assert_eq!(validate_signal("SIGHUP").unwrap(), "SIGHUP");
assert_eq!(validate_signal("SIGUSR1").unwrap(), "SIGUSR1");
assert_eq!(validate_signal("SIGUSR2").unwrap(), "SIGUSR2");
assert_eq!(validate_signal("KILL").unwrap(), "SIGKILL");
assert_eq!(validate_signal("term").unwrap(), "SIGTERM");
assert_eq!(validate_signal(" INT ").unwrap(), "SIGINT");
}
#[test]
fn validate_signal_rejects_unknown_or_empty() {
assert!(matches!(
validate_signal(""),
Err(AgentError::InvalidSpec(_))
));
assert!(matches!(
validate_signal(" "),
Err(AgentError::InvalidSpec(_))
));
assert!(matches!(
validate_signal("SIGSEGV"),
Err(AgentError::InvalidSpec(_))
));
assert!(matches!(
validate_signal("NOPE"),
Err(AgentError::InvalidSpec(_))
));
assert!(matches!(
validate_signal("SIGPIPE"),
Err(AgentError::InvalidSpec(_))
));
}
#[tokio::test]
async fn mock_kill_container_defaults_to_sigkill() {
let runtime = MockRuntime::new();
let id = ContainerId {
service: "kill-me".to_string(),
replica: 0,
};
runtime.create_container(&id, &mock_spec()).await.unwrap();
runtime.start_container(&id).await.unwrap();
runtime.kill_container(&id, None).await.unwrap();
let state = runtime.container_state(&id).await.unwrap();
assert!(
matches!(state, ContainerState::Exited { code: 137 }),
"expected Exited(137), got {state:?}"
);
}
#[test]
fn wait_reason_serializes_as_snake_case() {
assert_eq!(
serde_json::to_string(&WaitReason::Exited).unwrap(),
"\"exited\""
);
assert_eq!(
serde_json::to_string(&WaitReason::Signal).unwrap(),
"\"signal\""
);
assert_eq!(
serde_json::to_string(&WaitReason::OomKilled).unwrap(),
"\"oom_killed\""
);
assert_eq!(
serde_json::to_string(&WaitReason::RuntimeError).unwrap(),
"\"runtime_error\""
);
}
#[test]
fn wait_reason_deserialize_roundtrip() {
for variant in [
WaitReason::Exited,
WaitReason::Signal,
WaitReason::OomKilled,
WaitReason::RuntimeError,
] {
let s = serde_json::to_string(&variant).unwrap();
let back: WaitReason = serde_json::from_str(&s).unwrap();
assert_eq!(variant, back, "roundtrip failed for {variant:?}");
}
}
#[test]
fn signal_name_from_exit_code_known_signals() {
assert_eq!(signal_name_from_exit_code(137).as_deref(), Some("SIGKILL"));
assert_eq!(signal_name_from_exit_code(143).as_deref(), Some("SIGTERM"));
assert_eq!(signal_name_from_exit_code(130).as_deref(), Some("SIGINT"));
assert_eq!(signal_name_from_exit_code(129).as_deref(), Some("SIGHUP"));
assert_eq!(signal_name_from_exit_code(139).as_deref(), Some("SIGSEGV"));
}
#[test]
fn signal_name_from_exit_code_handles_unknown_and_normal() {
assert_eq!(signal_name_from_exit_code(0), None);
assert_eq!(signal_name_from_exit_code(1), None);
assert_eq!(signal_name_from_exit_code(128), None);
assert_eq!(
signal_name_from_exit_code(128 + 99).as_deref(),
Some("signal_99")
);
}
#[tokio::test]
async fn default_wait_outcome_delegates_to_wait_container() {
let runtime = MockRuntime::new();
let id = ContainerId {
service: "wait-test".to_string(),
replica: 0,
};
runtime.create_container(&id, &mock_spec()).await.unwrap();
runtime.start_container(&id).await.unwrap();
let outcome = runtime.wait_outcome(&id).await.unwrap();
assert_eq!(outcome.exit_code, 0);
assert_eq!(outcome.reason, WaitReason::Exited);
assert!(outcome.signal.is_none());
assert!(outcome.finished_at.is_none());
}
#[tokio::test]
async fn mock_kill_container_rejects_bogus_signal() {
let runtime = MockRuntime::new();
let id = ContainerId {
service: "kill-me".to_string(),
replica: 0,
};
runtime.create_container(&id, &mock_spec()).await.unwrap();
runtime.start_container(&id).await.unwrap();
let err = runtime
.kill_container(&id, Some("SIGFOO"))
.await
.unwrap_err();
assert!(
matches!(err, AgentError::InvalidSpec(_)),
"expected InvalidSpec, got {err:?}"
);
}
fn mock_spec() -> ServiceSpec {
use zlayer_spec::*;
serde_yaml::from_str::<DeploymentSpec>(
r"
version: v1
deployment: test
services:
test:
rtype: service
image:
name: test:latest
endpoints:
- name: http
protocol: http
port: 8080
",
)
.unwrap()
.services
.remove("test")
.unwrap()
}
}