#![cfg_attr(docsrs, feature(doc_auto_cfg))]
pub mod backend;
#[cfg(feature = "docker")]
pub mod docker;
pub mod error;
pub mod heuristics;
pub mod http;
pub mod instance;
use std::collections::HashMap;
use std::sync::Arc;
use axum::Router;
use koi_common::capability::CapabilityStatus;
use tokio::sync::{broadcast, mpsc, Mutex};
use tokio_util::sync::CancellationToken;
pub use backend::{RuntimeBackend, RuntimeBackendKind, RuntimeEvent};
pub use error::RuntimeError;
pub use instance::{Instance, InstanceState, KoiMetadata, PortMapping};
const BROADCAST_CHANNEL_CAPACITY: usize = 256;
#[derive(Debug, Clone)]
pub struct RuntimeConfig {
pub backend_kind: RuntimeBackendKind,
pub socket_path: Option<String>,
}
impl Default for RuntimeConfig {
fn default() -> Self {
Self {
backend_kind: RuntimeBackendKind::Auto,
socket_path: None,
}
}
}
struct RuntimeState {
instances: Mutex<HashMap<String, Instance>>,
backend_name: Mutex<Option<String>>,
active: Mutex<bool>,
event_tx: broadcast::Sender<RuntimeEvent>,
}
pub struct RuntimeCore {
state: Arc<RuntimeState>,
config: RuntimeConfig,
}
impl RuntimeCore {
pub fn new(config: RuntimeConfig) -> Self {
Self {
state: Arc::new(RuntimeState {
instances: Mutex::new(HashMap::new()),
backend_name: Mutex::new(None),
active: Mutex::new(false),
event_tx: broadcast::channel(BROADCAST_CHANNEL_CAPACITY).0,
}),
config,
}
}
pub fn routes(&self) -> Router {
http::routes(Arc::new(RuntimeCore {
state: Arc::clone(&self.state),
config: self.config.clone(),
}))
}
pub fn subscribe(&self) -> broadcast::Receiver<RuntimeEvent> {
self.state.event_tx.subscribe()
}
pub async fn status(&self) -> http::RuntimeStatus {
let instances = self.state.instances.lock().await;
let backend = self.state.backend_name.lock().await;
let active = *self.state.active.lock().await;
http::RuntimeStatus {
active,
backend: backend.clone(),
instance_count: instances.len(),
}
}
pub async fn list_instances(&self) -> Result<Vec<Instance>, RuntimeError> {
let instances = self.state.instances.lock().await;
Ok(instances.values().cloned().collect())
}
pub async fn start_watching(&self, cancel: CancellationToken) -> Result<(), RuntimeError> {
let mut backend = self.create_backend()?;
backend.connect().await?;
*self.state.backend_name.lock().await = Some(backend.name().to_string());
*self.state.active.lock().await = true;
let existing = backend.list_instances().await?;
{
let mut instances = self.state.instances.lock().await;
for instance in &existing {
instances.insert(instance.id.clone(), instance.clone());
}
}
tracing::info!(
backend = backend.name(),
instances = existing.len(),
"Runtime adapter started, initial reconciliation complete"
);
for instance in existing {
let _ = self.state.event_tx.send(RuntimeEvent::Started(instance));
}
let state = Arc::clone(&self.state);
let (event_tx, mut event_rx) = mpsc::channel(256);
let watch_cancel = cancel.clone();
tokio::spawn(async move {
if let Err(e) = backend.watch(event_tx, watch_cancel).await {
tracing::error!(error = %e, "Runtime watch loop exited with error");
}
*state.active.lock().await = false;
tracing::info!("Runtime watch loop stopped");
});
let state = Arc::clone(&self.state);
tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
match &event {
RuntimeEvent::Started(instance) => {
let mut instances = state.instances.lock().await;
instances.insert(instance.id.clone(), instance.clone());
tracing::debug!(
name = %instance.name,
id = %instance.id,
"Instance tracked"
);
}
RuntimeEvent::Stopped { id, name } => {
let mut instances = state.instances.lock().await;
instances.remove(id.as_str());
tracing::debug!(name, id, "Instance untracked");
}
RuntimeEvent::Updated(instance) => {
let mut instances = state.instances.lock().await;
instances.insert(instance.id.clone(), instance.clone());
}
RuntimeEvent::BackendDisconnected { backend, reason } => {
tracing::warn!(backend, reason, "Backend disconnected");
}
RuntimeEvent::BackendReconnected { backend } => {
tracing::info!(backend, "Backend reconnected");
}
}
let _ = state.event_tx.send(event);
}
});
Ok(())
}
pub async fn capability_status(&self) -> CapabilityStatus {
let instances = self.state.instances.lock().await;
let backend = self.state.backend_name.lock().await;
let active = *self.state.active.lock().await;
CapabilityStatus {
name: "runtime".to_string(),
healthy: active,
summary: if active {
format!(
"{}: {} instances",
backend.as_deref().unwrap_or("none"),
instances.len()
)
} else {
"inactive".to_string()
},
}
}
fn create_backend(&self) -> Result<Box<dyn RuntimeBackend>, RuntimeError> {
match self.config.backend_kind {
#[cfg(feature = "docker")]
RuntimeBackendKind::Docker => {
let backend = if let Some(ref path) = self.config.socket_path {
docker::DockerBackend::with_socket(path.clone())
} else {
docker::DockerBackend::new()
};
Ok(Box::new(backend))
}
#[cfg(feature = "docker")]
RuntimeBackendKind::Podman => {
let backend = if let Some(ref path) = self.config.socket_path {
docker::DockerBackend::with_socket(path.clone())
} else {
docker::DockerBackend::podman()
};
Ok(Box::new(backend))
}
RuntimeBackendKind::Auto => self.auto_detect_backend(),
#[cfg(not(feature = "docker"))]
RuntimeBackendKind::Docker | RuntimeBackendKind::Podman => {
Err(RuntimeError::BackendUnavailable(
"docker backend not compiled in — rebuild with the `docker` feature \
(koi-embedded: features = [\"docker\"]); the koi binary ships it by default"
.into(),
))
}
RuntimeBackendKind::Systemd => Err(RuntimeError::BackendUnavailable(
"systemd backend not yet implemented".into(),
)),
RuntimeBackendKind::Incus => Err(RuntimeError::BackendUnavailable(
"incus backend not yet implemented".into(),
)),
RuntimeBackendKind::Kubernetes => Err(RuntimeError::BackendUnavailable(
"kubernetes backend not yet implemented".into(),
)),
}
}
fn auto_detect_backend(&self) -> Result<Box<dyn RuntimeBackend>, RuntimeError> {
#[cfg(feature = "docker")]
{
if docker::is_docker_available() {
tracing::info!("Auto-detected Docker runtime");
return Ok(Box::new(docker::DockerBackend::new()));
}
if docker::is_podman_available() {
tracing::info!("Auto-detected Podman runtime");
return Ok(Box::new(docker::DockerBackend::podman()));
}
}
Err(RuntimeError::BackendUnavailable(
if cfg!(feature = "docker") {
"no supported runtime detected (checked: Docker, Podman)"
} else {
"no runtime backend compiled in (build without the `docker` feature)"
}
.into(),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn runtime_core_default_status_is_inactive() {
let core = RuntimeCore::new(RuntimeConfig::default());
let status = core.status().await;
assert!(!status.active);
assert_eq!(status.instance_count, 0);
assert!(status.backend.is_none());
}
#[cfg(not(feature = "docker"))]
#[tokio::test]
async fn docker_backend_unavailable_without_feature() {
let core = RuntimeCore::new(RuntimeConfig {
backend_kind: RuntimeBackendKind::Docker,
..Default::default()
});
let err = core
.start_watching(CancellationToken::new())
.await
.expect_err("docker backend must be unavailable without the feature");
assert!(matches!(err, RuntimeError::BackendUnavailable(_)));
assert!(err.to_string().contains("docker"));
}
#[tokio::test]
async fn list_instances_empty_by_default() {
let core = RuntimeCore::new(RuntimeConfig::default());
let instances = core.list_instances().await.unwrap();
assert!(instances.is_empty());
}
#[test]
fn auto_backend_kind_display() {
assert_eq!(RuntimeBackendKind::Auto.to_string(), "auto");
assert_eq!(RuntimeBackendKind::Docker.to_string(), "docker");
}
#[test]
fn backend_kind_from_str() {
assert_eq!(
RuntimeBackendKind::from_str_loose("docker"),
Some(RuntimeBackendKind::Docker)
);
assert_eq!(
RuntimeBackendKind::from_str_loose("k8s"),
Some(RuntimeBackendKind::Kubernetes)
);
assert_eq!(RuntimeBackendKind::from_str_loose("unknown"), None);
}
}