#![allow(
clippy::type_complexity,
clippy::must_use_candidate,
clippy::default_trait_access,
clippy::unused_self,
clippy::needless_pass_by_value,
clippy::bind_instead_of_map,
clippy::map_unwrap_or,
clippy::needless_return,
clippy::unnecessary_debug_formatting,
clippy::used_underscore_binding,
clippy::no_effect_underscore_binding,
clippy::needless_raw_string_hashes
)]
use std::collections::HashMap;
use std::net::IpAddr;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures_util::StreamExt;
use oci_client::manifest::OciImageManifest;
use tokio::sync::{Mutex, RwLock};
use tokio::time::timeout as tokio_timeout;
use tracing::instrument;
use windows::core::GUID;
use zlayer_hns::attach::{self as hns_attach, EndpointAttachment};
use zlayer_observability::logs::LogEntry;
use zlayer_overlay::ipnet;
use zlayer_spec::{PullPolicy, RegistryAuth as SpecRegistryAuth, ServiceSpec};
use crate::cgroups_stats::ContainerStats;
use crate::error::{AgentError, Result};
use crate::runtime::{
ContainerId, ContainerInspectDetails, ContainerState, ExecEvent, ExecEventStream, ImageInfo,
PruneResult, Runtime, WaitOutcome, WaitReason,
};
use crate::windows::{scratch, unpacker};
use zlayer_hcs::enumerate;
use zlayer_hcs::events::{self, HcsEventKind};
use zlayer_hcs::schema::{
ComputeSystem as HcsDoc, Container as HcsContainer, ContainerMemory, ContainerProcessor,
ProcessParameters, SchemaVersion, Statistics, Storage as HcsStorage,
};
use zlayer_hcs::system::ComputeSystem;
pub const OWNER_TAG: &str = "zlayer";
const OVERLAY_NETWORK_NAME: &str = "zlayer-overlay";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum IsolationMode {
#[default]
Process,
Hyperv,
}
#[derive(Debug, Clone)]
pub struct HcsConfig {
pub storage_root: PathBuf,
pub default_isolation: IsolationMode,
pub default_scratch_size_gb: u64,
pub cluster_cidr: String,
pub slice_cidr: Option<ipnet::IpNet>,
}
impl Default for HcsConfig {
fn default() -> Self {
let dirs = zlayer_paths::ZLayerDirs::system_default();
Self {
storage_root: std::env::var("ZLAYER_HCS_STORAGE_ROOT")
.map_or_else(|_| dirs.containers().join("hcs"), PathBuf::from),
default_isolation: IsolationMode::default(),
default_scratch_size_gb: 20,
cluster_cidr: "10.200.0.0/16".to_string(),
slice_cidr: None,
}
}
}
#[derive(Debug)]
struct CachedImage {
unpacked: unpacker::UnpackedImage,
}
#[derive(Debug)]
struct ContainerEntry {
system: ComputeSystem,
scratch_layer: Option<scratch::WritableLayer>,
hcs_id: String,
last_exit_code: Arc<RwLock<Option<i32>>>,
network_attachment: Option<EndpointAttachment>,
}
#[derive(Debug)]
struct OverlayNetwork {
id: GUID,
#[allow(dead_code)]
subnet: String,
_network: zlayer_hns::network::Network,
}
pub struct HcsRuntime {
config: HcsConfig,
containers: RwLock<HashMap<String, ContainerEntry>>,
images: RwLock<HashMap<String, CachedImage>>,
registry: Arc<zlayer_registry::ImagePuller>,
auth_resolver: zlayer_core::AuthResolver,
overlay_network: Arc<Mutex<Option<OverlayNetwork>>>,
next_container_ip: Arc<Mutex<Option<IpAddr>>>,
next_container_dns: Arc<Mutex<Option<(Option<IpAddr>, Option<String>)>>>,
}
impl std::fmt::Debug for HcsRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HcsRuntime")
.field("config", &self.config)
.finish_non_exhaustive()
}
}
impl HcsRuntime {
pub async fn new(config: HcsConfig) -> Result<Self> {
let cache_type = zlayer_registry::CacheType::from_env().map_err(|e| {
AgentError::Configuration(format!("failed to configure HCS blob cache from env: {e}"))
})?;
let blob_cache = cache_type.build().await.map_err(|e| {
AgentError::Configuration(format!("failed to open HCS blob cache: {e}"))
})?;
let registry = Arc::new(zlayer_registry::ImagePuller::with_cache(blob_cache));
Self::new_with_registry(config, registry)
}
pub fn new_with_registry(
config: HcsConfig,
registry: Arc<zlayer_registry::ImagePuller>,
) -> Result<Self> {
std::fs::create_dir_all(&config.storage_root).map_err(|e| {
AgentError::Configuration(format!(
"failed to create HCS storage root {:?}: {e}",
config.storage_root
))
})?;
std::fs::create_dir_all(config.storage_root.join("images")).map_err(|e| {
AgentError::Configuration(format!("failed to create HCS image cache dir: {e}"))
})?;
std::fs::create_dir_all(config.storage_root.join("scratch")).map_err(|e| {
AgentError::Configuration(format!("failed to create HCS scratch dir: {e}"))
})?;
Ok(Self {
config,
containers: RwLock::new(HashMap::new()),
images: RwLock::new(HashMap::new()),
registry,
auth_resolver: zlayer_core::AuthResolver::new(zlayer_core::AuthConfig::default()),
overlay_network: Arc::new(Mutex::new(None)),
next_container_ip: Arc::new(Mutex::new(None)),
next_container_dns: Arc::new(Mutex::new(None)),
})
}
fn hcs_id(id: &ContainerId) -> String {
id.to_string()
}
fn image_layer_dir(&self, image: &str) -> PathBuf {
let safe = image.replace(['/', ':', '@'], "_");
self.config.storage_root.join("images").join(safe)
}
fn scratch_dir(&self, hcs_id: &str) -> PathBuf {
self.config.storage_root.join("scratch").join(hcs_id)
}
async fn ensure_overlay_network(&self, slice_cidr: ipnet::IpNet) -> Result<GUID> {
let mut guard = self.overlay_network.lock().await;
if let Some(net) = guard.as_ref() {
return Ok(net.id);
}
let net_id = GUID::new().map_err(|e| {
AgentError::Internal(format!("GUID::new for overlay network failed: {e}"))
})?;
let uplink = zlayer_hns::adapter::find_primary_adapter()
.map_err(|e| AgentError::Internal(format!("find_primary_adapter: {e}")))?;
let subnet_str = slice_cidr.to_string();
let subnet_for_create = subnet_str.clone();
let uplink_for_create = uplink.clone();
let network = tokio::task::spawn_blocking(move || {
zlayer_hns::network::Network::create_transparent(
net_id,
OVERLAY_NETWORK_NAME,
&subnet_for_create,
&uplink_for_create,
)
})
.await
.map_err(|e| AgentError::Internal(format!("spawn_blocking join failed: {e}")))?
.map_err(|e| AgentError::Internal(format!("HcnCreateNetwork(zlayer-overlay): {e}")))?;
*guard = Some(OverlayNetwork {
id: net_id,
subnet: subnet_str.clone(),
_network: network,
});
tracing::info!(
network_id = %format!("{net_id:?}"),
subnet = %subnet_str,
uplink = %uplink,
"created HCN Transparent overlay network"
);
Ok(net_id)
}
pub async fn set_next_container_ip(&self, ip: IpAddr) {
*self.next_container_ip.lock().await = Some(ip);
}
pub async fn set_next_container_dns(
&self,
dns_server: Option<IpAddr>,
dns_domain: Option<String>,
) {
*self.next_container_dns.lock().await = Some((dns_server, dns_domain));
}
#[allow(dead_code)]
pub async fn reconcile_orphans(&self) -> Result<()> {
let live: std::collections::HashSet<String> =
self.containers.read().await.keys().cloned().collect();
let owned = tokio::task::spawn_blocking(|| hns_attach::list_owned_endpoints(OWNER_TAG))
.await
.map_err(|e| AgentError::Internal(format!("spawn_blocking join failed: {e}")))?
.map_err(|e| AgentError::Internal(format!("list_owned_endpoints: {e}")))?;
for (endpoint_id, name) in owned {
let prefix = format!("{OWNER_TAG}-");
let container_id = name.strip_prefix(&prefix).unwrap_or(name.as_str());
if live.contains(container_id) {
continue;
}
let ep_id = endpoint_id;
let namespace_id = match tokio::task::spawn_blocking(move || {
zlayer_hns::endpoint::Endpoint::open(ep_id).and_then(|ep| ep.query_properties("{}"))
})
.await
{
Ok(Ok(props)) => props
.host_compute_namespace
.as_deref()
.and_then(parse_guid_loose),
Ok(Err(e)) => {
tracing::warn!(
endpoint_id = %format!("{ep_id:?}"),
error = %e,
"reconcile: failed to query orphan endpoint properties"
);
None
}
Err(e) => {
tracing::warn!(
endpoint_id = %format!("{ep_id:?}"),
error = %e,
"reconcile: spawn_blocking join failed"
);
None
}
};
let res = tokio::task::spawn_blocking(move || match namespace_id {
Some(ns) => hns_attach::delete_endpoint_and_namespace(ep_id, ns),
None => zlayer_hns::endpoint::Endpoint::delete(ep_id),
})
.await;
match res {
Ok(Ok(())) => {
tracing::info!(
endpoint_id = %format!("{ep_id:?}"),
container_id = %container_id,
"reconcile: reaped orphan HCN endpoint"
);
}
Ok(Err(e)) => {
tracing::warn!(
endpoint_id = %format!("{ep_id:?}"),
error = %e,
"reconcile: failed to delete orphan endpoint"
);
}
Err(e) => {
tracing::warn!(
endpoint_id = %format!("{ep_id:?}"),
error = %e,
"reconcile: spawn_blocking join failed during delete"
);
}
}
}
Ok(())
}
fn manifest_to_descriptors(
manifest: &OciImageManifest,
) -> Vec<unpacker::ResolvedLayerDescriptor> {
manifest
.layers
.iter()
.map(|l| unpacker::ResolvedLayerDescriptor {
digest: l.digest.clone(),
media_type: l.media_type.clone(),
size: l.size,
urls: l.urls.clone().unwrap_or_default(),
})
.collect()
}
async fn do_pull(
&self,
image: &str,
_policy: PullPolicy,
_auth: Option<&SpecRegistryAuth>,
) -> Result<()> {
{
let cache = self.images.read().await;
if cache.contains_key(image) {
tracing::debug!(image = %image, "HCS image cache hit, skipping pull");
return Ok(());
}
}
let auth = self.auth_resolver.resolve(image);
let (manifest, _digest) = self
.registry
.pull_manifest(image, &auth)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("manifest pull: {e}"),
})?;
let descriptors = Self::manifest_to_descriptors(&manifest);
let dest_root = self.image_layer_dir(image);
let unpacked = unpacker::unpack_windows_image(
self.registry.as_ref(),
image,
&auth,
&descriptors,
&dest_root,
)
.await
.map_err(|e| AgentError::PullFailed {
image: image.to_string(),
reason: format!("unpack: {e}"),
})?;
let mut cache = self.images.write().await;
cache.insert(image.to_string(), CachedImage { unpacked });
Ok(())
}
fn build_compute_system_doc(
&self,
hcs_id: &str,
spec: &ServiceSpec,
scratch_layer: &scratch::WritableLayer,
parent_layers: Vec<zlayer_hcs::schema::Layer>,
namespace_ids: Vec<String>,
) -> HcsDoc {
let processor = spec.resources.cpu.and_then(|cpu| {
let count = cpu.ceil();
#[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
let count_u32 = if count.is_finite() && count >= 1.0 {
count as u32
} else {
return None;
};
Some(ContainerProcessor {
count: Some(count_u32),
maximum: None,
weight: None,
})
});
let memory = spec.resources.memory.as_ref().and_then(|mem_str| {
crate::bundle::parse_memory_string(mem_str)
.ok()
.map(|bytes| {
let mib = bytes.div_ceil(1024 * 1024);
ContainerMemory {
size_in_mb: Some(mib),
}
})
});
let storage = HcsStorage {
layers: parent_layers,
path: Some(scratch_layer.layer_path().to_string_lossy().into_owned()),
};
let networking = if namespace_ids.is_empty() {
None
} else {
Some(zlayer_hcs::schema::ContainerNetworking {
allow_unqualified_dns_query: None,
dns_search_list: Vec::new(),
namespace: namespace_ids,
network_shared_container_name: None,
})
};
let container = HcsContainer {
storage: Some(storage),
networking,
mapped_directories: Vec::new(),
mapped_pipes: Vec::new(),
hostname: spec.hostname.clone(),
processor,
memory,
};
HcsDoc {
owner: OWNER_TAG.to_string(),
schema_version: SchemaVersion::default(),
hosting_system_id: String::new(),
container: Some(container),
virtual_machine: None,
should_terminate_on_last_handle_closed: Some(true),
}
.apply_service_id(hcs_id)
}
async fn resolve_parent_chain(&self, image: &str) -> Result<Vec<zlayer_hcs::schema::Layer>> {
let cache = self.images.read().await;
let entry = cache.get(image).ok_or_else(|| AgentError::CreateFailed {
id: image.to_string(),
reason: format!("image '{image}' not pulled before create_container"),
})?;
Ok(entry.unpacked.chain.0.clone())
}
fn spawn_exit_watcher(
&self,
hcs_id: String,
system_raw: windows::Win32::System::HostComputeSystem::HCS_SYSTEM,
sink: Arc<RwLock<Option<i32>>>,
) {
let (_sub, mut stream) = match events::subscribe(system_raw) {
Ok(pair) => pair,
Err(e) => {
tracing::warn!(
hcs_id = %hcs_id,
error = %e,
"failed to subscribe to HCS lifecycle events; exit code will be unknown"
);
return;
}
};
tokio::spawn(async move {
let _sub = _sub;
while let Some(evt) = stream.next().await {
if matches!(evt.kind, HcsEventKind::SystemExited) {
let code = extract_exit_code(&evt.detail_json).unwrap_or(0);
*sink.write().await = Some(code);
break;
}
if matches!(evt.kind, HcsEventKind::ServiceDisconnect) {
*sink.write().await = Some(-1);
break;
}
}
});
}
}
fn parse_guid_loose(s: &str) -> Option<GUID> {
let bare = s.trim_matches(|c: char| c == '{' || c == '}');
GUID::try_from(bare).ok()
}
fn extract_exit_code(detail_json: &str) -> Option<i32> {
if detail_json.trim().is_empty() {
return None;
}
let v: serde_json::Value = serde_json::from_str(detail_json).ok()?;
v.get("ExitCode")
.and_then(serde_json::Value::as_i64)
.and_then(|n| {
#[allow(clippy::cast_possible_truncation)]
Some(n as i32)
})
}
trait ApplyServiceId {
fn apply_service_id(self, hcs_id: &str) -> Self;
}
impl ApplyServiceId for HcsDoc {
fn apply_service_id(self, hcs_id: &str) -> Self {
let _ = hcs_id;
self
}
}
#[async_trait]
impl Runtime for HcsRuntime {
#[instrument(skip(self), fields(otel.name = "image.pull", container.image.name = %image))]
async fn pull_image(&self, image: &str) -> Result<()> {
self.do_pull(image, PullPolicy::IfNotPresent, None).await
}
#[instrument(
skip(self, auth),
fields(otel.name = "image.pull", container.image.name = %image, pull_policy = ?policy)
)]
async fn pull_image_with_policy(
&self,
image: &str,
policy: PullPolicy,
auth: Option<&SpecRegistryAuth>,
) -> Result<()> {
if matches!(policy, PullPolicy::Never) {
let cache = self.images.read().await;
return if cache.contains_key(image) {
Ok(())
} else {
Err(AgentError::PullFailed {
image: image.to_string(),
reason: "pull_policy=never and image not cached locally".to_string(),
})
};
}
self.do_pull(image, policy, auth).await
}
#[instrument(
skip(self, spec),
fields(
otel.name = "container.create",
container.id = %id,
service.name = %id.service,
service.replica = %id.replica,
container.image.name = %spec.image.name,
)
)]
async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
let hcs_id = Self::hcs_id(id);
let image_name = spec.image.name.to_string();
{
let cache = self.images.read().await;
if !cache.contains_key(&image_name) {
drop(cache);
self.do_pull(&image_name, spec.image.pull_policy, None)
.await?;
}
}
let parent_layers = self.resolve_parent_chain(&image_name).await?;
let scratch_dir = self.scratch_dir(&hcs_id);
let chain = crate::windows::wclayer::LayerChain::new(parent_layers.clone());
let scratch_layer = scratch::create(
&scratch_dir,
&chain,
self.config.default_scratch_size_gb,
false,
)
.map_err(|e| AgentError::CreateFailed {
id: hcs_id.clone(),
reason: format!("scratch layer create: {e}"),
})?;
let slice_cidr = self.config.slice_cidr;
let allocated_ip = self.next_container_ip.lock().await.take();
let dns_config = self.next_container_dns.lock().await.take();
let cluster_cidr = self.config.cluster_cidr.clone();
let network_attachment = match (slice_cidr, allocated_ip) {
(Some(slice), Some(ip)) => match self.ensure_overlay_network(slice).await {
Ok(net_id) => {
let cid_for_attach = hcs_id.clone();
let prefix_length = slice.prefix_len();
let cluster_cidr_owned = cluster_cidr;
let (dns_server, dns_domain) = dns_config.unwrap_or((None, None));
match tokio::task::spawn_blocking(move || {
EndpointAttachment::create_overlay(
net_id,
OWNER_TAG,
cid_for_attach.as_str(),
ip,
prefix_length,
&cluster_cidr_owned,
dns_server,
dns_domain.as_deref(),
)
})
.await
{
Ok(Ok(att)) => Some(att),
Ok(Err(e)) => {
tracing::warn!(
hcs_id = %hcs_id,
error = %e,
"HCN overlay endpoint attach failed; starting container without network"
);
None
}
Err(e) => {
tracing::warn!(
hcs_id = %hcs_id,
error = %e,
"spawn_blocking join for overlay endpoint attach failed; starting container without network"
);
None
}
}
}
Err(e) => {
tracing::warn!(
hcs_id = %hcs_id,
error = %e,
"HCN Transparent overlay network unavailable; starting container without network"
);
None
}
},
(None, _) => {
tracing::warn!(
hcs_id = %hcs_id,
"HcsConfig.slice_cidr is None (node has no assigned slice yet); starting container without network"
);
None
}
(Some(_), None) => {
tracing::warn!(
hcs_id = %hcs_id,
"no container IP stashed via set_next_container_ip; starting container without network"
);
None
}
};
let namespace_strs: Vec<String> = network_attachment
.as_ref()
.map(|att| vec![format!("{:?}", att.namespace_id())])
.unwrap_or_default();
let doc = self.build_compute_system_doc(
&hcs_id,
spec,
&scratch_layer,
parent_layers,
namespace_strs,
);
let doc_json = serde_json::to_string(&doc).map_err(|e| AgentError::CreateFailed {
id: hcs_id.clone(),
reason: format!("serialize ComputeSystem doc: {e}"),
})?;
let system = ComputeSystem::create(&hcs_id, &doc_json)
.await
.map_err(|e| AgentError::CreateFailed {
id: hcs_id.clone(),
reason: format!("HcsCreateComputeSystem: {e}"),
})?;
let sink: Arc<RwLock<Option<i32>>> = Arc::new(RwLock::new(None));
self.spawn_exit_watcher(hcs_id.clone(), *system.raw(), sink.clone());
let entry = ContainerEntry {
system,
scratch_layer: Some(scratch_layer),
hcs_id: hcs_id.clone(),
last_exit_code: sink,
network_attachment,
};
self.containers.write().await.insert(hcs_id, entry);
Ok(())
}
#[instrument(skip(self), fields(otel.name = "container.start", container.id = %id))]
async fn start_container(&self, id: &ContainerId) -> Result<()> {
let hcs_id = Self::hcs_id(id);
let containers = self.containers.read().await;
let entry = containers
.get(&hcs_id)
.ok_or_else(|| AgentError::NotFound {
container: hcs_id.clone(),
reason: "no HCS entry for container".to_string(),
})?;
entry
.system
.start("")
.await
.map_err(|e| AgentError::StartFailed {
id: hcs_id.clone(),
reason: format!("HcsStartComputeSystem: {e}"),
})
}
#[instrument(skip(self), fields(otel.name = "container.stop", container.id = %id))]
async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
let hcs_id = Self::hcs_id(id);
let containers = self.containers.read().await;
let entry = containers
.get(&hcs_id)
.ok_or_else(|| AgentError::NotFound {
container: hcs_id.clone(),
reason: "no HCS entry for container".to_string(),
})?;
let opts_json = format!(r#"{{"TimeoutSeconds":{}}}"#, timeout.as_secs().max(1));
match tokio_timeout(timeout, entry.system.shutdown(&opts_json)).await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => {
tracing::warn!(
hcs_id = %hcs_id,
error = %e,
"graceful shutdown failed; escalating to terminate"
);
entry
.system
.terminate("")
.await
.map_err(|e| AgentError::Internal(format!("HcsTerminateComputeSystem: {e}")))
}
Err(_elapsed) => {
tracing::warn!(
hcs_id = %hcs_id,
"graceful shutdown timed out; escalating to terminate"
);
entry
.system
.terminate("")
.await
.map_err(|e| AgentError::Internal(format!("HcsTerminateComputeSystem: {e}")))
}
}
}
#[instrument(skip(self), fields(otel.name = "container.remove", container.id = %id))]
async fn remove_container(&self, id: &ContainerId) -> Result<()> {
let hcs_id = Self::hcs_id(id);
let mut containers = self.containers.write().await;
let Some(mut entry) = containers.remove(&hcs_id) else {
return Err(AgentError::NotFound {
container: hcs_id,
reason: "no HCS entry for container".to_string(),
});
};
if let Err(e) = entry.system.terminate("").await {
tracing::debug!(
hcs_id = %entry.hcs_id,
error = %e,
"terminate during remove failed (container may already be stopped)"
);
}
if let Some(scratch_layer) = entry.scratch_layer.take() {
scratch_layer
.detach_and_destroy()
.map_err(|e| AgentError::Internal(format!("scratch teardown: {e}")))?;
}
if let Some(attachment) = entry.network_attachment.take() {
let hcs_id_for_log = entry.hcs_id.clone();
let res = tokio::task::spawn_blocking(move || attachment.teardown()).await;
match res {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::warn!(
hcs_id = %hcs_id_for_log,
error = %e,
"HCN attachment teardown failed; endpoint may leak until next reconcile"
);
}
Err(e) => {
tracing::warn!(
hcs_id = %hcs_id_for_log,
error = %e,
"spawn_blocking join failed during HCN teardown"
);
}
}
}
drop(entry);
Ok(())
}
async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
let hcs_id = Self::hcs_id(id);
let containers = self.containers.read().await;
let Some(entry) = containers.get(&hcs_id) else {
return Err(AgentError::NotFound {
container: hcs_id,
reason: "no HCS entry for container".to_string(),
});
};
if let Some(code) = *entry.last_exit_code.read().await {
return Ok(ContainerState::Exited { code });
}
Ok(ContainerState::Running)
}
async fn container_logs(&self, _id: &ContainerId, _tail: usize) -> Result<Vec<LogEntry>> {
Err(AgentError::Unsupported(
"container_logs is not yet wired for the HCS runtime; use `zlayer exec` to inspect logs inside the container".to_string(),
))
}
async fn exec(&self, id: &ContainerId, cmd: &[String]) -> Result<(i32, String, String)> {
use zlayer_hcs::process::ComputeProcess;
if cmd.is_empty() {
return Err(AgentError::InvalidSpec(
"exec command must not be empty".to_string(),
));
}
let hcs_id = Self::hcs_id(id);
let containers = self.containers.read().await;
let entry = containers
.get(&hcs_id)
.ok_or_else(|| AgentError::NotFound {
container: hcs_id.clone(),
reason: "no HCS entry for container".to_string(),
})?;
let command_line = cmd.join(" ");
let params = ProcessParameters {
command_line,
working_directory: String::new(),
environment: Default::default(),
emulate_console: Some(false),
create_std_in_pipe: Some(false),
create_std_out_pipe: Some(true),
create_std_err_pipe: Some(true),
console_size: None,
user: None,
};
let system_handle = entry.system.raw();
let process = ComputeProcess::spawn(system_handle, ¶ms)
.await
.map_err(|e| AgentError::Internal(format!("HcsCreateProcess: {e}")))?;
for _ in 0..600 {
let raw_props = process
.properties(r#"{"PropertyTypes":["ProcessStatus"]}"#)
.await
.map_err(|e| AgentError::Internal(format!("HcsGetProcessProperties: {e}")))?;
if let Some(code) = extract_process_exit_code(&raw_props) {
return Ok((code, String::new(), String::new()));
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
Err(AgentError::Timeout {
timeout: Duration::from_secs(60),
})
}
async fn exec_stream(&self, id: &ContainerId, cmd: &[String]) -> Result<ExecEventStream> {
let (exit, stdout, stderr) = self.exec(id, cmd).await?;
let mut events: Vec<ExecEvent> = Vec::with_capacity(3);
if !stdout.is_empty() {
events.push(ExecEvent::Stdout(stdout));
}
if !stderr.is_empty() {
events.push(ExecEvent::Stderr(stderr));
}
events.push(ExecEvent::Exit(exit));
Ok(Box::pin(futures_util::stream::iter(events)))
}
async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
let hcs_id = Self::hcs_id(id);
let containers = self.containers.read().await;
let entry = containers
.get(&hcs_id)
.ok_or_else(|| AgentError::NotFound {
container: hcs_id.clone(),
reason: "no HCS entry for container".to_string(),
})?;
let raw = entry
.system
.read_statistics()
.await
.map_err(|e| AgentError::Internal(format!("HcsGetComputeSystemProperties: {e}")))?;
Ok(translate_stats(&raw))
}
async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
let hcs_id = Self::hcs_id(id);
let sink = {
let containers = self.containers.read().await;
let entry = containers
.get(&hcs_id)
.ok_or_else(|| AgentError::NotFound {
container: hcs_id.clone(),
reason: "no HCS entry for container".to_string(),
})?;
entry.last_exit_code.clone()
};
loop {
if let Some(code) = *sink.read().await {
return Ok(code);
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
async fn wait_outcome(&self, id: &ContainerId) -> Result<WaitOutcome> {
let exit_code = self.wait_container(id).await?;
let reason = if exit_code == -1 {
WaitReason::RuntimeError
} else {
WaitReason::Exited
};
Ok(WaitOutcome {
exit_code,
reason,
signal: None,
finished_at: Some(chrono::Utc::now()),
})
}
async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
self.container_logs(id, usize::MAX).await
}
async fn get_container_pid(&self, _id: &ContainerId) -> Result<Option<u32>> {
Ok(None)
}
async fn get_container_namespace_id(
&self,
id: &ContainerId,
) -> Result<Option<windows::core::GUID>> {
let hcs_id = Self::hcs_id(id);
let entries = self.containers.read().await;
Ok(entries.get(&hcs_id).and_then(|e| {
e.network_attachment
.as_ref()
.map(EndpointAttachment::namespace_id)
}))
}
async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
let hcs_id = Self::hcs_id(id);
let containers = self.containers.read().await;
let Some(entry) = containers.get(&hcs_id) else {
return Err(AgentError::NotFound {
container: hcs_id,
reason: "no HCS entry for container".to_string(),
});
};
let Some(ip_str) = entry
.network_attachment
.as_ref()
.and_then(|a| a.ip().map(str::to_string))
else {
return Ok(None);
};
match ip_str.parse::<IpAddr>() {
Ok(ip) => Ok(Some(ip)),
Err(e) => {
tracing::warn!(
hcs_id = %hcs_id,
ip = %ip_str,
error = %e,
"HCN endpoint returned unparseable IP"
);
Ok(None)
}
}
}
async fn list_images(&self) -> Result<Vec<ImageInfo>> {
let cache = self.images.read().await;
Ok(cache
.keys()
.map(|reference| ImageInfo {
reference: reference.clone(),
digest: None,
size_bytes: None,
})
.collect())
}
async fn remove_image(&self, image: &str, _force: bool) -> Result<()> {
let mut cache = self.images.write().await;
if let Some(entry) = cache.remove(image) {
for layer in &entry.unpacked.chain.0 {
let path = std::path::Path::new(&layer.path);
if let Err(e) = crate::windows::wclayer::destroy_layer(path) {
tracing::warn!(layer = %layer.path, error = %e, "destroy_layer failed");
}
}
}
Ok(())
}
async fn prune_images(&self) -> Result<PruneResult> {
Ok(PruneResult::default())
}
async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
let _ = crate::runtime::validate_signal(signal.unwrap_or("SIGKILL"))?;
let hcs_id = Self::hcs_id(id);
let containers = self.containers.read().await;
let entry = containers
.get(&hcs_id)
.ok_or_else(|| AgentError::NotFound {
container: hcs_id.clone(),
reason: "no HCS entry for container".to_string(),
})?;
entry
.system
.terminate("")
.await
.map_err(|e| AgentError::Internal(format!("HcsTerminateComputeSystem: {e}")))
}
async fn tag_image(&self, source: &str, target: &str) -> Result<()> {
let mut cache = self.images.write().await;
let Some(entry) = cache.get(source) else {
return Err(AgentError::NotFound {
container: source.to_string(),
reason: "source image not cached".to_string(),
});
};
let cloned = CachedImage {
unpacked: entry.unpacked.clone(),
};
cache.insert(target.to_string(), cloned);
Ok(())
}
async fn inspect_detailed(&self, id: &ContainerId) -> Result<ContainerInspectDetails> {
let hcs_id = Self::hcs_id(id);
let last_exit_code_lock = {
let containers = self.containers.read().await;
let entry = containers
.get(&hcs_id)
.ok_or_else(|| AgentError::NotFound {
container: hcs_id.clone(),
reason: "no HCS entry for container".to_string(),
})?;
Arc::clone(&entry.last_exit_code)
};
let exit_code = *last_exit_code_lock.read().await;
Ok(ContainerInspectDetails {
ports: Vec::new(),
networks: Vec::new(),
ipv4: None,
health: None,
exit_code,
})
}
}
fn translate_stats(raw: &Statistics) -> ContainerStats {
let cpu_usage_usec = raw
.processor
.as_ref()
.map(|p| p.total_runtime_100ns / 10)
.unwrap_or(0);
let memory_bytes = raw
.memory
.as_ref()
.map(|m| m.memory_usage_private_working_set_bytes)
.unwrap_or(0);
ContainerStats {
cpu_usage_usec,
memory_bytes,
memory_limit: u64::MAX,
timestamp: Instant::now(),
}
}
fn extract_process_exit_code(raw_json: &str) -> Option<i32> {
let v: serde_json::Value = serde_json::from_str(raw_json).ok()?;
let status = v
.get("ProcessStatus")
.and_then(|s| s.get("ExitCode"))
.or_else(|| v.get("ExitCode"))?;
status.as_i64().map(|n| {
#[allow(clippy::cast_possible_truncation)]
let truncated = n as i32;
truncated
})
}
pub async fn list_owned_systems() -> Result<Vec<String>> {
let systems = enumerate::list_by_owner(OWNER_TAG)
.await
.map_err(|e| AgentError::Internal(format!("HcsEnumerateComputeSystems: {e}")))?;
Ok(systems.into_iter().map(|s| s.id).collect())
}
#[cfg(test)]
mod tests {
use super::*;
use zlayer_hcs::schema::{MemoryStats, ProcessorStats};
#[test]
fn translate_stats_converts_100ns_to_usec_and_private_working_set_to_bytes() {
let raw = Statistics {
timestamp: None,
container_start_time: None,
uptime_100ns: 0,
processor: Some(ProcessorStats {
total_runtime_100ns: 12_345_000, runtime_user_100ns: 0,
runtime_kernel_100ns: 0,
}),
memory: Some(MemoryStats {
memory_usage_commit_bytes: 0,
memory_usage_commit_peak_bytes: 0,
memory_usage_private_working_set_bytes: 256 * 1024 * 1024,
}),
storage: None,
};
let stats = translate_stats(&raw);
assert_eq!(stats.cpu_usage_usec, 1_234_500);
assert_eq!(stats.memory_bytes, 256 * 1024 * 1024);
assert_eq!(stats.memory_limit, u64::MAX);
}
#[test]
fn translate_stats_defaults_zero_when_fields_missing() {
let raw = Statistics::default();
let stats = translate_stats(&raw);
assert_eq!(stats.cpu_usage_usec, 0);
assert_eq!(stats.memory_bytes, 0);
assert_eq!(stats.memory_limit, u64::MAX);
}
#[test]
fn extract_exit_code_reads_json_payload() {
assert_eq!(extract_exit_code(r#"{"ExitCode":42}"#), Some(42));
assert_eq!(extract_exit_code(""), None);
assert_eq!(extract_exit_code("not json"), None);
assert_eq!(extract_exit_code(r#"{"NoExitCode":1}"#), None);
}
#[test]
fn extract_process_exit_code_handles_nested_and_flat() {
assert_eq!(
extract_process_exit_code(r#"{"ProcessStatus":{"ExitCode":7}}"#),
Some(7)
);
assert_eq!(extract_process_exit_code(r#"{"ExitCode":9}"#), Some(9));
assert_eq!(extract_process_exit_code(r#"{}"#), None);
}
#[test]
fn hcs_config_default_sets_overlay_networking_fields() {
let cfg = HcsConfig::default();
assert_eq!(cfg.cluster_cidr, "10.200.0.0/16");
assert!(
cfg.slice_cidr.is_none(),
"slice_cidr must be None until the node joins the cluster and the leader hands out a slice"
);
}
}