1use std::net::IpAddr;
2use std::sync::Arc;
3
4use tokio::sync::{broadcast, mpsc, Mutex};
5use tokio::task::JoinHandle;
6use tokio_stream::wrappers::BroadcastStream;
7use tokio_util::sync::CancellationToken;
8
9use koi_client::KoiClient;
10use koi_common::capability::Capability;
11use koi_common::peer::Peer;
12use koi_common::types::{EventKind, ServiceRecord};
13use koi_config::state::DnsEntry;
14use koi_dns::{DnsLookupResult, DnsRuntime};
15use koi_health::{HealthCheck, HealthRuntime};
16use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
17use koi_mdns::{BrowseSubscription as MdnsBrowseHandle, MdnsCore, MdnsEvent};
18use koi_proxy::{ProxyEntry, ProxyRuntime};
19
20use crate::{map_join_error, KoiError, KoiEvent};
21
22enum HandleBackend {
23 Embedded {
24 mdns: Option<Arc<MdnsCore>>,
25 dns: Option<Arc<DnsRuntime>>,
26 health: Option<Arc<HealthRuntime>>,
27 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
28 proxy: Option<Arc<ProxyRuntime>>,
29 udp: Option<Arc<koi_udp::UdpRuntime>>,
30 runtime: Option<Arc<koi_runtime::RuntimeCore>>,
31 },
32 Remote {
33 client: Arc<KoiClient>,
34 },
35}
36
37pub struct KoiHandle {
38 backend: HandleBackend,
39 data_dir: Option<std::path::PathBuf>,
40 events: broadcast::Sender<KoiEvent>,
41 cancel: CancellationToken,
42 tasks: Vec<JoinHandle<()>>,
43 http_announce_id: Option<String>,
44}
45
46impl KoiHandle {
47 #[allow(clippy::too_many_arguments)]
48 pub(crate) fn new_embedded(
49 mdns: Option<Arc<MdnsCore>>,
50 dns: Option<Arc<DnsRuntime>>,
51 health: Option<Arc<HealthRuntime>>,
52 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
53 proxy: Option<Arc<ProxyRuntime>>,
54 udp: Option<Arc<koi_udp::UdpRuntime>>,
55 runtime: Option<Arc<koi_runtime::RuntimeCore>>,
56 data_dir: Option<std::path::PathBuf>,
57 events: broadcast::Sender<KoiEvent>,
58 cancel: CancellationToken,
59 tasks: Vec<JoinHandle<()>>,
60 http_announce_id: Option<String>,
61 ) -> Self {
62 Self {
63 backend: HandleBackend::Embedded {
64 mdns,
65 dns,
66 health,
67 certmesh,
68 proxy,
69 udp,
70 runtime,
71 },
72 data_dir,
73 events,
74 cancel,
75 tasks,
76 http_announce_id,
77 }
78 }
79
80 pub(crate) fn new_remote(
81 client: Arc<KoiClient>,
82 events: broadcast::Sender<KoiEvent>,
83 cancel: CancellationToken,
84 tasks: Vec<JoinHandle<()>>,
85 ) -> Self {
86 Self {
87 backend: HandleBackend::Remote { client },
88 data_dir: None,
89 events,
90 cancel,
91 tasks,
92 http_announce_id: None,
93 }
94 }
95
96 pub fn events(&self) -> BroadcastStream<KoiEvent> {
97 BroadcastStream::new(self.events.subscribe())
98 }
99
100 pub fn serve(
110 &self,
111 router: axum::Router,
112 addr: std::net::SocketAddr,
113 cancel: CancellationToken,
114 ) -> Result<JoinHandle<()>, KoiError> {
115 match &self.backend {
116 HandleBackend::Embedded { certmesh, .. } => {
117 let core = certmesh
118 .as_ref()
119 .ok_or(KoiError::DisabledCapability("certmesh"))?;
120 let core = Arc::clone(core);
121 Ok(tokio::spawn(async move {
122 if let Err(e) = crate::serve::serve_adaptive(core, router, addr, cancel).await {
123 tracing::error!(error = %e, "same-port serve failed to bind");
124 }
125 }))
126 }
127 HandleBackend::Remote { .. } => {
128 Err(KoiError::DisabledCapability("certmesh (remote mode)"))
129 }
130 }
131 }
132
133 pub async fn participate(
148 &self,
149 router: axum::Router,
150 addr: std::net::SocketAddr,
151 service_type: &str,
152 cancel: CancellationToken,
153 ) -> Result<JoinHandle<()>, KoiError> {
154 let (certmesh, mdns) = match &self.backend {
155 HandleBackend::Embedded { certmesh, mdns, .. } => (
156 certmesh
157 .as_ref()
158 .ok_or(KoiError::DisabledCapability("certmesh"))?
159 .clone(),
160 mdns.clone(),
161 ),
162 HandleBackend::Remote { .. } => {
163 return Err(KoiError::DisabledCapability("certmesh (remote mode)"))
164 }
165 };
166
167 let _ = certmesh.ensure_identity().await;
169
170 if let Some(mdns) = mdns {
173 spawn_participate_announce(
174 mdns,
175 Arc::clone(&certmesh),
176 service_type.to_string(),
177 addr.port(),
178 cancel.clone(),
179 );
180 } else {
181 tracing::debug!("participate: mDNS disabled — serving without announcing");
182 }
183
184 self.serve(router, addr, cancel)
186 }
187
188 pub fn subscribe(&self) -> broadcast::Receiver<KoiEvent> {
189 self.events.subscribe()
190 }
191
192 pub fn mdns(&self) -> Result<MdnsHandle, KoiError> {
193 match &self.backend {
194 HandleBackend::Embedded { mdns, .. } => {
195 let core = mdns.as_ref().ok_or(KoiError::DisabledCapability("mdns"))?;
196 Ok(MdnsHandle::new_embedded(
197 Arc::clone(core),
198 self.events.clone(),
199 ))
200 }
201 HandleBackend::Remote { client } => Ok(MdnsHandle::new_remote(
202 Arc::clone(client),
203 self.events.clone(),
204 )),
205 }
206 }
207
208 pub fn dns(&self) -> Result<DnsHandle, KoiError> {
209 match &self.backend {
210 HandleBackend::Embedded { dns, .. } => {
211 let runtime = dns.as_ref().ok_or(KoiError::DisabledCapability("dns"))?;
212 Ok(DnsHandle::new_embedded(Arc::clone(runtime)))
213 }
214 HandleBackend::Remote { client } => Ok(DnsHandle::new_remote(Arc::clone(client))),
215 }
216 }
217
218 pub fn health(&self) -> Result<HealthHandle, KoiError> {
219 match &self.backend {
220 HandleBackend::Embedded { health, .. } => {
221 let runtime = health
222 .as_ref()
223 .ok_or(KoiError::DisabledCapability("health"))?;
224 Ok(HealthHandle::new_embedded(Arc::clone(runtime)))
225 }
226 HandleBackend::Remote { client } => Ok(HealthHandle::new_remote(Arc::clone(client))),
227 }
228 }
229
230 pub fn certmesh(&self) -> Result<CertmeshHandle, KoiError> {
231 match &self.backend {
232 HandleBackend::Embedded { certmesh, .. } => {
233 let core = certmesh
234 .as_ref()
235 .ok_or(KoiError::DisabledCapability("certmesh"))?;
236 Ok(CertmeshHandle::new_embedded(Arc::clone(core)))
237 }
238 HandleBackend::Remote { client } => Ok(CertmeshHandle::new_remote(Arc::clone(client))),
239 }
240 }
241
242 pub fn vault(&self) -> Result<koi_crypto::vault::Vault, KoiError> {
248 let dir = self
249 .data_dir
250 .as_ref()
251 .ok_or(KoiError::DisabledCapability("vault (no data_dir)"))?;
252 koi_crypto::vault::Vault::open(dir)
253 .map_err(|e| KoiError::Io(std::io::Error::other(e.to_string())))
254 }
255
256 pub fn proxy(&self) -> Result<ProxyHandle, KoiError> {
257 match &self.backend {
258 HandleBackend::Embedded { proxy, .. } => {
259 let runtime = proxy
260 .as_ref()
261 .ok_or(KoiError::DisabledCapability("proxy"))?;
262 Ok(ProxyHandle::new_embedded(Arc::clone(runtime)))
263 }
264 HandleBackend::Remote { client } => Ok(ProxyHandle::new_remote(Arc::clone(client))),
265 }
266 }
267
268 pub fn udp(&self) -> Result<Arc<koi_udp::UdpRuntime>, KoiError> {
273 match &self.backend {
274 HandleBackend::Embedded { udp, .. } => {
275 let runtime = udp.as_ref().ok_or(KoiError::DisabledCapability("udp"))?;
276 Ok(Arc::clone(runtime))
277 }
278 HandleBackend::Remote { .. } => Err(KoiError::DisabledCapability("udp (remote mode)")),
279 }
280 }
281
282 pub fn runtime(&self) -> Result<Arc<koi_runtime::RuntimeCore>, KoiError> {
286 match &self.backend {
287 HandleBackend::Embedded { runtime, .. } => {
288 let core = runtime
289 .as_ref()
290 .ok_or(KoiError::DisabledCapability("runtime"))?;
291 Ok(Arc::clone(core))
292 }
293 HandleBackend::Remote { .. } => {
294 Err(KoiError::DisabledCapability("runtime (remote mode)"))
295 }
296 }
297 }
298
299 pub async fn shutdown(mut self) -> Result<(), KoiError> {
300 self.cancel.cancel();
301 for task in self.tasks.drain(..) {
302 let _ = task.await;
303 }
304
305 if let HandleBackend::Embedded {
306 mdns,
307 dns,
308 health,
309 proxy,
310 ..
311 } = &self.backend
312 {
313 if let Some(runtime) = proxy {
314 runtime.stop_all().await;
315 }
316 if let Some(runtime) = health {
317 let _ = runtime.stop().await;
318 }
319 if let Some(runtime) = dns {
320 let _ = runtime.stop().await;
321 }
322 if let Some(id) = &self.http_announce_id {
323 if let Some(core) = mdns {
324 if let Err(e) = core.unregister(id) {
325 tracing::warn!(error = %e, "Failed to withdraw HTTP mDNS announcement");
326 }
327 }
328 }
329 if let Some(core) = mdns {
330 core.shutdown().await?;
331 }
332 }
333
334 Ok(())
335 }
336}
337
338pub struct KoiBrowseHandle {
339 backend: BrowseBackend,
340}
341
342enum BrowseBackend {
343 Embedded(MdnsBrowseHandle),
344 Remote(Mutex<mpsc::Receiver<MdnsEvent>>),
345}
346
347impl KoiBrowseHandle {
348 fn embedded(handle: MdnsBrowseHandle) -> Self {
349 Self {
350 backend: BrowseBackend::Embedded(handle),
351 }
352 }
353
354 fn remote(rx: mpsc::Receiver<MdnsEvent>) -> Self {
355 Self {
356 backend: BrowseBackend::Remote(Mutex::new(rx)),
357 }
358 }
359
360 pub async fn recv(&self) -> Option<MdnsEvent> {
361 match &self.backend {
362 BrowseBackend::Embedded(handle) => handle.recv().await,
363 BrowseBackend::Remote(rx) => rx.lock().await.recv().await,
364 }
365 }
366}
367
368pub const DEFAULT_DISCOVER_WINDOW: std::time::Duration = std::time::Duration::from_secs(2);
372
373pub struct MdnsHandle {
374 backend: MdnsBackend,
375 events: broadcast::Sender<KoiEvent>,
376}
377
378enum MdnsBackend {
379 Embedded { core: Arc<MdnsCore> },
380 Remote { client: Arc<KoiClient> },
381}
382
383impl MdnsHandle {
384 fn new_embedded(core: Arc<MdnsCore>, events: broadcast::Sender<KoiEvent>) -> Self {
385 Self {
386 backend: MdnsBackend::Embedded { core },
387 events,
388 }
389 }
390
391 fn new_remote(client: Arc<KoiClient>, events: broadcast::Sender<KoiEvent>) -> Self {
392 Self {
393 backend: MdnsBackend::Remote { client },
394 events,
395 }
396 }
397
398 pub fn core(&self) -> Result<Arc<MdnsCore>, KoiError> {
399 match &self.backend {
400 MdnsBackend::Embedded { core } => Ok(Arc::clone(core)),
401 MdnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("mdns")),
402 }
403 }
404
405 pub async fn browse(&self, service_type: &str) -> Result<KoiBrowseHandle, KoiError> {
406 match &self.backend {
407 MdnsBackend::Embedded { core } => {
408 let handle = core.subscribe_type(service_type).await?;
409 Ok(KoiBrowseHandle::embedded(handle))
410 }
411 MdnsBackend::Remote { client } => {
412 let (tx, rx) = mpsc::channel(64);
413 let client = Arc::clone(client);
414 let service_type = service_type.to_string();
415 tokio::task::spawn_blocking(move || {
416 let stream = match client.browse_stream(&service_type) {
417 Ok(stream) => stream,
418 Err(_) => return,
419 };
420 for item in stream {
421 let Ok(json) = item else {
422 break;
423 };
424 if let Some(event) = mdns_event_from_pipeline(json) {
425 if tx.blocking_send(event).is_err() {
426 break;
427 }
428 }
429 }
430 });
431 Ok(KoiBrowseHandle::remote(rx))
432 }
433 }
434 }
435
436 pub async fn discover(&self, service_type: &str) -> Result<Vec<Peer>, KoiError> {
446 self.discover_for(service_type, DEFAULT_DISCOVER_WINDOW)
447 .await
448 }
449
450 pub async fn discover_for(
452 &self,
453 service_type: &str,
454 window: std::time::Duration,
455 ) -> Result<Vec<Peer>, KoiError> {
456 let browse = self.browse(service_type).await?;
457 let mut events = Vec::new();
458 let deadline = tokio::time::sleep(window);
459 tokio::pin!(deadline);
460 loop {
461 tokio::select! {
462 _ = &mut deadline => break,
463 ev = browse.recv() => match ev {
464 Some(e) => events.push(e),
465 None => break,
466 },
467 }
468 }
469 Ok(fold_peers(events))
470 }
471
472 pub async fn resolve(&self, name: &str) -> Result<ServiceRecord, KoiError> {
473 match &self.backend {
474 MdnsBackend::Embedded { core } => Ok(core.resolve(name).await?),
475 MdnsBackend::Remote { client } => {
476 let name = name.to_string();
477 let client = Arc::clone(client);
478 let record = tokio::task::spawn_blocking(move || client.resolve(&name))
479 .await
480 .map_err(map_join_error)??;
481 Ok(record)
482 }
483 }
484 }
485
486 pub fn register(&self, payload: RegisterPayload) -> Result<RegistrationResult, KoiError> {
487 match &self.backend {
488 MdnsBackend::Embedded { core } => Ok(core.register(payload)?),
489 MdnsBackend::Remote { client } => Ok(client.register(&payload)?),
490 }
491 }
492
493 pub fn unregister(&self, id: &str) -> Result<(), KoiError> {
494 match &self.backend {
495 MdnsBackend::Embedded { core } => Ok(core.unregister(id)?),
496 MdnsBackend::Remote { client } => Ok(client.unregister(id)?),
497 }
498 }
499
500 pub fn subscribe(&self) -> Result<broadcast::Receiver<MdnsEvent>, KoiError> {
511 match &self.backend {
512 MdnsBackend::Embedded { core } => Ok(core.subscribe()),
513 MdnsBackend::Remote { .. } => Err(KoiError::RemoteUnsupported(
514 "mdns subscribe — use mdns.browse(service_type) for a remote event stream",
515 )),
516 }
517 }
518
519 pub fn emit_event(&self, event: KoiEvent) {
520 let _ = self.events.send(event);
521 }
522}
523
524pub struct DnsHandle {
525 backend: DnsBackend,
526}
527
528enum DnsBackend {
529 Embedded { runtime: Arc<DnsRuntime> },
530 Remote { client: Arc<KoiClient> },
531}
532
533impl DnsHandle {
534 fn new_embedded(runtime: Arc<DnsRuntime>) -> Self {
535 Self {
536 backend: DnsBackend::Embedded { runtime },
537 }
538 }
539
540 fn new_remote(client: Arc<KoiClient>) -> Self {
541 Self {
542 backend: DnsBackend::Remote { client },
543 }
544 }
545
546 pub fn runtime(&self) -> Result<Arc<DnsRuntime>, KoiError> {
547 match &self.backend {
548 DnsBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
549 DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
550 }
551 }
552
553 pub fn core(&self) -> Result<Arc<koi_dns::DnsCore>, KoiError> {
554 match &self.backend {
555 DnsBackend::Embedded { runtime } => Ok(runtime.core()),
556 DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
557 }
558 }
559
560 pub async fn lookup(
561 &self,
562 name: &str,
563 record_type: hickory_proto::rr::RecordType,
564 ) -> Option<DnsLookupResult> {
565 match &self.backend {
566 DnsBackend::Embedded { runtime } => runtime.core().lookup(name, record_type).await,
567 DnsBackend::Remote { client } => {
568 let name = name.to_string();
569 let client = Arc::clone(client);
570 let result =
571 tokio::task::spawn_blocking(move || client.dns_lookup(&name, record_type))
572 .await
573 .ok()
574 .and_then(|res| res.ok());
575 let json = match result {
576 Some(json) => json,
577 None => return None,
578 };
579 parse_dns_lookup(json)
580 }
581 }
582 }
583
584 pub fn list_names(&self) -> Vec<String> {
585 match &self.backend {
586 DnsBackend::Embedded { runtime } => runtime.core().list_names(),
587 DnsBackend::Remote { client } => {
588 let result = client.dns_list();
589 let Ok(json) = result else {
590 return Vec::new();
591 };
592 json.get("names")
593 .and_then(|v| v.as_array())
594 .map(|arr| {
595 arr.iter()
596 .filter_map(|name| name.as_str().map(|s| s.to_string()))
597 .collect()
598 })
599 .unwrap_or_default()
600 }
601 }
602 }
603
604 pub async fn start(&self) -> Result<bool, KoiError> {
605 match &self.backend {
606 DnsBackend::Embedded { runtime } => Ok(runtime.start().await?),
607 DnsBackend::Remote { client } => {
608 let client = Arc::clone(client);
609 let started = tokio::task::spawn_blocking(move || client.dns_start())
610 .await
611 .map_err(map_join_error)??
612 .get("started")
613 .and_then(|v| v.as_bool())
614 .unwrap_or(false);
615 Ok(started)
616 }
617 }
618 }
619
620 pub async fn stop(&self) -> bool {
621 match &self.backend {
622 DnsBackend::Embedded { runtime } => runtime.stop().await,
623 DnsBackend::Remote { client } => {
624 let client = Arc::clone(client);
625 tokio::task::spawn_blocking(move || client.dns_stop())
626 .await
627 .ok()
628 .and_then(|res| res.ok())
629 .and_then(|json| json.get("stopped").and_then(|v| v.as_bool()))
630 .unwrap_or(false)
631 }
632 }
633 }
634
635 pub fn add_entry(&self, entry: DnsEntry) -> Result<Vec<DnsEntry>, KoiError> {
636 match &self.backend {
637 DnsBackend::Embedded { runtime } => Ok(runtime.core().add_entry(entry)?),
638 DnsBackend::Remote { client } => {
639 let json = client.dns_add(&entry.name, &entry.ip, entry.ttl)?;
640 parse_dns_entries(json)
641 }
642 }
643 }
644
645 pub fn remove_entry(&self, name: &str) -> Result<Vec<DnsEntry>, KoiError> {
646 match &self.backend {
647 DnsBackend::Embedded { runtime } => {
648 Ok(runtime.core().remove_entry(name)?.unwrap_or_default())
649 }
650 DnsBackend::Remote { client } => {
651 let json = client.dns_remove(name)?;
652 parse_dns_entries(json)
653 }
654 }
655 }
656}
657
658pub struct HealthHandle {
659 backend: HealthBackend,
660}
661
662enum HealthBackend {
663 Embedded { runtime: Arc<HealthRuntime> },
664 Remote { client: Arc<KoiClient> },
665}
666
667impl HealthHandle {
668 fn new_embedded(runtime: Arc<HealthRuntime>) -> Self {
669 Self {
670 backend: HealthBackend::Embedded { runtime },
671 }
672 }
673
674 fn new_remote(client: Arc<KoiClient>) -> Self {
675 Self {
676 backend: HealthBackend::Remote { client },
677 }
678 }
679
680 pub fn core(&self) -> Result<Arc<koi_health::HealthCore>, KoiError> {
681 match &self.backend {
682 HealthBackend::Embedded { runtime } => Ok(runtime.core()),
683 HealthBackend::Remote { .. } => Err(KoiError::DisabledCapability("health")),
684 }
685 }
686
687 pub async fn status(&self) -> koi_health::HealthSnapshot {
688 match &self.backend {
689 HealthBackend::Embedded { runtime } => runtime.core().snapshot().await,
690 HealthBackend::Remote { client } => {
691 let client = Arc::clone(client);
692 let json = tokio::task::spawn_blocking(move || client.health_status())
693 .await
694 .ok()
695 .and_then(|res| res.ok());
696 json.and_then(|json| serde_json::from_value(json).ok())
697 .unwrap_or_else(|| koi_health::HealthSnapshot {
698 machines: Vec::new(),
699 services: Vec::new(),
700 })
701 }
702 }
703 }
704
705 pub async fn add_check(&self, check: HealthCheck) -> Result<(), KoiError> {
706 match &self.backend {
707 HealthBackend::Embedded { runtime } => Ok(runtime.core().add_check(check).await?),
708 HealthBackend::Remote { client } => {
709 let client = Arc::clone(client);
710 let check = check.clone();
711 tokio::task::spawn_blocking(move || {
712 client.health_add_check(
713 &check.name,
714 check.kind,
715 &check.target,
716 check.interval_secs,
717 check.timeout_secs,
718 )
719 })
720 .await
721 .map_err(map_join_error)??;
722 Ok(())
723 }
724 }
725 }
726
727 pub async fn remove_check(&self, name: &str) -> Result<(), KoiError> {
728 match &self.backend {
729 HealthBackend::Embedded { runtime } => Ok(runtime.core().remove_check(name).await?),
730 HealthBackend::Remote { client } => {
731 let client = Arc::clone(client);
732 let name = name.to_string();
733 tokio::task::spawn_blocking(move || client.health_remove_check(&name))
734 .await
735 .map_err(map_join_error)??;
736 Ok(())
737 }
738 }
739 }
740
741 pub async fn start(&self) -> Result<bool, KoiError> {
742 match &self.backend {
743 HealthBackend::Embedded { runtime } => Ok(runtime.start().await?),
744 HealthBackend::Remote { .. } => Ok(false),
745 }
746 }
747
748 pub async fn stop(&self) -> bool {
749 match &self.backend {
750 HealthBackend::Embedded { runtime } => runtime.stop().await,
751 HealthBackend::Remote { .. } => false,
752 }
753 }
754}
755
756pub struct CertmeshHandle {
757 backend: CertmeshBackend,
758}
759
760enum CertmeshBackend {
761 Embedded {
762 core: Arc<koi_certmesh::CertmeshCore>,
763 },
764 Remote {
765 client: Arc<KoiClient>,
766 },
767}
768
769impl CertmeshHandle {
770 fn new_embedded(core: Arc<koi_certmesh::CertmeshCore>) -> Self {
771 Self {
772 backend: CertmeshBackend::Embedded { core },
773 }
774 }
775
776 fn new_remote(client: Arc<KoiClient>) -> Self {
777 Self {
778 backend: CertmeshBackend::Remote { client },
779 }
780 }
781
782 pub fn core(&self) -> Result<Arc<koi_certmesh::CertmeshCore>, KoiError> {
783 match &self.backend {
784 CertmeshBackend::Embedded { core } => Ok(Arc::clone(core)),
785 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
786 }
787 }
788
789 pub async fn status(&self) -> koi_common::capability::CapabilityStatus {
790 match &self.backend {
791 CertmeshBackend::Embedded { core } => core.status().await,
792 CertmeshBackend::Remote { client } => {
793 let client = Arc::clone(client);
794 let json = tokio::task::spawn_blocking(move || client.unified_status())
795 .await
796 .ok()
797 .and_then(|res| res.ok());
798 json.and_then(extract_capability_status)
799 .unwrap_or_else(default_capability_status)
800 }
801 }
802 }
803
804 pub fn posture(&self) -> Result<koi_common::posture::Posture, KoiError> {
810 match &self.backend {
811 CertmeshBackend::Embedded { core } => Ok(core.posture()),
812 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
813 }
814 }
815
816 pub async fn local_identity(&self) -> Result<Option<koi_certmesh::Identity>, KoiError> {
819 match &self.backend {
820 CertmeshBackend::Embedded { core } => Ok(core.local_identity().await),
821 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
822 }
823 }
824
825 pub async fn ensure_identity(&self) -> Result<Option<koi_certmesh::Identity>, KoiError> {
828 match &self.backend {
829 CertmeshBackend::Embedded { core } => Ok(core.ensure_identity().await),
830 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
831 }
832 }
833
834 pub async fn sign(&self, bytes: &[u8]) -> Result<koi_common::envelope::Envelope, KoiError> {
838 match &self.backend {
839 CertmeshBackend::Embedded { core } => Ok(core.sign(bytes).await),
840 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
841 }
842 }
843
844 pub async fn verify(
847 &self,
848 env: &koi_common::envelope::Envelope,
849 ) -> Result<koi_common::envelope::Assurance, KoiError> {
850 match &self.backend {
851 CertmeshBackend::Embedded { core } => Ok(core.verify(env).await),
852 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
853 }
854 }
855
856 pub async fn seal(&self, bytes: &[u8]) -> Result<koi_common::sealed::Sealed, KoiError> {
860 match &self.backend {
861 CertmeshBackend::Embedded { core } => Ok(core.seal(bytes).await),
862 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
863 }
864 }
865
866 pub async fn open(
869 &self,
870 sealed: &koi_common::sealed::Sealed,
871 ) -> Result<koi_common::sealed::Opened, KoiError> {
872 match &self.backend {
873 CertmeshBackend::Embedded { core } => Ok(core.open(sealed).await?),
874 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
875 }
876 }
877
878 pub async fn diagnose(&self) -> Result<koi_common::diagnosis::TrustDiagnosis, KoiError> {
883 match &self.backend {
884 CertmeshBackend::Embedded { core } => Ok(core.diagnose().await),
885 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
886 }
887 }
888
889 pub async fn client_for(&self, peer: &Peer) -> Result<koi_certmesh::PeerClient, KoiError> {
897 match &self.backend {
898 CertmeshBackend::Embedded { core } => Ok(core.client_for(peer).await?),
899 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
900 }
901 }
902}
903
904async fn announce_once(
908 mdns: &Arc<MdnsCore>,
909 certmesh: &Arc<koi_certmesh::CertmeshCore>,
910 hostname: &str,
911 service_type: &str,
912 port: u16,
913) -> Option<String> {
914 let id = certmesh.local_identity().await;
915 let mut txt = std::collections::HashMap::new();
916 koi_common::peer::stamp(
917 &mut txt,
918 certmesh.posture(),
919 id.as_ref().map(|i| i.ca_fingerprint.as_str()),
920 id.as_ref().map(|i| i.renewal.expires_at),
921 );
922 let payload = RegisterPayload {
923 name: hostname.to_string(),
924 service_type: service_type.to_string(),
925 port,
926 ip: None,
927 lease_secs: None,
928 txt,
929 };
930 match mdns.register(payload) {
931 Ok(result) => Some(result.id),
932 Err(e) => {
933 tracing::warn!(error = %e, "participate: mDNS announce failed");
934 None
935 }
936 }
937}
938
939fn spawn_participate_announce(
944 mdns: Arc<MdnsCore>,
945 certmesh: Arc<koi_certmesh::CertmeshCore>,
946 service_type: String,
947 port: u16,
948 cancel: CancellationToken,
949) {
950 tokio::spawn(async move {
951 let hostname = hostname::get()
952 .ok()
953 .and_then(|os| os.into_string().ok())
954 .unwrap_or_else(|| "unknown".to_string());
955 let mut posture_rx = certmesh.watch_posture();
956 let mut current_id = announce_once(&mdns, &certmesh, &hostname, &service_type, port).await;
957 loop {
958 tokio::select! {
959 _ = cancel.cancelled() => break,
960 changed = posture_rx.changed() => {
961 if changed.is_err() {
962 break; }
964 if let Some(old) = current_id.take() {
966 let _ = mdns.unregister(&old);
967 }
968 current_id =
969 announce_once(&mdns, &certmesh, &hostname, &service_type, port).await;
970 }
971 }
972 }
973 if let Some(id) = current_id {
974 let _ = mdns.unregister(&id);
975 }
976 });
977}
978
979fn fold_peers(events: impl IntoIterator<Item = MdnsEvent>) -> Vec<Peer> {
984 use std::collections::BTreeMap;
985 let mut by_name: BTreeMap<String, ServiceRecord> = BTreeMap::new();
986 for ev in events {
987 match ev {
988 MdnsEvent::Found(rec) => {
989 by_name.entry(rec.name.clone()).or_insert(rec);
990 }
991 MdnsEvent::Resolved(rec) => {
992 by_name.insert(rec.name.clone(), rec);
993 }
994 MdnsEvent::Removed { name, .. } => {
995 by_name.remove(&name);
996 }
997 }
998 }
999 by_name.into_values().map(Peer::from_record).collect()
1000}
1001
1002pub struct ProxyHandle {
1003 backend: ProxyBackend,
1004}
1005
1006enum ProxyBackend {
1007 Embedded { runtime: Arc<ProxyRuntime> },
1008 Remote { client: Arc<KoiClient> },
1009}
1010
1011impl ProxyHandle {
1012 fn new_embedded(runtime: Arc<ProxyRuntime>) -> Self {
1013 Self {
1014 backend: ProxyBackend::Embedded { runtime },
1015 }
1016 }
1017
1018 fn new_remote(client: Arc<KoiClient>) -> Self {
1019 Self {
1020 backend: ProxyBackend::Remote { client },
1021 }
1022 }
1023
1024 pub fn runtime(&self) -> Result<Arc<ProxyRuntime>, KoiError> {
1025 match &self.backend {
1026 ProxyBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
1027 ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
1028 }
1029 }
1030
1031 pub fn core(&self) -> Result<Arc<koi_proxy::ProxyCore>, KoiError> {
1032 match &self.backend {
1033 ProxyBackend::Embedded { runtime } => Ok(runtime.core()),
1034 ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
1035 }
1036 }
1037
1038 pub async fn entries(&self) -> Vec<ProxyEntry> {
1039 match &self.backend {
1040 ProxyBackend::Embedded { runtime } => runtime.core().entries().await,
1041 ProxyBackend::Remote { client } => {
1042 let client = Arc::clone(client);
1043 tokio::task::spawn_blocking(move || client.proxy_list())
1044 .await
1045 .ok()
1046 .and_then(|res| res.ok())
1047 .and_then(|json| parse_proxy_entries(json).ok())
1048 .unwrap_or_default()
1049 }
1050 }
1051 }
1052
1053 pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, KoiError> {
1054 match &self.backend {
1055 ProxyBackend::Embedded { runtime } => Ok(runtime.core().upsert(entry).await?),
1056 ProxyBackend::Remote { client } => {
1057 let client = Arc::clone(client);
1058 let entry = entry.clone();
1059 let add_client = Arc::clone(&client);
1060 tokio::task::spawn_blocking(move || {
1061 add_client.proxy_add(
1062 &entry.name,
1063 entry.listen_port,
1064 &entry.backend,
1065 entry.allow_remote,
1066 )
1067 })
1068 .await
1069 .map_err(map_join_error)??;
1070 let list = tokio::task::spawn_blocking(move || client.proxy_list())
1071 .await
1072 .map_err(map_join_error)??;
1073 parse_proxy_entries(list)
1074 }
1075 }
1076 }
1077
1078 pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, KoiError> {
1079 match &self.backend {
1080 ProxyBackend::Embedded { runtime } => Ok(runtime.core().remove(name).await?),
1081 ProxyBackend::Remote { client } => {
1082 let client = Arc::clone(client);
1083 let name = name.to_string();
1084 let remove_client = Arc::clone(&client);
1085 tokio::task::spawn_blocking(move || remove_client.proxy_remove(&name))
1086 .await
1087 .map_err(map_join_error)??;
1088 let list = tokio::task::spawn_blocking(move || client.proxy_list())
1089 .await
1090 .map_err(map_join_error)??;
1091 parse_proxy_entries(list)
1092 }
1093 }
1094 }
1095
1096 pub async fn start_all(&self) -> Result<(), KoiError> {
1097 match &self.backend {
1098 ProxyBackend::Embedded { runtime } => Ok(runtime.start_all().await?),
1099 ProxyBackend::Remote { .. } => Ok(()),
1100 }
1101 }
1102
1103 pub async fn stop_all(&self) {
1104 if let ProxyBackend::Embedded { runtime } = &self.backend {
1105 runtime.stop_all().await;
1106 }
1107 }
1108}
1109
1110fn parse_dns_lookup(json: serde_json::Value) -> Option<DnsLookupResult> {
1111 let name = json.get("name").and_then(|v| v.as_str())?.to_string();
1112 let source = json
1113 .get("source")
1114 .and_then(|v| v.as_str())
1115 .unwrap_or("unknown")
1116 .to_string();
1117 let ips = json.get("ips").and_then(|v| v.as_array()).map(|arr| {
1118 arr.iter()
1119 .filter_map(|ip| ip.as_str())
1120 .filter_map(|ip| ip.parse::<IpAddr>().ok())
1121 .collect::<Vec<_>>()
1122 })?;
1123 Some(DnsLookupResult { name, ips, source })
1124}
1125
1126fn parse_dns_entries(json: serde_json::Value) -> Result<Vec<DnsEntry>, KoiError> {
1127 let entries = json.get("entries").ok_or_else(|| {
1128 KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::other(
1129 "missing entries",
1130 )))
1131 })?;
1132 let entries = serde_json::from_value(entries.clone()).map_err(|e| {
1133 KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::new(
1134 std::io::ErrorKind::InvalidData,
1135 e.to_string(),
1136 )))
1137 })?;
1138 Ok(entries)
1139}
1140
1141fn parse_proxy_entries(json: serde_json::Value) -> Result<Vec<ProxyEntry>, KoiError> {
1142 let entries = json
1143 .get("entries")
1144 .ok_or_else(|| KoiError::Proxy(koi_proxy::ProxyError::Io("missing entries".to_string())))?
1145 .clone();
1146 serde_json::from_value(entries)
1147 .map_err(|e| KoiError::Proxy(koi_proxy::ProxyError::Io(e.to_string())))
1148}
1149
1150fn extract_capability_status(
1151 json: serde_json::Value,
1152) -> Option<koi_common::capability::CapabilityStatus> {
1153 let caps = json.get("capabilities")?.as_array()?;
1154 for cap in caps {
1155 if cap.get("name")?.as_str()? == "certmesh" {
1156 let name = cap.get("name")?.as_str()?.to_string();
1157 let summary = cap
1158 .get("summary")
1159 .and_then(|v| v.as_str())
1160 .unwrap_or("unknown")
1161 .to_string();
1162 let healthy = cap
1163 .get("healthy")
1164 .and_then(|v| v.as_bool())
1165 .unwrap_or(false);
1166 return Some(koi_common::capability::CapabilityStatus {
1167 name,
1168 summary,
1169 healthy,
1170 });
1171 }
1172 }
1173 None
1174}
1175
1176fn default_capability_status() -> koi_common::capability::CapabilityStatus {
1177 koi_common::capability::CapabilityStatus {
1178 name: "certmesh".to_string(),
1179 summary: "unknown".to_string(),
1180 healthy: false,
1181 }
1182}
1183
1184fn mdns_event_from_pipeline(json: serde_json::Value) -> Option<MdnsEvent> {
1185 if let Some(found) = json.get("found") {
1186 let record: ServiceRecord = serde_json::from_value(found.clone()).ok()?;
1187 return Some(MdnsEvent::Found(record));
1188 }
1189 if let Some(resolved) = json.get("resolved") {
1190 let record: ServiceRecord = serde_json::from_value(resolved.clone()).ok()?;
1191 return Some(MdnsEvent::Resolved(record));
1192 }
1193 if let Some(event) = json.get("event") {
1194 let kind: EventKind = serde_json::from_value(event.clone()).ok()?;
1195 let service = json
1196 .get("service")
1197 .cloned()
1198 .unwrap_or(serde_json::Value::Null);
1199 let record: ServiceRecord = serde_json::from_value(service).ok()?;
1200 return match kind {
1201 EventKind::Found => Some(MdnsEvent::Found(record)),
1202 EventKind::Resolved => Some(MdnsEvent::Resolved(record)),
1203 EventKind::Removed => Some(MdnsEvent::Removed {
1204 name: record.name,
1205 service_type: record.service_type,
1206 }),
1207 };
1208 }
1209 None
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214 use super::*;
1215 use koi_common::posture::PostureLevel;
1216 use std::collections::HashMap;
1217
1218 fn rec(name: &str, txt: &[(&str, &str)]) -> ServiceRecord {
1219 ServiceRecord {
1220 name: name.to_string(),
1221 service_type: "_http._tcp".to_string(),
1222 host: Some(format!("{name}.local")),
1223 ip: Some("10.0.0.9".to_string()),
1224 port: Some(8443),
1225 txt: txt
1226 .iter()
1227 .map(|(k, v)| (k.to_string(), v.to_string()))
1228 .collect::<HashMap<_, _>>(),
1229 }
1230 }
1231
1232 #[test]
1233 fn fold_resolved_overwrites_found_for_txt_enrichment() {
1234 let peers = fold_peers([
1236 MdnsEvent::Found(rec("a", &[])),
1237 MdnsEvent::Resolved(rec("a", &[("fp", "CAFP"), ("posture", "authenticated")])),
1238 ]);
1239 assert_eq!(peers.len(), 1, "the two events collapse to one peer");
1240 assert_eq!(peers[0].level(), PostureLevel::Authenticated);
1241 assert_eq!(peers[0].fp.as_deref(), Some("CAFP"));
1242 }
1243
1244 #[test]
1245 fn fold_removed_drops_the_peer() {
1246 let peers = fold_peers([
1247 MdnsEvent::Found(rec("b", &[])),
1248 MdnsEvent::Removed {
1249 name: "b".to_string(),
1250 service_type: "_http._tcp".to_string(),
1251 },
1252 ]);
1253 assert!(peers.is_empty(), "a removed peer is not in the snapshot");
1254 }
1255
1256 #[test]
1257 fn fold_orders_peers_by_name() {
1258 let peers = fold_peers([
1259 MdnsEvent::Resolved(rec("z", &[])),
1260 MdnsEvent::Resolved(rec("a", &[])),
1261 MdnsEvent::Resolved(rec("m", &[])),
1262 ]);
1263 let names: Vec<_> = peers.iter().map(|p| p.record.name.clone()).collect();
1264 assert_eq!(names, vec!["a", "m", "z"]);
1265 }
1266
1267 #[test]
1268 fn fold_open_peer_has_open_posture() {
1269 let peers = fold_peers([MdnsEvent::Resolved(rec("plain", &[]))]);
1270 assert_eq!(peers.len(), 1);
1271 assert_eq!(peers[0].level(), PostureLevel::Open);
1272 assert!(!peers[0].is_secure());
1273 }
1274
1275 #[tokio::test]
1278 async fn participate_remote_handle_is_disabled() {
1279 let client = Arc::new(KoiClient::new("http://127.0.0.1:1"));
1280 let (tx, _) = broadcast::channel(8);
1281 let handle = KoiHandle::new_remote(client, tx, CancellationToken::new(), Vec::new());
1282 let router = axum::Router::new();
1283 let addr: std::net::SocketAddr = ([127, 0, 0, 1], 0).into();
1284 let err = handle
1285 .participate(router, addr, "_x._tcp", CancellationToken::new())
1286 .await
1287 .unwrap_err();
1288 assert!(matches!(err, KoiError::DisabledCapability(_)));
1289 }
1290
1291 #[tokio::test]
1292 async fn participate_open_node_serves_plaintext() {
1293 let dir = std::env::temp_dir().join(format!("koi-emb-participate-{}", std::process::id()));
1297 let _ = std::fs::remove_dir_all(&dir);
1298 let koi = crate::Builder::new()
1299 .data_dir(&dir)
1300 .service_mode(crate::ServiceMode::EmbeddedOnly)
1301 .mdns(false)
1302 .dns_enabled(false)
1303 .health(false)
1304 .certmesh(true)
1305 .proxy(false)
1306 .build()
1307 .expect("build");
1308 let handle = koi.start().await.expect("start");
1309
1310 let addr = {
1311 let l = tokio::net::TcpListener::bind(("127.0.0.1", 0))
1312 .await
1313 .unwrap();
1314 l.local_addr().unwrap()
1315 };
1316 let router = axum::Router::new().route("/ping", axum::routing::get(|| async { "pong" }));
1317 let cancel = CancellationToken::new();
1318 let _server = handle
1319 .participate(router, addr, "_koi-test._tcp", cancel.clone())
1320 .await
1321 .expect("participate");
1322 tokio::time::sleep(std::time::Duration::from_millis(75)).await;
1323
1324 let (status, body) = koi_certmesh::mtls::get(&addr.ip().to_string(), addr.port(), "/ping")
1325 .await
1326 .expect("plain GET to an Open participating node");
1327 assert_eq!(status, 200);
1328 assert_eq!(body, "pong");
1329
1330 cancel.cancel();
1331 handle.shutdown().await.expect("shutdown");
1332 }
1333
1334 #[tokio::test]
1337 async fn seal_open_round_trip_on_open_node() {
1338 use koi_common::sealed::Confidentiality;
1339 let dir = std::env::temp_dir().join(format!("koi-emb-seal-{}", std::process::id()));
1340 let _ = std::fs::remove_dir_all(&dir);
1341 let koi = crate::Builder::new()
1342 .data_dir(&dir)
1343 .service_mode(crate::ServiceMode::EmbeddedOnly)
1344 .mdns(false)
1345 .dns_enabled(false)
1346 .health(false)
1347 .certmesh(true)
1348 .proxy(false)
1349 .build()
1350 .expect("build");
1351 let handle = koi.start().await.expect("start");
1352 let cm = handle.certmesh().expect("certmesh handle");
1353
1354 let sealed = cm.seal(b"hello seal").await.expect("seal");
1357 assert_eq!(sealed.confidentiality(), Confidentiality::None);
1358 let opened = cm.open(&sealed).await.expect("open");
1359 assert_eq!(opened.payload, b"hello seal");
1360 assert_eq!(opened.confidentiality, Confidentiality::None);
1361 assert!(
1362 opened.assurance.identity().is_none(),
1363 "an Open node's seal is anonymous, not a trusted identity"
1364 );
1365
1366 handle.shutdown().await.expect("shutdown");
1367 }
1368}