1mod config;
2mod events;
3mod handle;
4mod serve;
5pub mod testkit;
6
7use std::sync::Arc;
8
9use tokio::sync::broadcast;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use koi_client::KoiClient;
14
15pub use config::{DnsConfigBuilder, KoiConfig, ServiceMode};
16pub use events::KoiEvent;
17pub use handle::{
18 CertmeshHandle, DnsHandle, HealthHandle, KoiHandle, MdnsHandle, ProxyHandle,
19 DEFAULT_DISCOVER_WINDOW,
20};
21
22pub use koi_common::firewall::{FirewallPort, FirewallProtocol};
24pub use koi_certmesh::PeerClient;
26pub use koi_common::diagnosis::{CheckStatus, DiagnosisCheck, DiagnosisStatus, TrustDiagnosis};
27pub use koi_common::peer::Peer;
28pub use koi_common::posture::{Posture, PostureLevel};
29pub use koi_common::sealed::{Confidentiality, Opened, Sealed};
30pub use koi_common::types::ServiceRecord;
31pub use koi_config::state::DnsEntry;
32pub use koi_health::{HealthCheck, HealthSnapshot, ServiceCheckKind};
33pub use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
34pub use koi_mdns::MdnsEvent;
35pub use koi_proxy::ProxyEntry;
36pub use serve::serve_adaptive;
38
39pub use koi_crypto::vault::{Vault, VaultError};
41
42pub use koi_runtime::{RuntimeBackendKind, RuntimeConfig};
44
45pub type Result<T> = std::result::Result<T, KoiError>;
46
47#[derive(Debug, thiserror::Error)]
48pub enum KoiError {
49 #[error("capability disabled: {0}")]
50 DisabledCapability(&'static str),
51 #[error("not available in client (remote) mode: {0}")]
52 RemoteUnsupported(&'static str),
53 #[error("mdns error: {0}")]
54 Mdns(#[from] koi_mdns::MdnsError),
55 #[error("dns error: {0}")]
56 Dns(#[from] koi_dns::DnsError),
57 #[error("health error: {0}")]
58 Health(#[from] koi_health::HealthError),
59 #[error("proxy error: {0}")]
60 Proxy(#[from] koi_proxy::ProxyError),
61 #[error("certmesh error: {0}")]
62 Certmesh(#[from] koi_certmesh::CertmeshError),
63 #[error("runtime error: {0}")]
64 Runtime(#[from] koi_runtime::RuntimeError),
65 #[error("client error: {0}")]
66 Client(#[from] koi_client::ClientError),
67 #[error("io error: {0}")]
68 Io(#[from] std::io::Error),
69 #[error("insecure configuration: {0}")]
70 InsecureConfig(String),
71}
72
73impl From<koi_compose::cores::BuildCoresError> for KoiError {
74 fn from(e: koi_compose::cores::BuildCoresError) -> Self {
75 use koi_compose::cores::BuildCoresError as B;
76 match e {
77 B::Mdns(e) => KoiError::Mdns(e),
78 B::Dns(e) => KoiError::Dns(e),
79 B::Proxy(e) => KoiError::Proxy(e),
80 B::Health(e) => KoiError::Health(e),
81 B::CertmeshInit(s) => KoiError::Io(std::io::Error::other(s)),
82 }
83 }
84}
85
86pub struct Builder {
87 config: KoiConfig,
88 event_handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
89 extra_firewall_ports: Vec<koi_common::firewall::FirewallPort>,
90}
91
92impl Builder {
93 pub fn new() -> Self {
94 Self {
95 config: KoiConfig::default(),
96 event_handler: None,
97 extra_firewall_ports: Vec::new(),
98 }
99 }
100
101 pub fn data_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
102 self.config.data_dir = Some(path.into());
103 self
104 }
105
106 pub fn service_endpoint(mut self, endpoint: impl Into<String>) -> Self {
107 self.config.service_endpoint = endpoint.into();
108 self
109 }
110
111 pub fn service_token(mut self, token: impl Into<String>) -> Self {
118 self.config.service_token = Some(token.into());
119 self
120 }
121
122 pub fn service_mode(mut self, mode: ServiceMode) -> Self {
123 self.config.service_mode = mode;
124 self
125 }
126
127 pub fn http(mut self, enabled: bool) -> Self {
128 self.config.http_enabled = enabled;
129 self
130 }
131
132 pub fn mdns(mut self, enabled: bool) -> Self {
133 self.config.mdns_enabled = enabled;
134 self
135 }
136
137 pub fn dns<F>(mut self, configure: F) -> Self
138 where
139 F: FnOnce(DnsConfigBuilder) -> DnsConfigBuilder,
140 {
141 let builder = DnsConfigBuilder::new(self.config.dns_config.clone());
142 self.config.dns_config = configure(builder).build();
143 self
144 }
145
146 pub fn dns_enabled(mut self, enabled: bool) -> Self {
147 self.config.dns_enabled = enabled;
148 self
149 }
150
151 pub fn dns_auto_start(mut self, enabled: bool) -> Self {
152 self.config.dns_auto_start = enabled;
153 self
154 }
155
156 pub fn health(mut self, enabled: bool) -> Self {
157 self.config.health_enabled = enabled;
158 self
159 }
160
161 pub fn health_auto_start(mut self, enabled: bool) -> Self {
162 self.config.health_auto_start = enabled;
163 self
164 }
165
166 pub fn certmesh(mut self, enabled: bool) -> Self {
167 self.config.certmesh_enabled = enabled;
168 self
169 }
170
171 pub fn proxy(mut self, enabled: bool) -> Self {
172 self.config.proxy_enabled = enabled;
173 self
174 }
175
176 pub fn proxy_auto_start(mut self, enabled: bool) -> Self {
177 self.config.proxy_auto_start = enabled;
178 self
179 }
180
181 pub fn udp(mut self, enabled: bool) -> Self {
182 self.config.udp_enabled = enabled;
183 self
184 }
185
186 pub fn runtime(mut self, kind: koi_runtime::RuntimeBackendKind) -> Self {
191 self.config.runtime_enabled = true;
192 self.config.runtime_backend = kind;
193 self
194 }
195
196 pub fn runtime_auto(mut self) -> Self {
198 self.config.runtime_enabled = true;
199 self.config.runtime_backend = koi_runtime::RuntimeBackendKind::Auto;
200 self
201 }
202
203 pub fn orchestrator(mut self, enabled: bool) -> Self {
207 self.config.orchestrator_enabled = enabled;
208 self
209 }
210
211 pub fn certmesh_background(mut self, enabled: bool) -> Self {
216 self.config.certmesh_background_enabled = enabled;
217 self
218 }
219
220 pub fn http_port(mut self, port: u16) -> Self {
225 self.config.http_port = port;
226 self
227 }
228
229 pub fn dashboard(mut self, enabled: bool) -> Self {
230 self.config.dashboard_enabled = enabled;
231 self
232 }
233
234 pub fn api_docs(mut self, enabled: bool) -> Self {
235 self.config.api_docs_enabled = enabled;
236 self
237 }
238
239 pub fn mdns_browser(mut self, enabled: bool) -> Self {
240 self.config.mdns_browser_enabled = enabled;
241 self
242 }
243
244 pub fn announce_http(mut self, enabled: bool) -> Self {
250 self.config.announce_http = enabled;
251 self
252 }
253
254 pub fn http_token(mut self, token: impl Into<String>) -> Self {
259 self.config.http_token = Some(token.into());
260 self
261 }
262
263 pub fn events<F>(mut self, handler: F) -> Self
264 where
265 F: Fn(KoiEvent) + Send + Sync + 'static,
266 {
267 self.event_handler = Some(Arc::new(handler));
268 self
269 }
270
271 pub fn extra_firewall_ports(mut self, ports: Vec<koi_common::firewall::FirewallPort>) -> Self {
276 self.extra_firewall_ports = ports;
277 self
278 }
279
280 pub fn ensure_firewall_rules(self, prefix: &str) -> Self {
291 let mut all_ports = self.config.firewall_ports();
292 all_ports.extend(self.extra_firewall_ports.iter().cloned());
293
294 let count = koi_common::firewall::ensure_firewall_rules(prefix, &all_ports);
295 if count > 0 {
296 tracing::info!(count, "Firewall rules ensured");
297 }
298 self
299 }
300
301 pub fn build(self) -> Result<KoiEmbedded> {
302 Ok(KoiEmbedded {
303 config: self.config,
304 event_handler: self.event_handler,
305 })
306 }
307}
308
309impl Default for Builder {
310 fn default() -> Self {
311 Self::new()
312 }
313}
314
315pub struct KoiEmbedded {
316 config: KoiConfig,
317 event_handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
318}
319
320impl KoiEmbedded {
321 pub async fn start(self) -> Result<KoiHandle> {
322 let cancel = CancellationToken::new();
323 let (event_tx, _) = broadcast::channel(256);
324 let mut tasks: Vec<JoinHandle<()>> = Vec::new();
325
326 if self.config.service_mode != ServiceMode::EmbeddedOnly {
327 let client = Arc::new(build_remote_client(&self.config));
328 match self.config.service_mode {
329 ServiceMode::ClientOnly => {
330 tokio::task::spawn_blocking({
331 let client = Arc::clone(&client);
332 move || client.health()
333 })
334 .await
335 .map_err(map_join_error)??;
336 return Ok(KoiHandle::new_remote(client, event_tx, cancel, tasks));
337 }
338 ServiceMode::Auto => {
339 let health = tokio::task::spawn_blocking({
340 let client = Arc::clone(&client);
341 move || client.health()
342 })
343 .await;
344 if matches!(health, Ok(Ok(()))) {
345 return Ok(KoiHandle::new_remote(client, event_tx, cancel, tasks));
346 }
347 }
348 ServiceMode::EmbeddedOnly => {}
349 }
350 }
351
352 if self.config.http_enabled && self.config.announce_http && self.config.http_token.is_none()
358 {
359 return Err(KoiError::InsecureConfig(
360 "announce_http exposes the embedded HTTP adapter on 0.0.0.0; call \
361 .http_token(..) to require x-koi-token, or drop announce_http to bind loopback"
362 .into(),
363 ));
364 }
365
366 let cores = koi_compose::cores::build_cores(
372 &koi_compose::cores::CoreSpec {
373 no_mdns: !self.config.mdns_enabled,
374 no_certmesh: !self.config.certmesh_enabled,
375 no_dns: !self.config.dns_enabled,
376 no_health: !self.config.health_enabled,
377 no_proxy: !self.config.proxy_enabled,
378 no_udp: !self.config.udp_enabled,
379 no_runtime: !self.config.runtime_enabled,
380 data_dir: self.config.data_dir.clone(),
381 dns_config: self.config.dns_config.clone(),
382 runtime: self.config.runtime_backend.to_string(),
383 http_port: self.config.http_port,
384 dns_state_path: self
387 .config
388 .data_dir
389 .as_ref()
390 .map(|dir| dir.join("state").join("dns.json")),
391 proxy_data_dir: self.config.data_dir.clone(),
392 dns_auto_start: self.config.dns_auto_start,
393 health_auto_start: self.config.health_auto_start,
394 proxy_auto_start: self.config.proxy_auto_start,
395 spawn_orchestrator: self.config.orchestrator_enabled,
396 spawn_certmesh_loops: self.config.certmesh_background_enabled,
397 fail_fast: true,
398 },
399 &cancel,
400 &mut tasks,
401 )
402 .await?;
403 let koi_compose::cores::Cores {
404 mdns,
405 certmesh,
406 dns,
407 health,
408 proxy,
409 udp,
410 runtime,
411 mdns_snapshot: mdns_bridge,
412 } = cores;
413
414 let dashboard_state = if self.config.dashboard_enabled && self.config.http_enabled {
416 let started_at = std::time::Instant::now();
417 let snap_mdns = mdns.clone();
418 let snap_certmesh = certmesh.clone();
419 let snap_dns = dns.clone();
420 let snap_health = health.clone();
421 let snap_proxy = proxy.clone();
422 let snap_udp = udp.clone();
423 let snap_runtime = runtime.clone();
424
425 let snapshot_fn: koi_dashboard::dashboard::SnapshotFn = Arc::new(move || {
426 let m = snap_mdns.clone();
427 let cm = snap_certmesh.clone();
428 let d = snap_dns.clone();
429 let h = snap_health.clone();
430 let p = snap_proxy.clone();
431 let u = snap_udp.clone();
432 let rt = snap_runtime.clone();
433 Box::pin(async move { build_embedded_snapshot(m, cm, d, h, p, u, rt).await })
434 });
435
436 let (dash_event_tx, _) = broadcast::channel(256);
437 let ds = koi_dashboard::dashboard::DashboardState {
438 identity: koi_dashboard::dashboard::DashboardIdentity {
439 version: env!("CARGO_PKG_VERSION").to_string(),
440 platform: std::env::consts::OS.to_string(),
441 },
442 mode: "embedded",
443 snapshot_fn,
444 event_tx: dash_event_tx.clone(),
445 started_at,
446 };
447
448 tasks.push(koi_dashboard::forward::spawn_event_forwarder(
451 koi_dashboard::forward::ForwarderCores {
452 mdns: mdns.clone(),
453 certmesh: certmesh.clone(),
454 dns: dns.clone(),
455 health: health.clone(),
456 proxy: proxy.clone(),
457 runtime: runtime.clone(),
458 },
459 dash_event_tx,
460 cancel.clone(),
461 ));
462
463 Some(ds)
464 } else {
465 None
466 };
467
468 let browser_state = if self.config.mdns_browser_enabled && self.config.http_enabled {
471 if let Some(ref mdns_core) = mdns {
472 Some(koi_dashboard::browser::build_state(
473 mdns_core.clone(),
474 cancel.clone(),
475 ))
476 } else {
477 tracing::warn!("mdns_browser enabled but mDNS is disabled — skipping browser");
478 None
479 }
480 } else {
481 None
482 };
483
484 let mut http_addr: Option<std::net::SocketAddr> = None;
494 if self.config.http_enabled {
495 let http_cancel = cancel.clone();
496 let http_cores = koi_compose::cores::Cores {
497 mdns: mdns.clone(),
498 certmesh: certmesh.clone(),
499 dns: dns.clone(),
500 health: health.clone(),
501 proxy: proxy.clone(),
502 udp: udp.clone(),
503 runtime: runtime.clone(),
504 mdns_snapshot: mdns_bridge.clone(),
505 };
506 let exposed = self.config.announce_http;
509 let bind_ip = if exposed {
510 std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED)
511 } else {
512 std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST)
513 };
514 let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
515 let http_cfg = koi_serve::http::HttpConfig {
516 bind_ip,
517 port: self.config.http_port,
518 started_at: std::time::Instant::now(),
519 dashboard: dashboard_state,
520 browser: browser_state,
521 auth: self.config.http_token.clone(),
522 mdns_snapshot: mdns_bridge.clone(),
523 mcp_http: false,
524 admin_shutdown: false,
525 api_docs: self.config.api_docs_enabled,
526 daemon: false,
527 ready: Some(ready_tx),
528 };
529 tasks.push(tokio::spawn(async move {
530 if let Err(e) = koi_serve::http::start(http_cores, http_cfg, http_cancel).await {
531 tracing::error!(error = %e, "embedded HTTP adapter failed");
532 }
533 }));
534 http_addr = ready_rx.await.ok();
538 }
539
540 let announce_cores = koi_compose::cores::Cores {
546 mdns: mdns.clone(),
547 certmesh: certmesh.clone(),
548 dns: dns.clone(),
549 health: health.clone(),
550 proxy: proxy.clone(),
551 udp: udp.clone(),
552 runtime: runtime.clone(),
553 mdns_snapshot: mdns_bridge.clone(),
554 };
555 let announce_http_port = http_addr.map(|a| a.port()).unwrap_or(self.config.http_port);
561 koi_compose::self_announce::spawn(
562 &announce_cores,
563 koi_compose::self_announce::SelfAnnounceConfig {
564 http_port: announce_http_port,
565 dashboard_enabled: self.config.dashboard_enabled,
566 announce_http: self.config.announce_http
567 && self.config.http_enabled
568 && self.config.mdns_enabled,
569 announce_mcp: false,
570 dns_zone: self.config.dns_config.zone.clone(),
571 },
572 cancel.clone(),
573 &mut tasks,
574 );
575
576 if let Some(core) = &mdns {
581 spawn_event_mapper(
582 core.subscribe(),
583 map_mdns_event,
584 event_tx.clone(),
585 self.event_handler.clone(),
586 cancel.clone(),
587 &mut tasks,
588 );
589 }
590 if let Some(runtime) = &health {
591 spawn_event_mapper(
592 runtime.core().subscribe(),
593 |e| Some(map_health_event(e)),
594 event_tx.clone(),
595 self.event_handler.clone(),
596 cancel.clone(),
597 &mut tasks,
598 );
599 }
600 if let Some(runtime) = &dns {
601 spawn_event_mapper(
602 runtime.core().subscribe(),
603 |e| Some(map_dns_event(e)),
604 event_tx.clone(),
605 self.event_handler.clone(),
606 cancel.clone(),
607 &mut tasks,
608 );
609 }
610 if let Some(core) = &certmesh {
611 spawn_event_mapper(
612 core.subscribe(),
613 |e| Some(map_certmesh_event(e)),
614 event_tx.clone(),
615 self.event_handler.clone(),
616 cancel.clone(),
617 &mut tasks,
618 );
619 spawn_posture_watcher(
623 core.watch_posture(),
624 event_tx.clone(),
625 self.event_handler.clone(),
626 cancel.clone(),
627 &mut tasks,
628 );
629 }
630 if let Some(runtime_proxy) = &proxy {
631 spawn_event_mapper(
632 runtime_proxy.core().subscribe(),
633 |e| Some(map_proxy_event(e)),
634 event_tx.clone(),
635 self.event_handler.clone(),
636 cancel.clone(),
637 &mut tasks,
638 );
639 }
640 if let Some(runtime_core) = &runtime {
641 spawn_event_mapper(
642 runtime_core.subscribe(),
643 map_runtime_event,
644 event_tx.clone(),
645 self.event_handler.clone(),
646 cancel.clone(),
647 &mut tasks,
648 );
649 }
650
651 if self.config.orchestrator_enabled && runtime.is_none() {
656 tracing::warn!(
657 "orchestrator enabled but the runtime adapter is not — skipping orchestrator"
658 );
659 }
660
661 if self.config.certmesh_background_enabled {
666 if let Some(ref certmesh_core) = certmesh {
667 koi_compose::certmesh::spawn_enrollment_approval(
668 certmesh_core,
669 koi_compose::certmesh::deny_and_log_decider(),
670 &cancel,
671 &mut tasks,
672 )
673 .await;
674 } else {
675 tracing::warn!(
676 "certmesh_background enabled but certmesh is not — skipping certmesh loops"
677 );
678 }
679 }
680
681 Ok(KoiHandle::new_embedded(
682 mdns,
683 dns,
684 health,
685 certmesh,
686 proxy,
687 udp,
688 runtime,
689 http_addr,
690 self.config.data_dir.clone(),
691 event_tx,
692 cancel,
693 tasks,
694 ))
695 }
696}
697
698fn build_remote_client(config: &KoiConfig) -> KoiClient {
706 if let Some(token) = &config.service_token {
707 return KoiClient::with_token(&config.service_endpoint, token);
708 }
709 if let Some(bc) = koi_config::breadcrumb::read_breadcrumb() {
710 if endpoints_match(&bc.endpoint, &config.service_endpoint) {
711 return KoiClient::with_token(&config.service_endpoint, &bc.token);
712 }
713 }
714 KoiClient::new(&config.service_endpoint)
715}
716
717fn endpoints_match(a: &str, b: &str) -> bool {
722 fn norm(s: &str) -> String {
723 s.trim_end_matches('/')
724 .to_ascii_lowercase()
725 .replace("localhost", "127.0.0.1")
726 }
727 norm(a) == norm(b)
728}
729
730fn map_mdns_event(event: MdnsEvent) -> Option<KoiEvent> {
731 match event {
732 MdnsEvent::Found(record) => Some(KoiEvent::MdnsFound(record)),
733 MdnsEvent::Resolved(record) => Some(KoiEvent::MdnsResolved(record)),
734 MdnsEvent::Removed { name, service_type } => {
735 Some(KoiEvent::MdnsRemoved { name, service_type })
736 }
737 }
738}
739
740fn map_health_event(event: koi_health::HealthEvent) -> KoiEvent {
741 match event {
742 koi_health::HealthEvent::StatusChanged { name, status } => {
743 KoiEvent::HealthChanged { name, status }
744 }
745 }
746}
747
748fn map_dns_event(event: koi_dns::DnsEvent) -> KoiEvent {
749 match event {
750 koi_dns::DnsEvent::EntryUpdated { name, ip } => KoiEvent::DnsEntryUpdated { name, ip },
751 koi_dns::DnsEvent::EntryRemoved { name } => KoiEvent::DnsEntryRemoved { name },
752 }
753}
754
755fn map_certmesh_event(event: koi_certmesh::CertmeshEvent) -> KoiEvent {
756 match event {
757 koi_certmesh::CertmeshEvent::MemberJoined {
758 hostname,
759 fingerprint,
760 } => KoiEvent::CertmeshMemberJoined {
761 hostname,
762 fingerprint,
763 },
764 koi_certmesh::CertmeshEvent::MemberRevoked { hostname } => {
765 KoiEvent::CertmeshMemberRevoked { hostname }
766 }
767 koi_certmesh::CertmeshEvent::Destroyed => KoiEvent::CertmeshDestroyed,
768 koi_certmesh::CertmeshEvent::CertRenewed { expires_at } => {
769 KoiEvent::CertRenewed { expires_at }
770 }
771 koi_certmesh::CertmeshEvent::CertExpiringSoon { days_left } => {
772 KoiEvent::CertExpiringSoon { days_left }
773 }
774 koi_certmesh::CertmeshEvent::CertRenewalFailed {
775 reason,
776 consecutive_failures,
777 } => KoiEvent::CertRenewalFailed {
778 reason,
779 consecutive_failures,
780 },
781 koi_certmesh::CertmeshEvent::BundleUpdated { self_revoked } => {
782 KoiEvent::BundleUpdated { self_revoked }
783 }
784 }
785}
786
787fn map_proxy_event(event: koi_proxy::ProxyEvent) -> KoiEvent {
788 match event {
789 koi_proxy::ProxyEvent::EntryUpdated { entry } => KoiEvent::ProxyEntryUpdated { entry },
790 koi_proxy::ProxyEvent::EntryRemoved { name } => KoiEvent::ProxyEntryRemoved { name },
791 }
792}
793
794fn map_runtime_event(event: koi_runtime::RuntimeEvent) -> Option<KoiEvent> {
795 match event {
796 koi_runtime::RuntimeEvent::Started(instance) => Some(KoiEvent::RuntimeInstanceStarted {
797 name: instance.name,
798 backend: instance.backend,
799 }),
800 koi_runtime::RuntimeEvent::Stopped { name, .. } => {
801 Some(KoiEvent::RuntimeInstanceStopped { name })
802 }
803 _ => None,
806 }
807}
808
809fn spawn_event_mapper<E, F>(
816 mut rx: broadcast::Receiver<E>,
817 map: F,
818 tx: broadcast::Sender<KoiEvent>,
819 handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
820 cancel: CancellationToken,
821 tasks: &mut Vec<JoinHandle<()>>,
822) where
823 E: Clone + Send + 'static,
824 F: Fn(E) -> Option<KoiEvent> + Send + 'static,
825{
826 tasks.push(tokio::spawn(async move {
827 loop {
828 tokio::select! {
829 _ = cancel.cancelled() => break,
830 msg = rx.recv() => {
831 let Ok(event) = msg else { continue; };
832 if let Some(mapped) = map(event) {
833 emit_event(&tx, handler.as_ref(), mapped);
834 }
835 }
836 }
837 }
838 }));
839}
840
841fn spawn_posture_watcher(
847 mut rx: tokio::sync::watch::Receiver<koi_common::posture::Posture>,
848 tx: broadcast::Sender<KoiEvent>,
849 handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
850 cancel: CancellationToken,
851 tasks: &mut Vec<JoinHandle<()>>,
852) {
853 tasks.push(tokio::spawn(async move {
854 let mut last = *rx.borrow_and_update();
855 loop {
856 tokio::select! {
857 _ = cancel.cancelled() => break,
858 res = rx.changed() => {
859 if res.is_err() {
860 break; }
862 let to = *rx.borrow_and_update();
863 if to != last {
864 emit_event(&tx, handler.as_ref(), KoiEvent::PostureChanged { from: last, to });
865 last = to;
866 }
867 }
868 }
869 }
870 }));
871}
872
873fn emit_event(
874 tx: &broadcast::Sender<KoiEvent>,
875 handler: Option<&Arc<dyn Fn(KoiEvent) + Send + Sync>>,
876 event: KoiEvent,
877) {
878 if let Some(handler) = handler {
879 handler(event.clone());
880 }
881 let _ = tx.send(event);
882}
883
884pub(crate) fn map_join_error(err: tokio::task::JoinError) -> KoiError {
885 KoiError::Io(std::io::Error::other(err.to_string()))
886}
887
888async fn build_embedded_snapshot(
894 mdns: Option<Arc<koi_mdns::MdnsCore>>,
895 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
896 dns: Option<Arc<koi_dns::DnsRuntime>>,
897 health: Option<Arc<koi_health::HealthRuntime>>,
898 proxy: Option<Arc<koi_proxy::ProxyRuntime>>,
899 udp: Option<Arc<koi_udp::UdpRuntime>>,
900 runtime: Option<Arc<koi_runtime::RuntimeCore>>,
901) -> serde_json::Value {
902 let cores = koi_compose::cores::Cores {
903 mdns,
904 certmesh,
905 dns,
906 health,
907 proxy,
908 udp,
909 runtime,
910 mdns_snapshot: None,
911 };
912 koi_compose::snapshot::build_dashboard_snapshot(&cores).await
913}
914
915#[cfg(test)]
916mod tests {
917 use super::*;
918 use koi_common::types::ServiceRecord;
919 use std::collections::HashMap;
920
921 fn sample_record() -> ServiceRecord {
922 ServiceRecord {
923 name: "Test Service".to_string(),
924 service_type: "_http._tcp".to_string(),
925 host: Some("host.local".to_string()),
926 ip: Some("10.0.0.1".to_string()),
927 port: Some(8080),
928 txt: HashMap::new(),
929 }
930 }
931
932 #[test]
935 fn koi_error_disabled_capability_display() {
936 let err = KoiError::DisabledCapability("mdns");
937 assert_eq!(err.to_string(), "capability disabled: mdns");
938 }
939
940 #[test]
941 fn koi_error_io_from_impl() {
942 let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "file missing");
943 let err: KoiError = io_err.into();
944 assert!(matches!(err, KoiError::Io(_)));
945 assert!(err.to_string().contains("file missing"));
946 }
947
948 #[test]
949 fn koi_error_debug_does_not_panic() {
950 let err = KoiError::DisabledCapability("proxy");
951 let debug = format!("{err:?}");
952 assert!(debug.contains("DisabledCapability"));
953 }
954
955 #[tokio::test]
958 async fn init_certmesh_core_honors_custom_data_dir_end_to_end() {
959 let base = koi_common::test::ensure_data_dir("koi-embedded-datadir-tests");
963 let data_dir = base.join("custom-data");
964 let paths = koi_certmesh::CertmeshPaths::with_data_dir(data_dir.clone());
965
966 let fresh =
970 koi_compose::cores::init_certmesh_core(Some(&data_dir)).expect("uninitialized core");
971 assert_eq!(
972 fresh.paths().data_dir(),
973 data_dir.as_path(),
974 "uninitialized core must keep the injected data_dir"
975 );
976
977 koi_certmesh::ca::create_ca("test-pass-strong", &[7u8; 32], &paths)
979 .expect("create CA under injected dir");
980 let roster = koi_certmesh::roster::Roster::new(false, true, Some("ops".to_string()));
982 koi_certmesh::roster::save_roster(&roster, &paths.roster_path())
983 .expect("save roster under injected dir");
984
985 let reopened =
988 koi_compose::cores::init_certmesh_core(Some(&data_dir)).expect("locked core");
989 assert_eq!(reopened.paths().data_dir(), data_dir.as_path());
990 reopened
991 .unlock("test-pass-strong")
992 .await
993 .expect("unlock CA from the injected data_dir");
994 }
995
996 #[test]
999 fn map_mdns_found() {
1000 let record = sample_record();
1001 let event = koi_mdns::MdnsEvent::Found(record.clone());
1002 let mapped = map_mdns_event(event);
1003 assert!(mapped.is_some());
1004 match mapped.unwrap() {
1005 KoiEvent::MdnsFound(r) => assert_eq!(r.name, "Test Service"),
1006 other => panic!("expected MdnsFound, got {other:?}"),
1007 }
1008 }
1009
1010 #[test]
1011 fn map_mdns_resolved() {
1012 let record = sample_record();
1013 let event = koi_mdns::MdnsEvent::Resolved(record);
1014 let mapped = map_mdns_event(event);
1015 assert!(mapped.is_some());
1016 match mapped.unwrap() {
1017 KoiEvent::MdnsResolved(r) => {
1018 assert_eq!(r.port, Some(8080));
1019 assert_eq!(r.service_type, "_http._tcp");
1020 }
1021 other => panic!("expected MdnsResolved, got {other:?}"),
1022 }
1023 }
1024
1025 #[test]
1026 fn map_mdns_removed() {
1027 let event = koi_mdns::MdnsEvent::Removed {
1028 name: "Gone Service".to_string(),
1029 service_type: "_http._tcp".to_string(),
1030 };
1031 let mapped = map_mdns_event(event);
1032 assert!(mapped.is_some());
1033 match mapped.unwrap() {
1034 KoiEvent::MdnsRemoved { name, service_type } => {
1035 assert_eq!(name, "Gone Service");
1036 assert_eq!(service_type, "_http._tcp");
1037 }
1038 other => panic!("expected MdnsRemoved, got {other:?}"),
1039 }
1040 }
1041
1042 #[test]
1045 fn map_health_status_changed_up() {
1046 let event = koi_health::HealthEvent::StatusChanged {
1047 name: "api".to_string(),
1048 status: koi_health::HealthStatus::Up,
1049 };
1050 let mapped = map_health_event(event);
1051 match mapped {
1052 KoiEvent::HealthChanged { name, status } => {
1053 assert_eq!(name, "api");
1054 assert!(matches!(status, koi_health::HealthStatus::Up));
1055 }
1056 other => panic!("expected HealthChanged, got {other:?}"),
1057 }
1058 }
1059
1060 #[test]
1061 fn map_health_status_changed_down() {
1062 let event = koi_health::HealthEvent::StatusChanged {
1063 name: "db".to_string(),
1064 status: koi_health::HealthStatus::Down,
1065 };
1066 let mapped = map_health_event(event);
1067 match mapped {
1068 KoiEvent::HealthChanged { name, status } => {
1069 assert_eq!(name, "db");
1070 assert!(matches!(status, koi_health::HealthStatus::Down));
1071 }
1072 other => panic!("expected HealthChanged, got {other:?}"),
1073 }
1074 }
1075
1076 #[test]
1079 fn map_dns_entry_updated() {
1080 let event = koi_dns::DnsEvent::EntryUpdated {
1081 name: "grafana".to_string(),
1082 ip: "10.0.0.5".to_string(),
1083 };
1084 let mapped = map_dns_event(event);
1085 match mapped {
1086 KoiEvent::DnsEntryUpdated { name, ip } => {
1087 assert_eq!(name, "grafana");
1088 assert_eq!(ip, "10.0.0.5");
1089 }
1090 other => panic!("expected DnsEntryUpdated, got {other:?}"),
1091 }
1092 }
1093
1094 #[test]
1095 fn map_dns_entry_removed() {
1096 let event = koi_dns::DnsEvent::EntryRemoved {
1097 name: "old-host".to_string(),
1098 };
1099 let mapped = map_dns_event(event);
1100 match mapped {
1101 KoiEvent::DnsEntryRemoved { name } => {
1102 assert_eq!(name, "old-host");
1103 }
1104 other => panic!("expected DnsEntryRemoved, got {other:?}"),
1105 }
1106 }
1107
1108 #[test]
1111 fn map_certmesh_member_joined() {
1112 let event = koi_certmesh::CertmeshEvent::MemberJoined {
1113 hostname: "node-a".to_string(),
1114 fingerprint: "sha256:abc".to_string(),
1115 };
1116 let mapped = map_certmesh_event(event);
1117 match mapped {
1118 KoiEvent::CertmeshMemberJoined {
1119 hostname,
1120 fingerprint,
1121 } => {
1122 assert_eq!(hostname, "node-a");
1123 assert_eq!(fingerprint, "sha256:abc");
1124 }
1125 other => panic!("expected CertmeshMemberJoined, got {other:?}"),
1126 }
1127 }
1128
1129 #[test]
1130 fn map_certmesh_member_revoked() {
1131 let event = koi_certmesh::CertmeshEvent::MemberRevoked {
1132 hostname: "node-b".to_string(),
1133 };
1134 let mapped = map_certmesh_event(event);
1135 match mapped {
1136 KoiEvent::CertmeshMemberRevoked { hostname } => {
1137 assert_eq!(hostname, "node-b");
1138 }
1139 other => panic!("expected CertmeshMemberRevoked, got {other:?}"),
1140 }
1141 }
1142
1143 #[test]
1144 fn map_certmesh_destroyed() {
1145 let event = koi_certmesh::CertmeshEvent::Destroyed;
1146 let mapped = map_certmesh_event(event);
1147 assert!(matches!(mapped, KoiEvent::CertmeshDestroyed));
1148 }
1149
1150 #[tokio::test]
1151 async fn posture_watcher_emits_upgrade_and_degrade() {
1152 use koi_common::posture::Posture;
1153 let (tx_p, rx_p) = tokio::sync::watch::channel(Posture::OPEN);
1154 let (ev_tx, mut ev_rx) = broadcast::channel(16);
1155 let cancel = CancellationToken::new();
1156 let mut tasks = Vec::new();
1157 spawn_posture_watcher(rx_p, ev_tx, None, cancel.clone(), &mut tasks);
1158 tokio::task::yield_now().await;
1162
1163 tx_p.send(Posture::new(true, false)).unwrap();
1165 let ev = tokio::time::timeout(std::time::Duration::from_secs(1), ev_rx.recv())
1166 .await
1167 .expect("event arrives")
1168 .expect("recv ok");
1169 assert!(
1170 matches!(ev, KoiEvent::PostureChanged { from, to } if !from.signed && to.signed),
1171 "expected upgrade, got {ev:?}"
1172 );
1173
1174 tx_p.send(Posture::OPEN).unwrap();
1176 let ev = tokio::time::timeout(std::time::Duration::from_secs(1), ev_rx.recv())
1177 .await
1178 .expect("event arrives")
1179 .expect("recv ok");
1180 assert!(
1181 matches!(ev, KoiEvent::PostureChanged { from, to } if from.signed && !to.signed),
1182 "expected degrade, got {ev:?}"
1183 );
1184
1185 cancel.cancel();
1186 for t in tasks {
1187 let _ = t.await;
1188 }
1189 }
1190
1191 #[test]
1194 fn map_proxy_entry_updated() {
1195 let entry = koi_proxy::ProxyEntry {
1196 name: "web".to_string(),
1197 listen_port: 443,
1198 backend: "http://localhost:3000".to_string(),
1199 allow_remote: true,
1200 };
1201 let event = koi_proxy::ProxyEvent::EntryUpdated {
1202 entry: entry.clone(),
1203 };
1204 let mapped = map_proxy_event(event);
1205 match mapped {
1206 KoiEvent::ProxyEntryUpdated { entry } => {
1207 assert_eq!(entry.name, "web");
1208 assert_eq!(entry.listen_port, 443);
1209 assert!(entry.allow_remote);
1210 }
1211 other => panic!("expected ProxyEntryUpdated, got {other:?}"),
1212 }
1213 }
1214
1215 #[test]
1216 fn map_proxy_entry_removed() {
1217 let event = koi_proxy::ProxyEvent::EntryRemoved {
1218 name: "old-proxy".to_string(),
1219 };
1220 let mapped = map_proxy_event(event);
1221 match mapped {
1222 KoiEvent::ProxyEntryRemoved { name } => {
1223 assert_eq!(name, "old-proxy");
1224 }
1225 other => panic!("expected ProxyEntryRemoved, got {other:?}"),
1226 }
1227 }
1228
1229 #[test]
1232 fn map_join_error_produces_io_error() {
1233 let io_err = std::io::Error::other("simulated join error");
1236 let koi_err = KoiError::Io(io_err);
1237 assert!(koi_err.to_string().contains("simulated join error"));
1238 }
1239
1240 #[test]
1243 fn builder_default_config() {
1244 let builder = Builder::new();
1245 let embedded = builder.build().expect("build should succeed");
1246 assert!(embedded.config.mdns_enabled);
1247 assert!(!embedded.config.http_enabled);
1248 assert_eq!(embedded.config.http_port, 5641);
1249 }
1250
1251 #[test]
1252 fn builder_default_trait() {
1253 let builder = Builder::default();
1254 let embedded = builder.build().expect("build should succeed");
1255 assert_eq!(embedded.config.service_endpoint, "http://127.0.0.1:5641");
1256 }
1257
1258 #[test]
1259 fn service_token_builder_sets_token() {
1260 let embedded = Builder::new()
1261 .service_token("secret-token")
1262 .build()
1263 .expect("build should succeed");
1264 assert_eq!(
1265 embedded.config.service_token.as_deref(),
1266 Some("secret-token")
1267 );
1268 }
1269
1270 #[test]
1271 fn endpoints_match_treats_localhost_as_loopback() {
1272 assert!(endpoints_match(
1273 "http://localhost:5641",
1274 "http://127.0.0.1:5641"
1275 ));
1276 assert!(endpoints_match(
1277 "http://127.0.0.1:5641/",
1278 "http://127.0.0.1:5641"
1279 ));
1280 assert!(endpoints_match(
1281 "HTTP://LOCALHOST:5641",
1282 "http://127.0.0.1:5641"
1283 ));
1284 }
1285
1286 #[test]
1287 fn endpoints_match_rejects_different_hosts() {
1288 assert!(!endpoints_match(
1290 "http://127.0.0.1:5641",
1291 "http://10.0.0.1:5641"
1292 ));
1293 assert!(!endpoints_match(
1294 "http://127.0.0.1:5641",
1295 "http://127.0.0.1:9999"
1296 ));
1297 }
1298
1299 #[test]
1300 fn builder_fluent_overrides() {
1301 let embedded = Builder::new()
1302 .http(true)
1303 .mdns(false)
1304 .dns_enabled(false)
1305 .health(true)
1306 .certmesh(true)
1307 .proxy(true)
1308 .udp(true)
1309 .http_port(9000)
1310 .dashboard(true)
1311 .api_docs(true)
1312 .mdns_browser(true)
1313 .announce_http(true)
1314 .dns_auto_start(true)
1315 .health_auto_start(true)
1316 .proxy_auto_start(true)
1317 .service_endpoint("http://10.0.0.1:8080")
1318 .service_mode(ServiceMode::EmbeddedOnly)
1319 .data_dir("/tmp/koi-test")
1320 .build()
1321 .expect("build should succeed");
1322
1323 assert!(embedded.config.http_enabled);
1324 assert!(!embedded.config.mdns_enabled);
1325 assert!(!embedded.config.dns_enabled);
1326 assert!(embedded.config.health_enabled);
1327 assert!(embedded.config.certmesh_enabled);
1328 assert!(embedded.config.proxy_enabled);
1329 assert!(embedded.config.udp_enabled);
1330 assert_eq!(embedded.config.http_port, 9000);
1331 assert!(embedded.config.dashboard_enabled);
1332 assert!(embedded.config.api_docs_enabled);
1333 assert!(embedded.config.mdns_browser_enabled);
1334 assert!(embedded.config.announce_http);
1335 assert!(embedded.config.dns_auto_start);
1336 assert!(embedded.config.health_auto_start);
1337 assert!(embedded.config.proxy_auto_start);
1338 assert_eq!(embedded.config.service_endpoint, "http://10.0.0.1:8080");
1339 assert_eq!(embedded.config.service_mode, ServiceMode::EmbeddedOnly);
1340 assert_eq!(
1341 embedded.config.data_dir,
1342 Some(std::path::PathBuf::from("/tmp/koi-test"))
1343 );
1344 }
1345
1346 #[test]
1347 fn orchestrator_and_certmesh_background_are_opt_in() {
1348 let default_cfg = Builder::new().build().expect("build should succeed");
1350 assert!(!default_cfg.config.orchestrator_enabled);
1351 assert!(!default_cfg.config.certmesh_background_enabled);
1352
1353 let opted = Builder::new()
1355 .runtime_auto()
1356 .orchestrator(true)
1357 .certmesh(true)
1358 .certmesh_background(true)
1359 .build()
1360 .expect("build should succeed");
1361 assert!(opted.config.orchestrator_enabled);
1362 assert!(opted.config.certmesh_background_enabled);
1363 }
1364
1365 #[test]
1366 fn builder_dns_configure_closure() {
1367 let embedded = Builder::new()
1368 .dns(|b| b.port(5353).zone("home").local_ttl(120))
1369 .build()
1370 .expect("build should succeed");
1371
1372 assert_eq!(embedded.config.dns_config.port, 5353);
1373 assert_eq!(embedded.config.dns_config.zone, "home");
1374 assert_eq!(embedded.config.dns_config.local_ttl, 120);
1375 }
1376
1377 #[test]
1378 fn builder_event_handler() {
1379 use std::sync::atomic::{AtomicBool, Ordering};
1380 let called = Arc::new(AtomicBool::new(false));
1381 let called_clone = called.clone();
1382
1383 let embedded = Builder::new()
1384 .events(move |_event| {
1385 called_clone.store(true, Ordering::SeqCst);
1386 })
1387 .build()
1388 .expect("build should succeed");
1389
1390 assert!(embedded.event_handler.is_some());
1391 }
1392
1393 #[test]
1394 fn builder_extra_firewall_ports() {
1395 use koi_common::firewall::{FirewallPort, FirewallProtocol};
1396 let extra = vec![FirewallPort::new("Custom", FirewallProtocol::Tcp, 12345)];
1397 let _builder = Builder::new().extra_firewall_ports(extra);
1398 }
1400
1401 #[test]
1404 fn result_type_works_with_ok() {
1405 let result: Result<i32> = Ok(42);
1406 assert!(matches!(result, Ok(42)));
1407 }
1408
1409 #[test]
1410 fn result_type_works_with_err() {
1411 let result: Result<i32> = Err(KoiError::DisabledCapability("test"));
1412 assert!(result.is_err());
1413 }
1414}