1mod config;
2mod events;
3mod handle;
4pub(crate) mod http;
5mod mdns_browse_adapter;
6
7use std::sync::Arc;
8
9use tokio::sync::broadcast;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12
13use koi_client::KoiClient;
14
15pub use config::{DnsConfigBuilder, KoiConfig, ServiceMode};
16pub use events::KoiEvent;
17pub use handle::{CertmeshHandle, DnsHandle, HealthHandle, KoiHandle, MdnsHandle, ProxyHandle};
18
19pub use koi_common::firewall::{FirewallPort, FirewallProtocol};
21pub use koi_common::types::ServiceRecord;
22pub use koi_config::state::DnsEntry;
23pub use koi_health::{HealthCheck, HealthSnapshot, ServiceCheckKind};
24pub use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
25pub use koi_mdns::MdnsEvent;
26pub use koi_proxy::ProxyEntry;
27
28pub use koi_crypto::vault::{Vault, VaultError};
30
31pub use koi_runtime::{RuntimeBackendKind, RuntimeConfig};
33
34pub type Result<T> = std::result::Result<T, KoiError>;
35
36#[derive(Debug, thiserror::Error)]
37pub enum KoiError {
38 #[error("capability disabled: {0}")]
39 DisabledCapability(&'static str),
40 #[error("mdns error: {0}")]
41 Mdns(#[from] koi_mdns::MdnsError),
42 #[error("dns error: {0}")]
43 Dns(#[from] koi_dns::DnsError),
44 #[error("health error: {0}")]
45 Health(#[from] koi_health::HealthError),
46 #[error("proxy error: {0}")]
47 Proxy(#[from] koi_proxy::ProxyError),
48 #[error("certmesh error: {0}")]
49 Certmesh(#[from] koi_certmesh::CertmeshError),
50 #[error("runtime error: {0}")]
51 Runtime(#[from] koi_runtime::RuntimeError),
52 #[error("client error: {0}")]
53 Client(#[from] koi_client::ClientError),
54 #[error("io error: {0}")]
55 Io(#[from] std::io::Error),
56}
57
58pub struct Builder {
59 config: KoiConfig,
60 event_handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
61 extra_firewall_ports: Vec<koi_common::firewall::FirewallPort>,
62}
63
64impl Builder {
65 pub fn new() -> Self {
66 Self {
67 config: KoiConfig::default(),
68 event_handler: None,
69 extra_firewall_ports: Vec::new(),
70 }
71 }
72
73 pub fn data_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
74 self.config.data_dir = Some(path.into());
75 self
76 }
77
78 pub fn service_endpoint(mut self, endpoint: impl Into<String>) -> Self {
79 self.config.service_endpoint = endpoint.into();
80 self
81 }
82
83 pub fn service_mode(mut self, mode: ServiceMode) -> Self {
84 self.config.service_mode = mode;
85 self
86 }
87
88 pub fn http(mut self, enabled: bool) -> Self {
89 self.config.http_enabled = enabled;
90 self
91 }
92
93 pub fn mdns(mut self, enabled: bool) -> Self {
94 self.config.mdns_enabled = enabled;
95 self
96 }
97
98 pub fn dns<F>(mut self, configure: F) -> Self
99 where
100 F: FnOnce(DnsConfigBuilder) -> DnsConfigBuilder,
101 {
102 let builder = DnsConfigBuilder::new(self.config.dns_config.clone());
103 self.config.dns_config = configure(builder).build();
104 self
105 }
106
107 pub fn dns_enabled(mut self, enabled: bool) -> Self {
108 self.config.dns_enabled = enabled;
109 self
110 }
111
112 pub fn dns_auto_start(mut self, enabled: bool) -> Self {
113 self.config.dns_auto_start = enabled;
114 self
115 }
116
117 pub fn health(mut self, enabled: bool) -> Self {
118 self.config.health_enabled = enabled;
119 self
120 }
121
122 pub fn health_auto_start(mut self, enabled: bool) -> Self {
123 self.config.health_auto_start = enabled;
124 self
125 }
126
127 pub fn certmesh(mut self, enabled: bool) -> Self {
128 self.config.certmesh_enabled = enabled;
129 self
130 }
131
132 pub fn proxy(mut self, enabled: bool) -> Self {
133 self.config.proxy_enabled = enabled;
134 self
135 }
136
137 pub fn proxy_auto_start(mut self, enabled: bool) -> Self {
138 self.config.proxy_auto_start = enabled;
139 self
140 }
141
142 pub fn udp(mut self, enabled: bool) -> Self {
143 self.config.udp_enabled = enabled;
144 self
145 }
146
147 pub fn runtime(mut self, kind: koi_runtime::RuntimeBackendKind) -> Self {
152 self.config.runtime_enabled = true;
153 self.config.runtime_backend = kind;
154 self
155 }
156
157 pub fn runtime_auto(mut self) -> Self {
159 self.config.runtime_enabled = true;
160 self.config.runtime_backend = koi_runtime::RuntimeBackendKind::Auto;
161 self
162 }
163
164 pub fn http_port(mut self, port: u16) -> Self {
165 self.config.http_port = port;
166 self
167 }
168
169 pub fn dashboard(mut self, enabled: bool) -> Self {
170 self.config.dashboard_enabled = enabled;
171 self
172 }
173
174 pub fn api_docs(mut self, enabled: bool) -> Self {
175 self.config.api_docs_enabled = enabled;
176 self
177 }
178
179 pub fn mdns_browser(mut self, enabled: bool) -> Self {
180 self.config.mdns_browser_enabled = enabled;
181 self
182 }
183
184 pub fn announce_http(mut self, enabled: bool) -> Self {
185 self.config.announce_http = enabled;
186 self
187 }
188
189 pub fn events<F>(mut self, handler: F) -> Self
190 where
191 F: Fn(KoiEvent) + Send + Sync + 'static,
192 {
193 self.event_handler = Some(Arc::new(handler));
194 self
195 }
196
197 pub fn extra_firewall_ports(mut self, ports: Vec<koi_common::firewall::FirewallPort>) -> Self {
202 self.extra_firewall_ports = ports;
203 self
204 }
205
206 pub fn ensure_firewall_rules(self, prefix: &str) -> Self {
217 let mut all_ports = self.config.firewall_ports();
218 all_ports.extend(self.extra_firewall_ports.iter().cloned());
219
220 let count = koi_common::firewall::ensure_firewall_rules(prefix, &all_ports);
221 if count > 0 {
222 tracing::info!(count, "Firewall rules ensured");
223 }
224 self
225 }
226
227 pub fn build(self) -> Result<KoiEmbedded> {
228 Ok(KoiEmbedded {
229 config: self.config,
230 event_handler: self.event_handler,
231 })
232 }
233}
234
235impl Default for Builder {
236 fn default() -> Self {
237 Self::new()
238 }
239}
240
241pub struct KoiEmbedded {
242 config: KoiConfig,
243 event_handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
244}
245
246impl KoiEmbedded {
247 pub async fn start(self) -> Result<KoiHandle> {
248 let cancel = CancellationToken::new();
249 let (event_tx, _) = broadcast::channel(256);
250 let mut tasks: Vec<JoinHandle<()>> = Vec::new();
251
252 if self.config.service_mode != ServiceMode::EmbeddedOnly {
253 let client = Arc::new(KoiClient::new(&self.config.service_endpoint));
254 match self.config.service_mode {
255 ServiceMode::ClientOnly => {
256 tokio::task::spawn_blocking({
257 let client = Arc::clone(&client);
258 move || client.health()
259 })
260 .await
261 .map_err(map_join_error)??;
262 return Ok(KoiHandle::new_remote(client, event_tx, cancel, tasks));
263 }
264 ServiceMode::Auto => {
265 let health = tokio::task::spawn_blocking({
266 let client = Arc::clone(&client);
267 move || client.health()
268 })
269 .await;
270 if matches!(health, Ok(Ok(()))) {
271 return Ok(KoiHandle::new_remote(client, event_tx, cancel, tasks));
272 }
273 }
274 ServiceMode::EmbeddedOnly => {}
275 }
276 }
277
278 let mdns = if self.config.mdns_enabled {
279 Some(Arc::new(koi_mdns::MdnsCore::with_cancel(cancel.clone())?))
280 } else {
281 None
282 };
283
284 let certmesh = if self.config.certmesh_enabled {
285 let data_dir = self.config.data_dir.clone();
286 tokio::task::spawn_blocking(move || init_certmesh_core(data_dir.as_deref()))
287 .await
288 .map_err(|e| std::io::Error::other(format!("certmesh init: {e}")))?
289 } else {
290 None
291 };
292
293 let mdns_bridge: Option<Arc<dyn koi_common::integration::MdnsSnapshot>> =
295 if let Some(ref core) = mdns {
296 Some(MdnsBridgeEmbedded::spawn(core.clone()).await)
297 } else {
298 None
299 };
300
301 let certmesh_bridge: Option<Arc<dyn koi_common::integration::CertmeshSnapshot>> =
302 certmesh.as_ref().map(|core| {
303 CertmeshBridgeEmbedded::new(core.clone())
304 as Arc<dyn koi_common::integration::CertmeshSnapshot>
305 });
306
307 let alias_feedback: Option<Arc<dyn koi_common::integration::AliasFeedback>> =
308 certmesh.as_ref().map(|core| {
309 AliasFeedbackBridgeEmbedded::new(core.clone())
310 as Arc<dyn koi_common::integration::AliasFeedback>
311 });
312
313 let dns = if self.config.dns_enabled {
314 let mut dns_config = self.config.dns_config.clone();
315 if let Some(dir) = &self.config.data_dir {
318 dns_config.state_path = Some(dir.join("state").join("dns.json"));
319 }
320 let core = koi_dns::DnsCore::new(
321 dns_config,
322 mdns_bridge.clone(),
323 certmesh_bridge.clone(),
324 alias_feedback,
325 )
326 .await?;
327 Some(Arc::new(koi_dns::DnsRuntime::new(core)))
328 } else {
329 None
330 };
331
332 let proxy = if self.config.proxy_enabled {
333 let core = if let Some(dir) = &self.config.data_dir {
334 Arc::new(koi_proxy::ProxyCore::with_data_dir(dir)?)
335 } else {
336 Arc::new(koi_proxy::ProxyCore::new()?)
337 };
338 Some(Arc::new(koi_proxy::ProxyRuntime::new(core)))
339 } else {
340 None
341 };
342
343 let dns_bridge: Option<Arc<dyn koi_common::integration::DnsProbe>> =
344 dns.as_ref().map(|rt| {
345 DnsBridgeEmbedded::new(rt.clone()) as Arc<dyn koi_common::integration::DnsProbe>
346 });
347
348 let proxy_bridge: Option<Arc<dyn koi_common::integration::ProxySnapshot>> =
349 proxy.as_ref().map(|rt| {
350 ProxyBridgeEmbedded::new(rt.core())
351 as Arc<dyn koi_common::integration::ProxySnapshot>
352 });
353
354 let health = if self.config.health_enabled {
355 let core = koi_health::HealthCore::new(
356 mdns_bridge.clone(),
357 dns_bridge,
358 certmesh_bridge,
359 proxy_bridge,
360 )
361 .await;
362 Some(Arc::new(koi_health::HealthRuntime::new(Arc::new(core))))
363 } else {
364 None
365 };
366
367 if let Some(runtime) = &dns {
368 if self.config.dns_auto_start {
369 let _ = runtime.start().await?;
370 }
371 }
372
373 if let Some(runtime) = &health {
374 if self.config.health_auto_start {
375 let _ = runtime.start().await?;
376 }
377 }
378
379 if let Some(runtime) = &proxy {
380 if self.config.proxy_auto_start {
381 runtime.start_all().await?;
382 }
383 }
384
385 let udp = if self.config.udp_enabled {
386 Some(Arc::new(koi_udp::UdpRuntime::new(cancel.clone())))
387 } else {
388 None
389 };
390
391 let runtime = if self.config.runtime_enabled {
392 let config = koi_runtime::RuntimeConfig {
393 backend_kind: self.config.runtime_backend,
394 socket_path: None,
395 };
396 let core = Arc::new(koi_runtime::RuntimeCore::new(config));
397 match core.start_watching(cancel.clone()).await {
398 Ok(()) => {
399 tracing::info!("Runtime adapter started");
400 Some(core)
401 }
402 Err(e) => {
403 tracing::warn!(error = %e, "Runtime backend unavailable — continuing without runtime adapter");
404 None
405 }
406 }
407 } else {
408 None
409 };
410
411 let dashboard_state = if self.config.dashboard_enabled && self.config.http_enabled {
413 let started_at = std::time::Instant::now();
414 let snap_mdns = mdns.clone();
415 let snap_certmesh = certmesh.clone();
416 let snap_dns = dns.clone();
417 let snap_health = health.clone();
418 let snap_proxy = proxy.clone();
419 let snap_udp = udp.clone();
420 let snap_runtime = runtime.clone();
421
422 let snapshot_fn: koi_common::dashboard::SnapshotFn = Arc::new(move || {
423 let m = snap_mdns.clone();
424 let cm = snap_certmesh.clone();
425 let d = snap_dns.clone();
426 let h = snap_health.clone();
427 let p = snap_proxy.clone();
428 let u = snap_udp.clone();
429 let rt = snap_runtime.clone();
430 Box::pin(async move { build_embedded_snapshot(m, cm, d, h, p, u, rt).await })
431 });
432
433 let (dash_event_tx, _) = broadcast::channel(256);
434 let ds = koi_common::dashboard::DashboardState {
435 identity: koi_common::dashboard::DashboardIdentity {
436 version: env!("CARGO_PKG_VERSION").to_string(),
437 platform: std::env::consts::OS.to_string(),
438 },
439 mode: "embedded",
440 snapshot_fn,
441 event_tx: dash_event_tx.clone(),
442 started_at,
443 };
444
445 {
447 let mut mdns_rx = mdns.as_ref().map(|c| c.subscribe());
448 let mut health_rx = health.as_ref().map(|r| r.core().subscribe());
449 let mut dns_rx = dns.as_ref().map(|r| r.core().subscribe());
450 let mut certmesh_rx = certmesh.as_ref().map(|c| c.subscribe());
451 let mut proxy_rx = proxy.as_ref().map(|r| r.core().subscribe());
452 let mut runtime_rx = runtime.as_ref().map(|r| r.subscribe());
453 let tx = dash_event_tx;
454 let token = cancel.clone();
455 tasks.push(tokio::spawn(async move {
456 loop {
457 let sse_event: Option<koi_common::dashboard::DashboardSseEvent> = tokio::select! {
458 _ = token.cancelled() => break,
459 Some(Ok(ev)) = async { match mdns_rx.as_mut() { Some(rx) => Some(rx.recv().await), None => None } } => {
460 let id = uuid::Uuid::now_v7().to_string();
461 match ev {
462 koi_mdns::MdnsEvent::Found(record) => Some(koi_common::dashboard::DashboardSseEvent {
463 event_type: "mdns.found".to_string(), id,
464 data: serde_json::to_value(record).unwrap_or_default(),
465 }),
466 koi_mdns::MdnsEvent::Resolved(record) => Some(koi_common::dashboard::DashboardSseEvent {
467 event_type: "mdns.resolved".to_string(), id,
468 data: serde_json::to_value(record).unwrap_or_default(),
469 }),
470 koi_mdns::MdnsEvent::Removed { name, service_type } => Some(koi_common::dashboard::DashboardSseEvent {
471 event_type: "mdns.removed".to_string(), id,
472 data: serde_json::json!({ "name": name, "service_type": service_type }),
473 }),
474 }
475 },
476 Some(Ok(ev)) = async { match health_rx.as_mut() { Some(rx) => Some(rx.recv().await), None => None } } => {
477 let id = uuid::Uuid::now_v7().to_string();
478 match ev {
479 koi_health::HealthEvent::StatusChanged { name, status } => Some(koi_common::dashboard::DashboardSseEvent {
480 event_type: "health.changed".to_string(), id,
481 data: serde_json::json!({ "name": name, "status": status }),
482 }),
483 }
484 },
485 Some(Ok(ev)) = async { match dns_rx.as_mut() { Some(rx) => Some(rx.recv().await), None => None } } => {
486 let id = uuid::Uuid::now_v7().to_string();
487 match ev {
488 koi_dns::DnsEvent::EntryUpdated { name, ip } => Some(koi_common::dashboard::DashboardSseEvent {
489 event_type: "dns.updated".to_string(), id,
490 data: serde_json::json!({ "name": name, "ip": ip }),
491 }),
492 koi_dns::DnsEvent::EntryRemoved { name } => Some(koi_common::dashboard::DashboardSseEvent {
493 event_type: "dns.removed".to_string(), id,
494 data: serde_json::json!({ "name": name }),
495 }),
496 }
497 },
498 Some(Ok(ev)) = async { match certmesh_rx.as_mut() { Some(rx) => Some(rx.recv().await), None => None } } => {
499 let id = uuid::Uuid::now_v7().to_string();
500 match ev {
501 koi_certmesh::CertmeshEvent::MemberJoined { hostname, fingerprint } => Some(koi_common::dashboard::DashboardSseEvent {
502 event_type: "certmesh.joined".to_string(), id,
503 data: serde_json::json!({ "hostname": hostname, "fingerprint": fingerprint }),
504 }),
505 koi_certmesh::CertmeshEvent::MemberRevoked { hostname } => Some(koi_common::dashboard::DashboardSseEvent {
506 event_type: "certmesh.revoked".to_string(), id,
507 data: serde_json::json!({ "hostname": hostname }),
508 }),
509 koi_certmesh::CertmeshEvent::Destroyed => Some(koi_common::dashboard::DashboardSseEvent {
510 event_type: "certmesh.destroyed".to_string(), id,
511 data: serde_json::json!({}),
512 }),
513 }
514 },
515 Some(Ok(ev)) = async { match proxy_rx.as_mut() { Some(rx) => Some(rx.recv().await), None => None } } => {
516 let id = uuid::Uuid::now_v7().to_string();
517 match ev {
518 koi_proxy::ProxyEvent::EntryUpdated { entry } => Some(koi_common::dashboard::DashboardSseEvent {
519 event_type: "proxy.updated".to_string(), id,
520 data: serde_json::to_value(entry).unwrap_or_default(),
521 }),
522 koi_proxy::ProxyEvent::EntryRemoved { name } => Some(koi_common::dashboard::DashboardSseEvent {
523 event_type: "proxy.removed".to_string(), id,
524 data: serde_json::json!({ "name": name }),
525 }),
526 }
527 },
528 Some(Ok(ev)) = async { match runtime_rx.as_mut() { Some(rx) => Some(rx.recv().await), None => None } } => {
529 let id = uuid::Uuid::now_v7().to_string();
530 match ev {
531 koi_runtime::RuntimeEvent::Started(instance) => Some(koi_common::dashboard::DashboardSseEvent {
532 event_type: "runtime.started".to_string(), id,
533 data: serde_json::to_value(instance).unwrap_or_default(),
534 }),
535 koi_runtime::RuntimeEvent::Stopped { id: inst_id, name } => Some(koi_common::dashboard::DashboardSseEvent {
536 event_type: "runtime.stopped".to_string(), id,
537 data: serde_json::json!({ "id": inst_id, "name": name }),
538 }),
539 koi_runtime::RuntimeEvent::Updated(instance) => Some(koi_common::dashboard::DashboardSseEvent {
540 event_type: "runtime.updated".to_string(), id,
541 data: serde_json::to_value(instance).unwrap_or_default(),
542 }),
543 koi_runtime::RuntimeEvent::BackendDisconnected { backend, reason } => Some(koi_common::dashboard::DashboardSseEvent {
544 event_type: "runtime.disconnected".to_string(), id,
545 data: serde_json::json!({ "backend": backend, "reason": reason }),
546 }),
547 koi_runtime::RuntimeEvent::BackendReconnected { backend } => Some(koi_common::dashboard::DashboardSseEvent {
548 event_type: "runtime.reconnected".to_string(), id,
549 data: serde_json::json!({ "backend": backend }),
550 }),
551 }
552 },
553 };
554 if let Some(ev) = sse_event {
555 let _ = tx.send(ev);
556 }
557 }
558 }));
559 }
560
561 Some(ds)
562 } else {
563 None
564 };
565
566 let browser_state = if self.config.mdns_browser_enabled && self.config.http_enabled {
568 if let Some(ref mdns_core) = mdns {
569 let adapter =
570 mdns_browse_adapter::MdnsBrowseAdapter::new(mdns_core.clone(), cancel.clone());
571 let cache = koi_common::browser::BrowserCache::new();
572 let source = adapter.clone() as Arc<dyn koi_common::browser::BrowseSource>;
573 let bc = cache.clone();
574 let token = cancel.clone();
575 tasks.push(tokio::spawn(async move {
576 koi_common::browser::worker(source, bc, token).await;
577 }));
578 Some(koi_common::browser::BrowserState {
579 source: adapter,
580 cache,
581 })
582 } else {
583 tracing::warn!("mdns_browser enabled but mDNS is disabled — skipping browser");
584 None
585 }
586 } else {
587 None
588 };
589
590 if self.config.http_enabled {
592 let http_port = self.config.http_port;
593 let http_cancel = cancel.clone();
594 let http_mdns = mdns.clone();
595 let http_dns = dns.clone();
596 let http_health = health.clone();
597 let http_certmesh = certmesh.clone();
598 let http_proxy = proxy.clone();
599 let http_udp = udp.clone();
600 let http_runtime = runtime.clone();
601 let http_api_docs = self.config.api_docs_enabled;
602 tasks.push(tokio::spawn(async move {
603 http::serve(
604 http_port,
605 http_mdns,
606 http_dns,
607 http_health,
608 http_certmesh,
609 http_proxy,
610 http_udp,
611 http_runtime,
612 dashboard_state,
613 browser_state,
614 http_api_docs,
615 http_cancel,
616 )
617 .await;
618 }));
619 }
620
621 let http_announce_id =
623 if self.config.announce_http && self.config.http_enabled && self.config.mdns_enabled {
624 if let Some(ref mdns_core) = mdns {
625 let hostname = hostname::get()
626 .ok()
627 .and_then(|os| os.into_string().ok())
628 .unwrap_or_else(|| "unknown".to_string());
629
630 let mut txt = std::collections::HashMap::new();
631 txt.insert("path".to_string(), "/".to_string());
632 txt.insert("version".to_string(), env!("CARGO_PKG_VERSION").to_string());
633 txt.insert("api".to_string(), "v1".to_string());
634 txt.insert(
635 "dashboard".to_string(),
636 self.config.dashboard_enabled.to_string(),
637 );
638
639 let payload = koi_mdns::protocol::RegisterPayload {
640 name: format!("Koi ({hostname})"),
641 service_type: "_http._tcp".to_string(),
642 port: self.config.http_port,
643 ip: None,
644 lease_secs: None,
645 txt,
646 };
647 match mdns_core.register(payload) {
648 Ok(result) => {
649 tracing::info!(
650 id = %result.id,
651 port = self.config.http_port,
652 "HTTP server announced via mDNS"
653 );
654 Some(result.id)
655 }
656 Err(e) => {
657 tracing::warn!(error = %e, "Failed to announce HTTP server via mDNS");
658 None
659 }
660 }
661 } else {
662 None
663 }
664 } else {
665 None
666 };
667
668 if let Some(core) = &mdns {
669 let mut rx = core.subscribe();
670 let tx = event_tx.clone();
671 let token = cancel.clone();
672 let handler = self.event_handler.clone();
673 tasks.push(tokio::spawn(async move {
674 loop {
675 tokio::select! {
676 _ = token.cancelled() => break,
677 msg = rx.recv() => {
678 let Ok(event) = msg else { continue; };
679 let mapped = map_mdns_event(event);
680 if let Some(mapped) = mapped {
681 emit_event(&tx, handler.as_ref(), mapped);
682 }
683 }
684 }
685 }
686 }));
687 }
688
689 if self.config.health_enabled {
690 if let Some(runtime) = &health {
691 let mut rx = runtime.core().subscribe();
692 let tx = event_tx.clone();
693 let token = cancel.clone();
694 let handler = self.event_handler.clone();
695 tasks.push(tokio::spawn(async move {
696 loop {
697 tokio::select! {
698 _ = token.cancelled() => break,
699 msg = rx.recv() => {
700 let Ok(event) = msg else { continue; };
701 let mapped = map_health_event(event);
702 emit_event(&tx, handler.as_ref(), mapped);
703 }
704 }
705 }
706 }));
707 }
708 }
709
710 if self.config.dns_enabled {
711 if let Some(runtime) = &dns {
712 let mut rx = runtime.core().subscribe();
713 let tx = event_tx.clone();
714 let token = cancel.clone();
715 let handler = self.event_handler.clone();
716 tasks.push(tokio::spawn(async move {
717 loop {
718 tokio::select! {
719 _ = token.cancelled() => break,
720 msg = rx.recv() => {
721 let Ok(event) = msg else { continue; };
722 let mapped = map_dns_event(event);
723 emit_event(&tx, handler.as_ref(), mapped);
724 }
725 }
726 }
727 }));
728 }
729 }
730
731 if self.config.certmesh_enabled {
732 if let Some(core) = &certmesh {
733 let mut rx = core.subscribe();
734 let tx = event_tx.clone();
735 let token = cancel.clone();
736 let handler = self.event_handler.clone();
737 tasks.push(tokio::spawn(async move {
738 loop {
739 tokio::select! {
740 _ = token.cancelled() => break,
741 msg = rx.recv() => {
742 let Ok(event) = msg else { continue; };
743 let mapped = map_certmesh_event(event);
744 emit_event(&tx, handler.as_ref(), mapped);
745 }
746 }
747 }
748 }));
749 }
750 }
751
752 if self.config.proxy_enabled {
753 if let Some(runtime_proxy) = &proxy {
754 let mut rx = runtime_proxy.core().subscribe();
755 let tx = event_tx.clone();
756 let token = cancel.clone();
757 let handler = self.event_handler.clone();
758 tasks.push(tokio::spawn(async move {
759 loop {
760 tokio::select! {
761 _ = token.cancelled() => break,
762 msg = rx.recv() => {
763 let Ok(event) = msg else { continue; };
764 let mapped = map_proxy_event(event);
765 emit_event(&tx, handler.as_ref(), mapped);
766 }
767 }
768 }
769 }));
770 }
771 }
772
773 if let Some(ref runtime_core) = runtime {
774 let mut rx = runtime_core.subscribe();
775 let tx = event_tx.clone();
776 let token = cancel.clone();
777 let handler = self.event_handler.clone();
778 tasks.push(tokio::spawn(async move {
779 loop {
780 tokio::select! {
781 _ = token.cancelled() => break,
782 msg = rx.recv() => {
783 let Ok(event) = msg else { continue; };
784 if let Some(mapped) = map_runtime_event(event) {
785 emit_event(&tx, handler.as_ref(), mapped);
786 }
787 }
788 }
789 }
790 }));
791 }
792
793 Ok(KoiHandle::new_embedded(
794 mdns,
795 dns,
796 health,
797 certmesh,
798 proxy,
799 udp,
800 runtime,
801 self.config.data_dir.clone(),
802 event_tx,
803 cancel,
804 tasks,
805 http_announce_id,
806 ))
807 }
808}
809
810fn init_certmesh_core(
811 data_dir: Option<&std::path::Path>,
812) -> Option<Arc<koi_certmesh::CertmeshCore>> {
813 let paths = match data_dir {
814 Some(dir) => koi_certmesh::CertmeshPaths::with_data_dir(dir.to_path_buf()),
815 None => koi_certmesh::CertmeshPaths::default(),
816 };
817 if !paths.is_ca_initialized() {
818 return Some(Arc::new(koi_certmesh::CertmeshCore::uninitialized()));
819 }
820
821 let roster_path = paths.roster_path();
822 let roster = match koi_certmesh::roster::load_roster(&roster_path) {
823 Ok(r) => r,
824 Err(_) => {
825 return Some(Arc::new(koi_certmesh::CertmeshCore::uninitialized()));
826 }
827 };
828
829 let profile = roster.metadata.trust_profile;
830
831 let resolved_data_dir = koi_common::paths::koi_data_dir_with_override(data_dir);
836 let auto_key_path = resolved_data_dir.join("auto-unlock-key");
837 if let Ok(pp) = std::fs::read_to_string(&auto_key_path) {
838 if !pp.is_empty() {
839 match koi_certmesh::ca::load_ca(&pp, &paths) {
840 Ok(ca_state) => {
841 if let Ok(fresh_roster) = koi_certmesh::roster::load_roster(&roster_path) {
843 let auth_path = paths.auth_path();
844 let auth = if auth_path.exists() {
845 std::fs::read_to_string(&auth_path)
846 .ok()
847 .and_then(|json| {
848 serde_json::from_str::<koi_crypto::auth::StoredAuth>(&json).ok()
849 })
850 .and_then(|stored| stored.unlock(&pp).ok())
851 } else {
852 None
853 };
854
855 tracing::info!("Certmesh CA auto-unlocked at init");
856 return Some(Arc::new(koi_certmesh::CertmeshCore::new(
857 ca_state,
858 fresh_roster,
859 auth,
860 profile,
861 )));
862 }
863 }
864 Err(e) => {
865 tracing::warn!(
866 error = %e,
867 "Auto-unlock key exists but decryption failed"
868 );
869 }
870 }
871 }
872 }
873
874 let core = koi_certmesh::CertmeshCore::locked(roster, profile);
876 Some(Arc::new(core))
877}
878
879fn map_mdns_event(event: MdnsEvent) -> Option<KoiEvent> {
880 match event {
881 MdnsEvent::Found(record) => Some(KoiEvent::MdnsFound(record)),
882 MdnsEvent::Resolved(record) => Some(KoiEvent::MdnsResolved(record)),
883 MdnsEvent::Removed { name, service_type } => {
884 Some(KoiEvent::MdnsRemoved { name, service_type })
885 }
886 }
887}
888
889fn map_health_event(event: koi_health::HealthEvent) -> KoiEvent {
890 match event {
891 koi_health::HealthEvent::StatusChanged { name, status } => {
892 KoiEvent::HealthChanged { name, status }
893 }
894 }
895}
896
897fn map_dns_event(event: koi_dns::DnsEvent) -> KoiEvent {
898 match event {
899 koi_dns::DnsEvent::EntryUpdated { name, ip } => KoiEvent::DnsEntryUpdated { name, ip },
900 koi_dns::DnsEvent::EntryRemoved { name } => KoiEvent::DnsEntryRemoved { name },
901 }
902}
903
904fn map_certmesh_event(event: koi_certmesh::CertmeshEvent) -> KoiEvent {
905 match event {
906 koi_certmesh::CertmeshEvent::MemberJoined {
907 hostname,
908 fingerprint,
909 } => KoiEvent::CertmeshMemberJoined {
910 hostname,
911 fingerprint,
912 },
913 koi_certmesh::CertmeshEvent::MemberRevoked { hostname } => {
914 KoiEvent::CertmeshMemberRevoked { hostname }
915 }
916 koi_certmesh::CertmeshEvent::Destroyed => KoiEvent::CertmeshDestroyed,
917 }
918}
919
920fn map_proxy_event(event: koi_proxy::ProxyEvent) -> KoiEvent {
921 match event {
922 koi_proxy::ProxyEvent::EntryUpdated { entry } => KoiEvent::ProxyEntryUpdated { entry },
923 koi_proxy::ProxyEvent::EntryRemoved { name } => KoiEvent::ProxyEntryRemoved { name },
924 }
925}
926
927fn map_runtime_event(event: koi_runtime::RuntimeEvent) -> Option<KoiEvent> {
928 match event {
929 koi_runtime::RuntimeEvent::Started(instance) => Some(KoiEvent::RuntimeInstanceStarted {
930 name: instance.name,
931 backend: instance.backend,
932 }),
933 koi_runtime::RuntimeEvent::Stopped { name, .. } => {
934 Some(KoiEvent::RuntimeInstanceStopped { name })
935 }
936 _ => None,
939 }
940}
941
942fn emit_event(
943 tx: &broadcast::Sender<KoiEvent>,
944 handler: Option<&Arc<dyn Fn(KoiEvent) + Send + Sync>>,
945 event: KoiEvent,
946) {
947 if let Some(handler) = handler {
948 handler(event.clone());
949 }
950 let _ = tx.send(event);
951}
952
953pub(crate) fn map_join_error(err: tokio::task::JoinError) -> KoiError {
954 KoiError::Io(std::io::Error::other(err.to_string()))
955}
956
957async fn build_embedded_snapshot(
959 mdns: Option<Arc<koi_mdns::MdnsCore>>,
960 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
961 dns: Option<Arc<koi_dns::DnsRuntime>>,
962 health: Option<Arc<koi_health::HealthRuntime>>,
963 proxy: Option<Arc<koi_proxy::ProxyRuntime>>,
964 udp: Option<Arc<koi_udp::UdpRuntime>>,
965 runtime: Option<Arc<koi_runtime::RuntimeCore>>,
966) -> serde_json::Value {
967 use koi_common::capability::Capability;
968
969 let mut capabilities = Vec::new();
970
971 if let Some(ref core) = mdns {
972 let s = core.status();
973 capabilities.push(serde_json::json!({
974 "name": s.name, "enabled": true, "healthy": s.healthy, "summary": s.summary,
975 }));
976 } else {
977 capabilities.push(serde_json::json!({
978 "name": "mdns", "enabled": false, "healthy": false, "summary": "disabled",
979 }));
980 }
981
982 if let Some(ref core) = certmesh {
983 let s = core.status();
984 capabilities.push(serde_json::json!({
985 "name": s.name, "enabled": true, "healthy": s.healthy, "summary": s.summary,
986 }));
987 } else {
988 capabilities.push(serde_json::json!({
989 "name": "certmesh", "enabled": false, "healthy": false, "summary": "disabled",
990 }));
991 }
992
993 if let Some(ref runtime) = dns {
994 let running = runtime.status().await.running;
995 if running {
996 let s = runtime.core().status();
997 capabilities.push(serde_json::json!({
998 "name": s.name, "enabled": true, "healthy": s.healthy, "summary": s.summary,
999 }));
1000 } else {
1001 capabilities.push(serde_json::json!({
1002 "name": "dns", "enabled": true, "healthy": false, "summary": "stopped",
1003 }));
1004 }
1005 } else {
1006 capabilities.push(serde_json::json!({
1007 "name": "dns", "enabled": false, "healthy": false, "summary": "disabled",
1008 }));
1009 }
1010
1011 if let Some(ref runtime) = health {
1012 let running = runtime.status().await.running;
1013 if running {
1014 let s = runtime.core().status();
1015 capabilities.push(serde_json::json!({
1016 "name": s.name, "enabled": true, "healthy": s.healthy, "summary": s.summary,
1017 }));
1018 } else {
1019 capabilities.push(serde_json::json!({
1020 "name": "health", "enabled": true, "healthy": false, "summary": "stopped",
1021 }));
1022 }
1023 } else {
1024 capabilities.push(serde_json::json!({
1025 "name": "health", "enabled": false, "healthy": false, "summary": "disabled",
1026 }));
1027 }
1028
1029 if let Some(ref runtime) = proxy {
1030 let status = runtime.status().await;
1031 capabilities.push(serde_json::json!({
1032 "name": "proxy", "enabled": true, "healthy": true,
1033 "summary": if status.is_empty() { "no listeners".to_string() } else { format!("{} listeners", status.len()) },
1034 }));
1035 } else {
1036 capabilities.push(serde_json::json!({
1037 "name": "proxy", "enabled": false, "healthy": false, "summary": "disabled",
1038 }));
1039 }
1040
1041 if let Some(ref runtime) = udp {
1042 let s = Capability::status(runtime.as_ref());
1043 capabilities.push(serde_json::json!({
1044 "name": s.name, "enabled": true, "healthy": s.healthy, "summary": s.summary,
1045 }));
1046 } else {
1047 capabilities.push(serde_json::json!({
1048 "name": "udp", "enabled": false, "healthy": false, "summary": "disabled",
1049 }));
1050 }
1051
1052 if let Some(ref rt) = runtime {
1053 let s = rt.capability_status().await;
1054 capabilities.push(serde_json::json!({
1055 "name": s.name, "enabled": true, "healthy": s.healthy, "summary": s.summary,
1056 }));
1057 } else {
1058 capabilities.push(serde_json::json!({
1059 "name": "runtime", "enabled": false, "healthy": false, "summary": "disabled",
1060 }));
1061 }
1062
1063 serde_json::json!({ "capabilities": capabilities })
1064}
1065
1066struct CertmeshBridgeEmbedded(#[allow(dead_code)] Arc<koi_certmesh::CertmeshCore>);
1071
1072impl CertmeshBridgeEmbedded {
1073 fn new(core: Arc<koi_certmesh::CertmeshCore>) -> Arc<Self> {
1074 Arc::new(Self(core))
1075 }
1076}
1077
1078impl koi_common::integration::CertmeshSnapshot for CertmeshBridgeEmbedded {
1079 fn active_members(&self) -> Vec<koi_common::integration::MemberSummary> {
1080 let roster_path = koi_certmesh::CertmeshPaths::default().roster_path();
1081 let Ok(roster) = koi_certmesh::roster::load_roster(&roster_path) else {
1082 return Vec::new();
1083 };
1084 roster
1085 .members
1086 .into_iter()
1087 .filter(|m| m.status == koi_certmesh::roster::MemberStatus::Active)
1088 .map(|m| koi_common::integration::MemberSummary {
1089 hostname: m.hostname,
1090 sans: m.cert_sans,
1091 cert_expires: Some(m.cert_expires),
1092 last_seen: m.last_seen,
1093 status: "active".to_string(),
1094 proxy_entries: m
1095 .proxy_entries
1096 .into_iter()
1097 .map(|p| koi_common::integration::ProxyConfigSummary {
1098 name: p.name,
1099 listen_port: p.listen_port,
1100 backend: p.backend,
1101 allow_remote: p.allow_remote,
1102 })
1103 .collect(),
1104 })
1105 .collect()
1106 }
1107}
1108
1109struct MdnsBridgeEmbedded {
1110 records: Arc<
1111 std::sync::RwLock<
1112 std::collections::HashMap<String, std::collections::HashMap<String, ServiceRecord>>,
1113 >,
1114 >,
1115 cancel: CancellationToken,
1116}
1117
1118impl MdnsBridgeEmbedded {
1119 async fn spawn(core: Arc<koi_mdns::MdnsCore>) -> Arc<Self> {
1120 use koi_common::types::META_QUERY;
1121 let records = Arc::new(std::sync::RwLock::new(std::collections::HashMap::new()));
1122 let cancel = CancellationToken::new();
1123
1124 let meta_core = Arc::clone(&core);
1125 let meta_records = Arc::clone(&records);
1126 let meta_cancel = cancel.clone();
1127 tokio::spawn(async move {
1128 if let Ok(handle) = meta_core.browse(META_QUERY).await {
1129 run_meta_browse_embedded(meta_core, handle, meta_records, meta_cancel).await;
1130 }
1131 });
1132
1133 Arc::new(Self { records, cancel })
1134 }
1135}
1136
1137impl Drop for MdnsBridgeEmbedded {
1138 fn drop(&mut self) {
1139 self.cancel.cancel();
1140 }
1141}
1142
1143impl koi_common::integration::MdnsSnapshot for MdnsBridgeEmbedded {
1144 fn host_ips(&self) -> std::collections::HashMap<String, std::net::IpAddr> {
1145 let guard = self.records.read().unwrap_or_else(|e| e.into_inner());
1146 let mut map = std::collections::HashMap::new();
1147 for type_map in guard.values() {
1148 for record in type_map.values() {
1149 let Some(host) = record.host.as_deref() else {
1150 continue;
1151 };
1152 let Some(ip) = record.ip.as_deref().and_then(|ip| ip.parse().ok()) else {
1153 continue;
1154 };
1155 let hostname = host.trim_end_matches('.').trim_end_matches(".local");
1156 if !hostname.is_empty() {
1157 map.insert(hostname.to_string(), ip);
1158 }
1159 }
1160 }
1161 map
1162 }
1163
1164 fn cached_records(&self) -> Vec<ServiceRecord> {
1165 let guard = self.records.read().unwrap_or_else(|e| e.into_inner());
1166 guard.values().flat_map(|m| m.values().cloned()).collect()
1167 }
1168}
1169
1170struct DnsBridgeEmbedded(Arc<koi_dns::DnsRuntime>);
1171
1172impl DnsBridgeEmbedded {
1173 fn new(runtime: Arc<koi_dns::DnsRuntime>) -> Arc<Self> {
1174 Arc::new(Self(runtime))
1175 }
1176}
1177
1178impl koi_common::integration::DnsProbe for DnsBridgeEmbedded {
1179 fn resolve_local(&self, name: &str) -> Option<Vec<std::net::IpAddr>> {
1180 use hickory_proto::rr::RecordType;
1181 let core = self.0.core();
1182 let result = core
1183 .resolve_local(name, RecordType::A)
1184 .or_else(|| core.resolve_local(name, RecordType::AAAA));
1185 result.map(|r| r.ips)
1186 }
1187}
1188
1189struct ProxyBridgeEmbedded(#[allow(dead_code)] Arc<koi_proxy::ProxyCore>);
1190
1191impl ProxyBridgeEmbedded {
1192 fn new(core: Arc<koi_proxy::ProxyCore>) -> Arc<Self> {
1193 Arc::new(Self(core))
1194 }
1195}
1196
1197impl koi_common::integration::ProxySnapshot for ProxyBridgeEmbedded {
1198 fn entries(&self) -> Vec<koi_common::integration::ProxyEntrySummary> {
1199 let Ok(entries) = koi_proxy::config::load_entries() else {
1200 return Vec::new();
1201 };
1202 entries
1203 .into_iter()
1204 .map(|e| koi_common::integration::ProxyEntrySummary {
1205 name: e.name,
1206 listen_port: e.listen_port,
1207 backend: e.backend,
1208 })
1209 .collect()
1210 }
1211}
1212
1213struct AliasFeedbackBridgeEmbedded(Arc<koi_certmesh::CertmeshCore>);
1214
1215impl AliasFeedbackBridgeEmbedded {
1216 fn new(core: Arc<koi_certmesh::CertmeshCore>) -> Arc<Self> {
1217 Arc::new(Self(core))
1218 }
1219}
1220
1221impl koi_common::integration::AliasFeedback for AliasFeedbackBridgeEmbedded {
1222 fn record_alias(&self, hostname: &str, alias: &str) {
1223 let core = Arc::clone(&self.0);
1224 let hostname = hostname.to_string();
1225 let alias = alias.to_string();
1226 tokio::spawn(async move {
1227 let _ = core.add_alias_sans(&hostname, &[alias]).await;
1228 });
1229 }
1230}
1231
1232async fn run_meta_browse_embedded(
1233 core: Arc<koi_mdns::MdnsCore>,
1234 handle: koi_mdns::BrowseHandle,
1235 records: Arc<
1236 std::sync::RwLock<
1237 std::collections::HashMap<String, std::collections::HashMap<String, ServiceRecord>>,
1238 >,
1239 >,
1240 cancel: CancellationToken,
1241) {
1242 let active = Arc::new(tokio::sync::Mutex::new(
1243 std::collections::HashSet::<String>::new(),
1244 ));
1245 loop {
1246 tokio::select! {
1247 _ = cancel.cancelled() => break,
1248 event = handle.recv() => {
1249 let Some(event) = event else { break; };
1250 if let koi_mdns::events::MdnsEvent::Found(record) = event {
1251 let service_type = record.name;
1252 let mut guard = active.lock().await;
1253 if guard.insert(service_type.clone()) {
1254 let c = Arc::clone(&core);
1255 let r = Arc::clone(&records);
1256 let t = service_type.clone();
1257 let cancel_child = cancel.clone();
1258 tokio::spawn(async move {
1259 if let Ok(handle) = c.browse(&t).await {
1260 run_type_browse_embedded(handle, r, cancel_child).await;
1261 }
1262 });
1263 }
1264 }
1265 }
1266 }
1267 }
1268}
1269
1270#[cfg(test)]
1271mod tests {
1272 use super::*;
1273 use koi_common::types::ServiceRecord;
1274 use std::collections::HashMap;
1275
1276 fn sample_record() -> ServiceRecord {
1277 ServiceRecord {
1278 name: "Test Service".to_string(),
1279 service_type: "_http._tcp".to_string(),
1280 host: Some("host.local".to_string()),
1281 ip: Some("10.0.0.1".to_string()),
1282 port: Some(8080),
1283 txt: HashMap::new(),
1284 }
1285 }
1286
1287 #[test]
1290 fn koi_error_disabled_capability_display() {
1291 let err = KoiError::DisabledCapability("mdns");
1292 assert_eq!(err.to_string(), "capability disabled: mdns");
1293 }
1294
1295 #[test]
1296 fn koi_error_io_from_impl() {
1297 let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "file missing");
1298 let err: KoiError = io_err.into();
1299 assert!(matches!(err, KoiError::Io(_)));
1300 assert!(err.to_string().contains("file missing"));
1301 }
1302
1303 #[test]
1304 fn koi_error_debug_does_not_panic() {
1305 let err = KoiError::DisabledCapability("proxy");
1306 let debug = format!("{err:?}");
1307 assert!(debug.contains("DisabledCapability"));
1308 }
1309
1310 #[test]
1313 fn map_mdns_found() {
1314 let record = sample_record();
1315 let event = koi_mdns::MdnsEvent::Found(record.clone());
1316 let mapped = map_mdns_event(event);
1317 assert!(mapped.is_some());
1318 match mapped.unwrap() {
1319 KoiEvent::MdnsFound(r) => assert_eq!(r.name, "Test Service"),
1320 other => panic!("expected MdnsFound, got {other:?}"),
1321 }
1322 }
1323
1324 #[test]
1325 fn map_mdns_resolved() {
1326 let record = sample_record();
1327 let event = koi_mdns::MdnsEvent::Resolved(record);
1328 let mapped = map_mdns_event(event);
1329 assert!(mapped.is_some());
1330 match mapped.unwrap() {
1331 KoiEvent::MdnsResolved(r) => {
1332 assert_eq!(r.port, Some(8080));
1333 assert_eq!(r.service_type, "_http._tcp");
1334 }
1335 other => panic!("expected MdnsResolved, got {other:?}"),
1336 }
1337 }
1338
1339 #[test]
1340 fn map_mdns_removed() {
1341 let event = koi_mdns::MdnsEvent::Removed {
1342 name: "Gone Service".to_string(),
1343 service_type: "_http._tcp".to_string(),
1344 };
1345 let mapped = map_mdns_event(event);
1346 assert!(mapped.is_some());
1347 match mapped.unwrap() {
1348 KoiEvent::MdnsRemoved { name, service_type } => {
1349 assert_eq!(name, "Gone Service");
1350 assert_eq!(service_type, "_http._tcp");
1351 }
1352 other => panic!("expected MdnsRemoved, got {other:?}"),
1353 }
1354 }
1355
1356 #[test]
1359 fn map_health_status_changed_up() {
1360 let event = koi_health::HealthEvent::StatusChanged {
1361 name: "api".to_string(),
1362 status: koi_health::HealthStatus::Up,
1363 };
1364 let mapped = map_health_event(event);
1365 match mapped {
1366 KoiEvent::HealthChanged { name, status } => {
1367 assert_eq!(name, "api");
1368 assert!(matches!(status, koi_health::HealthStatus::Up));
1369 }
1370 other => panic!("expected HealthChanged, got {other:?}"),
1371 }
1372 }
1373
1374 #[test]
1375 fn map_health_status_changed_down() {
1376 let event = koi_health::HealthEvent::StatusChanged {
1377 name: "db".to_string(),
1378 status: koi_health::HealthStatus::Down,
1379 };
1380 let mapped = map_health_event(event);
1381 match mapped {
1382 KoiEvent::HealthChanged { name, status } => {
1383 assert_eq!(name, "db");
1384 assert!(matches!(status, koi_health::HealthStatus::Down));
1385 }
1386 other => panic!("expected HealthChanged, got {other:?}"),
1387 }
1388 }
1389
1390 #[test]
1393 fn map_dns_entry_updated() {
1394 let event = koi_dns::DnsEvent::EntryUpdated {
1395 name: "grafana".to_string(),
1396 ip: "10.0.0.5".to_string(),
1397 };
1398 let mapped = map_dns_event(event);
1399 match mapped {
1400 KoiEvent::DnsEntryUpdated { name, ip } => {
1401 assert_eq!(name, "grafana");
1402 assert_eq!(ip, "10.0.0.5");
1403 }
1404 other => panic!("expected DnsEntryUpdated, got {other:?}"),
1405 }
1406 }
1407
1408 #[test]
1409 fn map_dns_entry_removed() {
1410 let event = koi_dns::DnsEvent::EntryRemoved {
1411 name: "old-host".to_string(),
1412 };
1413 let mapped = map_dns_event(event);
1414 match mapped {
1415 KoiEvent::DnsEntryRemoved { name } => {
1416 assert_eq!(name, "old-host");
1417 }
1418 other => panic!("expected DnsEntryRemoved, got {other:?}"),
1419 }
1420 }
1421
1422 #[test]
1425 fn map_certmesh_member_joined() {
1426 let event = koi_certmesh::CertmeshEvent::MemberJoined {
1427 hostname: "node-a".to_string(),
1428 fingerprint: "sha256:abc".to_string(),
1429 };
1430 let mapped = map_certmesh_event(event);
1431 match mapped {
1432 KoiEvent::CertmeshMemberJoined {
1433 hostname,
1434 fingerprint,
1435 } => {
1436 assert_eq!(hostname, "node-a");
1437 assert_eq!(fingerprint, "sha256:abc");
1438 }
1439 other => panic!("expected CertmeshMemberJoined, got {other:?}"),
1440 }
1441 }
1442
1443 #[test]
1444 fn map_certmesh_member_revoked() {
1445 let event = koi_certmesh::CertmeshEvent::MemberRevoked {
1446 hostname: "node-b".to_string(),
1447 };
1448 let mapped = map_certmesh_event(event);
1449 match mapped {
1450 KoiEvent::CertmeshMemberRevoked { hostname } => {
1451 assert_eq!(hostname, "node-b");
1452 }
1453 other => panic!("expected CertmeshMemberRevoked, got {other:?}"),
1454 }
1455 }
1456
1457 #[test]
1458 fn map_certmesh_destroyed() {
1459 let event = koi_certmesh::CertmeshEvent::Destroyed;
1460 let mapped = map_certmesh_event(event);
1461 assert!(matches!(mapped, KoiEvent::CertmeshDestroyed));
1462 }
1463
1464 #[test]
1467 fn map_proxy_entry_updated() {
1468 let entry = koi_proxy::ProxyEntry {
1469 name: "web".to_string(),
1470 listen_port: 443,
1471 backend: "http://localhost:3000".to_string(),
1472 allow_remote: true,
1473 };
1474 let event = koi_proxy::ProxyEvent::EntryUpdated {
1475 entry: entry.clone(),
1476 };
1477 let mapped = map_proxy_event(event);
1478 match mapped {
1479 KoiEvent::ProxyEntryUpdated { entry } => {
1480 assert_eq!(entry.name, "web");
1481 assert_eq!(entry.listen_port, 443);
1482 assert!(entry.allow_remote);
1483 }
1484 other => panic!("expected ProxyEntryUpdated, got {other:?}"),
1485 }
1486 }
1487
1488 #[test]
1489 fn map_proxy_entry_removed() {
1490 let event = koi_proxy::ProxyEvent::EntryRemoved {
1491 name: "old-proxy".to_string(),
1492 };
1493 let mapped = map_proxy_event(event);
1494 match mapped {
1495 KoiEvent::ProxyEntryRemoved { name } => {
1496 assert_eq!(name, "old-proxy");
1497 }
1498 other => panic!("expected ProxyEntryRemoved, got {other:?}"),
1499 }
1500 }
1501
1502 #[test]
1505 fn map_join_error_produces_io_error() {
1506 let io_err = std::io::Error::other("simulated join error");
1509 let koi_err = KoiError::Io(io_err);
1510 assert!(koi_err.to_string().contains("simulated join error"));
1511 }
1512
1513 #[test]
1516 fn builder_default_config() {
1517 let builder = Builder::new();
1518 let embedded = builder.build().expect("build should succeed");
1519 assert!(embedded.config.mdns_enabled);
1520 assert!(!embedded.config.http_enabled);
1521 assert_eq!(embedded.config.http_port, 5641);
1522 }
1523
1524 #[test]
1525 fn builder_default_trait() {
1526 let builder = Builder::default();
1527 let embedded = builder.build().expect("build should succeed");
1528 assert_eq!(embedded.config.service_endpoint, "http://127.0.0.1:5641");
1529 }
1530
1531 #[test]
1532 fn builder_fluent_overrides() {
1533 let embedded = Builder::new()
1534 .http(true)
1535 .mdns(false)
1536 .dns_enabled(false)
1537 .health(true)
1538 .certmesh(true)
1539 .proxy(true)
1540 .udp(true)
1541 .http_port(9000)
1542 .dashboard(true)
1543 .api_docs(true)
1544 .mdns_browser(true)
1545 .announce_http(true)
1546 .dns_auto_start(true)
1547 .health_auto_start(true)
1548 .proxy_auto_start(true)
1549 .service_endpoint("http://10.0.0.1:8080")
1550 .service_mode(ServiceMode::EmbeddedOnly)
1551 .data_dir("/tmp/koi-test")
1552 .build()
1553 .expect("build should succeed");
1554
1555 assert!(embedded.config.http_enabled);
1556 assert!(!embedded.config.mdns_enabled);
1557 assert!(!embedded.config.dns_enabled);
1558 assert!(embedded.config.health_enabled);
1559 assert!(embedded.config.certmesh_enabled);
1560 assert!(embedded.config.proxy_enabled);
1561 assert!(embedded.config.udp_enabled);
1562 assert_eq!(embedded.config.http_port, 9000);
1563 assert!(embedded.config.dashboard_enabled);
1564 assert!(embedded.config.api_docs_enabled);
1565 assert!(embedded.config.mdns_browser_enabled);
1566 assert!(embedded.config.announce_http);
1567 assert!(embedded.config.dns_auto_start);
1568 assert!(embedded.config.health_auto_start);
1569 assert!(embedded.config.proxy_auto_start);
1570 assert_eq!(embedded.config.service_endpoint, "http://10.0.0.1:8080");
1571 assert_eq!(embedded.config.service_mode, ServiceMode::EmbeddedOnly);
1572 assert_eq!(
1573 embedded.config.data_dir,
1574 Some(std::path::PathBuf::from("/tmp/koi-test"))
1575 );
1576 }
1577
1578 #[test]
1579 fn builder_dns_configure_closure() {
1580 let embedded = Builder::new()
1581 .dns(|b| b.port(5353).zone("home").local_ttl(120))
1582 .build()
1583 .expect("build should succeed");
1584
1585 assert_eq!(embedded.config.dns_config.port, 5353);
1586 assert_eq!(embedded.config.dns_config.zone, "home");
1587 assert_eq!(embedded.config.dns_config.local_ttl, 120);
1588 }
1589
1590 #[test]
1591 fn builder_event_handler() {
1592 use std::sync::atomic::{AtomicBool, Ordering};
1593 let called = Arc::new(AtomicBool::new(false));
1594 let called_clone = called.clone();
1595
1596 let embedded = Builder::new()
1597 .events(move |_event| {
1598 called_clone.store(true, Ordering::SeqCst);
1599 })
1600 .build()
1601 .expect("build should succeed");
1602
1603 assert!(embedded.event_handler.is_some());
1604 }
1605
1606 #[test]
1607 fn builder_extra_firewall_ports() {
1608 use koi_common::firewall::{FirewallPort, FirewallProtocol};
1609 let extra = vec![FirewallPort::new("Custom", FirewallProtocol::Tcp, 12345)];
1610 let _builder = Builder::new().extra_firewall_ports(extra);
1611 }
1613
1614 #[test]
1617 fn result_type_works_with_ok() {
1618 let result: Result<i32> = Ok(42);
1619 assert_eq!(result.unwrap(), 42);
1620 }
1621
1622 #[test]
1623 fn result_type_works_with_err() {
1624 let result: Result<i32> = Err(KoiError::DisabledCapability("test"));
1625 assert!(result.is_err());
1626 }
1627}
1628
1629async fn run_type_browse_embedded(
1630 handle: koi_mdns::BrowseHandle,
1631 records: Arc<
1632 std::sync::RwLock<
1633 std::collections::HashMap<String, std::collections::HashMap<String, ServiceRecord>>,
1634 >,
1635 >,
1636 cancel: CancellationToken,
1637) {
1638 loop {
1639 tokio::select! {
1640 _ = cancel.cancelled() => break,
1641 event = handle.recv() => {
1642 let Some(event) = event else { break; };
1643 match event {
1644 koi_mdns::events::MdnsEvent::Resolved(record) => {
1645 let mut guard = records.write().unwrap_or_else(|e| e.into_inner());
1646 let entry = guard.entry(record.service_type.clone()).or_default();
1647 entry.insert(record.name.clone(), record);
1648 }
1649 koi_mdns::events::MdnsEvent::Removed { name, service_type } => {
1650 let mut guard = records.write().unwrap_or_else(|e| e.into_inner());
1651 let st = if service_type.is_empty() {
1652 name.find("._").map(|idx| {
1653 let rest = &name[idx + 1..];
1654 rest.trim_end_matches('.').trim_end_matches(".local").to_string()
1655 })
1656 } else {
1657 Some(service_type)
1658 };
1659 if let Some(st) = st {
1660 if let Some(map) = guard.get_mut(&st) {
1661 let instance = name.find("._").map(|idx| name[..idx].to_string());
1662 if let Some(instance) = instance {
1663 map.remove(&instance);
1664 }
1665 }
1666 }
1667 }
1668 _ => {}
1669 }
1670 }
1671 }
1672 }
1673}