1mod config;
2mod events;
3mod handle;
4pub(crate) mod http;
5mod serve;
6pub mod testkit;
7
8use std::sync::Arc;
9
10use tokio::sync::broadcast;
11use tokio::task::JoinHandle;
12use tokio_util::sync::CancellationToken;
13
14use koi_client::KoiClient;
15use koi_compose::bridges::{
16 AliasFeedbackBridge, CertmeshBridge, DnsBridge, MdnsBridge, ProxyBridge,
17};
18
19pub use config::{DnsConfigBuilder, KoiConfig, ServiceMode};
20pub use events::KoiEvent;
21pub use handle::{
22 CertmeshHandle, DnsHandle, HealthHandle, KoiHandle, MdnsHandle, ProxyHandle,
23 DEFAULT_DISCOVER_WINDOW,
24};
25
26pub use koi_common::firewall::{FirewallPort, FirewallProtocol};
28pub use koi_certmesh::PeerClient;
30pub use koi_common::diagnosis::{CheckStatus, DiagnosisCheck, DiagnosisStatus, TrustDiagnosis};
31pub use koi_common::peer::Peer;
32pub use koi_common::posture::{Posture, PostureLevel};
33pub use koi_common::sealed::{Confidentiality, Opened, Sealed};
34pub use koi_common::types::ServiceRecord;
35pub use koi_config::state::DnsEntry;
36pub use koi_health::{HealthCheck, HealthSnapshot, ServiceCheckKind};
37pub use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
38pub use koi_mdns::MdnsEvent;
39pub use koi_proxy::ProxyEntry;
40pub use serve::serve_adaptive;
42
43pub use koi_crypto::vault::{Vault, VaultError};
45
46pub use koi_runtime::{RuntimeBackendKind, RuntimeConfig};
48
49pub type Result<T> = std::result::Result<T, KoiError>;
50
51#[derive(Debug, thiserror::Error)]
52pub enum KoiError {
53 #[error("capability disabled: {0}")]
54 DisabledCapability(&'static str),
55 #[error("not available in client (remote) mode: {0}")]
56 RemoteUnsupported(&'static str),
57 #[error("mdns error: {0}")]
58 Mdns(#[from] koi_mdns::MdnsError),
59 #[error("dns error: {0}")]
60 Dns(#[from] koi_dns::DnsError),
61 #[error("health error: {0}")]
62 Health(#[from] koi_health::HealthError),
63 #[error("proxy error: {0}")]
64 Proxy(#[from] koi_proxy::ProxyError),
65 #[error("certmesh error: {0}")]
66 Certmesh(#[from] koi_certmesh::CertmeshError),
67 #[error("runtime error: {0}")]
68 Runtime(#[from] koi_runtime::RuntimeError),
69 #[error("client error: {0}")]
70 Client(#[from] koi_client::ClientError),
71 #[error("io error: {0}")]
72 Io(#[from] std::io::Error),
73}
74
75pub struct Builder {
76 config: KoiConfig,
77 event_handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
78 extra_firewall_ports: Vec<koi_common::firewall::FirewallPort>,
79}
80
81impl Builder {
82 pub fn new() -> Self {
83 Self {
84 config: KoiConfig::default(),
85 event_handler: None,
86 extra_firewall_ports: Vec::new(),
87 }
88 }
89
90 pub fn data_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
91 self.config.data_dir = Some(path.into());
92 self
93 }
94
95 pub fn service_endpoint(mut self, endpoint: impl Into<String>) -> Self {
96 self.config.service_endpoint = endpoint.into();
97 self
98 }
99
100 pub fn service_mode(mut self, mode: ServiceMode) -> Self {
101 self.config.service_mode = mode;
102 self
103 }
104
105 pub fn http(mut self, enabled: bool) -> Self {
106 self.config.http_enabled = enabled;
107 self
108 }
109
110 pub fn mdns(mut self, enabled: bool) -> Self {
111 self.config.mdns_enabled = enabled;
112 self
113 }
114
115 pub fn dns<F>(mut self, configure: F) -> Self
116 where
117 F: FnOnce(DnsConfigBuilder) -> DnsConfigBuilder,
118 {
119 let builder = DnsConfigBuilder::new(self.config.dns_config.clone());
120 self.config.dns_config = configure(builder).build();
121 self
122 }
123
124 pub fn dns_enabled(mut self, enabled: bool) -> Self {
125 self.config.dns_enabled = enabled;
126 self
127 }
128
129 pub fn dns_auto_start(mut self, enabled: bool) -> Self {
130 self.config.dns_auto_start = enabled;
131 self
132 }
133
134 pub fn health(mut self, enabled: bool) -> Self {
135 self.config.health_enabled = enabled;
136 self
137 }
138
139 pub fn health_auto_start(mut self, enabled: bool) -> Self {
140 self.config.health_auto_start = enabled;
141 self
142 }
143
144 pub fn certmesh(mut self, enabled: bool) -> Self {
145 self.config.certmesh_enabled = enabled;
146 self
147 }
148
149 pub fn proxy(mut self, enabled: bool) -> Self {
150 self.config.proxy_enabled = enabled;
151 self
152 }
153
154 pub fn proxy_auto_start(mut self, enabled: bool) -> Self {
155 self.config.proxy_auto_start = enabled;
156 self
157 }
158
159 pub fn udp(mut self, enabled: bool) -> Self {
160 self.config.udp_enabled = enabled;
161 self
162 }
163
164 pub fn runtime(mut self, kind: koi_runtime::RuntimeBackendKind) -> Self {
169 self.config.runtime_enabled = true;
170 self.config.runtime_backend = kind;
171 self
172 }
173
174 pub fn runtime_auto(mut self) -> Self {
176 self.config.runtime_enabled = true;
177 self.config.runtime_backend = koi_runtime::RuntimeBackendKind::Auto;
178 self
179 }
180
181 pub fn orchestrator(mut self, enabled: bool) -> Self {
185 self.config.orchestrator_enabled = enabled;
186 self
187 }
188
189 pub fn certmesh_background(mut self, enabled: bool) -> Self {
194 self.config.certmesh_background_enabled = enabled;
195 self
196 }
197
198 pub fn http_port(mut self, port: u16) -> Self {
199 self.config.http_port = port;
200 self
201 }
202
203 pub fn dashboard(mut self, enabled: bool) -> Self {
204 self.config.dashboard_enabled = enabled;
205 self
206 }
207
208 pub fn api_docs(mut self, enabled: bool) -> Self {
209 self.config.api_docs_enabled = enabled;
210 self
211 }
212
213 pub fn mdns_browser(mut self, enabled: bool) -> Self {
214 self.config.mdns_browser_enabled = enabled;
215 self
216 }
217
218 pub fn announce_http(mut self, enabled: bool) -> Self {
219 self.config.announce_http = enabled;
220 self
221 }
222
223 pub fn events<F>(mut self, handler: F) -> Self
224 where
225 F: Fn(KoiEvent) + Send + Sync + 'static,
226 {
227 self.event_handler = Some(Arc::new(handler));
228 self
229 }
230
231 pub fn extra_firewall_ports(mut self, ports: Vec<koi_common::firewall::FirewallPort>) -> Self {
236 self.extra_firewall_ports = ports;
237 self
238 }
239
240 pub fn ensure_firewall_rules(self, prefix: &str) -> Self {
251 let mut all_ports = self.config.firewall_ports();
252 all_ports.extend(self.extra_firewall_ports.iter().cloned());
253
254 let count = koi_common::firewall::ensure_firewall_rules(prefix, &all_ports);
255 if count > 0 {
256 tracing::info!(count, "Firewall rules ensured");
257 }
258 self
259 }
260
261 pub fn build(self) -> Result<KoiEmbedded> {
262 Ok(KoiEmbedded {
263 config: self.config,
264 event_handler: self.event_handler,
265 })
266 }
267}
268
269impl Default for Builder {
270 fn default() -> Self {
271 Self::new()
272 }
273}
274
275pub struct KoiEmbedded {
276 config: KoiConfig,
277 event_handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
278}
279
280impl KoiEmbedded {
281 pub async fn start(self) -> Result<KoiHandle> {
282 let cancel = CancellationToken::new();
283 let (event_tx, _) = broadcast::channel(256);
284 let mut tasks: Vec<JoinHandle<()>> = Vec::new();
285
286 if self.config.service_mode != ServiceMode::EmbeddedOnly {
287 let client = Arc::new(KoiClient::new(&self.config.service_endpoint));
288 match self.config.service_mode {
289 ServiceMode::ClientOnly => {
290 tokio::task::spawn_blocking({
291 let client = Arc::clone(&client);
292 move || client.health()
293 })
294 .await
295 .map_err(map_join_error)??;
296 return Ok(KoiHandle::new_remote(client, event_tx, cancel, tasks));
297 }
298 ServiceMode::Auto => {
299 let health = tokio::task::spawn_blocking({
300 let client = Arc::clone(&client);
301 move || client.health()
302 })
303 .await;
304 if matches!(health, Ok(Ok(()))) {
305 return Ok(KoiHandle::new_remote(client, event_tx, cancel, tasks));
306 }
307 }
308 ServiceMode::EmbeddedOnly => {}
309 }
310 }
311
312 let mdns = if self.config.mdns_enabled {
313 Some(Arc::new(koi_mdns::MdnsCore::with_cancel(cancel.clone())?))
314 } else {
315 None
316 };
317
318 let certmesh = if self.config.certmesh_enabled {
319 let data_dir = self.config.data_dir.clone();
320 tokio::task::spawn_blocking(move || {
321 koi_compose::cores::init_certmesh_core(data_dir.as_deref())
322 })
323 .await
324 .map_err(|e| std::io::Error::other(format!("certmesh init: {e}")))?
325 } else {
326 None
327 };
328
329 let mdns_bridge: Option<Arc<dyn koi_common::integration::MdnsSnapshot>> =
331 if let Some(ref core) = mdns {
332 Some(MdnsBridge::spawn(core.clone()).await)
333 } else {
334 None
335 };
336
337 let certmesh_bridge: Option<Arc<dyn koi_common::integration::CertmeshSnapshot>> =
338 certmesh.as_ref().map(|core| {
339 CertmeshBridge::new(core.clone())
340 as Arc<dyn koi_common::integration::CertmeshSnapshot>
341 });
342
343 let alias_feedback: Option<Arc<dyn koi_common::integration::AliasFeedback>> =
344 certmesh.as_ref().map(|core| {
345 AliasFeedbackBridge::new(core.clone())
346 as Arc<dyn koi_common::integration::AliasFeedback>
347 });
348
349 let dns = if self.config.dns_enabled {
350 let mut dns_config = self.config.dns_config.clone();
351 if let Some(dir) = &self.config.data_dir {
354 dns_config.state_path = Some(dir.join("state").join("dns.json"));
355 }
356 let core = koi_dns::DnsCore::new(
357 dns_config,
358 mdns_bridge.clone(),
359 certmesh_bridge.clone(),
360 alias_feedback,
361 )
362 .await?;
363 Some(Arc::new(koi_dns::DnsRuntime::new(core)))
364 } else {
365 None
366 };
367
368 let proxy = if self.config.proxy_enabled {
369 let core = if let Some(dir) = &self.config.data_dir {
370 Arc::new(koi_proxy::ProxyCore::with_data_dir(dir)?)
371 } else {
372 Arc::new(koi_proxy::ProxyCore::new()?)
373 };
374 Some(Arc::new(koi_proxy::ProxyRuntime::new(core)))
375 } else {
376 None
377 };
378
379 let dns_bridge: Option<Arc<dyn koi_common::integration::DnsProbe>> = dns
380 .as_ref()
381 .map(|rt| DnsBridge::new(rt.clone()) as Arc<dyn koi_common::integration::DnsProbe>);
382
383 let proxy_bridge: Option<Arc<dyn koi_common::integration::ProxySnapshot>> =
384 proxy.as_ref().map(|rt| {
385 ProxyBridge::new(rt.core()) as Arc<dyn koi_common::integration::ProxySnapshot>
386 });
387
388 let health = if self.config.health_enabled {
389 let core = koi_health::HealthCore::new(
390 mdns_bridge.clone(),
391 dns_bridge,
392 certmesh_bridge,
393 proxy_bridge,
394 )
395 .await;
396 Some(Arc::new(koi_health::HealthRuntime::new(Arc::new(core))))
397 } else {
398 None
399 };
400
401 if let Some(runtime) = &dns {
402 if self.config.dns_auto_start {
403 let _ = runtime.start().await?;
404 }
405 }
406
407 if let Some(runtime) = &health {
408 if self.config.health_auto_start {
409 let _ = runtime.start().await?;
410 }
411 }
412
413 if let Some(runtime) = &proxy {
414 if self.config.proxy_auto_start {
415 runtime.start_all().await?;
416 }
417 }
418
419 let udp = if self.config.udp_enabled {
420 Some(Arc::new(koi_udp::UdpRuntime::new(cancel.clone())))
421 } else {
422 None
423 };
424
425 let runtime = if self.config.runtime_enabled {
426 let config = koi_runtime::RuntimeConfig {
427 backend_kind: self.config.runtime_backend,
428 socket_path: None,
429 };
430 let core = Arc::new(koi_runtime::RuntimeCore::new(config));
431 match core.start_watching(cancel.clone()).await {
432 Ok(()) => {
433 tracing::info!("Runtime adapter started");
434 Some(core)
435 }
436 Err(e) => {
437 tracing::warn!(error = %e, "Runtime backend unavailable — continuing without runtime adapter");
438 None
439 }
440 }
441 } else {
442 None
443 };
444
445 let dashboard_state = if self.config.dashboard_enabled && self.config.http_enabled {
447 let started_at = std::time::Instant::now();
448 let snap_mdns = mdns.clone();
449 let snap_certmesh = certmesh.clone();
450 let snap_dns = dns.clone();
451 let snap_health = health.clone();
452 let snap_proxy = proxy.clone();
453 let snap_udp = udp.clone();
454 let snap_runtime = runtime.clone();
455
456 let snapshot_fn: koi_dashboard::dashboard::SnapshotFn = Arc::new(move || {
457 let m = snap_mdns.clone();
458 let cm = snap_certmesh.clone();
459 let d = snap_dns.clone();
460 let h = snap_health.clone();
461 let p = snap_proxy.clone();
462 let u = snap_udp.clone();
463 let rt = snap_runtime.clone();
464 Box::pin(async move { build_embedded_snapshot(m, cm, d, h, p, u, rt).await })
465 });
466
467 let (dash_event_tx, _) = broadcast::channel(256);
468 let ds = koi_dashboard::dashboard::DashboardState {
469 identity: koi_dashboard::dashboard::DashboardIdentity {
470 version: env!("CARGO_PKG_VERSION").to_string(),
471 platform: std::env::consts::OS.to_string(),
472 },
473 mode: "embedded",
474 snapshot_fn,
475 event_tx: dash_event_tx.clone(),
476 started_at,
477 };
478
479 tasks.push(koi_dashboard::forward::spawn_event_forwarder(
482 koi_dashboard::forward::ForwarderCores {
483 mdns: mdns.clone(),
484 certmesh: certmesh.clone(),
485 dns: dns.clone(),
486 health: health.clone(),
487 proxy: proxy.clone(),
488 runtime: runtime.clone(),
489 },
490 dash_event_tx,
491 cancel.clone(),
492 ));
493
494 Some(ds)
495 } else {
496 None
497 };
498
499 let browser_state = if self.config.mdns_browser_enabled && self.config.http_enabled {
502 if let Some(ref mdns_core) = mdns {
503 Some(koi_dashboard::browser::build_state(
504 mdns_core.clone(),
505 cancel.clone(),
506 ))
507 } else {
508 tracing::warn!("mdns_browser enabled but mDNS is disabled — skipping browser");
509 None
510 }
511 } else {
512 None
513 };
514
515 if self.config.http_enabled {
517 let http_port = self.config.http_port;
518 let http_cancel = cancel.clone();
519 let http_mdns = mdns.clone();
520 let http_dns = dns.clone();
521 let http_health = health.clone();
522 let http_certmesh = certmesh.clone();
523 let http_proxy = proxy.clone();
524 let http_udp = udp.clone();
525 let http_runtime = runtime.clone();
526 let http_api_docs = self.config.api_docs_enabled;
527 tasks.push(tokio::spawn(async move {
528 http::serve(
529 http_port,
530 http_mdns,
531 http_dns,
532 http_health,
533 http_certmesh,
534 http_proxy,
535 http_udp,
536 http_runtime,
537 dashboard_state,
538 browser_state,
539 http_api_docs,
540 http_cancel,
541 )
542 .await;
543 }));
544 }
545
546 let http_announce_id =
548 if self.config.announce_http && self.config.http_enabled && self.config.mdns_enabled {
549 if let Some(ref mdns_core) = mdns {
550 let hostname = hostname::get()
551 .ok()
552 .and_then(|os| os.into_string().ok())
553 .unwrap_or_else(|| "unknown".to_string());
554
555 let mut txt = std::collections::HashMap::new();
556 txt.insert("path".to_string(), "/".to_string());
557 txt.insert("version".to_string(), env!("CARGO_PKG_VERSION").to_string());
558 txt.insert("api".to_string(), "v1".to_string());
559 txt.insert(
560 "dashboard".to_string(),
561 self.config.dashboard_enabled.to_string(),
562 );
563
564 if let Some(ref core) = certmesh {
568 let id = core.local_identity().await;
569 koi_common::peer::stamp(
570 &mut txt,
571 core.posture(),
572 id.as_ref().map(|i| i.ca_fingerprint.as_str()),
573 id.as_ref().map(|i| i.renewal.expires_at),
574 );
575 }
576
577 let payload = koi_mdns::protocol::RegisterPayload {
578 name: format!("Koi ({hostname})"),
579 service_type: "_http._tcp".to_string(),
580 port: self.config.http_port,
581 ip: None,
582 lease_secs: None,
583 txt,
584 };
585 match mdns_core.register(payload) {
586 Ok(result) => {
587 tracing::info!(
588 id = %result.id,
589 port = self.config.http_port,
590 "HTTP server announced via mDNS"
591 );
592 Some(result.id)
593 }
594 Err(e) => {
595 tracing::warn!(error = %e, "Failed to announce HTTP server via mDNS");
596 None
597 }
598 }
599 } else {
600 None
601 }
602 } else {
603 None
604 };
605
606 if let Some(core) = &mdns {
611 spawn_event_mapper(
612 core.subscribe(),
613 map_mdns_event,
614 event_tx.clone(),
615 self.event_handler.clone(),
616 cancel.clone(),
617 &mut tasks,
618 );
619 }
620 if let Some(runtime) = &health {
621 spawn_event_mapper(
622 runtime.core().subscribe(),
623 |e| Some(map_health_event(e)),
624 event_tx.clone(),
625 self.event_handler.clone(),
626 cancel.clone(),
627 &mut tasks,
628 );
629 }
630 if let Some(runtime) = &dns {
631 spawn_event_mapper(
632 runtime.core().subscribe(),
633 |e| Some(map_dns_event(e)),
634 event_tx.clone(),
635 self.event_handler.clone(),
636 cancel.clone(),
637 &mut tasks,
638 );
639 }
640 if let Some(core) = &certmesh {
641 spawn_event_mapper(
642 core.subscribe(),
643 |e| Some(map_certmesh_event(e)),
644 event_tx.clone(),
645 self.event_handler.clone(),
646 cancel.clone(),
647 &mut tasks,
648 );
649 spawn_posture_watcher(
653 core.watch_posture(),
654 event_tx.clone(),
655 self.event_handler.clone(),
656 cancel.clone(),
657 &mut tasks,
658 );
659 }
660 if let Some(runtime_proxy) = &proxy {
661 spawn_event_mapper(
662 runtime_proxy.core().subscribe(),
663 |e| Some(map_proxy_event(e)),
664 event_tx.clone(),
665 self.event_handler.clone(),
666 cancel.clone(),
667 &mut tasks,
668 );
669 }
670 if let Some(runtime_core) = &runtime {
671 spawn_event_mapper(
672 runtime_core.subscribe(),
673 map_runtime_event,
674 event_tx.clone(),
675 self.event_handler.clone(),
676 cancel.clone(),
677 &mut tasks,
678 );
679 }
680
681 if self.config.orchestrator_enabled {
685 if let Some(ref runtime_core) = runtime {
686 tasks.push(koi_compose::orchestrator::spawn_orchestrator(
687 runtime_core,
688 koi_compose::orchestrator::OrchestrationTargets {
689 mdns: mdns.clone(),
690 dns: dns.clone(),
691 health: health.clone(),
692 proxy: proxy.clone(),
693 },
694 cancel.clone(),
695 ));
696 } else {
697 tracing::warn!(
698 "orchestrator enabled but the runtime adapter is not — skipping orchestrator"
699 );
700 }
701 }
702
703 if self.config.certmesh_background_enabled {
707 if let Some(ref certmesh_core) = certmesh {
708 koi_compose::certmesh::spawn_enrollment_approval(
709 certmesh_core,
710 koi_compose::certmesh::deny_and_log_decider(),
711 &cancel,
712 &mut tasks,
713 )
714 .await;
715 koi_compose::certmesh::spawn_certmesh_background_tasks(
716 certmesh_core,
717 &cancel,
718 &mut tasks,
719 );
720 } else {
721 tracing::warn!(
722 "certmesh_background enabled but certmesh is not — skipping certmesh loops"
723 );
724 }
725 }
726
727 Ok(KoiHandle::new_embedded(
728 mdns,
729 dns,
730 health,
731 certmesh,
732 proxy,
733 udp,
734 runtime,
735 self.config.data_dir.clone(),
736 event_tx,
737 cancel,
738 tasks,
739 http_announce_id,
740 ))
741 }
742}
743
744fn map_mdns_event(event: MdnsEvent) -> Option<KoiEvent> {
745 match event {
746 MdnsEvent::Found(record) => Some(KoiEvent::MdnsFound(record)),
747 MdnsEvent::Resolved(record) => Some(KoiEvent::MdnsResolved(record)),
748 MdnsEvent::Removed { name, service_type } => {
749 Some(KoiEvent::MdnsRemoved { name, service_type })
750 }
751 }
752}
753
754fn map_health_event(event: koi_health::HealthEvent) -> KoiEvent {
755 match event {
756 koi_health::HealthEvent::StatusChanged { name, status } => {
757 KoiEvent::HealthChanged { name, status }
758 }
759 }
760}
761
762fn map_dns_event(event: koi_dns::DnsEvent) -> KoiEvent {
763 match event {
764 koi_dns::DnsEvent::EntryUpdated { name, ip } => KoiEvent::DnsEntryUpdated { name, ip },
765 koi_dns::DnsEvent::EntryRemoved { name } => KoiEvent::DnsEntryRemoved { name },
766 }
767}
768
769fn map_certmesh_event(event: koi_certmesh::CertmeshEvent) -> KoiEvent {
770 match event {
771 koi_certmesh::CertmeshEvent::MemberJoined {
772 hostname,
773 fingerprint,
774 } => KoiEvent::CertmeshMemberJoined {
775 hostname,
776 fingerprint,
777 },
778 koi_certmesh::CertmeshEvent::MemberRevoked { hostname } => {
779 KoiEvent::CertmeshMemberRevoked { hostname }
780 }
781 koi_certmesh::CertmeshEvent::Destroyed => KoiEvent::CertmeshDestroyed,
782 }
783}
784
785fn map_proxy_event(event: koi_proxy::ProxyEvent) -> KoiEvent {
786 match event {
787 koi_proxy::ProxyEvent::EntryUpdated { entry } => KoiEvent::ProxyEntryUpdated { entry },
788 koi_proxy::ProxyEvent::EntryRemoved { name } => KoiEvent::ProxyEntryRemoved { name },
789 }
790}
791
792fn map_runtime_event(event: koi_runtime::RuntimeEvent) -> Option<KoiEvent> {
793 match event {
794 koi_runtime::RuntimeEvent::Started(instance) => Some(KoiEvent::RuntimeInstanceStarted {
795 name: instance.name,
796 backend: instance.backend,
797 }),
798 koi_runtime::RuntimeEvent::Stopped { name, .. } => {
799 Some(KoiEvent::RuntimeInstanceStopped { name })
800 }
801 _ => None,
804 }
805}
806
807fn spawn_event_mapper<E, F>(
814 mut rx: broadcast::Receiver<E>,
815 map: F,
816 tx: broadcast::Sender<KoiEvent>,
817 handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
818 cancel: CancellationToken,
819 tasks: &mut Vec<JoinHandle<()>>,
820) where
821 E: Clone + Send + 'static,
822 F: Fn(E) -> Option<KoiEvent> + Send + 'static,
823{
824 tasks.push(tokio::spawn(async move {
825 loop {
826 tokio::select! {
827 _ = cancel.cancelled() => break,
828 msg = rx.recv() => {
829 let Ok(event) = msg else { continue; };
830 if let Some(mapped) = map(event) {
831 emit_event(&tx, handler.as_ref(), mapped);
832 }
833 }
834 }
835 }
836 }));
837}
838
839fn spawn_posture_watcher(
845 mut rx: tokio::sync::watch::Receiver<koi_common::posture::Posture>,
846 tx: broadcast::Sender<KoiEvent>,
847 handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
848 cancel: CancellationToken,
849 tasks: &mut Vec<JoinHandle<()>>,
850) {
851 tasks.push(tokio::spawn(async move {
852 let mut last = *rx.borrow_and_update();
853 loop {
854 tokio::select! {
855 _ = cancel.cancelled() => break,
856 res = rx.changed() => {
857 if res.is_err() {
858 break; }
860 let to = *rx.borrow_and_update();
861 if to != last {
862 emit_event(&tx, handler.as_ref(), KoiEvent::PostureChanged { from: last, to });
863 last = to;
864 }
865 }
866 }
867 }
868 }));
869}
870
871fn emit_event(
872 tx: &broadcast::Sender<KoiEvent>,
873 handler: Option<&Arc<dyn Fn(KoiEvent) + Send + Sync>>,
874 event: KoiEvent,
875) {
876 if let Some(handler) = handler {
877 handler(event.clone());
878 }
879 let _ = tx.send(event);
880}
881
882pub(crate) fn map_join_error(err: tokio::task::JoinError) -> KoiError {
883 KoiError::Io(std::io::Error::other(err.to_string()))
884}
885
886async fn build_embedded_snapshot(
888 mdns: Option<Arc<koi_mdns::MdnsCore>>,
889 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
890 dns: Option<Arc<koi_dns::DnsRuntime>>,
891 health: Option<Arc<koi_health::HealthRuntime>>,
892 proxy: Option<Arc<koi_proxy::ProxyRuntime>>,
893 udp: Option<Arc<koi_udp::UdpRuntime>>,
894 runtime: Option<Arc<koi_runtime::RuntimeCore>>,
895) -> serde_json::Value {
896 let cores = koi_compose::cores::Cores {
899 mdns,
900 certmesh,
901 dns,
902 health,
903 proxy,
904 udp,
905 runtime,
906 mdns_snapshot: None,
907 };
908 let capabilities: Vec<serde_json::Value> = koi_compose::status::assemble_capabilities(&cores)
909 .await
910 .into_iter()
911 .map(|c| {
912 serde_json::json!({
913 "name": c.status.name,
914 "enabled": c.enabled,
915 "healthy": c.status.healthy,
916 "summary": c.status.summary,
917 })
918 })
919 .collect();
920 serde_json::json!({ "capabilities": capabilities })
921}
922
923#[cfg(test)]
924mod tests {
925 use super::*;
926 use koi_common::types::ServiceRecord;
927 use std::collections::HashMap;
928
929 fn sample_record() -> ServiceRecord {
930 ServiceRecord {
931 name: "Test Service".to_string(),
932 service_type: "_http._tcp".to_string(),
933 host: Some("host.local".to_string()),
934 ip: Some("10.0.0.1".to_string()),
935 port: Some(8080),
936 txt: HashMap::new(),
937 }
938 }
939
940 #[test]
943 fn koi_error_disabled_capability_display() {
944 let err = KoiError::DisabledCapability("mdns");
945 assert_eq!(err.to_string(), "capability disabled: mdns");
946 }
947
948 #[test]
949 fn koi_error_io_from_impl() {
950 let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "file missing");
951 let err: KoiError = io_err.into();
952 assert!(matches!(err, KoiError::Io(_)));
953 assert!(err.to_string().contains("file missing"));
954 }
955
956 #[test]
957 fn koi_error_debug_does_not_panic() {
958 let err = KoiError::DisabledCapability("proxy");
959 let debug = format!("{err:?}");
960 assert!(debug.contains("DisabledCapability"));
961 }
962
963 #[tokio::test]
966 async fn init_certmesh_core_honors_custom_data_dir_end_to_end() {
967 let base = koi_common::test::ensure_data_dir("koi-embedded-datadir-tests");
971 let data_dir = base.join("custom-pond");
972 let paths = koi_certmesh::CertmeshPaths::with_data_dir(data_dir.clone());
973
974 let fresh =
978 koi_compose::cores::init_certmesh_core(Some(&data_dir)).expect("uninitialized core");
979 assert_eq!(
980 fresh.paths().data_dir(),
981 data_dir.as_path(),
982 "uninitialized core must keep the injected data_dir"
983 );
984
985 koi_certmesh::ca::create_ca("pond-pass-strong", &[7u8; 32], &paths)
987 .expect("create CA under injected dir");
988 let roster = koi_certmesh::roster::Roster::new(false, true, Some("ops".to_string()));
990 koi_certmesh::roster::save_roster(&roster, &paths.roster_path())
991 .expect("save roster under injected dir");
992
993 let reopened =
996 koi_compose::cores::init_certmesh_core(Some(&data_dir)).expect("locked core");
997 assert_eq!(reopened.paths().data_dir(), data_dir.as_path());
998 reopened
999 .unlock("pond-pass-strong")
1000 .await
1001 .expect("unlock CA from the injected data_dir");
1002 }
1003
1004 #[test]
1007 fn map_mdns_found() {
1008 let record = sample_record();
1009 let event = koi_mdns::MdnsEvent::Found(record.clone());
1010 let mapped = map_mdns_event(event);
1011 assert!(mapped.is_some());
1012 match mapped.unwrap() {
1013 KoiEvent::MdnsFound(r) => assert_eq!(r.name, "Test Service"),
1014 other => panic!("expected MdnsFound, got {other:?}"),
1015 }
1016 }
1017
1018 #[test]
1019 fn map_mdns_resolved() {
1020 let record = sample_record();
1021 let event = koi_mdns::MdnsEvent::Resolved(record);
1022 let mapped = map_mdns_event(event);
1023 assert!(mapped.is_some());
1024 match mapped.unwrap() {
1025 KoiEvent::MdnsResolved(r) => {
1026 assert_eq!(r.port, Some(8080));
1027 assert_eq!(r.service_type, "_http._tcp");
1028 }
1029 other => panic!("expected MdnsResolved, got {other:?}"),
1030 }
1031 }
1032
1033 #[test]
1034 fn map_mdns_removed() {
1035 let event = koi_mdns::MdnsEvent::Removed {
1036 name: "Gone Service".to_string(),
1037 service_type: "_http._tcp".to_string(),
1038 };
1039 let mapped = map_mdns_event(event);
1040 assert!(mapped.is_some());
1041 match mapped.unwrap() {
1042 KoiEvent::MdnsRemoved { name, service_type } => {
1043 assert_eq!(name, "Gone Service");
1044 assert_eq!(service_type, "_http._tcp");
1045 }
1046 other => panic!("expected MdnsRemoved, got {other:?}"),
1047 }
1048 }
1049
1050 #[test]
1053 fn map_health_status_changed_up() {
1054 let event = koi_health::HealthEvent::StatusChanged {
1055 name: "api".to_string(),
1056 status: koi_health::HealthStatus::Up,
1057 };
1058 let mapped = map_health_event(event);
1059 match mapped {
1060 KoiEvent::HealthChanged { name, status } => {
1061 assert_eq!(name, "api");
1062 assert!(matches!(status, koi_health::HealthStatus::Up));
1063 }
1064 other => panic!("expected HealthChanged, got {other:?}"),
1065 }
1066 }
1067
1068 #[test]
1069 fn map_health_status_changed_down() {
1070 let event = koi_health::HealthEvent::StatusChanged {
1071 name: "db".to_string(),
1072 status: koi_health::HealthStatus::Down,
1073 };
1074 let mapped = map_health_event(event);
1075 match mapped {
1076 KoiEvent::HealthChanged { name, status } => {
1077 assert_eq!(name, "db");
1078 assert!(matches!(status, koi_health::HealthStatus::Down));
1079 }
1080 other => panic!("expected HealthChanged, got {other:?}"),
1081 }
1082 }
1083
1084 #[test]
1087 fn map_dns_entry_updated() {
1088 let event = koi_dns::DnsEvent::EntryUpdated {
1089 name: "grafana".to_string(),
1090 ip: "10.0.0.5".to_string(),
1091 };
1092 let mapped = map_dns_event(event);
1093 match mapped {
1094 KoiEvent::DnsEntryUpdated { name, ip } => {
1095 assert_eq!(name, "grafana");
1096 assert_eq!(ip, "10.0.0.5");
1097 }
1098 other => panic!("expected DnsEntryUpdated, got {other:?}"),
1099 }
1100 }
1101
1102 #[test]
1103 fn map_dns_entry_removed() {
1104 let event = koi_dns::DnsEvent::EntryRemoved {
1105 name: "old-host".to_string(),
1106 };
1107 let mapped = map_dns_event(event);
1108 match mapped {
1109 KoiEvent::DnsEntryRemoved { name } => {
1110 assert_eq!(name, "old-host");
1111 }
1112 other => panic!("expected DnsEntryRemoved, got {other:?}"),
1113 }
1114 }
1115
1116 #[test]
1119 fn map_certmesh_member_joined() {
1120 let event = koi_certmesh::CertmeshEvent::MemberJoined {
1121 hostname: "node-a".to_string(),
1122 fingerprint: "sha256:abc".to_string(),
1123 };
1124 let mapped = map_certmesh_event(event);
1125 match mapped {
1126 KoiEvent::CertmeshMemberJoined {
1127 hostname,
1128 fingerprint,
1129 } => {
1130 assert_eq!(hostname, "node-a");
1131 assert_eq!(fingerprint, "sha256:abc");
1132 }
1133 other => panic!("expected CertmeshMemberJoined, got {other:?}"),
1134 }
1135 }
1136
1137 #[test]
1138 fn map_certmesh_member_revoked() {
1139 let event = koi_certmesh::CertmeshEvent::MemberRevoked {
1140 hostname: "node-b".to_string(),
1141 };
1142 let mapped = map_certmesh_event(event);
1143 match mapped {
1144 KoiEvent::CertmeshMemberRevoked { hostname } => {
1145 assert_eq!(hostname, "node-b");
1146 }
1147 other => panic!("expected CertmeshMemberRevoked, got {other:?}"),
1148 }
1149 }
1150
1151 #[test]
1152 fn map_certmesh_destroyed() {
1153 let event = koi_certmesh::CertmeshEvent::Destroyed;
1154 let mapped = map_certmesh_event(event);
1155 assert!(matches!(mapped, KoiEvent::CertmeshDestroyed));
1156 }
1157
1158 #[tokio::test]
1159 async fn posture_watcher_emits_upgrade_and_degrade() {
1160 use koi_common::posture::Posture;
1161 let (tx_p, rx_p) = tokio::sync::watch::channel(Posture::OPEN);
1162 let (ev_tx, mut ev_rx) = broadcast::channel(16);
1163 let cancel = CancellationToken::new();
1164 let mut tasks = Vec::new();
1165 spawn_posture_watcher(rx_p, ev_tx, None, cancel.clone(), &mut tasks);
1166 tokio::task::yield_now().await;
1170
1171 tx_p.send(Posture::new(true, false)).unwrap();
1173 let ev = tokio::time::timeout(std::time::Duration::from_secs(1), ev_rx.recv())
1174 .await
1175 .expect("event arrives")
1176 .expect("recv ok");
1177 assert!(
1178 matches!(ev, KoiEvent::PostureChanged { from, to } if !from.signed && to.signed),
1179 "expected upgrade, got {ev:?}"
1180 );
1181
1182 tx_p.send(Posture::OPEN).unwrap();
1184 let ev = tokio::time::timeout(std::time::Duration::from_secs(1), ev_rx.recv())
1185 .await
1186 .expect("event arrives")
1187 .expect("recv ok");
1188 assert!(
1189 matches!(ev, KoiEvent::PostureChanged { from, to } if from.signed && !to.signed),
1190 "expected degrade, got {ev:?}"
1191 );
1192
1193 cancel.cancel();
1194 for t in tasks {
1195 let _ = t.await;
1196 }
1197 }
1198
1199 #[test]
1202 fn map_proxy_entry_updated() {
1203 let entry = koi_proxy::ProxyEntry {
1204 name: "web".to_string(),
1205 listen_port: 443,
1206 backend: "http://localhost:3000".to_string(),
1207 allow_remote: true,
1208 };
1209 let event = koi_proxy::ProxyEvent::EntryUpdated {
1210 entry: entry.clone(),
1211 };
1212 let mapped = map_proxy_event(event);
1213 match mapped {
1214 KoiEvent::ProxyEntryUpdated { entry } => {
1215 assert_eq!(entry.name, "web");
1216 assert_eq!(entry.listen_port, 443);
1217 assert!(entry.allow_remote);
1218 }
1219 other => panic!("expected ProxyEntryUpdated, got {other:?}"),
1220 }
1221 }
1222
1223 #[test]
1224 fn map_proxy_entry_removed() {
1225 let event = koi_proxy::ProxyEvent::EntryRemoved {
1226 name: "old-proxy".to_string(),
1227 };
1228 let mapped = map_proxy_event(event);
1229 match mapped {
1230 KoiEvent::ProxyEntryRemoved { name } => {
1231 assert_eq!(name, "old-proxy");
1232 }
1233 other => panic!("expected ProxyEntryRemoved, got {other:?}"),
1234 }
1235 }
1236
1237 #[test]
1240 fn map_join_error_produces_io_error() {
1241 let io_err = std::io::Error::other("simulated join error");
1244 let koi_err = KoiError::Io(io_err);
1245 assert!(koi_err.to_string().contains("simulated join error"));
1246 }
1247
1248 #[test]
1251 fn builder_default_config() {
1252 let builder = Builder::new();
1253 let embedded = builder.build().expect("build should succeed");
1254 assert!(embedded.config.mdns_enabled);
1255 assert!(!embedded.config.http_enabled);
1256 assert_eq!(embedded.config.http_port, 5641);
1257 }
1258
1259 #[test]
1260 fn builder_default_trait() {
1261 let builder = Builder::default();
1262 let embedded = builder.build().expect("build should succeed");
1263 assert_eq!(embedded.config.service_endpoint, "http://127.0.0.1:5641");
1264 }
1265
1266 #[test]
1267 fn builder_fluent_overrides() {
1268 let embedded = Builder::new()
1269 .http(true)
1270 .mdns(false)
1271 .dns_enabled(false)
1272 .health(true)
1273 .certmesh(true)
1274 .proxy(true)
1275 .udp(true)
1276 .http_port(9000)
1277 .dashboard(true)
1278 .api_docs(true)
1279 .mdns_browser(true)
1280 .announce_http(true)
1281 .dns_auto_start(true)
1282 .health_auto_start(true)
1283 .proxy_auto_start(true)
1284 .service_endpoint("http://10.0.0.1:8080")
1285 .service_mode(ServiceMode::EmbeddedOnly)
1286 .data_dir("/tmp/koi-test")
1287 .build()
1288 .expect("build should succeed");
1289
1290 assert!(embedded.config.http_enabled);
1291 assert!(!embedded.config.mdns_enabled);
1292 assert!(!embedded.config.dns_enabled);
1293 assert!(embedded.config.health_enabled);
1294 assert!(embedded.config.certmesh_enabled);
1295 assert!(embedded.config.proxy_enabled);
1296 assert!(embedded.config.udp_enabled);
1297 assert_eq!(embedded.config.http_port, 9000);
1298 assert!(embedded.config.dashboard_enabled);
1299 assert!(embedded.config.api_docs_enabled);
1300 assert!(embedded.config.mdns_browser_enabled);
1301 assert!(embedded.config.announce_http);
1302 assert!(embedded.config.dns_auto_start);
1303 assert!(embedded.config.health_auto_start);
1304 assert!(embedded.config.proxy_auto_start);
1305 assert_eq!(embedded.config.service_endpoint, "http://10.0.0.1:8080");
1306 assert_eq!(embedded.config.service_mode, ServiceMode::EmbeddedOnly);
1307 assert_eq!(
1308 embedded.config.data_dir,
1309 Some(std::path::PathBuf::from("/tmp/koi-test"))
1310 );
1311 }
1312
1313 #[test]
1314 fn orchestrator_and_certmesh_background_are_opt_in() {
1315 let default_cfg = Builder::new().build().expect("build should succeed");
1317 assert!(!default_cfg.config.orchestrator_enabled);
1318 assert!(!default_cfg.config.certmesh_background_enabled);
1319
1320 let opted = Builder::new()
1322 .runtime_auto()
1323 .orchestrator(true)
1324 .certmesh(true)
1325 .certmesh_background(true)
1326 .build()
1327 .expect("build should succeed");
1328 assert!(opted.config.orchestrator_enabled);
1329 assert!(opted.config.certmesh_background_enabled);
1330 }
1331
1332 #[test]
1333 fn builder_dns_configure_closure() {
1334 let embedded = Builder::new()
1335 .dns(|b| b.port(5353).zone("home").local_ttl(120))
1336 .build()
1337 .expect("build should succeed");
1338
1339 assert_eq!(embedded.config.dns_config.port, 5353);
1340 assert_eq!(embedded.config.dns_config.zone, "home");
1341 assert_eq!(embedded.config.dns_config.local_ttl, 120);
1342 }
1343
1344 #[test]
1345 fn builder_event_handler() {
1346 use std::sync::atomic::{AtomicBool, Ordering};
1347 let called = Arc::new(AtomicBool::new(false));
1348 let called_clone = called.clone();
1349
1350 let embedded = Builder::new()
1351 .events(move |_event| {
1352 called_clone.store(true, Ordering::SeqCst);
1353 })
1354 .build()
1355 .expect("build should succeed");
1356
1357 assert!(embedded.event_handler.is_some());
1358 }
1359
1360 #[test]
1361 fn builder_extra_firewall_ports() {
1362 use koi_common::firewall::{FirewallPort, FirewallProtocol};
1363 let extra = vec![FirewallPort::new("Custom", FirewallProtocol::Tcp, 12345)];
1364 let _builder = Builder::new().extra_firewall_ports(extra);
1365 }
1367
1368 #[test]
1371 fn result_type_works_with_ok() {
1372 let result: Result<i32> = Ok(42);
1373 assert!(matches!(result, Ok(42)));
1374 }
1375
1376 #[test]
1377 fn result_type_works_with_err() {
1378 let result: Result<i32> = Err(KoiError::DisabledCapability("test"));
1379 assert!(result.is_err());
1380 }
1381}