1use std::net::IpAddr;
2use std::sync::Arc;
3
4use tokio::sync::{broadcast, mpsc, Mutex};
5use tokio::task::JoinHandle;
6use tokio_stream::wrappers::BroadcastStream;
7use tokio_util::sync::CancellationToken;
8
9use koi_client::KoiClient;
10use koi_common::capability::Capability;
11use koi_common::peer::Peer;
12use koi_common::types::{EventKind, ServiceRecord};
13use koi_config::state::DnsEntry;
14use koi_dns::{DnsLookupResult, DnsRuntime};
15use koi_health::{HealthCheck, HealthRuntime};
16use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
17use koi_mdns::{BrowseSubscription as MdnsBrowseHandle, MdnsCore, MdnsEvent};
18use koi_proxy::{ProxyEntry, ProxyRuntime};
19
20use crate::{map_join_error, KoiError, KoiEvent};
21
22const SHUTDOWN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(20);
25const SHUTDOWN_DRAIN: std::time::Duration = std::time::Duration::from_millis(500);
27
28enum HandleBackend {
29 Embedded {
30 mdns: Option<Arc<MdnsCore>>,
31 dns: Option<Arc<DnsRuntime>>,
32 health: Option<Arc<HealthRuntime>>,
33 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
34 proxy: Option<Arc<ProxyRuntime>>,
35 udp: Option<Arc<koi_udp::UdpRuntime>>,
36 runtime: Option<Arc<koi_runtime::RuntimeCore>>,
37 },
38 Remote {
39 client: Arc<KoiClient>,
40 },
41}
42
43pub struct KoiHandle {
44 backend: HandleBackend,
45 http_addr: Option<std::net::SocketAddr>,
49 data_dir: Option<std::path::PathBuf>,
50 events: broadcast::Sender<KoiEvent>,
51 cancel: CancellationToken,
52 tasks: Vec<JoinHandle<()>>,
53}
54
55impl KoiHandle {
56 #[allow(clippy::too_many_arguments)]
57 pub(crate) fn new_embedded(
58 mdns: Option<Arc<MdnsCore>>,
59 dns: Option<Arc<DnsRuntime>>,
60 health: Option<Arc<HealthRuntime>>,
61 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
62 proxy: Option<Arc<ProxyRuntime>>,
63 udp: Option<Arc<koi_udp::UdpRuntime>>,
64 runtime: Option<Arc<koi_runtime::RuntimeCore>>,
65 http_addr: Option<std::net::SocketAddr>,
66 data_dir: Option<std::path::PathBuf>,
67 events: broadcast::Sender<KoiEvent>,
68 cancel: CancellationToken,
69 tasks: Vec<JoinHandle<()>>,
70 ) -> Self {
71 Self {
72 backend: HandleBackend::Embedded {
73 mdns,
74 dns,
75 health,
76 certmesh,
77 proxy,
78 udp,
79 runtime,
80 },
81 http_addr,
82 data_dir,
83 events,
84 cancel,
85 tasks,
86 }
87 }
88
89 pub(crate) fn new_remote(
90 client: Arc<KoiClient>,
91 events: broadcast::Sender<KoiEvent>,
92 cancel: CancellationToken,
93 tasks: Vec<JoinHandle<()>>,
94 ) -> Self {
95 Self {
96 backend: HandleBackend::Remote { client },
97 http_addr: None,
98 data_dir: None,
99 events,
100 cancel,
101 tasks,
102 }
103 }
104
105 pub fn events(&self) -> BroadcastStream<KoiEvent> {
106 BroadcastStream::new(self.events.subscribe())
107 }
108
109 pub fn http_addr(&self) -> Option<std::net::SocketAddr> {
114 self.http_addr
115 }
116
117 pub fn bound_http_port(&self) -> Option<u16> {
120 self.http_addr.map(|addr| addr.port())
121 }
122
123 pub fn serve(
133 &self,
134 router: axum::Router,
135 addr: std::net::SocketAddr,
136 cancel: CancellationToken,
137 ) -> Result<JoinHandle<()>, KoiError> {
138 match &self.backend {
139 HandleBackend::Embedded { certmesh, .. } => {
140 let core = certmesh
141 .as_ref()
142 .ok_or(KoiError::DisabledCapability("certmesh"))?;
143 let core = Arc::clone(core);
144 Ok(tokio::spawn(async move {
145 if let Err(e) = crate::serve::serve_adaptive(core, router, addr, cancel).await {
146 tracing::error!(error = %e, "same-port serve failed to bind");
147 }
148 }))
149 }
150 HandleBackend::Remote { .. } => {
151 Err(KoiError::DisabledCapability("certmesh (remote mode)"))
152 }
153 }
154 }
155
156 pub async fn participate(
171 &self,
172 router: axum::Router,
173 addr: std::net::SocketAddr,
174 service_type: &str,
175 cancel: CancellationToken,
176 ) -> Result<JoinHandle<()>, KoiError> {
177 let (certmesh, mdns) = match &self.backend {
178 HandleBackend::Embedded { certmesh, mdns, .. } => (
179 certmesh
180 .as_ref()
181 .ok_or(KoiError::DisabledCapability("certmesh"))?
182 .clone(),
183 mdns.clone(),
184 ),
185 HandleBackend::Remote { .. } => {
186 return Err(KoiError::DisabledCapability("certmesh (remote mode)"))
187 }
188 };
189
190 let _ = certmesh.ensure_identity().await;
192
193 if let Some(mdns) = mdns {
196 spawn_participate_announce(
197 mdns,
198 Arc::clone(&certmesh),
199 service_type.to_string(),
200 addr.port(),
201 cancel.clone(),
202 );
203 } else {
204 tracing::debug!("participate: mDNS disabled — serving without announcing");
205 }
206
207 self.serve(router, addr, cancel)
209 }
210
211 pub fn subscribe(&self) -> broadcast::Receiver<KoiEvent> {
212 self.events.subscribe()
213 }
214
215 pub fn mdns(&self) -> Result<MdnsHandle, KoiError> {
216 match &self.backend {
217 HandleBackend::Embedded { mdns, .. } => {
218 let core = mdns.as_ref().ok_or(KoiError::DisabledCapability("mdns"))?;
219 Ok(MdnsHandle::new_embedded(
220 Arc::clone(core),
221 self.events.clone(),
222 ))
223 }
224 HandleBackend::Remote { client } => Ok(MdnsHandle::new_remote(
225 Arc::clone(client),
226 self.events.clone(),
227 )),
228 }
229 }
230
231 pub fn dns(&self) -> Result<DnsHandle, KoiError> {
232 match &self.backend {
233 HandleBackend::Embedded { dns, .. } => {
234 let runtime = dns.as_ref().ok_or(KoiError::DisabledCapability("dns"))?;
235 Ok(DnsHandle::new_embedded(Arc::clone(runtime)))
236 }
237 HandleBackend::Remote { client } => Ok(DnsHandle::new_remote(Arc::clone(client))),
238 }
239 }
240
241 pub fn health(&self) -> Result<HealthHandle, KoiError> {
242 match &self.backend {
243 HandleBackend::Embedded { health, .. } => {
244 let runtime = health
245 .as_ref()
246 .ok_or(KoiError::DisabledCapability("health"))?;
247 Ok(HealthHandle::new_embedded(Arc::clone(runtime)))
248 }
249 HandleBackend::Remote { client } => Ok(HealthHandle::new_remote(Arc::clone(client))),
250 }
251 }
252
253 pub fn certmesh(&self) -> Result<CertmeshHandle, KoiError> {
254 match &self.backend {
255 HandleBackend::Embedded { certmesh, .. } => {
256 let core = certmesh
257 .as_ref()
258 .ok_or(KoiError::DisabledCapability("certmesh"))?;
259 Ok(CertmeshHandle::new_embedded(Arc::clone(core)))
260 }
261 HandleBackend::Remote { client } => Ok(CertmeshHandle::new_remote(Arc::clone(client))),
262 }
263 }
264
265 pub fn vault(&self) -> Result<koi_crypto::vault::Vault, KoiError> {
271 let dir = self
272 .data_dir
273 .as_ref()
274 .ok_or(KoiError::DisabledCapability("vault (no data_dir)"))?;
275 koi_crypto::vault::Vault::open(dir)
276 .map_err(|e| KoiError::Io(std::io::Error::other(e.to_string())))
277 }
278
279 pub fn proxy(&self) -> Result<ProxyHandle, KoiError> {
280 match &self.backend {
281 HandleBackend::Embedded { proxy, .. } => {
282 let runtime = proxy
283 .as_ref()
284 .ok_or(KoiError::DisabledCapability("proxy"))?;
285 Ok(ProxyHandle::new_embedded(Arc::clone(runtime)))
286 }
287 HandleBackend::Remote { client } => Ok(ProxyHandle::new_remote(Arc::clone(client))),
288 }
289 }
290
291 pub fn udp(&self) -> Result<Arc<koi_udp::UdpRuntime>, KoiError> {
296 match &self.backend {
297 HandleBackend::Embedded { udp, .. } => {
298 let runtime = udp.as_ref().ok_or(KoiError::DisabledCapability("udp"))?;
299 Ok(Arc::clone(runtime))
300 }
301 HandleBackend::Remote { .. } => Err(KoiError::DisabledCapability("udp (remote mode)")),
302 }
303 }
304
305 pub fn runtime(&self) -> Result<Arc<koi_runtime::RuntimeCore>, KoiError> {
309 match &self.backend {
310 HandleBackend::Embedded { runtime, .. } => {
311 let core = runtime
312 .as_ref()
313 .ok_or(KoiError::DisabledCapability("runtime"))?;
314 Ok(Arc::clone(core))
315 }
316 HandleBackend::Remote { .. } => {
317 Err(KoiError::DisabledCapability("runtime (remote mode)"))
318 }
319 }
320 }
321
322 pub async fn shutdown(mut self) -> Result<(), KoiError> {
323 let tasks = std::mem::take(&mut self.tasks);
324
325 if let HandleBackend::Embedded {
326 mdns,
327 dns,
328 health,
329 certmesh,
330 proxy,
331 udp,
332 runtime,
333 } = &self.backend
334 {
335 let cores = koi_compose::cores::Cores {
339 mdns: mdns.clone(),
340 certmesh: certmesh.clone(),
341 dns: dns.clone(),
342 health: health.clone(),
343 proxy: proxy.clone(),
344 udp: udp.clone(),
345 runtime: runtime.clone(),
346 mdns_snapshot: None,
347 };
348 koi_compose::cores::ordered_shutdown(
349 &self.cancel,
350 tasks,
351 &cores,
352 SHUTDOWN_TIMEOUT,
353 SHUTDOWN_DRAIN,
354 )
355 .await;
356 } else {
357 self.cancel.cancel();
359 for task in tasks {
360 let _ = task.await;
361 }
362 }
363
364 Ok(())
365 }
366}
367
368pub struct KoiBrowseHandle {
369 backend: BrowseBackend,
370}
371
372enum BrowseBackend {
373 Embedded(MdnsBrowseHandle),
374 Remote(Mutex<mpsc::Receiver<MdnsEvent>>),
375}
376
377impl KoiBrowseHandle {
378 fn embedded(handle: MdnsBrowseHandle) -> Self {
379 Self {
380 backend: BrowseBackend::Embedded(handle),
381 }
382 }
383
384 fn remote(rx: mpsc::Receiver<MdnsEvent>) -> Self {
385 Self {
386 backend: BrowseBackend::Remote(Mutex::new(rx)),
387 }
388 }
389
390 pub async fn recv(&self) -> Option<MdnsEvent> {
391 match &self.backend {
392 BrowseBackend::Embedded(handle) => handle.recv().await,
393 BrowseBackend::Remote(rx) => rx.lock().await.recv().await,
394 }
395 }
396}
397
398pub const DEFAULT_DISCOVER_WINDOW: std::time::Duration = std::time::Duration::from_secs(2);
402
403pub struct MdnsHandle {
404 backend: MdnsBackend,
405 events: broadcast::Sender<KoiEvent>,
406}
407
408enum MdnsBackend {
409 Embedded { core: Arc<MdnsCore> },
410 Remote { client: Arc<KoiClient> },
411}
412
413impl MdnsHandle {
414 fn new_embedded(core: Arc<MdnsCore>, events: broadcast::Sender<KoiEvent>) -> Self {
415 Self {
416 backend: MdnsBackend::Embedded { core },
417 events,
418 }
419 }
420
421 fn new_remote(client: Arc<KoiClient>, events: broadcast::Sender<KoiEvent>) -> Self {
422 Self {
423 backend: MdnsBackend::Remote { client },
424 events,
425 }
426 }
427
428 pub fn core(&self) -> Result<Arc<MdnsCore>, KoiError> {
429 match &self.backend {
430 MdnsBackend::Embedded { core } => Ok(Arc::clone(core)),
431 MdnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("mdns")),
432 }
433 }
434
435 pub async fn browse(&self, service_type: &str) -> Result<KoiBrowseHandle, KoiError> {
436 match &self.backend {
437 MdnsBackend::Embedded { core } => {
438 let handle = core.subscribe_type(service_type).await?;
439 Ok(KoiBrowseHandle::embedded(handle))
440 }
441 MdnsBackend::Remote { client } => {
442 let (tx, rx) = mpsc::channel(64);
443 let client = Arc::clone(client);
444 let service_type = service_type.to_string();
445 tokio::task::spawn_blocking(move || {
446 let stream = match client.browse_stream(&service_type) {
447 Ok(stream) => stream,
448 Err(_) => return,
449 };
450 for item in stream {
451 let Ok(json) = item else {
452 break;
453 };
454 if let Some(event) = mdns_event_from_pipeline(json) {
455 if tx.blocking_send(event).is_err() {
456 break;
457 }
458 }
459 }
460 });
461 Ok(KoiBrowseHandle::remote(rx))
462 }
463 }
464 }
465
466 pub async fn discover(&self, service_type: &str) -> Result<Vec<Peer>, KoiError> {
476 self.discover_for(service_type, DEFAULT_DISCOVER_WINDOW)
477 .await
478 }
479
480 pub async fn discover_for(
482 &self,
483 service_type: &str,
484 window: std::time::Duration,
485 ) -> Result<Vec<Peer>, KoiError> {
486 let browse = self.browse(service_type).await?;
487 let mut events = Vec::new();
488 let deadline = tokio::time::sleep(window);
489 tokio::pin!(deadline);
490 loop {
491 tokio::select! {
492 _ = &mut deadline => break,
493 ev = browse.recv() => match ev {
494 Some(e) => events.push(e),
495 None => break,
496 },
497 }
498 }
499 Ok(fold_peers(events))
500 }
501
502 pub async fn resolve(&self, name: &str) -> Result<ServiceRecord, KoiError> {
503 match &self.backend {
504 MdnsBackend::Embedded { core } => Ok(core.resolve(name).await?),
505 MdnsBackend::Remote { client } => {
506 let name = name.to_string();
507 let client = Arc::clone(client);
508 let record = tokio::task::spawn_blocking(move || client.resolve(&name))
509 .await
510 .map_err(map_join_error)??;
511 Ok(record)
512 }
513 }
514 }
515
516 pub fn register(&self, payload: RegisterPayload) -> Result<RegistrationResult, KoiError> {
517 match &self.backend {
518 MdnsBackend::Embedded { core } => Ok(core.register(payload)?),
519 MdnsBackend::Remote { client } => Ok(client.register(&payload)?),
520 }
521 }
522
523 pub fn unregister(&self, id: &str) -> Result<(), KoiError> {
524 match &self.backend {
525 MdnsBackend::Embedded { core } => Ok(core.unregister(id)?),
526 MdnsBackend::Remote { client } => Ok(client.unregister(id)?),
527 }
528 }
529
530 pub fn subscribe(&self) -> Result<broadcast::Receiver<MdnsEvent>, KoiError> {
541 match &self.backend {
542 MdnsBackend::Embedded { core } => Ok(core.subscribe()),
543 MdnsBackend::Remote { .. } => Err(KoiError::RemoteUnsupported(
544 "mdns subscribe — use mdns.browse(service_type) for a remote event stream",
545 )),
546 }
547 }
548
549 pub fn emit_event(&self, event: KoiEvent) {
550 let _ = self.events.send(event);
551 }
552}
553
554pub struct DnsHandle {
555 backend: DnsBackend,
556}
557
558enum DnsBackend {
559 Embedded { runtime: Arc<DnsRuntime> },
560 Remote { client: Arc<KoiClient> },
561}
562
563impl DnsHandle {
564 fn new_embedded(runtime: Arc<DnsRuntime>) -> Self {
565 Self {
566 backend: DnsBackend::Embedded { runtime },
567 }
568 }
569
570 fn new_remote(client: Arc<KoiClient>) -> Self {
571 Self {
572 backend: DnsBackend::Remote { client },
573 }
574 }
575
576 pub fn runtime(&self) -> Result<Arc<DnsRuntime>, KoiError> {
577 match &self.backend {
578 DnsBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
579 DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
580 }
581 }
582
583 pub fn core(&self) -> Result<Arc<koi_dns::DnsCore>, KoiError> {
584 match &self.backend {
585 DnsBackend::Embedded { runtime } => Ok(runtime.core()),
586 DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
587 }
588 }
589
590 pub async fn lookup(
591 &self,
592 name: &str,
593 record_type: hickory_proto::rr::RecordType,
594 ) -> Option<DnsLookupResult> {
595 match &self.backend {
596 DnsBackend::Embedded { runtime } => runtime.core().lookup(name, record_type).await,
597 DnsBackend::Remote { client } => {
598 let name = name.to_string();
599 let client = Arc::clone(client);
600 let result =
601 tokio::task::spawn_blocking(move || client.dns_lookup(&name, record_type))
602 .await
603 .ok()
604 .and_then(|res| res.ok());
605 let json = match result {
606 Some(json) => json,
607 None => return None,
608 };
609 parse_dns_lookup(json)
610 }
611 }
612 }
613
614 pub fn list_names(&self) -> Vec<String> {
615 match &self.backend {
616 DnsBackend::Embedded { runtime } => runtime.core().list_names(),
617 DnsBackend::Remote { client } => {
618 let result = client.dns_list();
619 let Ok(json) = result else {
620 return Vec::new();
621 };
622 json.get("names")
623 .and_then(|v| v.as_array())
624 .map(|arr| {
625 arr.iter()
626 .filter_map(|name| name.as_str().map(|s| s.to_string()))
627 .collect()
628 })
629 .unwrap_or_default()
630 }
631 }
632 }
633
634 pub async fn start(&self) -> Result<bool, KoiError> {
635 match &self.backend {
636 DnsBackend::Embedded { runtime } => Ok(runtime.start().await?),
637 DnsBackend::Remote { client } => {
638 let client = Arc::clone(client);
639 let started = tokio::task::spawn_blocking(move || client.dns_start())
640 .await
641 .map_err(map_join_error)??
642 .get("started")
643 .and_then(|v| v.as_bool())
644 .unwrap_or(false);
645 Ok(started)
646 }
647 }
648 }
649
650 pub async fn stop(&self) -> bool {
651 match &self.backend {
652 DnsBackend::Embedded { runtime } => runtime.stop().await,
653 DnsBackend::Remote { client } => {
654 let client = Arc::clone(client);
655 tokio::task::spawn_blocking(move || client.dns_stop())
656 .await
657 .ok()
658 .and_then(|res| res.ok())
659 .and_then(|json| json.get("stopped").and_then(|v| v.as_bool()))
660 .unwrap_or(false)
661 }
662 }
663 }
664
665 pub fn add_entry(&self, entry: DnsEntry) -> Result<Vec<DnsEntry>, KoiError> {
666 match &self.backend {
667 DnsBackend::Embedded { runtime } => Ok(runtime.core().add_entry(entry)?),
668 DnsBackend::Remote { client } => {
669 let json = client.dns_add(&entry.name, &entry.ip, entry.ttl)?;
670 parse_dns_entries(json)
671 }
672 }
673 }
674
675 pub fn remove_entry(&self, name: &str) -> Result<Vec<DnsEntry>, KoiError> {
676 match &self.backend {
677 DnsBackend::Embedded { runtime } => {
678 Ok(runtime.core().remove_entry(name)?.unwrap_or_default())
679 }
680 DnsBackend::Remote { client } => {
681 let json = client.dns_remove(name)?;
682 parse_dns_entries(json)
683 }
684 }
685 }
686}
687
688pub struct HealthHandle {
689 backend: HealthBackend,
690}
691
692enum HealthBackend {
693 Embedded { runtime: Arc<HealthRuntime> },
694 Remote { client: Arc<KoiClient> },
695}
696
697impl HealthHandle {
698 fn new_embedded(runtime: Arc<HealthRuntime>) -> Self {
699 Self {
700 backend: HealthBackend::Embedded { runtime },
701 }
702 }
703
704 fn new_remote(client: Arc<KoiClient>) -> Self {
705 Self {
706 backend: HealthBackend::Remote { client },
707 }
708 }
709
710 pub fn core(&self) -> Result<Arc<koi_health::HealthCore>, KoiError> {
711 match &self.backend {
712 HealthBackend::Embedded { runtime } => Ok(runtime.core()),
713 HealthBackend::Remote { .. } => Err(KoiError::DisabledCapability("health")),
714 }
715 }
716
717 pub async fn status(&self) -> koi_health::HealthSnapshot {
718 match &self.backend {
719 HealthBackend::Embedded { runtime } => runtime.core().snapshot().await,
720 HealthBackend::Remote { client } => {
721 let client = Arc::clone(client);
722 let json = tokio::task::spawn_blocking(move || client.health_status())
723 .await
724 .ok()
725 .and_then(|res| res.ok());
726 json.and_then(|json| serde_json::from_value(json).ok())
727 .unwrap_or_else(|| koi_health::HealthSnapshot {
728 machines: Vec::new(),
729 services: Vec::new(),
730 })
731 }
732 }
733 }
734
735 pub async fn add_check(&self, check: HealthCheck) -> Result<(), KoiError> {
736 match &self.backend {
737 HealthBackend::Embedded { runtime } => Ok(runtime.core().add_check(check).await?),
738 HealthBackend::Remote { client } => {
739 let client = Arc::clone(client);
740 let check = check.clone();
741 tokio::task::spawn_blocking(move || {
742 client.health_add_check(
743 &check.name,
744 check.kind,
745 &check.target,
746 check.interval_secs,
747 check.timeout_secs,
748 )
749 })
750 .await
751 .map_err(map_join_error)??;
752 Ok(())
753 }
754 }
755 }
756
757 pub async fn remove_check(&self, name: &str) -> Result<(), KoiError> {
758 match &self.backend {
759 HealthBackend::Embedded { runtime } => Ok(runtime.core().remove_check(name).await?),
760 HealthBackend::Remote { client } => {
761 let client = Arc::clone(client);
762 let name = name.to_string();
763 tokio::task::spawn_blocking(move || client.health_remove_check(&name))
764 .await
765 .map_err(map_join_error)??;
766 Ok(())
767 }
768 }
769 }
770
771 pub async fn start(&self) -> Result<bool, KoiError> {
772 match &self.backend {
773 HealthBackend::Embedded { runtime } => Ok(runtime.start().await?),
774 HealthBackend::Remote { .. } => Ok(false),
775 }
776 }
777
778 pub async fn stop(&self) -> bool {
779 match &self.backend {
780 HealthBackend::Embedded { runtime } => runtime.stop().await,
781 HealthBackend::Remote { .. } => false,
782 }
783 }
784}
785
786pub struct CertmeshHandle {
787 backend: CertmeshBackend,
788}
789
790enum CertmeshBackend {
791 Embedded {
792 core: Arc<koi_certmesh::CertmeshCore>,
793 },
794 Remote {
795 client: Arc<KoiClient>,
796 },
797}
798
799impl CertmeshHandle {
800 fn new_embedded(core: Arc<koi_certmesh::CertmeshCore>) -> Self {
801 Self {
802 backend: CertmeshBackend::Embedded { core },
803 }
804 }
805
806 fn new_remote(client: Arc<KoiClient>) -> Self {
807 Self {
808 backend: CertmeshBackend::Remote { client },
809 }
810 }
811
812 pub fn core(&self) -> Result<Arc<koi_certmesh::CertmeshCore>, KoiError> {
813 match &self.backend {
814 CertmeshBackend::Embedded { core } => Ok(Arc::clone(core)),
815 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
816 }
817 }
818
819 pub async fn status(&self) -> koi_common::capability::CapabilityStatus {
820 match &self.backend {
821 CertmeshBackend::Embedded { core } => core.status().await,
822 CertmeshBackend::Remote { client } => {
823 let client = Arc::clone(client);
824 let json = tokio::task::spawn_blocking(move || client.unified_status())
825 .await
826 .ok()
827 .and_then(|res| res.ok());
828 json.and_then(extract_capability_status)
829 .unwrap_or_else(default_capability_status)
830 }
831 }
832 }
833
834 pub fn posture(&self) -> Result<koi_common::posture::Posture, KoiError> {
840 match &self.backend {
841 CertmeshBackend::Embedded { core } => Ok(core.posture()),
842 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
843 }
844 }
845
846 pub async fn local_identity(&self) -> Result<Option<koi_certmesh::Identity>, KoiError> {
849 match &self.backend {
850 CertmeshBackend::Embedded { core } => Ok(core.local_identity().await),
851 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
852 }
853 }
854
855 pub async fn ensure_identity(&self) -> Result<Option<koi_certmesh::Identity>, KoiError> {
858 match &self.backend {
859 CertmeshBackend::Embedded { core } => Ok(core.ensure_identity().await),
860 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
861 }
862 }
863
864 pub async fn sign(&self, bytes: &[u8]) -> Result<koi_common::envelope::Envelope, KoiError> {
868 match &self.backend {
869 CertmeshBackend::Embedded { core } => Ok(core.sign(bytes).await),
870 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
871 }
872 }
873
874 pub async fn verify(
877 &self,
878 env: &koi_common::envelope::Envelope,
879 ) -> Result<koi_common::envelope::Assurance, KoiError> {
880 match &self.backend {
881 CertmeshBackend::Embedded { core } => Ok(core.verify(env).await),
882 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
883 }
884 }
885
886 pub async fn seal(&self, bytes: &[u8]) -> Result<koi_common::sealed::Sealed, KoiError> {
890 match &self.backend {
891 CertmeshBackend::Embedded { core } => Ok(core.seal(bytes).await),
892 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
893 }
894 }
895
896 pub async fn open(
899 &self,
900 sealed: &koi_common::sealed::Sealed,
901 ) -> Result<koi_common::sealed::Opened, KoiError> {
902 match &self.backend {
903 CertmeshBackend::Embedded { core } => Ok(core.open(sealed).await?),
904 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
905 }
906 }
907
908 pub async fn diagnose(&self) -> Result<koi_common::diagnosis::TrustDiagnosis, KoiError> {
913 match &self.backend {
914 CertmeshBackend::Embedded { core } => Ok(core.diagnose().await),
915 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
916 }
917 }
918
919 pub async fn client_for(&self, peer: &Peer) -> Result<koi_certmesh::PeerClient, KoiError> {
927 match &self.backend {
928 CertmeshBackend::Embedded { core } => Ok(core.client_for(peer).await?),
929 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
930 }
931 }
932}
933
934async fn announce_once(
938 mdns: &Arc<MdnsCore>,
939 certmesh: &Arc<koi_certmesh::CertmeshCore>,
940 hostname: &str,
941 service_type: &str,
942 port: u16,
943) -> Option<String> {
944 let id = certmesh.local_identity().await;
945 let mut txt = std::collections::HashMap::new();
946 koi_common::peer::stamp(
947 &mut txt,
948 certmesh.posture(),
949 id.as_ref().map(|i| i.ca_fingerprint.as_str()),
950 id.as_ref().map(|i| i.renewal.expires_at),
951 );
952 let payload = RegisterPayload {
953 name: hostname.to_string(),
954 service_type: service_type.to_string(),
955 port,
956 ip: None,
957 lease_secs: None,
958 txt,
959 };
960 match mdns.register(payload) {
961 Ok(result) => Some(result.id),
962 Err(e) => {
963 tracing::warn!(error = %e, "participate: mDNS announce failed");
964 None
965 }
966 }
967}
968
969fn spawn_participate_announce(
974 mdns: Arc<MdnsCore>,
975 certmesh: Arc<koi_certmesh::CertmeshCore>,
976 service_type: String,
977 port: u16,
978 cancel: CancellationToken,
979) {
980 tokio::spawn(async move {
981 let hostname = hostname::get()
982 .ok()
983 .and_then(|os| os.into_string().ok())
984 .unwrap_or_else(|| "unknown".to_string());
985 let mut posture_rx = certmesh.watch_posture();
986 let mut current_id = announce_once(&mdns, &certmesh, &hostname, &service_type, port).await;
987 loop {
988 tokio::select! {
989 _ = cancel.cancelled() => break,
990 changed = posture_rx.changed() => {
991 if changed.is_err() {
992 break; }
994 if let Some(old) = current_id.take() {
996 let _ = mdns.unregister(&old);
997 }
998 current_id =
999 announce_once(&mdns, &certmesh, &hostname, &service_type, port).await;
1000 }
1001 }
1002 }
1003 if let Some(id) = current_id {
1004 let _ = mdns.unregister(&id);
1005 }
1006 });
1007}
1008
1009fn fold_peers(events: impl IntoIterator<Item = MdnsEvent>) -> Vec<Peer> {
1014 use std::collections::BTreeMap;
1015 let mut by_name: BTreeMap<String, ServiceRecord> = BTreeMap::new();
1016 for ev in events {
1017 match ev {
1018 MdnsEvent::Found(rec) => {
1019 by_name.entry(rec.name.clone()).or_insert(rec);
1020 }
1021 MdnsEvent::Resolved(rec) => {
1022 by_name.insert(rec.name.clone(), rec);
1023 }
1024 MdnsEvent::Removed { name, .. } => {
1025 by_name.remove(&name);
1026 }
1027 }
1028 }
1029 by_name.into_values().map(Peer::from_record).collect()
1030}
1031
1032pub struct ProxyHandle {
1033 backend: ProxyBackend,
1034}
1035
1036enum ProxyBackend {
1037 Embedded { runtime: Arc<ProxyRuntime> },
1038 Remote { client: Arc<KoiClient> },
1039}
1040
1041impl ProxyHandle {
1042 fn new_embedded(runtime: Arc<ProxyRuntime>) -> Self {
1043 Self {
1044 backend: ProxyBackend::Embedded { runtime },
1045 }
1046 }
1047
1048 fn new_remote(client: Arc<KoiClient>) -> Self {
1049 Self {
1050 backend: ProxyBackend::Remote { client },
1051 }
1052 }
1053
1054 pub fn runtime(&self) -> Result<Arc<ProxyRuntime>, KoiError> {
1055 match &self.backend {
1056 ProxyBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
1057 ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
1058 }
1059 }
1060
1061 pub fn core(&self) -> Result<Arc<koi_proxy::ProxyCore>, KoiError> {
1062 match &self.backend {
1063 ProxyBackend::Embedded { runtime } => Ok(runtime.core()),
1064 ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
1065 }
1066 }
1067
1068 pub async fn entries(&self) -> Vec<ProxyEntry> {
1069 match &self.backend {
1070 ProxyBackend::Embedded { runtime } => runtime.core().entries().await,
1071 ProxyBackend::Remote { client } => {
1072 let client = Arc::clone(client);
1073 tokio::task::spawn_blocking(move || client.proxy_list())
1074 .await
1075 .ok()
1076 .and_then(|res| res.ok())
1077 .and_then(|json| parse_proxy_entries(json).ok())
1078 .unwrap_or_default()
1079 }
1080 }
1081 }
1082
1083 pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, KoiError> {
1084 match &self.backend {
1085 ProxyBackend::Embedded { runtime } => Ok(runtime.core().upsert(entry).await?),
1086 ProxyBackend::Remote { client } => {
1087 let client = Arc::clone(client);
1088 let entry = entry.clone();
1089 let add_client = Arc::clone(&client);
1090 tokio::task::spawn_blocking(move || {
1091 add_client.proxy_add(
1092 &entry.name,
1093 entry.listen_port,
1094 &entry.backend,
1095 entry.allow_remote,
1096 )
1097 })
1098 .await
1099 .map_err(map_join_error)??;
1100 let list = tokio::task::spawn_blocking(move || client.proxy_list())
1101 .await
1102 .map_err(map_join_error)??;
1103 parse_proxy_entries(list)
1104 }
1105 }
1106 }
1107
1108 pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, KoiError> {
1109 match &self.backend {
1110 ProxyBackend::Embedded { runtime } => Ok(runtime.core().remove(name).await?),
1111 ProxyBackend::Remote { client } => {
1112 let client = Arc::clone(client);
1113 let name = name.to_string();
1114 let remove_client = Arc::clone(&client);
1115 tokio::task::spawn_blocking(move || remove_client.proxy_remove(&name))
1116 .await
1117 .map_err(map_join_error)??;
1118 let list = tokio::task::spawn_blocking(move || client.proxy_list())
1119 .await
1120 .map_err(map_join_error)??;
1121 parse_proxy_entries(list)
1122 }
1123 }
1124 }
1125
1126 pub async fn start_all(&self) -> Result<(), KoiError> {
1127 match &self.backend {
1128 ProxyBackend::Embedded { runtime } => Ok(runtime.start_all().await?),
1129 ProxyBackend::Remote { .. } => Ok(()),
1130 }
1131 }
1132
1133 pub async fn stop_all(&self) {
1134 if let ProxyBackend::Embedded { runtime } = &self.backend {
1135 runtime.stop_all().await;
1136 }
1137 }
1138}
1139
1140fn parse_dns_lookup(json: serde_json::Value) -> Option<DnsLookupResult> {
1141 let name = json.get("name").and_then(|v| v.as_str())?.to_string();
1142 let source = json
1143 .get("source")
1144 .and_then(|v| v.as_str())
1145 .unwrap_or("unknown")
1146 .to_string();
1147 let ips = json.get("ips").and_then(|v| v.as_array()).map(|arr| {
1148 arr.iter()
1149 .filter_map(|ip| ip.as_str())
1150 .filter_map(|ip| ip.parse::<IpAddr>().ok())
1151 .collect::<Vec<_>>()
1152 })?;
1153 Some(DnsLookupResult { name, ips, source })
1154}
1155
1156fn parse_dns_entries(json: serde_json::Value) -> Result<Vec<DnsEntry>, KoiError> {
1157 let entries = json.get("entries").ok_or_else(|| {
1158 KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::other(
1159 "missing entries",
1160 )))
1161 })?;
1162 let entries = serde_json::from_value(entries.clone()).map_err(|e| {
1163 KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::new(
1164 std::io::ErrorKind::InvalidData,
1165 e.to_string(),
1166 )))
1167 })?;
1168 Ok(entries)
1169}
1170
1171fn parse_proxy_entries(json: serde_json::Value) -> Result<Vec<ProxyEntry>, KoiError> {
1172 let entries = json
1173 .get("entries")
1174 .ok_or_else(|| KoiError::Proxy(koi_proxy::ProxyError::Io("missing entries".to_string())))?
1175 .clone();
1176 serde_json::from_value(entries)
1177 .map_err(|e| KoiError::Proxy(koi_proxy::ProxyError::Io(e.to_string())))
1178}
1179
1180fn extract_capability_status(
1181 json: serde_json::Value,
1182) -> Option<koi_common::capability::CapabilityStatus> {
1183 let caps = json.get("capabilities")?.as_array()?;
1184 for cap in caps {
1185 if cap.get("name")?.as_str()? == "certmesh" {
1186 let name = cap.get("name")?.as_str()?.to_string();
1187 let summary = cap
1188 .get("summary")
1189 .and_then(|v| v.as_str())
1190 .unwrap_or("unknown")
1191 .to_string();
1192 let healthy = cap
1193 .get("healthy")
1194 .and_then(|v| v.as_bool())
1195 .unwrap_or(false);
1196 return Some(koi_common::capability::CapabilityStatus {
1197 name,
1198 summary,
1199 healthy,
1200 });
1201 }
1202 }
1203 None
1204}
1205
1206fn default_capability_status() -> koi_common::capability::CapabilityStatus {
1207 koi_common::capability::CapabilityStatus {
1208 name: "certmesh".to_string(),
1209 summary: "unknown".to_string(),
1210 healthy: false,
1211 }
1212}
1213
1214fn mdns_event_from_pipeline(json: serde_json::Value) -> Option<MdnsEvent> {
1215 if let Some(found) = json.get("found") {
1216 let record: ServiceRecord = serde_json::from_value(found.clone()).ok()?;
1217 return Some(MdnsEvent::Found(record));
1218 }
1219 if let Some(resolved) = json.get("resolved") {
1220 let record: ServiceRecord = serde_json::from_value(resolved.clone()).ok()?;
1221 return Some(MdnsEvent::Resolved(record));
1222 }
1223 if let Some(event) = json.get("event") {
1224 let kind: EventKind = serde_json::from_value(event.clone()).ok()?;
1225 let service = json
1226 .get("service")
1227 .cloned()
1228 .unwrap_or(serde_json::Value::Null);
1229 let record: ServiceRecord = serde_json::from_value(service).ok()?;
1230 return match kind {
1231 EventKind::Found => Some(MdnsEvent::Found(record)),
1232 EventKind::Resolved => Some(MdnsEvent::Resolved(record)),
1233 EventKind::Removed => Some(MdnsEvent::Removed {
1234 name: record.name,
1235 service_type: record.service_type,
1236 }),
1237 };
1238 }
1239 None
1240}
1241
1242#[cfg(test)]
1243mod tests {
1244 use super::*;
1245 use koi_common::posture::PostureLevel;
1246 use std::collections::HashMap;
1247
1248 fn rec(name: &str, txt: &[(&str, &str)]) -> ServiceRecord {
1249 ServiceRecord {
1250 name: name.to_string(),
1251 service_type: "_http._tcp".to_string(),
1252 host: Some(format!("{name}.local")),
1253 ip: Some("10.0.0.9".to_string()),
1254 port: Some(8443),
1255 txt: txt
1256 .iter()
1257 .map(|(k, v)| (k.to_string(), v.to_string()))
1258 .collect::<HashMap<_, _>>(),
1259 }
1260 }
1261
1262 #[test]
1263 fn fold_resolved_overwrites_found_for_txt_enrichment() {
1264 let peers = fold_peers([
1266 MdnsEvent::Found(rec("a", &[])),
1267 MdnsEvent::Resolved(rec("a", &[("fp", "CAFP"), ("posture", "authenticated")])),
1268 ]);
1269 assert_eq!(peers.len(), 1, "the two events collapse to one peer");
1270 assert_eq!(peers[0].level(), PostureLevel::Authenticated);
1271 assert_eq!(peers[0].fp.as_deref(), Some("CAFP"));
1272 }
1273
1274 #[test]
1275 fn fold_removed_drops_the_peer() {
1276 let peers = fold_peers([
1277 MdnsEvent::Found(rec("b", &[])),
1278 MdnsEvent::Removed {
1279 name: "b".to_string(),
1280 service_type: "_http._tcp".to_string(),
1281 },
1282 ]);
1283 assert!(peers.is_empty(), "a removed peer is not in the snapshot");
1284 }
1285
1286 #[test]
1287 fn fold_orders_peers_by_name() {
1288 let peers = fold_peers([
1289 MdnsEvent::Resolved(rec("z", &[])),
1290 MdnsEvent::Resolved(rec("a", &[])),
1291 MdnsEvent::Resolved(rec("m", &[])),
1292 ]);
1293 let names: Vec<_> = peers.iter().map(|p| p.record.name.clone()).collect();
1294 assert_eq!(names, vec!["a", "m", "z"]);
1295 }
1296
1297 #[test]
1298 fn fold_open_peer_has_open_posture() {
1299 let peers = fold_peers([MdnsEvent::Resolved(rec("plain", &[]))]);
1300 assert_eq!(peers.len(), 1);
1301 assert_eq!(peers[0].level(), PostureLevel::Open);
1302 assert!(!peers[0].is_secure());
1303 }
1304
1305 #[tokio::test]
1308 async fn participate_remote_handle_is_disabled() {
1309 let client = Arc::new(KoiClient::new("http://127.0.0.1:1"));
1310 let (tx, _) = broadcast::channel(8);
1311 let handle = KoiHandle::new_remote(client, tx, CancellationToken::new(), Vec::new());
1312 let router = axum::Router::new();
1313 let addr: std::net::SocketAddr = ([127, 0, 0, 1], 0).into();
1314 let err = handle
1315 .participate(router, addr, "_x._tcp", CancellationToken::new())
1316 .await
1317 .unwrap_err();
1318 assert!(matches!(err, KoiError::DisabledCapability(_)));
1319 }
1320
1321 #[tokio::test]
1322 async fn participate_open_node_serves_plaintext() {
1323 let dir = std::env::temp_dir().join(format!("koi-emb-participate-{}", std::process::id()));
1327 let _ = std::fs::remove_dir_all(&dir);
1328 let koi = crate::Builder::new()
1329 .data_dir(&dir)
1330 .service_mode(crate::ServiceMode::EmbeddedOnly)
1331 .mdns(false)
1332 .dns_enabled(false)
1333 .health(false)
1334 .certmesh(true)
1335 .proxy(false)
1336 .build()
1337 .expect("build");
1338 let handle = koi.start().await.expect("start");
1339
1340 let addr = {
1341 let l = tokio::net::TcpListener::bind(("127.0.0.1", 0))
1342 .await
1343 .unwrap();
1344 l.local_addr().unwrap()
1345 };
1346 let router = axum::Router::new().route("/ping", axum::routing::get(|| async { "pong" }));
1347 let cancel = CancellationToken::new();
1348 let _server = handle
1349 .participate(router, addr, "_koi-test._tcp", cancel.clone())
1350 .await
1351 .expect("participate");
1352 tokio::time::sleep(std::time::Duration::from_millis(75)).await;
1353
1354 let (status, body) = koi_certmesh::mtls::get(&addr.ip().to_string(), addr.port(), "/ping")
1355 .await
1356 .expect("plain GET to an Open participating node");
1357 assert_eq!(status, 200);
1358 assert_eq!(body, "pong");
1359
1360 cancel.cancel();
1361 handle.shutdown().await.expect("shutdown");
1362 }
1363
1364 #[tokio::test]
1367 async fn seal_open_round_trip_on_open_node() {
1368 use koi_common::sealed::Confidentiality;
1369 let dir = std::env::temp_dir().join(format!("koi-emb-seal-{}", std::process::id()));
1370 let _ = std::fs::remove_dir_all(&dir);
1371 let koi = crate::Builder::new()
1372 .data_dir(&dir)
1373 .service_mode(crate::ServiceMode::EmbeddedOnly)
1374 .mdns(false)
1375 .dns_enabled(false)
1376 .health(false)
1377 .certmesh(true)
1378 .proxy(false)
1379 .build()
1380 .expect("build");
1381 let handle = koi.start().await.expect("start");
1382 let cm = handle.certmesh().expect("certmesh handle");
1383
1384 let sealed = cm.seal(b"hello seal").await.expect("seal");
1387 assert_eq!(sealed.confidentiality(), Confidentiality::None);
1388 let opened = cm.open(&sealed).await.expect("open");
1389 assert_eq!(opened.payload, b"hello seal");
1390 assert_eq!(opened.confidentiality, Confidentiality::None);
1391 assert!(
1392 opened.assurance.identity().is_none(),
1393 "an Open node's seal is anonymous, not a trusted identity"
1394 );
1395
1396 handle.shutdown().await.expect("shutdown");
1397 }
1398}