1mod config;
2mod events;
3mod handle;
4pub(crate) mod http;
5
6use std::sync::Arc;
7
8use tokio::sync::broadcast;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11
12use koi_client::KoiClient;
13
14pub use config::{DnsConfigBuilder, KoiConfig, ServiceMode};
15pub use events::KoiEvent;
16pub use handle::{CertmeshHandle, DnsHandle, HealthHandle, KoiHandle, MdnsHandle, ProxyHandle};
17
18pub use koi_common::firewall::{FirewallPort, FirewallProtocol};
20pub use koi_common::types::ServiceRecord;
21pub use koi_config::state::DnsEntry;
22pub use koi_health::{HealthCheck, HealthSnapshot, ServiceCheckKind};
23pub use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
24pub use koi_mdns::MdnsEvent;
25pub use koi_proxy::ProxyEntry;
26
27pub use koi_crypto::vault::{Vault, VaultError};
29
30pub use koi_runtime::{RuntimeBackendKind, RuntimeConfig};
32
33pub type Result<T> = std::result::Result<T, KoiError>;
34
35#[derive(Debug, thiserror::Error)]
36pub enum KoiError {
37 #[error("capability disabled: {0}")]
38 DisabledCapability(&'static str),
39 #[error("mdns error: {0}")]
40 Mdns(#[from] koi_mdns::MdnsError),
41 #[error("dns error: {0}")]
42 Dns(#[from] koi_dns::DnsError),
43 #[error("health error: {0}")]
44 Health(#[from] koi_health::HealthError),
45 #[error("proxy error: {0}")]
46 Proxy(#[from] koi_proxy::ProxyError),
47 #[error("certmesh error: {0}")]
48 Certmesh(#[from] koi_certmesh::CertmeshError),
49 #[error("runtime error: {0}")]
50 Runtime(#[from] koi_runtime::RuntimeError),
51 #[error("client error: {0}")]
52 Client(#[from] koi_client::ClientError),
53 #[error("io error: {0}")]
54 Io(#[from] std::io::Error),
55}
56
57pub struct Builder {
58 config: KoiConfig,
59 event_handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
60 extra_firewall_ports: Vec<koi_common::firewall::FirewallPort>,
61}
62
63impl Builder {
64 pub fn new() -> Self {
65 Self {
66 config: KoiConfig::default(),
67 event_handler: None,
68 extra_firewall_ports: Vec::new(),
69 }
70 }
71
72 pub fn data_dir(mut self, path: impl Into<std::path::PathBuf>) -> Self {
73 self.config.data_dir = Some(path.into());
74 self
75 }
76
77 pub fn service_endpoint(mut self, endpoint: impl Into<String>) -> Self {
78 self.config.service_endpoint = endpoint.into();
79 self
80 }
81
82 pub fn service_mode(mut self, mode: ServiceMode) -> Self {
83 self.config.service_mode = mode;
84 self
85 }
86
87 pub fn http(mut self, enabled: bool) -> Self {
88 self.config.http_enabled = enabled;
89 self
90 }
91
92 pub fn mdns(mut self, enabled: bool) -> Self {
93 self.config.mdns_enabled = enabled;
94 self
95 }
96
97 pub fn dns<F>(mut self, configure: F) -> Self
98 where
99 F: FnOnce(DnsConfigBuilder) -> DnsConfigBuilder,
100 {
101 let builder = DnsConfigBuilder::new(self.config.dns_config.clone());
102 self.config.dns_config = configure(builder).build();
103 self
104 }
105
106 pub fn dns_enabled(mut self, enabled: bool) -> Self {
107 self.config.dns_enabled = enabled;
108 self
109 }
110
111 pub fn dns_auto_start(mut self, enabled: bool) -> Self {
112 self.config.dns_auto_start = enabled;
113 self
114 }
115
116 pub fn health(mut self, enabled: bool) -> Self {
117 self.config.health_enabled = enabled;
118 self
119 }
120
121 pub fn health_auto_start(mut self, enabled: bool) -> Self {
122 self.config.health_auto_start = enabled;
123 self
124 }
125
126 pub fn certmesh(mut self, enabled: bool) -> Self {
127 self.config.certmesh_enabled = enabled;
128 self
129 }
130
131 pub fn proxy(mut self, enabled: bool) -> Self {
132 self.config.proxy_enabled = enabled;
133 self
134 }
135
136 pub fn proxy_auto_start(mut self, enabled: bool) -> Self {
137 self.config.proxy_auto_start = enabled;
138 self
139 }
140
141 pub fn udp(mut self, enabled: bool) -> Self {
142 self.config.udp_enabled = enabled;
143 self
144 }
145
146 pub fn runtime(mut self, kind: koi_runtime::RuntimeBackendKind) -> Self {
151 self.config.runtime_enabled = true;
152 self.config.runtime_backend = kind;
153 self
154 }
155
156 pub fn runtime_auto(mut self) -> Self {
158 self.config.runtime_enabled = true;
159 self.config.runtime_backend = koi_runtime::RuntimeBackendKind::Auto;
160 self
161 }
162
163 pub fn http_port(mut self, port: u16) -> Self {
164 self.config.http_port = port;
165 self
166 }
167
168 pub fn dashboard(mut self, enabled: bool) -> Self {
169 self.config.dashboard_enabled = enabled;
170 self
171 }
172
173 pub fn api_docs(mut self, enabled: bool) -> Self {
174 self.config.api_docs_enabled = enabled;
175 self
176 }
177
178 pub fn mdns_browser(mut self, enabled: bool) -> Self {
179 self.config.mdns_browser_enabled = enabled;
180 self
181 }
182
183 pub fn announce_http(mut self, enabled: bool) -> Self {
184 self.config.announce_http = enabled;
185 self
186 }
187
188 pub fn events<F>(mut self, handler: F) -> Self
189 where
190 F: Fn(KoiEvent) + Send + Sync + 'static,
191 {
192 self.event_handler = Some(Arc::new(handler));
193 self
194 }
195
196 pub fn extra_firewall_ports(mut self, ports: Vec<koi_common::firewall::FirewallPort>) -> Self {
201 self.extra_firewall_ports = ports;
202 self
203 }
204
205 pub fn ensure_firewall_rules(self, prefix: &str) -> Self {
216 let mut all_ports = self.config.firewall_ports();
217 all_ports.extend(self.extra_firewall_ports.iter().cloned());
218
219 let count = koi_common::firewall::ensure_firewall_rules(prefix, &all_ports);
220 if count > 0 {
221 tracing::info!(count, "Firewall rules ensured");
222 }
223 self
224 }
225
226 pub fn build(self) -> Result<KoiEmbedded> {
227 Ok(KoiEmbedded {
228 config: self.config,
229 event_handler: self.event_handler,
230 })
231 }
232}
233
234impl Default for Builder {
235 fn default() -> Self {
236 Self::new()
237 }
238}
239
240pub struct KoiEmbedded {
241 config: KoiConfig,
242 event_handler: Option<Arc<dyn Fn(KoiEvent) + Send + Sync>>,
243}
244
245impl KoiEmbedded {
246 pub async fn start(self) -> Result<KoiHandle> {
247 let cancel = CancellationToken::new();
248 let (event_tx, _) = broadcast::channel(256);
249 let mut tasks: Vec<JoinHandle<()>> = Vec::new();
250
251 if self.config.service_mode != ServiceMode::EmbeddedOnly {
252 let client = Arc::new(KoiClient::new(&self.config.service_endpoint));
253 match self.config.service_mode {
254 ServiceMode::ClientOnly => {
255 tokio::task::spawn_blocking({
256 let client = Arc::clone(&client);
257 move || client.health()
258 })
259 .await
260 .map_err(map_join_error)??;
261 return Ok(KoiHandle::new_remote(client, event_tx, cancel, tasks));
262 }
263 ServiceMode::Auto => {
264 let health = tokio::task::spawn_blocking({
265 let client = Arc::clone(&client);
266 move || client.health()
267 })
268 .await;
269 if matches!(health, Ok(Ok(()))) {
270 return Ok(KoiHandle::new_remote(client, event_tx, cancel, tasks));
271 }
272 }
273 ServiceMode::EmbeddedOnly => {}
274 }
275 }
276
277 let mdns = if self.config.mdns_enabled {
278 Some(Arc::new(koi_mdns::MdnsCore::with_cancel(cancel.clone())?))
279 } else {
280 None
281 };
282
283 let certmesh = if self.config.certmesh_enabled {
284 let data_dir = self.config.data_dir.clone();
285 tokio::task::spawn_blocking(move || init_certmesh_core(data_dir.as_deref()))
286 .await
287 .map_err(|e| std::io::Error::other(format!("certmesh init: {e}")))?
288 } else {
289 None
290 };
291
292 let mdns_bridge: Option<Arc<dyn koi_common::integration::MdnsSnapshot>> =
294 if let Some(ref core) = mdns {
295 Some(MdnsBridgeEmbedded::spawn(core.clone()).await)
296 } else {
297 None
298 };
299
300 let certmesh_bridge: Option<Arc<dyn koi_common::integration::CertmeshSnapshot>> =
301 certmesh.as_ref().map(|core| {
302 CertmeshBridgeEmbedded::new(core.clone())
303 as Arc<dyn koi_common::integration::CertmeshSnapshot>
304 });
305
306 let alias_feedback: Option<Arc<dyn koi_common::integration::AliasFeedback>> =
307 certmesh.as_ref().map(|core| {
308 AliasFeedbackBridgeEmbedded::new(core.clone())
309 as Arc<dyn koi_common::integration::AliasFeedback>
310 });
311
312 let dns = if self.config.dns_enabled {
313 let mut dns_config = self.config.dns_config.clone();
314 if let Some(dir) = &self.config.data_dir {
317 dns_config.state_path = Some(dir.join("state").join("dns.json"));
318 }
319 let core = koi_dns::DnsCore::new(
320 dns_config,
321 mdns_bridge.clone(),
322 certmesh_bridge.clone(),
323 alias_feedback,
324 )
325 .await?;
326 Some(Arc::new(koi_dns::DnsRuntime::new(core)))
327 } else {
328 None
329 };
330
331 let proxy = if self.config.proxy_enabled {
332 let core = if let Some(dir) = &self.config.data_dir {
333 Arc::new(koi_proxy::ProxyCore::with_data_dir(dir)?)
334 } else {
335 Arc::new(koi_proxy::ProxyCore::new()?)
336 };
337 Some(Arc::new(koi_proxy::ProxyRuntime::new(core)))
338 } else {
339 None
340 };
341
342 let dns_bridge: Option<Arc<dyn koi_common::integration::DnsProbe>> =
343 dns.as_ref().map(|rt| {
344 DnsBridgeEmbedded::new(rt.clone()) as Arc<dyn koi_common::integration::DnsProbe>
345 });
346
347 let proxy_bridge: Option<Arc<dyn koi_common::integration::ProxySnapshot>> =
348 proxy.as_ref().map(|rt| {
349 ProxyBridgeEmbedded::new(rt.core())
350 as Arc<dyn koi_common::integration::ProxySnapshot>
351 });
352
353 let health = if self.config.health_enabled {
354 let core = koi_health::HealthCore::new(
355 mdns_bridge.clone(),
356 dns_bridge,
357 certmesh_bridge,
358 proxy_bridge,
359 )
360 .await;
361 Some(Arc::new(koi_health::HealthRuntime::new(Arc::new(core))))
362 } else {
363 None
364 };
365
366 if let Some(runtime) = &dns {
367 if self.config.dns_auto_start {
368 let _ = runtime.start().await?;
369 }
370 }
371
372 if let Some(runtime) = &health {
373 if self.config.health_auto_start {
374 let _ = runtime.start().await?;
375 }
376 }
377
378 if let Some(runtime) = &proxy {
379 if self.config.proxy_auto_start {
380 runtime.start_all().await?;
381 }
382 }
383
384 let udp = if self.config.udp_enabled {
385 Some(Arc::new(koi_udp::UdpRuntime::new(cancel.clone())))
386 } else {
387 None
388 };
389
390 let runtime = if self.config.runtime_enabled {
391 let config = koi_runtime::RuntimeConfig {
392 backend_kind: self.config.runtime_backend,
393 socket_path: None,
394 };
395 let core = Arc::new(koi_runtime::RuntimeCore::new(config));
396 match core.start_watching(cancel.clone()).await {
397 Ok(()) => {
398 tracing::info!("Runtime adapter started");
399 Some(core)
400 }
401 Err(e) => {
402 tracing::warn!(error = %e, "Runtime backend unavailable — continuing without runtime adapter");
403 None
404 }
405 }
406 } else {
407 None
408 };
409
410 let dashboard_state = if self.config.dashboard_enabled && self.config.http_enabled {
412 let started_at = std::time::Instant::now();
413 let snap_mdns = mdns.clone();
414 let snap_certmesh = certmesh.clone();
415 let snap_dns = dns.clone();
416 let snap_health = health.clone();
417 let snap_proxy = proxy.clone();
418 let snap_udp = udp.clone();
419 let snap_runtime = runtime.clone();
420
421 let snapshot_fn: koi_dashboard::dashboard::SnapshotFn = Arc::new(move || {
422 let m = snap_mdns.clone();
423 let cm = snap_certmesh.clone();
424 let d = snap_dns.clone();
425 let h = snap_health.clone();
426 let p = snap_proxy.clone();
427 let u = snap_udp.clone();
428 let rt = snap_runtime.clone();
429 Box::pin(async move { build_embedded_snapshot(m, cm, d, h, p, u, rt).await })
430 });
431
432 let (dash_event_tx, _) = broadcast::channel(256);
433 let ds = koi_dashboard::dashboard::DashboardState {
434 identity: koi_dashboard::dashboard::DashboardIdentity {
435 version: env!("CARGO_PKG_VERSION").to_string(),
436 platform: std::env::consts::OS.to_string(),
437 },
438 mode: "embedded",
439 snapshot_fn,
440 event_tx: dash_event_tx.clone(),
441 started_at,
442 };
443
444 tasks.push(koi_dashboard::forward::spawn_event_forwarder(
447 koi_dashboard::forward::ForwarderCores {
448 mdns: mdns.clone(),
449 certmesh: certmesh.clone(),
450 dns: dns.clone(),
451 health: health.clone(),
452 proxy: proxy.clone(),
453 runtime: runtime.clone(),
454 },
455 dash_event_tx,
456 cancel.clone(),
457 ));
458
459 Some(ds)
460 } else {
461 None
462 };
463
464 let browser_state = if self.config.mdns_browser_enabled && self.config.http_enabled {
467 if let Some(ref mdns_core) = mdns {
468 Some(koi_dashboard::browser::build_state(
469 mdns_core.clone(),
470 cancel.clone(),
471 ))
472 } else {
473 tracing::warn!("mdns_browser enabled but mDNS is disabled — skipping browser");
474 None
475 }
476 } else {
477 None
478 };
479
480 if self.config.http_enabled {
482 let http_port = self.config.http_port;
483 let http_cancel = cancel.clone();
484 let http_mdns = mdns.clone();
485 let http_dns = dns.clone();
486 let http_health = health.clone();
487 let http_certmesh = certmesh.clone();
488 let http_proxy = proxy.clone();
489 let http_udp = udp.clone();
490 let http_runtime = runtime.clone();
491 let http_api_docs = self.config.api_docs_enabled;
492 tasks.push(tokio::spawn(async move {
493 http::serve(
494 http_port,
495 http_mdns,
496 http_dns,
497 http_health,
498 http_certmesh,
499 http_proxy,
500 http_udp,
501 http_runtime,
502 dashboard_state,
503 browser_state,
504 http_api_docs,
505 http_cancel,
506 )
507 .await;
508 }));
509 }
510
511 let http_announce_id =
513 if self.config.announce_http && self.config.http_enabled && self.config.mdns_enabled {
514 if let Some(ref mdns_core) = mdns {
515 let hostname = hostname::get()
516 .ok()
517 .and_then(|os| os.into_string().ok())
518 .unwrap_or_else(|| "unknown".to_string());
519
520 let mut txt = std::collections::HashMap::new();
521 txt.insert("path".to_string(), "/".to_string());
522 txt.insert("version".to_string(), env!("CARGO_PKG_VERSION").to_string());
523 txt.insert("api".to_string(), "v1".to_string());
524 txt.insert(
525 "dashboard".to_string(),
526 self.config.dashboard_enabled.to_string(),
527 );
528
529 let payload = koi_mdns::protocol::RegisterPayload {
530 name: format!("Koi ({hostname})"),
531 service_type: "_http._tcp".to_string(),
532 port: self.config.http_port,
533 ip: None,
534 lease_secs: None,
535 txt,
536 };
537 match mdns_core.register(payload) {
538 Ok(result) => {
539 tracing::info!(
540 id = %result.id,
541 port = self.config.http_port,
542 "HTTP server announced via mDNS"
543 );
544 Some(result.id)
545 }
546 Err(e) => {
547 tracing::warn!(error = %e, "Failed to announce HTTP server via mDNS");
548 None
549 }
550 }
551 } else {
552 None
553 }
554 } else {
555 None
556 };
557
558 if let Some(core) = &mdns {
559 let mut rx = core.subscribe();
560 let tx = event_tx.clone();
561 let token = cancel.clone();
562 let handler = self.event_handler.clone();
563 tasks.push(tokio::spawn(async move {
564 loop {
565 tokio::select! {
566 _ = token.cancelled() => break,
567 msg = rx.recv() => {
568 let Ok(event) = msg else { continue; };
569 let mapped = map_mdns_event(event);
570 if let Some(mapped) = mapped {
571 emit_event(&tx, handler.as_ref(), mapped);
572 }
573 }
574 }
575 }
576 }));
577 }
578
579 if self.config.health_enabled {
580 if let Some(runtime) = &health {
581 let mut rx = runtime.core().subscribe();
582 let tx = event_tx.clone();
583 let token = cancel.clone();
584 let handler = self.event_handler.clone();
585 tasks.push(tokio::spawn(async move {
586 loop {
587 tokio::select! {
588 _ = token.cancelled() => break,
589 msg = rx.recv() => {
590 let Ok(event) = msg else { continue; };
591 let mapped = map_health_event(event);
592 emit_event(&tx, handler.as_ref(), mapped);
593 }
594 }
595 }
596 }));
597 }
598 }
599
600 if self.config.dns_enabled {
601 if let Some(runtime) = &dns {
602 let mut rx = runtime.core().subscribe();
603 let tx = event_tx.clone();
604 let token = cancel.clone();
605 let handler = self.event_handler.clone();
606 tasks.push(tokio::spawn(async move {
607 loop {
608 tokio::select! {
609 _ = token.cancelled() => break,
610 msg = rx.recv() => {
611 let Ok(event) = msg else { continue; };
612 let mapped = map_dns_event(event);
613 emit_event(&tx, handler.as_ref(), mapped);
614 }
615 }
616 }
617 }));
618 }
619 }
620
621 if self.config.certmesh_enabled {
622 if let Some(core) = &certmesh {
623 let mut rx = core.subscribe();
624 let tx = event_tx.clone();
625 let token = cancel.clone();
626 let handler = self.event_handler.clone();
627 tasks.push(tokio::spawn(async move {
628 loop {
629 tokio::select! {
630 _ = token.cancelled() => break,
631 msg = rx.recv() => {
632 let Ok(event) = msg else { continue; };
633 let mapped = map_certmesh_event(event);
634 emit_event(&tx, handler.as_ref(), mapped);
635 }
636 }
637 }
638 }));
639 }
640 }
641
642 if self.config.proxy_enabled {
643 if let Some(runtime_proxy) = &proxy {
644 let mut rx = runtime_proxy.core().subscribe();
645 let tx = event_tx.clone();
646 let token = cancel.clone();
647 let handler = self.event_handler.clone();
648 tasks.push(tokio::spawn(async move {
649 loop {
650 tokio::select! {
651 _ = token.cancelled() => break,
652 msg = rx.recv() => {
653 let Ok(event) = msg else { continue; };
654 let mapped = map_proxy_event(event);
655 emit_event(&tx, handler.as_ref(), mapped);
656 }
657 }
658 }
659 }));
660 }
661 }
662
663 if let Some(ref runtime_core) = runtime {
664 let mut rx = runtime_core.subscribe();
665 let tx = event_tx.clone();
666 let token = cancel.clone();
667 let handler = self.event_handler.clone();
668 tasks.push(tokio::spawn(async move {
669 loop {
670 tokio::select! {
671 _ = token.cancelled() => break,
672 msg = rx.recv() => {
673 let Ok(event) = msg else { continue; };
674 if let Some(mapped) = map_runtime_event(event) {
675 emit_event(&tx, handler.as_ref(), mapped);
676 }
677 }
678 }
679 }
680 }));
681 }
682
683 Ok(KoiHandle::new_embedded(
684 mdns,
685 dns,
686 health,
687 certmesh,
688 proxy,
689 udp,
690 runtime,
691 self.config.data_dir.clone(),
692 event_tx,
693 cancel,
694 tasks,
695 http_announce_id,
696 ))
697 }
698}
699
700fn init_certmesh_core(
701 data_dir: Option<&std::path::Path>,
702) -> Option<Arc<koi_certmesh::CertmeshCore>> {
703 let paths = match data_dir {
704 Some(dir) => koi_certmesh::CertmeshPaths::with_data_dir(dir.to_path_buf()),
705 None => koi_certmesh::CertmeshPaths::default(),
706 };
707 if !paths.is_ca_initialized() {
708 return Some(Arc::new(koi_certmesh::CertmeshCore::uninitialized()));
709 }
710
711 let roster_path = paths.roster_path();
712 let roster = match koi_certmesh::roster::load_roster(&roster_path) {
713 Ok(r) => r,
714 Err(_) => {
715 return Some(Arc::new(koi_certmesh::CertmeshCore::uninitialized()));
716 }
717 };
718
719 let profile = roster.metadata.trust_profile;
720
721 let resolved_data_dir = koi_common::paths::koi_data_dir_with_override(data_dir);
726 let auto_key_path = resolved_data_dir.join("auto-unlock-key");
727 if let Ok(pp) = std::fs::read_to_string(&auto_key_path) {
728 if !pp.is_empty() {
729 match koi_certmesh::ca::load_ca(&pp, &paths) {
730 Ok(ca_state) => {
731 if let Ok(fresh_roster) = koi_certmesh::roster::load_roster(&roster_path) {
733 let auth_path = paths.auth_path();
734 let auth = if auth_path.exists() {
735 std::fs::read_to_string(&auth_path)
736 .ok()
737 .and_then(|json| {
738 serde_json::from_str::<koi_crypto::auth::StoredAuth>(&json).ok()
739 })
740 .and_then(|stored| stored.unlock(&pp).ok())
741 } else {
742 None
743 };
744
745 tracing::info!("Certmesh CA auto-unlocked at init");
746 return Some(Arc::new(koi_certmesh::CertmeshCore::new(
747 ca_state,
748 fresh_roster,
749 auth,
750 profile,
751 )));
752 }
753 }
754 Err(e) => {
755 tracing::warn!(
756 error = %e,
757 "Auto-unlock key exists but decryption failed"
758 );
759 }
760 }
761 }
762 }
763
764 let core = koi_certmesh::CertmeshCore::locked(roster, profile);
766 Some(Arc::new(core))
767}
768
769fn map_mdns_event(event: MdnsEvent) -> Option<KoiEvent> {
770 match event {
771 MdnsEvent::Found(record) => Some(KoiEvent::MdnsFound(record)),
772 MdnsEvent::Resolved(record) => Some(KoiEvent::MdnsResolved(record)),
773 MdnsEvent::Removed { name, service_type } => {
774 Some(KoiEvent::MdnsRemoved { name, service_type })
775 }
776 }
777}
778
779fn map_health_event(event: koi_health::HealthEvent) -> KoiEvent {
780 match event {
781 koi_health::HealthEvent::StatusChanged { name, status } => {
782 KoiEvent::HealthChanged { name, status }
783 }
784 }
785}
786
787fn map_dns_event(event: koi_dns::DnsEvent) -> KoiEvent {
788 match event {
789 koi_dns::DnsEvent::EntryUpdated { name, ip } => KoiEvent::DnsEntryUpdated { name, ip },
790 koi_dns::DnsEvent::EntryRemoved { name } => KoiEvent::DnsEntryRemoved { name },
791 }
792}
793
794fn map_certmesh_event(event: koi_certmesh::CertmeshEvent) -> KoiEvent {
795 match event {
796 koi_certmesh::CertmeshEvent::MemberJoined {
797 hostname,
798 fingerprint,
799 } => KoiEvent::CertmeshMemberJoined {
800 hostname,
801 fingerprint,
802 },
803 koi_certmesh::CertmeshEvent::MemberRevoked { hostname } => {
804 KoiEvent::CertmeshMemberRevoked { hostname }
805 }
806 koi_certmesh::CertmeshEvent::Destroyed => KoiEvent::CertmeshDestroyed,
807 }
808}
809
810fn map_proxy_event(event: koi_proxy::ProxyEvent) -> KoiEvent {
811 match event {
812 koi_proxy::ProxyEvent::EntryUpdated { entry } => KoiEvent::ProxyEntryUpdated { entry },
813 koi_proxy::ProxyEvent::EntryRemoved { name } => KoiEvent::ProxyEntryRemoved { name },
814 }
815}
816
817fn map_runtime_event(event: koi_runtime::RuntimeEvent) -> Option<KoiEvent> {
818 match event {
819 koi_runtime::RuntimeEvent::Started(instance) => Some(KoiEvent::RuntimeInstanceStarted {
820 name: instance.name,
821 backend: instance.backend,
822 }),
823 koi_runtime::RuntimeEvent::Stopped { name, .. } => {
824 Some(KoiEvent::RuntimeInstanceStopped { name })
825 }
826 _ => None,
829 }
830}
831
832fn emit_event(
833 tx: &broadcast::Sender<KoiEvent>,
834 handler: Option<&Arc<dyn Fn(KoiEvent) + Send + Sync>>,
835 event: KoiEvent,
836) {
837 if let Some(handler) = handler {
838 handler(event.clone());
839 }
840 let _ = tx.send(event);
841}
842
843pub(crate) fn map_join_error(err: tokio::task::JoinError) -> KoiError {
844 KoiError::Io(std::io::Error::other(err.to_string()))
845}
846
847async fn build_embedded_snapshot(
849 mdns: Option<Arc<koi_mdns::MdnsCore>>,
850 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
851 dns: Option<Arc<koi_dns::DnsRuntime>>,
852 health: Option<Arc<koi_health::HealthRuntime>>,
853 proxy: Option<Arc<koi_proxy::ProxyRuntime>>,
854 udp: Option<Arc<koi_udp::UdpRuntime>>,
855 runtime: Option<Arc<koi_runtime::RuntimeCore>>,
856) -> serde_json::Value {
857 use koi_common::capability::Capability;
858
859 let mut capabilities = Vec::new();
860
861 if let Some(ref core) = mdns {
862 let s = core.status();
863 capabilities.push(serde_json::json!({
864 "name": s.name, "enabled": true, "healthy": s.healthy, "summary": s.summary,
865 }));
866 } else {
867 capabilities.push(serde_json::json!({
868 "name": "mdns", "enabled": false, "healthy": false, "summary": "disabled",
869 }));
870 }
871
872 if let Some(ref core) = certmesh {
873 let s = core.status();
874 capabilities.push(serde_json::json!({
875 "name": s.name, "enabled": true, "healthy": s.healthy, "summary": s.summary,
876 }));
877 } else {
878 capabilities.push(serde_json::json!({
879 "name": "certmesh", "enabled": false, "healthy": false, "summary": "disabled",
880 }));
881 }
882
883 if let Some(ref runtime) = dns {
884 let running = runtime.status().await.running;
885 if running {
886 let s = runtime.core().status();
887 capabilities.push(serde_json::json!({
888 "name": s.name, "enabled": true, "healthy": s.healthy, "summary": s.summary,
889 }));
890 } else {
891 capabilities.push(serde_json::json!({
892 "name": "dns", "enabled": true, "healthy": false, "summary": "stopped",
893 }));
894 }
895 } else {
896 capabilities.push(serde_json::json!({
897 "name": "dns", "enabled": false, "healthy": false, "summary": "disabled",
898 }));
899 }
900
901 if let Some(ref runtime) = health {
902 let running = runtime.status().await.running;
903 if running {
904 let s = runtime.core().status();
905 capabilities.push(serde_json::json!({
906 "name": s.name, "enabled": true, "healthy": s.healthy, "summary": s.summary,
907 }));
908 } else {
909 capabilities.push(serde_json::json!({
910 "name": "health", "enabled": true, "healthy": false, "summary": "stopped",
911 }));
912 }
913 } else {
914 capabilities.push(serde_json::json!({
915 "name": "health", "enabled": false, "healthy": false, "summary": "disabled",
916 }));
917 }
918
919 if let Some(ref runtime) = proxy {
920 let status = runtime.status().await;
921 capabilities.push(serde_json::json!({
922 "name": "proxy", "enabled": true, "healthy": true,
923 "summary": if status.is_empty() { "no listeners".to_string() } else { format!("{} listeners", status.len()) },
924 }));
925 } else {
926 capabilities.push(serde_json::json!({
927 "name": "proxy", "enabled": false, "healthy": false, "summary": "disabled",
928 }));
929 }
930
931 if let Some(ref runtime) = udp {
932 let s = Capability::status(runtime.as_ref());
933 capabilities.push(serde_json::json!({
934 "name": s.name, "enabled": true, "healthy": s.healthy, "summary": s.summary,
935 }));
936 } else {
937 capabilities.push(serde_json::json!({
938 "name": "udp", "enabled": false, "healthy": false, "summary": "disabled",
939 }));
940 }
941
942 if let Some(ref rt) = runtime {
943 let s = rt.capability_status().await;
944 capabilities.push(serde_json::json!({
945 "name": s.name, "enabled": true, "healthy": s.healthy, "summary": s.summary,
946 }));
947 } else {
948 capabilities.push(serde_json::json!({
949 "name": "runtime", "enabled": false, "healthy": false, "summary": "disabled",
950 }));
951 }
952
953 serde_json::json!({ "capabilities": capabilities })
954}
955
956struct CertmeshBridgeEmbedded(#[allow(dead_code)] Arc<koi_certmesh::CertmeshCore>);
961
962impl CertmeshBridgeEmbedded {
963 fn new(core: Arc<koi_certmesh::CertmeshCore>) -> Arc<Self> {
964 Arc::new(Self(core))
965 }
966}
967
968impl koi_common::integration::CertmeshSnapshot for CertmeshBridgeEmbedded {
969 fn active_members(&self) -> Vec<koi_common::integration::MemberSummary> {
970 let roster_path = koi_certmesh::CertmeshPaths::default().roster_path();
971 let Ok(roster) = koi_certmesh::roster::load_roster(&roster_path) else {
972 return Vec::new();
973 };
974 roster
975 .members
976 .into_iter()
977 .filter(|m| m.status == koi_certmesh::roster::MemberStatus::Active)
978 .map(|m| koi_common::integration::MemberSummary {
979 hostname: m.hostname,
980 sans: m.cert_sans,
981 cert_expires: Some(m.cert_expires),
982 last_seen: m.last_seen,
983 status: "active".to_string(),
984 proxy_entries: m
985 .proxy_entries
986 .into_iter()
987 .map(|p| koi_common::integration::ProxyConfigSummary {
988 name: p.name,
989 listen_port: p.listen_port,
990 backend: p.backend,
991 allow_remote: p.allow_remote,
992 })
993 .collect(),
994 })
995 .collect()
996 }
997}
998
999struct MdnsBridgeEmbedded {
1000 records: Arc<
1001 std::sync::RwLock<
1002 std::collections::HashMap<String, std::collections::HashMap<String, ServiceRecord>>,
1003 >,
1004 >,
1005 cancel: CancellationToken,
1006}
1007
1008impl MdnsBridgeEmbedded {
1009 async fn spawn(core: Arc<koi_mdns::MdnsCore>) -> Arc<Self> {
1010 use koi_common::types::META_QUERY;
1011 let records = Arc::new(std::sync::RwLock::new(std::collections::HashMap::new()));
1012 let cancel = CancellationToken::new();
1013
1014 let meta_core = Arc::clone(&core);
1015 let meta_records = Arc::clone(&records);
1016 let meta_cancel = cancel.clone();
1017 tokio::spawn(async move {
1018 if let Ok(handle) = meta_core.subscribe_type(META_QUERY).await {
1019 run_meta_browse_embedded(meta_core, handle, meta_records, meta_cancel).await;
1020 }
1021 });
1022
1023 Arc::new(Self { records, cancel })
1024 }
1025}
1026
1027impl Drop for MdnsBridgeEmbedded {
1028 fn drop(&mut self) {
1029 self.cancel.cancel();
1030 }
1031}
1032
1033impl koi_common::integration::MdnsSnapshot for MdnsBridgeEmbedded {
1034 fn host_ips(&self) -> std::collections::HashMap<String, std::net::IpAddr> {
1035 let guard = self.records.read().unwrap_or_else(|e| e.into_inner());
1036 let mut map = std::collections::HashMap::new();
1037 for type_map in guard.values() {
1038 for record in type_map.values() {
1039 let Some(host) = record.host.as_deref() else {
1040 continue;
1041 };
1042 let Some(ip) = record.ip.as_deref().and_then(|ip| ip.parse().ok()) else {
1043 continue;
1044 };
1045 let hostname = host.trim_end_matches('.').trim_end_matches(".local");
1046 if !hostname.is_empty() {
1047 map.insert(hostname.to_string(), ip);
1048 }
1049 }
1050 }
1051 map
1052 }
1053
1054 fn cached_records(&self) -> Vec<ServiceRecord> {
1055 let guard = self.records.read().unwrap_or_else(|e| e.into_inner());
1056 guard.values().flat_map(|m| m.values().cloned()).collect()
1057 }
1058}
1059
1060struct DnsBridgeEmbedded(Arc<koi_dns::DnsRuntime>);
1061
1062impl DnsBridgeEmbedded {
1063 fn new(runtime: Arc<koi_dns::DnsRuntime>) -> Arc<Self> {
1064 Arc::new(Self(runtime))
1065 }
1066}
1067
1068impl koi_common::integration::DnsProbe for DnsBridgeEmbedded {
1069 fn resolve_local(&self, name: &str) -> Option<Vec<std::net::IpAddr>> {
1070 use hickory_proto::rr::RecordType;
1071 let core = self.0.core();
1072 let result = core
1073 .resolve_local(name, RecordType::A)
1074 .or_else(|| core.resolve_local(name, RecordType::AAAA));
1075 result.map(|r| r.ips)
1076 }
1077}
1078
1079struct ProxyBridgeEmbedded(#[allow(dead_code)] Arc<koi_proxy::ProxyCore>);
1080
1081impl ProxyBridgeEmbedded {
1082 fn new(core: Arc<koi_proxy::ProxyCore>) -> Arc<Self> {
1083 Arc::new(Self(core))
1084 }
1085}
1086
1087impl koi_common::integration::ProxySnapshot for ProxyBridgeEmbedded {
1088 fn entries(&self) -> Vec<koi_common::integration::ProxyEntrySummary> {
1089 let Ok(entries) = koi_proxy::config::load_entries() else {
1090 return Vec::new();
1091 };
1092 entries
1093 .into_iter()
1094 .map(|e| koi_common::integration::ProxyEntrySummary {
1095 name: e.name,
1096 listen_port: e.listen_port,
1097 backend: e.backend,
1098 })
1099 .collect()
1100 }
1101}
1102
1103struct AliasFeedbackBridgeEmbedded(Arc<koi_certmesh::CertmeshCore>);
1104
1105impl AliasFeedbackBridgeEmbedded {
1106 fn new(core: Arc<koi_certmesh::CertmeshCore>) -> Arc<Self> {
1107 Arc::new(Self(core))
1108 }
1109}
1110
1111impl koi_common::integration::AliasFeedback for AliasFeedbackBridgeEmbedded {
1112 fn record_alias(&self, hostname: &str, alias: &str) {
1113 let core = Arc::clone(&self.0);
1114 let hostname = hostname.to_string();
1115 let alias = alias.to_string();
1116 tokio::spawn(async move {
1117 let _ = core.add_alias_sans(&hostname, &[alias]).await;
1118 });
1119 }
1120}
1121
1122async fn run_meta_browse_embedded(
1123 core: Arc<koi_mdns::MdnsCore>,
1124 handle: koi_mdns::BrowseSubscription,
1125 records: Arc<
1126 std::sync::RwLock<
1127 std::collections::HashMap<String, std::collections::HashMap<String, ServiceRecord>>,
1128 >,
1129 >,
1130 cancel: CancellationToken,
1131) {
1132 let mut seen = std::collections::HashSet::<String>::new();
1135 loop {
1136 tokio::select! {
1137 _ = cancel.cancelled() => break,
1138 event = handle.recv() => {
1139 let Some(event) = event else { break; };
1140 if let koi_mdns::events::MdnsEvent::Found(record) = event {
1141 let service_type = record.name;
1142 if seen.insert(service_type.clone()) {
1143 let c = Arc::clone(&core);
1144 let r = Arc::clone(&records);
1145 let t = service_type.clone();
1146 let cancel_child = cancel.clone();
1147 tokio::spawn(async move {
1148 if let Ok(handle) = c.subscribe_type(&t).await {
1149 run_type_browse_embedded(handle, r, cancel_child).await;
1150 }
1151 });
1152 }
1153 }
1154 }
1155 }
1156 }
1157}
1158
1159#[cfg(test)]
1160mod tests {
1161 use super::*;
1162 use koi_common::types::ServiceRecord;
1163 use std::collections::HashMap;
1164
1165 fn sample_record() -> ServiceRecord {
1166 ServiceRecord {
1167 name: "Test Service".to_string(),
1168 service_type: "_http._tcp".to_string(),
1169 host: Some("host.local".to_string()),
1170 ip: Some("10.0.0.1".to_string()),
1171 port: Some(8080),
1172 txt: HashMap::new(),
1173 }
1174 }
1175
1176 #[test]
1179 fn koi_error_disabled_capability_display() {
1180 let err = KoiError::DisabledCapability("mdns");
1181 assert_eq!(err.to_string(), "capability disabled: mdns");
1182 }
1183
1184 #[test]
1185 fn koi_error_io_from_impl() {
1186 let io_err = std::io::Error::new(std::io::ErrorKind::NotFound, "file missing");
1187 let err: KoiError = io_err.into();
1188 assert!(matches!(err, KoiError::Io(_)));
1189 assert!(err.to_string().contains("file missing"));
1190 }
1191
1192 #[test]
1193 fn koi_error_debug_does_not_panic() {
1194 let err = KoiError::DisabledCapability("proxy");
1195 let debug = format!("{err:?}");
1196 assert!(debug.contains("DisabledCapability"));
1197 }
1198
1199 #[test]
1202 fn map_mdns_found() {
1203 let record = sample_record();
1204 let event = koi_mdns::MdnsEvent::Found(record.clone());
1205 let mapped = map_mdns_event(event);
1206 assert!(mapped.is_some());
1207 match mapped.unwrap() {
1208 KoiEvent::MdnsFound(r) => assert_eq!(r.name, "Test Service"),
1209 other => panic!("expected MdnsFound, got {other:?}"),
1210 }
1211 }
1212
1213 #[test]
1214 fn map_mdns_resolved() {
1215 let record = sample_record();
1216 let event = koi_mdns::MdnsEvent::Resolved(record);
1217 let mapped = map_mdns_event(event);
1218 assert!(mapped.is_some());
1219 match mapped.unwrap() {
1220 KoiEvent::MdnsResolved(r) => {
1221 assert_eq!(r.port, Some(8080));
1222 assert_eq!(r.service_type, "_http._tcp");
1223 }
1224 other => panic!("expected MdnsResolved, got {other:?}"),
1225 }
1226 }
1227
1228 #[test]
1229 fn map_mdns_removed() {
1230 let event = koi_mdns::MdnsEvent::Removed {
1231 name: "Gone Service".to_string(),
1232 service_type: "_http._tcp".to_string(),
1233 };
1234 let mapped = map_mdns_event(event);
1235 assert!(mapped.is_some());
1236 match mapped.unwrap() {
1237 KoiEvent::MdnsRemoved { name, service_type } => {
1238 assert_eq!(name, "Gone Service");
1239 assert_eq!(service_type, "_http._tcp");
1240 }
1241 other => panic!("expected MdnsRemoved, got {other:?}"),
1242 }
1243 }
1244
1245 #[test]
1248 fn map_health_status_changed_up() {
1249 let event = koi_health::HealthEvent::StatusChanged {
1250 name: "api".to_string(),
1251 status: koi_health::HealthStatus::Up,
1252 };
1253 let mapped = map_health_event(event);
1254 match mapped {
1255 KoiEvent::HealthChanged { name, status } => {
1256 assert_eq!(name, "api");
1257 assert!(matches!(status, koi_health::HealthStatus::Up));
1258 }
1259 other => panic!("expected HealthChanged, got {other:?}"),
1260 }
1261 }
1262
1263 #[test]
1264 fn map_health_status_changed_down() {
1265 let event = koi_health::HealthEvent::StatusChanged {
1266 name: "db".to_string(),
1267 status: koi_health::HealthStatus::Down,
1268 };
1269 let mapped = map_health_event(event);
1270 match mapped {
1271 KoiEvent::HealthChanged { name, status } => {
1272 assert_eq!(name, "db");
1273 assert!(matches!(status, koi_health::HealthStatus::Down));
1274 }
1275 other => panic!("expected HealthChanged, got {other:?}"),
1276 }
1277 }
1278
1279 #[test]
1282 fn map_dns_entry_updated() {
1283 let event = koi_dns::DnsEvent::EntryUpdated {
1284 name: "grafana".to_string(),
1285 ip: "10.0.0.5".to_string(),
1286 };
1287 let mapped = map_dns_event(event);
1288 match mapped {
1289 KoiEvent::DnsEntryUpdated { name, ip } => {
1290 assert_eq!(name, "grafana");
1291 assert_eq!(ip, "10.0.0.5");
1292 }
1293 other => panic!("expected DnsEntryUpdated, got {other:?}"),
1294 }
1295 }
1296
1297 #[test]
1298 fn map_dns_entry_removed() {
1299 let event = koi_dns::DnsEvent::EntryRemoved {
1300 name: "old-host".to_string(),
1301 };
1302 let mapped = map_dns_event(event);
1303 match mapped {
1304 KoiEvent::DnsEntryRemoved { name } => {
1305 assert_eq!(name, "old-host");
1306 }
1307 other => panic!("expected DnsEntryRemoved, got {other:?}"),
1308 }
1309 }
1310
1311 #[test]
1314 fn map_certmesh_member_joined() {
1315 let event = koi_certmesh::CertmeshEvent::MemberJoined {
1316 hostname: "node-a".to_string(),
1317 fingerprint: "sha256:abc".to_string(),
1318 };
1319 let mapped = map_certmesh_event(event);
1320 match mapped {
1321 KoiEvent::CertmeshMemberJoined {
1322 hostname,
1323 fingerprint,
1324 } => {
1325 assert_eq!(hostname, "node-a");
1326 assert_eq!(fingerprint, "sha256:abc");
1327 }
1328 other => panic!("expected CertmeshMemberJoined, got {other:?}"),
1329 }
1330 }
1331
1332 #[test]
1333 fn map_certmesh_member_revoked() {
1334 let event = koi_certmesh::CertmeshEvent::MemberRevoked {
1335 hostname: "node-b".to_string(),
1336 };
1337 let mapped = map_certmesh_event(event);
1338 match mapped {
1339 KoiEvent::CertmeshMemberRevoked { hostname } => {
1340 assert_eq!(hostname, "node-b");
1341 }
1342 other => panic!("expected CertmeshMemberRevoked, got {other:?}"),
1343 }
1344 }
1345
1346 #[test]
1347 fn map_certmesh_destroyed() {
1348 let event = koi_certmesh::CertmeshEvent::Destroyed;
1349 let mapped = map_certmesh_event(event);
1350 assert!(matches!(mapped, KoiEvent::CertmeshDestroyed));
1351 }
1352
1353 #[test]
1356 fn map_proxy_entry_updated() {
1357 let entry = koi_proxy::ProxyEntry {
1358 name: "web".to_string(),
1359 listen_port: 443,
1360 backend: "http://localhost:3000".to_string(),
1361 allow_remote: true,
1362 };
1363 let event = koi_proxy::ProxyEvent::EntryUpdated {
1364 entry: entry.clone(),
1365 };
1366 let mapped = map_proxy_event(event);
1367 match mapped {
1368 KoiEvent::ProxyEntryUpdated { entry } => {
1369 assert_eq!(entry.name, "web");
1370 assert_eq!(entry.listen_port, 443);
1371 assert!(entry.allow_remote);
1372 }
1373 other => panic!("expected ProxyEntryUpdated, got {other:?}"),
1374 }
1375 }
1376
1377 #[test]
1378 fn map_proxy_entry_removed() {
1379 let event = koi_proxy::ProxyEvent::EntryRemoved {
1380 name: "old-proxy".to_string(),
1381 };
1382 let mapped = map_proxy_event(event);
1383 match mapped {
1384 KoiEvent::ProxyEntryRemoved { name } => {
1385 assert_eq!(name, "old-proxy");
1386 }
1387 other => panic!("expected ProxyEntryRemoved, got {other:?}"),
1388 }
1389 }
1390
1391 #[test]
1394 fn map_join_error_produces_io_error() {
1395 let io_err = std::io::Error::other("simulated join error");
1398 let koi_err = KoiError::Io(io_err);
1399 assert!(koi_err.to_string().contains("simulated join error"));
1400 }
1401
1402 #[test]
1405 fn builder_default_config() {
1406 let builder = Builder::new();
1407 let embedded = builder.build().expect("build should succeed");
1408 assert!(embedded.config.mdns_enabled);
1409 assert!(!embedded.config.http_enabled);
1410 assert_eq!(embedded.config.http_port, 5641);
1411 }
1412
1413 #[test]
1414 fn builder_default_trait() {
1415 let builder = Builder::default();
1416 let embedded = builder.build().expect("build should succeed");
1417 assert_eq!(embedded.config.service_endpoint, "http://127.0.0.1:5641");
1418 }
1419
1420 #[test]
1421 fn builder_fluent_overrides() {
1422 let embedded = Builder::new()
1423 .http(true)
1424 .mdns(false)
1425 .dns_enabled(false)
1426 .health(true)
1427 .certmesh(true)
1428 .proxy(true)
1429 .udp(true)
1430 .http_port(9000)
1431 .dashboard(true)
1432 .api_docs(true)
1433 .mdns_browser(true)
1434 .announce_http(true)
1435 .dns_auto_start(true)
1436 .health_auto_start(true)
1437 .proxy_auto_start(true)
1438 .service_endpoint("http://10.0.0.1:8080")
1439 .service_mode(ServiceMode::EmbeddedOnly)
1440 .data_dir("/tmp/koi-test")
1441 .build()
1442 .expect("build should succeed");
1443
1444 assert!(embedded.config.http_enabled);
1445 assert!(!embedded.config.mdns_enabled);
1446 assert!(!embedded.config.dns_enabled);
1447 assert!(embedded.config.health_enabled);
1448 assert!(embedded.config.certmesh_enabled);
1449 assert!(embedded.config.proxy_enabled);
1450 assert!(embedded.config.udp_enabled);
1451 assert_eq!(embedded.config.http_port, 9000);
1452 assert!(embedded.config.dashboard_enabled);
1453 assert!(embedded.config.api_docs_enabled);
1454 assert!(embedded.config.mdns_browser_enabled);
1455 assert!(embedded.config.announce_http);
1456 assert!(embedded.config.dns_auto_start);
1457 assert!(embedded.config.health_auto_start);
1458 assert!(embedded.config.proxy_auto_start);
1459 assert_eq!(embedded.config.service_endpoint, "http://10.0.0.1:8080");
1460 assert_eq!(embedded.config.service_mode, ServiceMode::EmbeddedOnly);
1461 assert_eq!(
1462 embedded.config.data_dir,
1463 Some(std::path::PathBuf::from("/tmp/koi-test"))
1464 );
1465 }
1466
1467 #[test]
1468 fn builder_dns_configure_closure() {
1469 let embedded = Builder::new()
1470 .dns(|b| b.port(5353).zone("home").local_ttl(120))
1471 .build()
1472 .expect("build should succeed");
1473
1474 assert_eq!(embedded.config.dns_config.port, 5353);
1475 assert_eq!(embedded.config.dns_config.zone, "home");
1476 assert_eq!(embedded.config.dns_config.local_ttl, 120);
1477 }
1478
1479 #[test]
1480 fn builder_event_handler() {
1481 use std::sync::atomic::{AtomicBool, Ordering};
1482 let called = Arc::new(AtomicBool::new(false));
1483 let called_clone = called.clone();
1484
1485 let embedded = Builder::new()
1486 .events(move |_event| {
1487 called_clone.store(true, Ordering::SeqCst);
1488 })
1489 .build()
1490 .expect("build should succeed");
1491
1492 assert!(embedded.event_handler.is_some());
1493 }
1494
1495 #[test]
1496 fn builder_extra_firewall_ports() {
1497 use koi_common::firewall::{FirewallPort, FirewallProtocol};
1498 let extra = vec![FirewallPort::new("Custom", FirewallProtocol::Tcp, 12345)];
1499 let _builder = Builder::new().extra_firewall_ports(extra);
1500 }
1502
1503 #[test]
1506 fn result_type_works_with_ok() {
1507 let result: Result<i32> = Ok(42);
1508 assert_eq!(result.unwrap(), 42);
1509 }
1510
1511 #[test]
1512 fn result_type_works_with_err() {
1513 let result: Result<i32> = Err(KoiError::DisabledCapability("test"));
1514 assert!(result.is_err());
1515 }
1516}
1517
1518async fn run_type_browse_embedded(
1519 handle: koi_mdns::BrowseSubscription,
1520 records: Arc<
1521 std::sync::RwLock<
1522 std::collections::HashMap<String, std::collections::HashMap<String, ServiceRecord>>,
1523 >,
1524 >,
1525 cancel: CancellationToken,
1526) {
1527 loop {
1528 tokio::select! {
1529 _ = cancel.cancelled() => break,
1530 event = handle.recv() => {
1531 let Some(event) = event else { break; };
1532 match event {
1533 koi_mdns::events::MdnsEvent::Resolved(record) => {
1534 let mut guard = records.write().unwrap_or_else(|e| e.into_inner());
1535 let entry = guard.entry(record.service_type.clone()).or_default();
1536 entry.insert(record.name.clone(), record);
1537 }
1538 koi_mdns::events::MdnsEvent::Removed { name, service_type } => {
1541 if service_type.is_empty() {
1542 continue;
1543 }
1544 let mut guard = records.write().unwrap_or_else(|e| e.into_inner());
1545 if let Some(map) = guard.get_mut(&service_type) {
1546 map.remove(&name);
1547 }
1548 }
1549 _ => {}
1550 }
1551 }
1552 }
1553 }
1554}