Skip to main content

koi_embedded/
lib.rs

1mod config;
2mod events;
3mod handle;
4pub(crate) mod http;
5
6use std::sync::Arc;
7
8use tokio::sync::broadcast;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11
12use koi_client::KoiClient;
13use koi_compose::bridges::{
14    AliasFeedbackBridge, CertmeshBridge, DnsBridge, MdnsBridge, ProxyBridge,
15};
16
17pub use config::{DnsConfigBuilder, KoiConfig, ServiceMode};
18pub use events::KoiEvent;
19pub use handle::{CertmeshHandle, DnsHandle, HealthHandle, KoiHandle, MdnsHandle, ProxyHandle};
20
21// Re-export types needed by downstream consumers (registration, discovery, DNS, proxy, health)
22pub use koi_common::firewall::{FirewallPort, FirewallProtocol};
23pub use koi_common::types::ServiceRecord;
24pub use koi_config::state::DnsEntry;
25pub use koi_health::{HealthCheck, HealthSnapshot, ServiceCheckKind};
26pub use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
27pub use koi_mdns::MdnsEvent;
28pub use koi_proxy::ProxyEntry;
29
30// Vault: general-purpose encrypted secret storage
31pub use koi_crypto::vault::{Vault, VaultError};
32
33// Runtime adapter re-exports
34pub use koi_runtime::{RuntimeBackendKind, RuntimeConfig};
35
36pub type Result<T> = std::result::Result<T, KoiError>;
37
38#[derive(Debug, thiserror::Error)]
39pub enum KoiError {
40    #[error("capability disabled: {0}")]
41    DisabledCapability(&'static str),
42    #[error("not available in client (remote) mode: {0}")]
43    RemoteUnsupported(&'static str),
44    #[error("mdns error: {0}")]
45    Mdns(#[from] koi_mdns::MdnsError),
46    #[error("dns error: {0}")]
47    Dns(#[from] koi_dns::DnsError),
48    #[error("health error: {0}")]
49    Health(#[from] koi_health::HealthError),
50    #[error("proxy error: {0}")]
51    Proxy(#[from] koi_proxy::ProxyError),
52    #[error("certmesh error: {0}")]
53    Certmesh(#[from] koi_certmesh::CertmeshError),
54    #[error("runtime error: {0}")]
55    Runtime(#[from] koi_runtime::RuntimeError),
56    #[error("client error: {0}")]
57    Client(#[from] koi_client::ClientError),
58    #[error("io error: {0}")]
59    Io(#[from] std::io::Error),
60}
61
62pub struct Builder {
63    config: KoiConfig,
64    event_handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
65    extra_firewall_ports: Vec<koi_common::firewall::FirewallPort>,
66}
67
68impl Builder {
69    pub fn new() -> Self {
70        Self {
71            config: KoiConfig::default(),
72            event_handler: None,
73            extra_firewall_ports: Vec::new(),
74        }
75    }
76
77    pub fn data_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
78        self.config.data_dir = Some(path.into());
79        self
80    }
81
82    pub fn service_endpoint(mut self, endpoint: impl Into<String>) -> Self {
83        self.config.service_endpoint = endpoint.into();
84        self
85    }
86
87    pub fn service_mode(mut self, mode: ServiceMode) -> Self {
88        self.config.service_mode = mode;
89        self
90    }
91
92    pub fn http(mut self, enabled: bool) -> Self {
93        self.config.http_enabled = enabled;
94        self
95    }
96
97    pub fn mdns(mut self, enabled: bool) -> Self {
98        self.config.mdns_enabled = enabled;
99        self
100    }
101
102    pub fn dns<F>(mut self, configure: F) -> Self
103    where
104        F: FnOnce(DnsConfigBuilder) -> DnsConfigBuilder,
105    {
106        let builder = DnsConfigBuilder::new(self.config.dns_config.clone());
107        self.config.dns_config = configure(builder).build();
108        self
109    }
110
111    pub fn dns_enabled(mut self, enabled: bool) -> Self {
112        self.config.dns_enabled = enabled;
113        self
114    }
115
116    pub fn dns_auto_start(mut self, enabled: bool) -> Self {
117        self.config.dns_auto_start = enabled;
118        self
119    }
120
121    pub fn health(mut self, enabled: bool) -> Self {
122        self.config.health_enabled = enabled;
123        self
124    }
125
126    pub fn health_auto_start(mut self, enabled: bool) -> Self {
127        self.config.health_auto_start = enabled;
128        self
129    }
130
131    pub fn certmesh(mut self, enabled: bool) -> Self {
132        self.config.certmesh_enabled = enabled;
133        self
134    }
135
136    pub fn proxy(mut self, enabled: bool) -> Self {
137        self.config.proxy_enabled = enabled;
138        self
139    }
140
141    pub fn proxy_auto_start(mut self, enabled: bool) -> Self {
142        self.config.proxy_auto_start = enabled;
143        self
144    }
145
146    pub fn udp(mut self, enabled: bool) -> Self {
147        self.config.udp_enabled = enabled;
148        self
149    }
150
151    /// Enable the runtime adapter with the specified backend kind.
152    ///
153    /// Runtime is opt-in for embedded (unlike daemon where capabilities
154    /// are enabled by default).
155    pub fn runtime(mut self, kind: koi_runtime::RuntimeBackendKind) -> Self {
156        self.config.runtime_enabled = true;
157        self.config.runtime_backend = kind;
158        self
159    }
160
161    /// Enable the runtime adapter with auto-detection.
162    pub fn runtime_auto(mut self) -> Self {
163        self.config.runtime_enabled = true;
164        self.config.runtime_backend = koi_runtime::RuntimeBackendKind::Auto;
165        self
166    }
167
168    /// Translate discovered runtime (container) lifecycle events into mDNS/DNS/health/proxy
169    /// entries — the same orchestrator the daemon runs. Opt-in; requires the runtime
170    /// adapter (`runtime`/`runtime_auto`) to be enabled to have any effect.
171    pub fn orchestrator(mut self, enabled: bool) -> Self {
172        self.config.orchestrator_enabled = enabled;
173        self
174    }
175
176    /// Run the certmesh role-driven background loops (renewal, standby roster sync, member
177    /// heartbeat, failover/announce) — the same loops the daemon runs. Opt-in; requires
178    /// certmesh (`certmesh`) to be enabled. A clustered embedded CA host wants this; a leaf
179    /// does not. Enrollment approval auto-denies (no interactive console).
180    pub fn certmesh_background(mut self, enabled: bool) -> Self {
181        self.config.certmesh_background_enabled = enabled;
182        self
183    }
184
185    pub fn http_port(mut self, port: u16) -> Self {
186        self.config.http_port = port;
187        self
188    }
189
190    pub fn dashboard(mut self, enabled: bool) -> Self {
191        self.config.dashboard_enabled = enabled;
192        self
193    }
194
195    pub fn api_docs(mut self, enabled: bool) -> Self {
196        self.config.api_docs_enabled = enabled;
197        self
198    }
199
200    pub fn mdns_browser(mut self, enabled: bool) -> Self {
201        self.config.mdns_browser_enabled = enabled;
202        self
203    }
204
205    pub fn announce_http(mut self, enabled: bool) -> Self {
206        self.config.announce_http = enabled;
207        self
208    }
209
210    pub fn events<F>(mut self, handler: F) -> Self
211    where
212        F: Fn(KoiEvent) + Send + Sync + 'static,
213    {
214        self.event_handler = Some(Arc::new(handler));
215        self
216    }
217
218    /// Register additional firewall ports that the host application needs
219    /// opened (e.g. Moss discovery UDP, HTTP API).  These are merged with
220    /// the ports from enabled Koi capabilities when `ensure_firewall_rules`
221    /// is called.
222    pub fn extra_firewall_ports(mut self, ports: Vec<koi_common::firewall::FirewallPort>) -> Self {
223        self.extra_firewall_ports = ports;
224        self
225    }
226
227    /// Best-effort ensure that Windows Firewall inbound-allow rules exist
228    /// for every port required by the enabled capabilities **plus** any
229    /// extra ports registered by the host application.
230    ///
231    /// * Idempotent — safe to call on every startup.
232    /// * Non-fatal  — logs warnings but never fails the build.
233    /// * No-op on non-Windows platforms.
234    ///
235    /// `prefix` is used in the firewall rule display-names
236    /// (e.g. `"Zen Garden"` → `"Zen Garden mDNS (UDP 5353)"`).
237    pub fn ensure_firewall_rules(self, prefix: &str) -> Self {
238        let mut all_ports = self.config.firewall_ports();
239        all_ports.extend(self.extra_firewall_ports.iter().cloned());
240
241        let count = koi_common::firewall::ensure_firewall_rules(prefix, &all_ports);
242        if count > 0 {
243            tracing::info!(count, "Firewall rules ensured");
244        }
245        self
246    }
247
248    pub fn build(self) -> Result<KoiEmbedded> {
249        Ok(KoiEmbedded {
250            config: self.config,
251            event_handler: self.event_handler,
252        })
253    }
254}
255
256impl Default for Builder {
257    fn default() -> Self {
258        Self::new()
259    }
260}
261
262pub struct KoiEmbedded {
263    config: KoiConfig,
264    event_handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
265}
266
267impl KoiEmbedded {
268    pub async fn start(self) -> Result<KoiHandle> {
269        let cancel = CancellationToken::new();
270        let (event_tx, _) = broadcast::channel(256);
271        let mut tasks: Vec<JoinHandle<()>> = Vec::new();
272
273        if self.config.service_mode != ServiceMode::EmbeddedOnly {
274            let client = Arc::new(KoiClient::new(&self.config.service_endpoint));
275            match self.config.service_mode {
276                ServiceMode::ClientOnly => {
277                    tokio::task::spawn_blocking({
278                        let client = Arc::clone(&client);
279                        move || client.health()
280                    })
281                    .await
282                    .map_err(map_join_error)??;
283                    return Ok(KoiHandle::new_remote(client, event_tx, cancel, tasks));
284                }
285                ServiceMode::Auto => {
286                    let health = tokio::task::spawn_blocking({
287                        let client = Arc::clone(&client);
288                        move || client.health()
289                    })
290                    .await;
291                    if matches!(health, Ok(Ok(()))) {
292                        return Ok(KoiHandle::new_remote(client, event_tx, cancel, tasks));
293                    }
294                }
295                ServiceMode::EmbeddedOnly => {}
296            }
297        }
298
299        let mdns = if self.config.mdns_enabled {
300            Some(Arc::new(koi_mdns::MdnsCore::with_cancel(cancel.clone())?))
301        } else {
302            None
303        };
304
305        let certmesh = if self.config.certmesh_enabled {
306            let data_dir = self.config.data_dir.clone();
307            tokio::task::spawn_blocking(move || {
308                koi_compose::cores::init_certmesh_core(data_dir.as_deref())
309            })
310            .await
311            .map_err(|e| std::io::Error::other(format!("certmesh init: {e}")))?
312        } else {
313            None
314        };
315
316        // Integration bridges for cross-domain communication
317        let mdns_bridge: Option<Arc<dyn koi_common::integration::MdnsSnapshot>> =
318            if let Some(ref core) = mdns {
319                Some(MdnsBridge::spawn(core.clone()).await)
320            } else {
321                None
322            };
323
324        let certmesh_bridge: Option<Arc<dyn koi_common::integration::CertmeshSnapshot>> =
325            certmesh.as_ref().map(|core| {
326                CertmeshBridge::new(core.clone())
327                    as Arc<dyn koi_common::integration::CertmeshSnapshot>
328            });
329
330        let alias_feedback: Option<Arc<dyn koi_common::integration::AliasFeedback>> =
331            certmesh.as_ref().map(|core| {
332                AliasFeedbackBridge::new(core.clone())
333                    as Arc<dyn koi_common::integration::AliasFeedback>
334            });
335
336        let dns = if self.config.dns_enabled {
337            let mut dns_config = self.config.dns_config.clone();
338            // Pin the state path to the data dir captured at construction time
339            // so it is immune to KOI_DATA_DIR env var races in parallel tests.
340            if let Some(dir) = &self.config.data_dir {
341                dns_config.state_path = Some(dir.join("state").join("dns.json"));
342            }
343            let core = koi_dns::DnsCore::new(
344                dns_config,
345                mdns_bridge.clone(),
346                certmesh_bridge.clone(),
347                alias_feedback,
348            )
349            .await?;
350            Some(Arc::new(koi_dns::DnsRuntime::new(core)))
351        } else {
352            None
353        };
354
355        let proxy = if self.config.proxy_enabled {
356            let core = if let Some(dir) = &self.config.data_dir {
357                Arc::new(koi_proxy::ProxyCore::with_data_dir(dir)?)
358            } else {
359                Arc::new(koi_proxy::ProxyCore::new()?)
360            };
361            Some(Arc::new(koi_proxy::ProxyRuntime::new(core)))
362        } else {
363            None
364        };
365
366        let dns_bridge: Option<Arc<dyn koi_common::integration::DnsProbe>> = dns
367            .as_ref()
368            .map(|rt| DnsBridge::new(rt.clone()) as Arc<dyn koi_common::integration::DnsProbe>);
369
370        let proxy_bridge: Option<Arc<dyn koi_common::integration::ProxySnapshot>> =
371            proxy.as_ref().map(|rt| {
372                ProxyBridge::new(rt.core()) as Arc<dyn koi_common::integration::ProxySnapshot>
373            });
374
375        let health = if self.config.health_enabled {
376            let core = koi_health::HealthCore::new(
377                mdns_bridge.clone(),
378                dns_bridge,
379                certmesh_bridge,
380                proxy_bridge,
381            )
382            .await;
383            Some(Arc::new(koi_health::HealthRuntime::new(Arc::new(core))))
384        } else {
385            None
386        };
387
388        if let Some(runtime) = &dns {
389            if self.config.dns_auto_start {
390                let _ = runtime.start().await?;
391            }
392        }
393
394        if let Some(runtime) = &health {
395            if self.config.health_auto_start {
396                let _ = runtime.start().await?;
397            }
398        }
399
400        if let Some(runtime) = &proxy {
401            if self.config.proxy_auto_start {
402                runtime.start_all().await?;
403            }
404        }
405
406        let udp = if self.config.udp_enabled {
407            Some(Arc::new(koi_udp::UdpRuntime::new(cancel.clone())))
408        } else {
409            None
410        };
411
412        let runtime = if self.config.runtime_enabled {
413            let config = koi_runtime::RuntimeConfig {
414                backend_kind: self.config.runtime_backend,
415                socket_path: None,
416            };
417            let core = Arc::new(koi_runtime::RuntimeCore::new(config));
418            match core.start_watching(cancel.clone()).await {
419                Ok(()) => {
420                    tracing::info!("Runtime adapter started");
421                    Some(core)
422                }
423                Err(e) => {
424                    tracing::warn!(error = %e, "Runtime backend unavailable — continuing without runtime adapter");
425                    None
426                }
427            }
428        } else {
429            None
430        };
431
432        // Build dashboard state if enabled
433        let dashboard_state = if self.config.dashboard_enabled && self.config.http_enabled {
434            let started_at = std::time::Instant::now();
435            let snap_mdns = mdns.clone();
436            let snap_certmesh = certmesh.clone();
437            let snap_dns = dns.clone();
438            let snap_health = health.clone();
439            let snap_proxy = proxy.clone();
440            let snap_udp = udp.clone();
441            let snap_runtime = runtime.clone();
442
443            let snapshot_fn: koi_dashboard::dashboard::SnapshotFn = Arc::new(move || {
444                let m = snap_mdns.clone();
445                let cm = snap_certmesh.clone();
446                let d = snap_dns.clone();
447                let h = snap_health.clone();
448                let p = snap_proxy.clone();
449                let u = snap_udp.clone();
450                let rt = snap_runtime.clone();
451                Box::pin(async move { build_embedded_snapshot(m, cm, d, h, p, u, rt).await })
452            });
453
454            let (dash_event_tx, _) = broadcast::channel(256);
455            let ds = koi_dashboard::dashboard::DashboardState {
456                identity: koi_dashboard::dashboard::DashboardIdentity {
457                    version: env!("CARGO_PKG_VERSION").to_string(),
458                    platform: std::env::consts::OS.to_string(),
459                },
460                mode: "embedded",
461                snapshot_fn,
462                event_tx: dash_event_tx.clone(),
463                started_at,
464            };
465
466            // Spawn the single unified event forwarder (superset incl. runtime),
467            // shared with the daemon — no more inline copy here.
468            tasks.push(koi_dashboard::forward::spawn_event_forwarder(
469                koi_dashboard::forward::ForwarderCores {
470                    mdns: mdns.clone(),
471                    certmesh: certmesh.clone(),
472                    dns: dns.clone(),
473                    health: health.clone(),
474                    proxy: proxy.clone(),
475                    runtime: runtime.clone(),
476                },
477                dash_event_tx,
478                cancel.clone(),
479            ));
480
481            Some(ds)
482        } else {
483            None
484        };
485
486        // Build browser state if enabled (requires mDNS). The LAN-wide meta-browse is
487        // lazy — it starts on the first browser request, not here.
488        let browser_state = if self.config.mdns_browser_enabled && self.config.http_enabled {
489            if let Some(ref mdns_core) = mdns {
490                Some(koi_dashboard::browser::build_state(
491                    mdns_core.clone(),
492                    cancel.clone(),
493                ))
494            } else {
495                tracing::warn!("mdns_browser enabled but mDNS is disabled — skipping browser");
496                None
497            }
498        } else {
499            None
500        };
501
502        // Spawn embedded HTTP adapter if enabled
503        if self.config.http_enabled {
504            let http_port = self.config.http_port;
505            let http_cancel = cancel.clone();
506            let http_mdns = mdns.clone();
507            let http_dns = dns.clone();
508            let http_health = health.clone();
509            let http_certmesh = certmesh.clone();
510            let http_proxy = proxy.clone();
511            let http_udp = udp.clone();
512            let http_runtime = runtime.clone();
513            let http_api_docs = self.config.api_docs_enabled;
514            tasks.push(tokio::spawn(async move {
515                http::serve(
516                    http_port,
517                    http_mdns,
518                    http_dns,
519                    http_health,
520                    http_certmesh,
521                    http_proxy,
522                    http_udp,
523                    http_runtime,
524                    dashboard_state,
525                    browser_state,
526                    http_api_docs,
527                    http_cancel,
528                )
529                .await;
530            }));
531        }
532
533        // ── HTTP mDNS announcement (opt-in) ──
534        let http_announce_id =
535            if self.config.announce_http && self.config.http_enabled && self.config.mdns_enabled {
536                if let Some(ref mdns_core) = mdns {
537                    let hostname = hostname::get()
538                        .ok()
539                        .and_then(|os| os.into_string().ok())
540                        .unwrap_or_else(|| "unknown".to_string());
541
542                    let mut txt = std::collections::HashMap::new();
543                    txt.insert("path".to_string(), "/".to_string());
544                    txt.insert("version".to_string(), env!("CARGO_PKG_VERSION").to_string());
545                    txt.insert("api".to_string(), "v1".to_string());
546                    txt.insert(
547                        "dashboard".to_string(),
548                        self.config.dashboard_enabled.to_string(),
549                    );
550
551                    let payload = koi_mdns::protocol::RegisterPayload {
552                        name: format!("Koi ({hostname})"),
553                        service_type: "_http._tcp".to_string(),
554                        port: self.config.http_port,
555                        ip: None,
556                        lease_secs: None,
557                        txt,
558                    };
559                    match mdns_core.register(payload) {
560                        Ok(result) => {
561                            tracing::info!(
562                                id = %result.id,
563                                port = self.config.http_port,
564                                "HTTP server announced via mDNS"
565                            );
566                            Some(result.id)
567                        }
568                        Err(e) => {
569                            tracing::warn!(error = %e, "Failed to announce HTTP server via mDNS");
570                            None
571                        }
572                    }
573                } else {
574                    None
575                }
576            } else {
577                None
578            };
579
580        // ── Domain event → host KoiEvent forwarders ──
581        // One shared spawn helper instead of six copies of the streaming select! skeleton.
582        // Each domain core is present only when its capability is enabled, so `if let Some`
583        // is the only gate needed.
584        if let Some(core) = &mdns {
585            spawn_event_mapper(
586                core.subscribe(),
587                map_mdns_event,
588                event_tx.clone(),
589                self.event_handler.clone(),
590                cancel.clone(),
591                &mut tasks,
592            );
593        }
594        if let Some(runtime) = &health {
595            spawn_event_mapper(
596                runtime.core().subscribe(),
597                |e| Some(map_health_event(e)),
598                event_tx.clone(),
599                self.event_handler.clone(),
600                cancel.clone(),
601                &mut tasks,
602            );
603        }
604        if let Some(runtime) = &dns {
605            spawn_event_mapper(
606                runtime.core().subscribe(),
607                |e| Some(map_dns_event(e)),
608                event_tx.clone(),
609                self.event_handler.clone(),
610                cancel.clone(),
611                &mut tasks,
612            );
613        }
614        if let Some(core) = &certmesh {
615            spawn_event_mapper(
616                core.subscribe(),
617                |e| Some(map_certmesh_event(e)),
618                event_tx.clone(),
619                self.event_handler.clone(),
620                cancel.clone(),
621                &mut tasks,
622            );
623        }
624        if let Some(runtime_proxy) = &proxy {
625            spawn_event_mapper(
626                runtime_proxy.core().subscribe(),
627                |e| Some(map_proxy_event(e)),
628                event_tx.clone(),
629                self.event_handler.clone(),
630                cancel.clone(),
631                &mut tasks,
632            );
633        }
634        if let Some(runtime_core) = &runtime {
635            spawn_event_mapper(
636                runtime_core.subscribe(),
637                map_runtime_event,
638                event_tx.clone(),
639                self.event_handler.clone(),
640                cancel.clone(),
641                &mut tasks,
642            );
643        }
644
645        // ── Runtime orchestrator (opt-in) ──
646        // Translate container lifecycle events into mDNS/DNS/health/proxy entries — the
647        // same orchestrator the daemon runs. Off by default; a leaf host only wants events.
648        if self.config.orchestrator_enabled {
649            if let Some(ref runtime_core) = runtime {
650                tasks.push(koi_compose::orchestrator::spawn_orchestrator(
651                    runtime_core,
652                    koi_compose::orchestrator::OrchestrationTargets {
653                        mdns: mdns.clone(),
654                        dns: dns.clone(),
655                        health: health.clone(),
656                        proxy: proxy.clone(),
657                    },
658                    cancel.clone(),
659                ));
660            } else {
661                tracing::warn!(
662                    "orchestrator enabled but the runtime adapter is not — skipping orchestrator"
663                );
664            }
665        }
666
667        // ── Certmesh background tasks (opt-in) ──
668        // Renewal / roster sync / heartbeat / failover, same as the daemon. Off by default;
669        // a clustered embedded CA host opts in. No console, so enrollment auto-denies.
670        if self.config.certmesh_background_enabled {
671            if let Some(ref certmesh_core) = certmesh {
672                koi_compose::certmesh::spawn_enrollment_approval(
673                    certmesh_core,
674                    koi_compose::certmesh::deny_and_log_decider(),
675                    &cancel,
676                    &mut tasks,
677                )
678                .await;
679                koi_compose::certmesh::spawn_certmesh_background_tasks(
680                    certmesh_core,
681                    mdns.clone(),
682                    self.config.http_port,
683                    &cancel,
684                    &mut tasks,
685                );
686            } else {
687                tracing::warn!(
688                    "certmesh_background enabled but certmesh is not — skipping certmesh loops"
689                );
690            }
691        }
692
693        Ok(KoiHandle::new_embedded(
694            mdns,
695            dns,
696            health,
697            certmesh,
698            proxy,
699            udp,
700            runtime,
701            self.config.data_dir.clone(),
702            event_tx,
703            cancel,
704            tasks,
705            http_announce_id,
706        ))
707    }
708}
709
710fn map_mdns_event(event: MdnsEvent) -> Option<KoiEvent> {
711    match event {
712        MdnsEvent::Found(record) => Some(KoiEvent::MdnsFound(record)),
713        MdnsEvent::Resolved(record) => Some(KoiEvent::MdnsResolved(record)),
714        MdnsEvent::Removed { name, service_type } => {
715            Some(KoiEvent::MdnsRemoved { name, service_type })
716        }
717    }
718}
719
720fn map_health_event(event: koi_health::HealthEvent) -> KoiEvent {
721    match event {
722        koi_health::HealthEvent::StatusChanged { name, status } => {
723            KoiEvent::HealthChanged { name, status }
724        }
725    }
726}
727
728fn map_dns_event(event: koi_dns::DnsEvent) -> KoiEvent {
729    match event {
730        koi_dns::DnsEvent::EntryUpdated { name, ip } => KoiEvent::DnsEntryUpdated { name, ip },
731        koi_dns::DnsEvent::EntryRemoved { name } => KoiEvent::DnsEntryRemoved { name },
732    }
733}
734
735fn map_certmesh_event(event: koi_certmesh::CertmeshEvent) -> KoiEvent {
736    match event {
737        koi_certmesh::CertmeshEvent::MemberJoined {
738            hostname,
739            fingerprint,
740        } => KoiEvent::CertmeshMemberJoined {
741            hostname,
742            fingerprint,
743        },
744        koi_certmesh::CertmeshEvent::MemberRevoked { hostname } => {
745            KoiEvent::CertmeshMemberRevoked { hostname }
746        }
747        koi_certmesh::CertmeshEvent::Destroyed => KoiEvent::CertmeshDestroyed,
748    }
749}
750
751fn map_proxy_event(event: koi_proxy::ProxyEvent) -> KoiEvent {
752    match event {
753        koi_proxy::ProxyEvent::EntryUpdated { entry } => KoiEvent::ProxyEntryUpdated { entry },
754        koi_proxy::ProxyEvent::EntryRemoved { name } => KoiEvent::ProxyEntryRemoved { name },
755    }
756}
757
758fn map_runtime_event(event: koi_runtime::RuntimeEvent) -> Option<KoiEvent> {
759    match event {
760        koi_runtime::RuntimeEvent::Started(instance) => Some(KoiEvent::RuntimeInstanceStarted {
761            name: instance.name,
762            backend: instance.backend,
763        }),
764        koi_runtime::RuntimeEvent::Stopped { name, .. } => {
765            Some(KoiEvent::RuntimeInstanceStopped { name })
766        }
767        // Updated, BackendDisconnected, BackendReconnected are operational events
768        // not surfaced as KoiEvents (dashboard SSE covers them)
769        _ => None,
770    }
771}
772
773/// Spawn a task that maps a domain's broadcast events into the host `KoiEvent` stream until
774/// cancellation. One shared skeleton replaces the six near-identical per-domain `select!`
775/// loops that `start()` used to inline (the charter calls out duplicating that skeleton).
776///
777/// `map` returns `None` to drop an event (e.g. mDNS `Found`, which has no host-facing
778/// variant); event types that always map wrap their mapper as `|e| Some(map_x(e))`.
779fn spawn_event_mapper<E, F>(
780    mut rx: broadcast::Receiver<E>,
781    map: F,
782    tx: broadcast::Sender<KoiEvent>,
783    handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
784    cancel: CancellationToken,
785    tasks: &mut Vec<JoinHandle<()>>,
786) where
787    E: Clone + Send + 'static,
788    F: Fn(E) -> Option<KoiEvent> + Send + 'static,
789{
790    tasks.push(tokio::spawn(async move {
791        loop {
792            tokio::select! {
793                _ = cancel.cancelled() => break,
794                msg = rx.recv() => {
795                    let Ok(event) = msg else { continue; };
796                    if let Some(mapped) = map(event) {
797                        emit_event(&tx, handler.as_ref(), mapped);
798                    }
799                }
800            }
801        }
802    }));
803}
804
805fn emit_event(
806    tx: &broadcast::Sender<KoiEvent>,
807    handler: Option<&Arc<dyn Fn(KoiEvent) + Send + Sync>>,
808    event: KoiEvent,
809) {
810    if let Some(handler) = handler {
811        handler(event.clone());
812    }
813    let _ = tx.send(event);
814}
815
816pub(crate) fn map_join_error(err: tokio::task::JoinError) -> KoiError {
817    KoiError::Io(std::io::Error::other(err.to_string()))
818}
819
820/// Build a dashboard snapshot from the embedded domain cores.
821async fn build_embedded_snapshot(
822    mdns: Option<Arc<koi_mdns::MdnsCore>>,
823    certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
824    dns: Option<Arc<koi_dns::DnsRuntime>>,
825    health: Option<Arc<koi_health::HealthRuntime>>,
826    proxy: Option<Arc<koi_proxy::ProxyRuntime>>,
827    udp: Option<Arc<koi_udp::UdpRuntime>>,
828    runtime: Option<Arc<koi_runtime::RuntimeCore>>,
829) -> serde_json::Value {
830    // The capability ladder is assembled once in koi-compose, shared with `/v1/status` and
831    // the dashboard snapshot. The embedded snapshot includes `enabled` like the dashboard.
832    let cores = koi_compose::cores::Cores {
833        mdns,
834        certmesh,
835        dns,
836        health,
837        proxy,
838        udp,
839        runtime,
840    };
841    let capabilities: Vec<serde_json::Value> = koi_compose::status::assemble_capabilities(&cores)
842        .await
843        .into_iter()
844        .map(|c| {
845            serde_json::json!({
846                "name": c.status.name,
847                "enabled": c.enabled,
848                "healthy": c.status.healthy,
849                "summary": c.status.summary,
850            })
851        })
852        .collect();
853    serde_json::json!({ "capabilities": capabilities })
854}
855
856#[cfg(test)]
857mod tests {
858    use super::*;
859    use koi_common::types::ServiceRecord;
860    use std::collections::HashMap;
861
862    fn sample_record() -> ServiceRecord {
863        ServiceRecord {
864            name: "Test Service".to_string(),
865            service_type: "_http._tcp".to_string(),
866            host: Some("host.local".to_string()),
867            ip: Some("10.0.0.1".to_string()),
868            port: Some(8080),
869            txt: HashMap::new(),
870        }
871    }
872
873    // ── KoiError Display ───────────────────────────────────────────
874
875    #[test]
876    fn koi_error_disabled_capability_display() {
877        let err = KoiError::DisabledCapability("mdns");
878        assert_eq!(err.to_string(), "capability disabled: mdns");
879    }
880
881    #[test]
882    fn koi_error_io_from_impl() {
883        let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "file missing");
884        let err: KoiError = io_err.into();
885        assert!(matches!(err, KoiError::Io(_)));
886        assert!(err.to_string().contains("file missing"));
887    }
888
889    #[test]
890    fn koi_error_debug_does_not_panic() {
891        let err = KoiError::DisabledCapability("proxy");
892        let debug = format!("{err:?}");
893        assert!(debug.contains("DisabledCapability"));
894    }
895
896    // ── certmesh data-dir SSOT (custom data_dir honored end-to-end) ──
897
898    #[tokio::test]
899    async fn init_certmesh_core_honors_custom_data_dir_end_to_end() {
900        // The point of the path-SSOT refactor: a host that injects its own
901        // data_dir gets the CA created, discovered, and unlocked under THAT
902        // dir — never a split between the injected dir and an ambient default.
903        let base = koi_common::test::ensure_data_dir("koi-embedded-datadir-tests");
904        let data_dir = base.join("custom-pond");
905        let paths = koi_certmesh::CertmeshPaths::with_data_dir(data_dir.clone());
906
907        // Fresh machine: no CA yet. The uninitialized early-return must still
908        // carry the injected paths — this is the regression the dropped-paths
909        // bug (uninitialized branches dropping `paths`) used to fail.
910        let fresh =
911            koi_compose::cores::init_certmesh_core(Some(&data_dir)).expect("uninitialized core");
912        assert_eq!(
913            fresh.paths().data_dir(),
914            data_dir.as_path(),
915            "uninitialized core must keep the injected data_dir"
916        );
917
918        // Create a CA + roster UNDER the injected dir.
919        koi_certmesh::ca::create_ca("pond-pass-strong", &[7u8; 32], &paths)
920            .expect("create CA under injected dir");
921        let roster = koi_certmesh::roster::Roster::new(
922            koi_certmesh::profiles::TrustProfile::MyOrganization,
923            Some("ops".to_string()),
924        );
925        koi_certmesh::roster::save_roster(&roster, &paths.roster_path())
926            .expect("save roster under injected dir");
927
928        // Reopen on the same injected dir: the CA is discovered there and the
929        // core unlocks from it — proving the data root is honored end-to-end.
930        let reopened =
931            koi_compose::cores::init_certmesh_core(Some(&data_dir)).expect("locked core");
932        assert_eq!(reopened.paths().data_dir(), data_dir.as_path());
933        reopened
934            .unlock("pond-pass-strong")
935            .await
936            .expect("unlock CA from the injected data_dir");
937    }
938
939    // ── map_mdns_event ─────────────────────────────────────────────
940
941    #[test]
942    fn map_mdns_found() {
943        let record = sample_record();
944        let event = koi_mdns::MdnsEvent::Found(record.clone());
945        let mapped = map_mdns_event(event);
946        assert!(mapped.is_some());
947        match mapped.unwrap() {
948            KoiEvent::MdnsFound(r) => assert_eq!(r.name, "Test Service"),
949            other => panic!("expected MdnsFound, got {other:?}"),
950        }
951    }
952
953    #[test]
954    fn map_mdns_resolved() {
955        let record = sample_record();
956        let event = koi_mdns::MdnsEvent::Resolved(record);
957        let mapped = map_mdns_event(event);
958        assert!(mapped.is_some());
959        match mapped.unwrap() {
960            KoiEvent::MdnsResolved(r) => {
961                assert_eq!(r.port, Some(8080));
962                assert_eq!(r.service_type, "_http._tcp");
963            }
964            other => panic!("expected MdnsResolved, got {other:?}"),
965        }
966    }
967
968    #[test]
969    fn map_mdns_removed() {
970        let event = koi_mdns::MdnsEvent::Removed {
971            name: "Gone Service".to_string(),
972            service_type: "_http._tcp".to_string(),
973        };
974        let mapped = map_mdns_event(event);
975        assert!(mapped.is_some());
976        match mapped.unwrap() {
977            KoiEvent::MdnsRemoved { name, service_type } => {
978                assert_eq!(name, "Gone Service");
979                assert_eq!(service_type, "_http._tcp");
980            }
981            other => panic!("expected MdnsRemoved, got {other:?}"),
982        }
983    }
984
985    // ── map_health_event ───────────────────────────────────────────
986
987    #[test]
988    fn map_health_status_changed_up() {
989        let event = koi_health::HealthEvent::StatusChanged {
990            name: "api".to_string(),
991            status: koi_health::HealthStatus::Up,
992        };
993        let mapped = map_health_event(event);
994        match mapped {
995            KoiEvent::HealthChanged { name, status } => {
996                assert_eq!(name, "api");
997                assert!(matches!(status, koi_health::HealthStatus::Up));
998            }
999            other => panic!("expected HealthChanged, got {other:?}"),
1000        }
1001    }
1002
1003    #[test]
1004    fn map_health_status_changed_down() {
1005        let event = koi_health::HealthEvent::StatusChanged {
1006            name: "db".to_string(),
1007            status: koi_health::HealthStatus::Down,
1008        };
1009        let mapped = map_health_event(event);
1010        match mapped {
1011            KoiEvent::HealthChanged { name, status } => {
1012                assert_eq!(name, "db");
1013                assert!(matches!(status, koi_health::HealthStatus::Down));
1014            }
1015            other => panic!("expected HealthChanged, got {other:?}"),
1016        }
1017    }
1018
1019    // ── map_dns_event ──────────────────────────────────────────────
1020
1021    #[test]
1022    fn map_dns_entry_updated() {
1023        let event = koi_dns::DnsEvent::EntryUpdated {
1024            name: "grafana".to_string(),
1025            ip: "10.0.0.5".to_string(),
1026        };
1027        let mapped = map_dns_event(event);
1028        match mapped {
1029            KoiEvent::DnsEntryUpdated { name, ip } => {
1030                assert_eq!(name, "grafana");
1031                assert_eq!(ip, "10.0.0.5");
1032            }
1033            other => panic!("expected DnsEntryUpdated, got {other:?}"),
1034        }
1035    }
1036
1037    #[test]
1038    fn map_dns_entry_removed() {
1039        let event = koi_dns::DnsEvent::EntryRemoved {
1040            name: "old-host".to_string(),
1041        };
1042        let mapped = map_dns_event(event);
1043        match mapped {
1044            KoiEvent::DnsEntryRemoved { name } => {
1045                assert_eq!(name, "old-host");
1046            }
1047            other => panic!("expected DnsEntryRemoved, got {other:?}"),
1048        }
1049    }
1050
1051    // ── map_certmesh_event ─────────────────────────────────────────
1052
1053    #[test]
1054    fn map_certmesh_member_joined() {
1055        let event = koi_certmesh::CertmeshEvent::MemberJoined {
1056            hostname: "node-a".to_string(),
1057            fingerprint: "sha256:abc".to_string(),
1058        };
1059        let mapped = map_certmesh_event(event);
1060        match mapped {
1061            KoiEvent::CertmeshMemberJoined {
1062                hostname,
1063                fingerprint,
1064            } => {
1065                assert_eq!(hostname, "node-a");
1066                assert_eq!(fingerprint, "sha256:abc");
1067            }
1068            other => panic!("expected CertmeshMemberJoined, got {other:?}"),
1069        }
1070    }
1071
1072    #[test]
1073    fn map_certmesh_member_revoked() {
1074        let event = koi_certmesh::CertmeshEvent::MemberRevoked {
1075            hostname: "node-b".to_string(),
1076        };
1077        let mapped = map_certmesh_event(event);
1078        match mapped {
1079            KoiEvent::CertmeshMemberRevoked { hostname } => {
1080                assert_eq!(hostname, "node-b");
1081            }
1082            other => panic!("expected CertmeshMemberRevoked, got {other:?}"),
1083        }
1084    }
1085
1086    #[test]
1087    fn map_certmesh_destroyed() {
1088        let event = koi_certmesh::CertmeshEvent::Destroyed;
1089        let mapped = map_certmesh_event(event);
1090        assert!(matches!(mapped, KoiEvent::CertmeshDestroyed));
1091    }
1092
1093    // ── map_proxy_event ────────────────────────────────────────────
1094
1095    #[test]
1096    fn map_proxy_entry_updated() {
1097        let entry = koi_proxy::ProxyEntry {
1098            name: "web".to_string(),
1099            listen_port: 443,
1100            backend: "http://localhost:3000".to_string(),
1101            allow_remote: true,
1102        };
1103        let event = koi_proxy::ProxyEvent::EntryUpdated {
1104            entry: entry.clone(),
1105        };
1106        let mapped = map_proxy_event(event);
1107        match mapped {
1108            KoiEvent::ProxyEntryUpdated { entry } => {
1109                assert_eq!(entry.name, "web");
1110                assert_eq!(entry.listen_port, 443);
1111                assert!(entry.allow_remote);
1112            }
1113            other => panic!("expected ProxyEntryUpdated, got {other:?}"),
1114        }
1115    }
1116
1117    #[test]
1118    fn map_proxy_entry_removed() {
1119        let event = koi_proxy::ProxyEvent::EntryRemoved {
1120            name: "old-proxy".to_string(),
1121        };
1122        let mapped = map_proxy_event(event);
1123        match mapped {
1124            KoiEvent::ProxyEntryRemoved { name } => {
1125                assert_eq!(name, "old-proxy");
1126            }
1127            other => panic!("expected ProxyEntryRemoved, got {other:?}"),
1128        }
1129    }
1130
1131    // ── map_join_error ─────────────────────────────────────────────
1132
1133    #[test]
1134    fn map_join_error_produces_io_error() {
1135        // We can't easily create a real JoinError, but we can test the function
1136        // signature exists and the KoiError::Io variant wraps correctly.
1137        let io_err = std::io::Error::other("simulated join error");
1138        let koi_err = KoiError::Io(io_err);
1139        assert!(koi_err.to_string().contains("simulated join error"));
1140    }
1141
1142    // ── Builder defaults ───────────────────────────────────────────
1143
1144    #[test]
1145    fn builder_default_config() {
1146        let builder = Builder::new();
1147        let embedded = builder.build().expect("build should succeed");
1148        assert!(embedded.config.mdns_enabled);
1149        assert!(!embedded.config.http_enabled);
1150        assert_eq!(embedded.config.http_port, 5641);
1151    }
1152
1153    #[test]
1154    fn builder_default_trait() {
1155        let builder = Builder::default();
1156        let embedded = builder.build().expect("build should succeed");
1157        assert_eq!(embedded.config.service_endpoint, "http://127.0.0.1:5641");
1158    }
1159
1160    #[test]
1161    fn builder_fluent_overrides() {
1162        let embedded = Builder::new()
1163            .http(true)
1164            .mdns(false)
1165            .dns_enabled(false)
1166            .health(true)
1167            .certmesh(true)
1168            .proxy(true)
1169            .udp(true)
1170            .http_port(9000)
1171            .dashboard(true)
1172            .api_docs(true)
1173            .mdns_browser(true)
1174            .announce_http(true)
1175            .dns_auto_start(true)
1176            .health_auto_start(true)
1177            .proxy_auto_start(true)
1178            .service_endpoint("http://10.0.0.1:8080")
1179            .service_mode(ServiceMode::EmbeddedOnly)
1180            .data_dir("/tmp/koi-test")
1181            .build()
1182            .expect("build should succeed");
1183
1184        assert!(embedded.config.http_enabled);
1185        assert!(!embedded.config.mdns_enabled);
1186        assert!(!embedded.config.dns_enabled);
1187        assert!(embedded.config.health_enabled);
1188        assert!(embedded.config.certmesh_enabled);
1189        assert!(embedded.config.proxy_enabled);
1190        assert!(embedded.config.udp_enabled);
1191        assert_eq!(embedded.config.http_port, 9000);
1192        assert!(embedded.config.dashboard_enabled);
1193        assert!(embedded.config.api_docs_enabled);
1194        assert!(embedded.config.mdns_browser_enabled);
1195        assert!(embedded.config.announce_http);
1196        assert!(embedded.config.dns_auto_start);
1197        assert!(embedded.config.health_auto_start);
1198        assert!(embedded.config.proxy_auto_start);
1199        assert_eq!(embedded.config.service_endpoint, "http://10.0.0.1:8080");
1200        assert_eq!(embedded.config.service_mode, ServiceMode::EmbeddedOnly);
1201        assert_eq!(
1202            embedded.config.data_dir,
1203            Some(std::path::PathBuf::from("/tmp/koi-test"))
1204        );
1205    }
1206
1207    #[test]
1208    fn orchestrator_and_certmesh_background_are_opt_in() {
1209        // Default: both off (a leaf embedded host only wants the event stream).
1210        let default_cfg = Builder::new().build().expect("build should succeed");
1211        assert!(!default_cfg.config.orchestrator_enabled);
1212        assert!(!default_cfg.config.certmesh_background_enabled);
1213
1214        // Opt-in: both on when requested.
1215        let opted = Builder::new()
1216            .runtime_auto()
1217            .orchestrator(true)
1218            .certmesh(true)
1219            .certmesh_background(true)
1220            .build()
1221            .expect("build should succeed");
1222        assert!(opted.config.orchestrator_enabled);
1223        assert!(opted.config.certmesh_background_enabled);
1224    }
1225
1226    #[test]
1227    fn builder_dns_configure_closure() {
1228        let embedded = Builder::new()
1229            .dns(|b| b.port(5353).zone("home").local_ttl(120))
1230            .build()
1231            .expect("build should succeed");
1232
1233        assert_eq!(embedded.config.dns_config.port, 5353);
1234        assert_eq!(embedded.config.dns_config.zone, "home");
1235        assert_eq!(embedded.config.dns_config.local_ttl, 120);
1236    }
1237
1238    #[test]
1239    fn builder_event_handler() {
1240        use std::sync::atomic::{AtomicBool, Ordering};
1241        let called = Arc::new(AtomicBool::new(false));
1242        let called_clone = called.clone();
1243
1244        let embedded = Builder::new()
1245            .events(move |_event| {
1246                called_clone.store(true, Ordering::SeqCst);
1247            })
1248            .build()
1249            .expect("build should succeed");
1250
1251        assert!(embedded.event_handler.is_some());
1252    }
1253
1254    #[test]
1255    fn builder_extra_firewall_ports() {
1256        use koi_common::firewall::{FirewallPort, FirewallProtocol};
1257        let extra = vec![FirewallPort::new("Custom", FirewallProtocol::Tcp, 12345)];
1258        let _builder = Builder::new().extra_firewall_ports(extra);
1259        // Just verifying the method compiles and does not panic.
1260    }
1261
1262    // ── Result type alias ──────────────────────────────────────────
1263
1264    #[test]
1265    fn result_type_works_with_ok() {
1266        let result: Result<i32> = Ok(42);
1267        assert_eq!(result.unwrap(), 42);
1268    }
1269
1270    #[test]
1271    fn result_type_works_with_err() {
1272        let result: Result<i32> = Err(KoiError::DisabledCapability("test"));
1273        assert!(result.is_err());
1274    }
1275}