1pub mod backend;
13pub mod docker;
14pub mod error;
15pub mod heuristics;
16pub mod http;
17pub mod instance;
18
19use std::collections::HashMap;
20use std::sync::Arc;
21
22use axum::Router;
23use koi_common::capability::CapabilityStatus;
24use tokio::sync::{broadcast, mpsc, Mutex};
25use tokio_util::sync::CancellationToken;
26
27pub use backend::{RuntimeBackend, RuntimeBackendKind, RuntimeEvent};
28pub use error::RuntimeError;
29pub use instance::{Instance, InstanceState, KoiMetadata, PortMapping};
30
31const BROADCAST_CHANNEL_CAPACITY: usize = 256;
33
34#[derive(Debug, Clone)]
36pub struct RuntimeConfig {
37 pub backend_kind: RuntimeBackendKind,
39 pub socket_path: Option<String>,
41}
42
43impl Default for RuntimeConfig {
44 fn default() -> Self {
45 Self {
46 backend_kind: RuntimeBackendKind::Auto,
47 socket_path: None,
48 }
49 }
50}
51
52struct RuntimeState {
55 instances: Mutex<HashMap<String, Instance>>,
57 backend_name: Mutex<Option<String>>,
59 active: Mutex<bool>,
61 event_tx: broadcast::Sender<RuntimeEvent>,
63}
64
65pub struct RuntimeCore {
72 state: Arc<RuntimeState>,
73 config: RuntimeConfig,
74}
75
76impl RuntimeCore {
77 pub fn new(config: RuntimeConfig) -> Self {
79 Self {
80 state: Arc::new(RuntimeState {
81 instances: Mutex::new(HashMap::new()),
82 backend_name: Mutex::new(None),
83 active: Mutex::new(false),
84 event_tx: broadcast::channel(BROADCAST_CHANNEL_CAPACITY).0,
85 }),
86 config,
87 }
88 }
89
90 pub fn routes(&self) -> Router {
92 http::routes(Arc::new(RuntimeCore {
93 state: Arc::clone(&self.state),
94 config: self.config.clone(),
95 }))
96 }
97
98 pub fn subscribe(&self) -> broadcast::Receiver<RuntimeEvent> {
100 self.state.event_tx.subscribe()
101 }
102
103 pub async fn status(&self) -> http::RuntimeStatus {
105 let instances = self.state.instances.lock().await;
106 let backend = self.state.backend_name.lock().await;
107 let active = *self.state.active.lock().await;
108
109 http::RuntimeStatus {
110 active,
111 backend: backend.clone(),
112 instance_count: instances.len(),
113 }
114 }
115
116 pub async fn list_instances(&self) -> Result<Vec<Instance>, RuntimeError> {
118 let instances = self.state.instances.lock().await;
119 Ok(instances.values().cloned().collect())
120 }
121
122 pub async fn start_watching(&self, cancel: CancellationToken) -> Result<(), RuntimeError> {
132 let mut backend = self.create_backend()?;
133
134 backend.connect().await?;
135
136 *self.state.backend_name.lock().await = Some(backend.name().to_string());
138 *self.state.active.lock().await = true;
139
140 let existing = backend.list_instances().await?;
142 {
143 let mut instances = self.state.instances.lock().await;
144 for instance in &existing {
145 instances.insert(instance.id.clone(), instance.clone());
146 }
147 }
148
149 tracing::info!(
150 backend = backend.name(),
151 instances = existing.len(),
152 "Runtime adapter started, initial reconciliation complete"
153 );
154
155 for instance in existing {
157 let _ = self.state.event_tx.send(RuntimeEvent::Started(instance));
158 }
159
160 let state = Arc::clone(&self.state);
162 let (event_tx, mut event_rx) = mpsc::channel(256);
163
164 let watch_cancel = cancel.clone();
165 tokio::spawn(async move {
166 if let Err(e) = backend.watch(event_tx, watch_cancel).await {
167 tracing::error!(error = %e, "Runtime watch loop exited with error");
168 }
169 *state.active.lock().await = false;
170 tracing::info!("Runtime watch loop stopped");
171 });
172
173 let state = Arc::clone(&self.state);
175 tokio::spawn(async move {
176 while let Some(event) = event_rx.recv().await {
177 match &event {
178 RuntimeEvent::Started(instance) => {
179 let mut instances = state.instances.lock().await;
180 instances.insert(instance.id.clone(), instance.clone());
181 tracing::debug!(
182 name = %instance.name,
183 id = %instance.id,
184 "Instance tracked"
185 );
186 }
187 RuntimeEvent::Stopped { id, name } => {
188 let mut instances = state.instances.lock().await;
189 instances.remove(id.as_str());
190 tracing::debug!(name, id, "Instance untracked");
191 }
192 RuntimeEvent::Updated(instance) => {
193 let mut instances = state.instances.lock().await;
194 instances.insert(instance.id.clone(), instance.clone());
195 }
196 RuntimeEvent::BackendDisconnected { backend, reason } => {
197 tracing::warn!(backend, reason, "Backend disconnected");
198 }
199 RuntimeEvent::BackendReconnected { backend } => {
200 tracing::info!(backend, "Backend reconnected");
201 }
202 }
203 let _ = state.event_tx.send(event);
205 }
206 });
207
208 Ok(())
209 }
210
211 pub async fn capability_status(&self) -> CapabilityStatus {
213 let instances = self.state.instances.lock().await;
214 let backend = self.state.backend_name.lock().await;
215 let active = *self.state.active.lock().await;
216
217 CapabilityStatus {
218 name: "runtime".to_string(),
219 healthy: active,
220 summary: if active {
221 format!(
222 "{}: {} instances",
223 backend.as_deref().unwrap_or("none"),
224 instances.len()
225 )
226 } else {
227 "inactive".to_string()
228 },
229 }
230 }
231
232 fn create_backend(&self) -> Result<Box<dyn RuntimeBackend>, RuntimeError> {
234 match self.config.backend_kind {
235 RuntimeBackendKind::Docker => {
236 let backend = if let Some(ref path) = self.config.socket_path {
237 docker::DockerBackend::with_socket(path.clone())
238 } else {
239 docker::DockerBackend::new()
240 };
241 Ok(Box::new(backend))
242 }
243 RuntimeBackendKind::Podman => {
244 let backend = if let Some(ref path) = self.config.socket_path {
245 docker::DockerBackend::with_socket(path.clone())
246 } else {
247 docker::DockerBackend::podman()
248 };
249 Ok(Box::new(backend))
250 }
251 RuntimeBackendKind::Auto => self.auto_detect_backend(),
252 RuntimeBackendKind::Systemd => Err(RuntimeError::BackendUnavailable(
253 "systemd backend not yet implemented".into(),
254 )),
255 RuntimeBackendKind::Incus => Err(RuntimeError::BackendUnavailable(
256 "incus backend not yet implemented".into(),
257 )),
258 RuntimeBackendKind::Kubernetes => Err(RuntimeError::BackendUnavailable(
259 "kubernetes backend not yet implemented".into(),
260 )),
261 }
262 }
263
264 fn auto_detect_backend(&self) -> Result<Box<dyn RuntimeBackend>, RuntimeError> {
266 if docker::is_docker_available() {
267 tracing::info!("Auto-detected Docker runtime");
268 return Ok(Box::new(docker::DockerBackend::new()));
269 }
270
271 if docker::is_podman_available() {
272 tracing::info!("Auto-detected Podman runtime");
273 return Ok(Box::new(docker::DockerBackend::podman()));
274 }
275
276 Err(RuntimeError::BackendUnavailable(
277 "no supported runtime detected (checked: Docker, Podman)".into(),
278 ))
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285
286 #[tokio::test]
287 async fn runtime_core_default_status_is_inactive() {
288 let core = RuntimeCore::new(RuntimeConfig::default());
289 let status = core.status().await;
290 assert!(!status.active);
291 assert_eq!(status.instance_count, 0);
292 assert!(status.backend.is_none());
293 }
294
295 #[tokio::test]
296 async fn list_instances_empty_by_default() {
297 let core = RuntimeCore::new(RuntimeConfig::default());
298 let instances = core.list_instances().await.unwrap();
299 assert!(instances.is_empty());
300 }
301
302 #[test]
303 fn auto_backend_kind_display() {
304 assert_eq!(RuntimeBackendKind::Auto.to_string(), "auto");
305 assert_eq!(RuntimeBackendKind::Docker.to_string(), "docker");
306 }
307
308 #[test]
309 fn backend_kind_from_str() {
310 assert_eq!(
311 RuntimeBackendKind::from_str_loose("docker"),
312 Some(RuntimeBackendKind::Docker)
313 );
314 assert_eq!(
315 RuntimeBackendKind::from_str_loose("k8s"),
316 Some(RuntimeBackendKind::Kubernetes)
317 );
318 assert_eq!(RuntimeBackendKind::from_str_loose("unknown"), None);
319 }
320}