1use std::net::IpAddr;
2use std::sync::Arc;
3
4use tokio::sync::{broadcast, mpsc, Mutex};
5use tokio::task::JoinHandle;
6use tokio_stream::wrappers::BroadcastStream;
7use tokio_util::sync::CancellationToken;
8
9use koi_client::KoiClient;
10use koi_common::capability::Capability;
11use koi_common::peer::Peer;
12use koi_common::types::{EventKind, ServiceRecord};
13use koi_config::state::DnsEntry;
14use koi_dns::{DnsLookupResult, DnsRuntime};
15use koi_health::{HealthCheck, HealthRuntime};
16use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
17use koi_mdns::{BrowseSubscription as MdnsBrowseHandle, MdnsCore, MdnsEvent};
18use koi_proxy::{ProxyEntry, ProxyRuntime};
19
20use crate::{map_join_error, KoiError, KoiEvent};
21
22const SHUTDOWN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(20);
25const SHUTDOWN_DRAIN: std::time::Duration = std::time::Duration::from_millis(500);
27
28enum HandleBackend {
29 Embedded {
30 mdns: Option<Arc<MdnsCore>>,
31 dns: Option<Arc<DnsRuntime>>,
32 health: Option<Arc<HealthRuntime>>,
33 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
34 proxy: Option<Arc<ProxyRuntime>>,
35 udp: Option<Arc<koi_udp::UdpRuntime>>,
36 runtime: Option<Arc<koi_runtime::RuntimeCore>>,
37 },
38 Remote {
39 client: Arc<KoiClient>,
40 },
41}
42
43pub struct KoiHandle {
44 backend: HandleBackend,
45 http_addr: Option<std::net::SocketAddr>,
49 data_dir: Option<std::path::PathBuf>,
50 events: broadcast::Sender<KoiEvent>,
51 cancel: CancellationToken,
52 tasks: Vec<JoinHandle<()>>,
53}
54
55impl KoiHandle {
56 #[allow(clippy::too_many_arguments)]
57 pub(crate) fn new_embedded(
58 mdns: Option<Arc<MdnsCore>>,
59 dns: Option<Arc<DnsRuntime>>,
60 health: Option<Arc<HealthRuntime>>,
61 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
62 proxy: Option<Arc<ProxyRuntime>>,
63 udp: Option<Arc<koi_udp::UdpRuntime>>,
64 runtime: Option<Arc<koi_runtime::RuntimeCore>>,
65 http_addr: Option<std::net::SocketAddr>,
66 data_dir: Option<std::path::PathBuf>,
67 events: broadcast::Sender<KoiEvent>,
68 cancel: CancellationToken,
69 tasks: Vec<JoinHandle<()>>,
70 ) -> Self {
71 Self {
72 backend: HandleBackend::Embedded {
73 mdns,
74 dns,
75 health,
76 certmesh,
77 proxy,
78 udp,
79 runtime,
80 },
81 http_addr,
82 data_dir,
83 events,
84 cancel,
85 tasks,
86 }
87 }
88
89 pub(crate) fn new_remote(
90 client: Arc<KoiClient>,
91 events: broadcast::Sender<KoiEvent>,
92 cancel: CancellationToken,
93 tasks: Vec<JoinHandle<()>>,
94 ) -> Self {
95 Self {
96 backend: HandleBackend::Remote { client },
97 http_addr: None,
98 data_dir: None,
99 events,
100 cancel,
101 tasks,
102 }
103 }
104
105 pub fn events(&self) -> BroadcastStream<KoiEvent> {
106 BroadcastStream::new(self.events.subscribe())
107 }
108
109 pub fn http_addr(&self) -> Option<std::net::SocketAddr> {
114 self.http_addr
115 }
116
117 pub fn bound_http_port(&self) -> Option<u16> {
120 self.http_addr.map(|addr| addr.port())
121 }
122
123 pub fn serve(
133 &self,
134 router: axum::Router,
135 addr: std::net::SocketAddr,
136 cancel: CancellationToken,
137 ) -> Result<JoinHandle<()>, KoiError> {
138 match &self.backend {
139 HandleBackend::Embedded { certmesh, .. } => {
140 let core = certmesh
141 .as_ref()
142 .ok_or(KoiError::DisabledCapability("certmesh"))?;
143 let core = Arc::clone(core);
144 Ok(tokio::spawn(async move {
145 if let Err(e) = crate::serve::serve_adaptive(core, router, addr, cancel).await {
146 tracing::error!(error = %e, "same-port serve failed to bind");
147 }
148 }))
149 }
150 HandleBackend::Remote { .. } => {
151 Err(KoiError::DisabledCapability("certmesh (remote mode)"))
152 }
153 }
154 }
155
156 pub fn try_serve(
163 &self,
164 router: axum::Router,
165 addr: std::net::SocketAddr,
166 cancel: CancellationToken,
167 ) -> Result<JoinHandle<Result<(), std::io::Error>>, KoiError> {
168 match &self.backend {
169 HandleBackend::Embedded { certmesh, .. } => {
170 let core = certmesh
171 .as_ref()
172 .ok_or(KoiError::DisabledCapability("certmesh"))?;
173 let core = Arc::clone(core);
174 Ok(tokio::spawn(async move {
175 crate::serve::serve_adaptive(core, router, addr, cancel).await
176 }))
177 }
178 HandleBackend::Remote { .. } => {
179 Err(KoiError::DisabledCapability("certmesh (remote mode)"))
180 }
181 }
182 }
183
184 pub async fn participate(
199 &self,
200 router: axum::Router,
201 addr: std::net::SocketAddr,
202 service_type: &str,
203 cancel: CancellationToken,
204 ) -> Result<JoinHandle<()>, KoiError> {
205 let (certmesh, mdns) = match &self.backend {
206 HandleBackend::Embedded { certmesh, mdns, .. } => (
207 certmesh
208 .as_ref()
209 .ok_or(KoiError::DisabledCapability("certmesh"))?
210 .clone(),
211 mdns.clone(),
212 ),
213 HandleBackend::Remote { .. } => {
214 return Err(KoiError::DisabledCapability("certmesh (remote mode)"))
215 }
216 };
217
218 let _ = certmesh.ensure_identity().await;
220
221 if let Some(mdns) = mdns {
224 spawn_participate_announce(
225 mdns,
226 Arc::clone(&certmesh),
227 service_type.to_string(),
228 addr.port(),
229 cancel.clone(),
230 );
231 } else {
232 tracing::debug!("participate: mDNS disabled — serving without announcing");
233 }
234
235 self.serve(router, addr, cancel)
237 }
238
239 pub fn subscribe(&self) -> broadcast::Receiver<KoiEvent> {
240 self.events.subscribe()
241 }
242
243 pub fn mdns(&self) -> Result<MdnsHandle, KoiError> {
244 match &self.backend {
245 HandleBackend::Embedded { mdns, .. } => {
246 let core = mdns.as_ref().ok_or(KoiError::DisabledCapability("mdns"))?;
247 Ok(MdnsHandle::new_embedded(
248 Arc::clone(core),
249 self.events.clone(),
250 ))
251 }
252 HandleBackend::Remote { client } => Ok(MdnsHandle::new_remote(
253 Arc::clone(client),
254 self.events.clone(),
255 )),
256 }
257 }
258
259 pub fn dns(&self) -> Result<DnsHandle, KoiError> {
260 match &self.backend {
261 HandleBackend::Embedded { dns, .. } => {
262 let runtime = dns.as_ref().ok_or(KoiError::DisabledCapability("dns"))?;
263 Ok(DnsHandle::new_embedded(Arc::clone(runtime)))
264 }
265 HandleBackend::Remote { client } => Ok(DnsHandle::new_remote(Arc::clone(client))),
266 }
267 }
268
269 pub fn health(&self) -> Result<HealthHandle, KoiError> {
270 match &self.backend {
271 HandleBackend::Embedded { health, .. } => {
272 let runtime = health
273 .as_ref()
274 .ok_or(KoiError::DisabledCapability("health"))?;
275 Ok(HealthHandle::new_embedded(Arc::clone(runtime)))
276 }
277 HandleBackend::Remote { client } => Ok(HealthHandle::new_remote(Arc::clone(client))),
278 }
279 }
280
281 pub fn certmesh(&self) -> Result<CertmeshHandle, KoiError> {
282 match &self.backend {
283 HandleBackend::Embedded { certmesh, .. } => {
284 let core = certmesh
285 .as_ref()
286 .ok_or(KoiError::DisabledCapability("certmesh"))?;
287 Ok(CertmeshHandle::new_embedded(Arc::clone(core)))
288 }
289 HandleBackend::Remote { client } => Ok(CertmeshHandle::new_remote(Arc::clone(client))),
290 }
291 }
292
293 pub fn vault(&self) -> Result<koi_crypto::vault::Vault, KoiError> {
299 let dir = self
300 .data_dir
301 .as_ref()
302 .ok_or(KoiError::DisabledCapability("vault (no data_dir)"))?;
303 koi_crypto::vault::Vault::open(dir)
304 .map_err(|e| KoiError::Io(std::io::Error::other(e.to_string())))
305 }
306
307 pub fn proxy(&self) -> Result<ProxyHandle, KoiError> {
308 match &self.backend {
309 HandleBackend::Embedded { proxy, .. } => {
310 let runtime = proxy
311 .as_ref()
312 .ok_or(KoiError::DisabledCapability("proxy"))?;
313 Ok(ProxyHandle::new_embedded(Arc::clone(runtime)))
314 }
315 HandleBackend::Remote { client } => Ok(ProxyHandle::new_remote(Arc::clone(client))),
316 }
317 }
318
319 pub fn udp(&self) -> Result<Arc<koi_udp::UdpRuntime>, KoiError> {
324 match &self.backend {
325 HandleBackend::Embedded { udp, .. } => {
326 let runtime = udp.as_ref().ok_or(KoiError::DisabledCapability("udp"))?;
327 Ok(Arc::clone(runtime))
328 }
329 HandleBackend::Remote { .. } => Err(KoiError::DisabledCapability("udp (remote mode)")),
330 }
331 }
332
333 pub fn runtime(&self) -> Result<Arc<koi_runtime::RuntimeCore>, KoiError> {
337 match &self.backend {
338 HandleBackend::Embedded { runtime, .. } => {
339 let core = runtime
340 .as_ref()
341 .ok_or(KoiError::DisabledCapability("runtime"))?;
342 Ok(Arc::clone(core))
343 }
344 HandleBackend::Remote { .. } => {
345 Err(KoiError::DisabledCapability("runtime (remote mode)"))
346 }
347 }
348 }
349
350 pub async fn shutdown(mut self) -> Result<(), KoiError> {
351 let tasks = std::mem::take(&mut self.tasks);
352
353 if let HandleBackend::Embedded {
354 mdns,
355 dns,
356 health,
357 certmesh,
358 proxy,
359 udp,
360 runtime,
361 } = &self.backend
362 {
363 let cores = koi_compose::cores::Cores {
367 mdns: mdns.clone(),
368 certmesh: certmesh.clone(),
369 dns: dns.clone(),
370 health: health.clone(),
371 proxy: proxy.clone(),
372 udp: udp.clone(),
373 runtime: runtime.clone(),
374 mdns_snapshot: None,
375 };
376 koi_compose::cores::ordered_shutdown(
377 &self.cancel,
378 tasks,
379 &cores,
380 SHUTDOWN_TIMEOUT,
381 SHUTDOWN_DRAIN,
382 )
383 .await;
384 } else {
385 self.cancel.cancel();
387 for task in tasks {
388 let _ = task.await;
389 }
390 }
391
392 Ok(())
393 }
394}
395
396pub struct KoiBrowseHandle {
397 backend: BrowseBackend,
398}
399
400enum BrowseBackend {
401 Embedded(MdnsBrowseHandle),
402 Remote(Mutex<mpsc::Receiver<MdnsEvent>>),
403}
404
405impl KoiBrowseHandle {
406 fn embedded(handle: MdnsBrowseHandle) -> Self {
407 Self {
408 backend: BrowseBackend::Embedded(handle),
409 }
410 }
411
412 fn remote(rx: mpsc::Receiver<MdnsEvent>) -> Self {
413 Self {
414 backend: BrowseBackend::Remote(Mutex::new(rx)),
415 }
416 }
417
418 pub async fn recv(&self) -> Option<MdnsEvent> {
419 match &self.backend {
420 BrowseBackend::Embedded(handle) => handle.recv().await,
421 BrowseBackend::Remote(rx) => rx.lock().await.recv().await,
422 }
423 }
424}
425
426pub const DEFAULT_DISCOVER_WINDOW: std::time::Duration = std::time::Duration::from_secs(2);
430
431pub struct MdnsHandle {
432 backend: MdnsBackend,
433 events: broadcast::Sender<KoiEvent>,
434}
435
436enum MdnsBackend {
437 Embedded { core: Arc<MdnsCore> },
438 Remote { client: Arc<KoiClient> },
439}
440
441impl MdnsHandle {
442 fn new_embedded(core: Arc<MdnsCore>, events: broadcast::Sender<KoiEvent>) -> Self {
443 Self {
444 backend: MdnsBackend::Embedded { core },
445 events,
446 }
447 }
448
449 fn new_remote(client: Arc<KoiClient>, events: broadcast::Sender<KoiEvent>) -> Self {
450 Self {
451 backend: MdnsBackend::Remote { client },
452 events,
453 }
454 }
455
456 pub fn core(&self) -> Result<Arc<MdnsCore>, KoiError> {
457 match &self.backend {
458 MdnsBackend::Embedded { core } => Ok(Arc::clone(core)),
459 MdnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("mdns")),
460 }
461 }
462
463 pub async fn browse(&self, service_type: &str) -> Result<KoiBrowseHandle, KoiError> {
464 match &self.backend {
465 MdnsBackend::Embedded { core } => {
466 let handle = core.subscribe_type(service_type).await?;
467 Ok(KoiBrowseHandle::embedded(handle))
468 }
469 MdnsBackend::Remote { client } => {
470 let (tx, rx) = mpsc::channel(64);
471 let client = Arc::clone(client);
472 let service_type = service_type.to_string();
473 tokio::task::spawn_blocking(move || {
474 let stream = match client.browse_stream(&service_type) {
475 Ok(stream) => stream,
476 Err(_) => return,
477 };
478 for item in stream {
479 let Ok(json) = item else {
480 break;
481 };
482 if let Some(event) = mdns_event_from_pipeline(json) {
483 if tx.blocking_send(event).is_err() {
484 break;
485 }
486 }
487 }
488 });
489 Ok(KoiBrowseHandle::remote(rx))
490 }
491 }
492 }
493
494 pub async fn discover(&self, service_type: &str) -> Result<Vec<Peer>, KoiError> {
504 self.discover_for(service_type, DEFAULT_DISCOVER_WINDOW)
505 .await
506 }
507
508 pub async fn discover_for(
510 &self,
511 service_type: &str,
512 window: std::time::Duration,
513 ) -> Result<Vec<Peer>, KoiError> {
514 let browse = self.browse(service_type).await?;
515 let mut events = Vec::new();
516 let deadline = tokio::time::sleep(window);
517 tokio::pin!(deadline);
518 loop {
519 tokio::select! {
520 _ = &mut deadline => break,
521 ev = browse.recv() => match ev {
522 Some(e) => events.push(e),
523 None => break,
524 },
525 }
526 }
527 Ok(fold_peers(events))
528 }
529
530 pub async fn resolve(&self, name: &str) -> Result<ServiceRecord, KoiError> {
531 match &self.backend {
532 MdnsBackend::Embedded { core } => Ok(core.resolve(name).await?),
533 MdnsBackend::Remote { client } => {
534 let name = name.to_string();
535 let client = Arc::clone(client);
536 let record = tokio::task::spawn_blocking(move || client.resolve(&name))
537 .await
538 .map_err(map_join_error)??;
539 Ok(record)
540 }
541 }
542 }
543
544 pub fn register(&self, payload: RegisterPayload) -> Result<RegistrationResult, KoiError> {
545 match &self.backend {
546 MdnsBackend::Embedded { core } => Ok(core.register(payload)?),
547 MdnsBackend::Remote { client } => Ok(client.register(&payload)?),
548 }
549 }
550
551 pub fn unregister(&self, id: &str) -> Result<(), KoiError> {
552 match &self.backend {
553 MdnsBackend::Embedded { core } => Ok(core.unregister(id)?),
554 MdnsBackend::Remote { client } => Ok(client.unregister(id)?),
555 }
556 }
557
558 pub fn subscribe(&self) -> Result<broadcast::Receiver<MdnsEvent>, KoiError> {
569 match &self.backend {
570 MdnsBackend::Embedded { core } => Ok(core.subscribe()),
571 MdnsBackend::Remote { .. } => Err(KoiError::RemoteUnsupported(
572 "mdns subscribe — use mdns.browse(service_type) for a remote event stream",
573 )),
574 }
575 }
576
577 pub fn emit_event(&self, event: KoiEvent) {
578 let _ = self.events.send(event);
579 }
580}
581
582pub struct DnsHandle {
583 backend: DnsBackend,
584}
585
586enum DnsBackend {
587 Embedded { runtime: Arc<DnsRuntime> },
588 Remote { client: Arc<KoiClient> },
589}
590
591impl DnsHandle {
592 fn new_embedded(runtime: Arc<DnsRuntime>) -> Self {
593 Self {
594 backend: DnsBackend::Embedded { runtime },
595 }
596 }
597
598 fn new_remote(client: Arc<KoiClient>) -> Self {
599 Self {
600 backend: DnsBackend::Remote { client },
601 }
602 }
603
604 pub fn runtime(&self) -> Result<Arc<DnsRuntime>, KoiError> {
605 match &self.backend {
606 DnsBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
607 DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
608 }
609 }
610
611 pub fn core(&self) -> Result<Arc<koi_dns::DnsCore>, KoiError> {
612 match &self.backend {
613 DnsBackend::Embedded { runtime } => Ok(runtime.core()),
614 DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
615 }
616 }
617
618 pub async fn lookup(
619 &self,
620 name: &str,
621 record_type: hickory_proto::rr::RecordType,
622 ) -> Option<DnsLookupResult> {
623 match &self.backend {
624 DnsBackend::Embedded { runtime } => runtime.core().lookup(name, record_type).await,
625 DnsBackend::Remote { client } => {
626 let name = name.to_string();
627 let client = Arc::clone(client);
628 let result =
629 tokio::task::spawn_blocking(move || client.dns_lookup(&name, record_type))
630 .await
631 .ok()
632 .and_then(|res| res.ok());
633 let json = match result {
634 Some(json) => json,
635 None => return None,
636 };
637 parse_dns_lookup(json)
638 }
639 }
640 }
641
642 pub fn list_names(&self) -> Vec<String> {
643 match &self.backend {
644 DnsBackend::Embedded { runtime } => runtime.core().list_names(),
645 DnsBackend::Remote { client } => {
646 let result = client.dns_list();
647 let Ok(json) = result else {
648 return Vec::new();
649 };
650 json.get("names")
651 .and_then(|v| v.as_array())
652 .map(|arr| {
653 arr.iter()
654 .filter_map(|name| name.as_str().map(|s| s.to_string()))
655 .collect()
656 })
657 .unwrap_or_default()
658 }
659 }
660 }
661
662 pub async fn start(&self) -> Result<bool, KoiError> {
663 match &self.backend {
664 DnsBackend::Embedded { runtime } => Ok(runtime.start().await?),
665 DnsBackend::Remote { client } => {
666 let client = Arc::clone(client);
667 let started = tokio::task::spawn_blocking(move || client.dns_start())
668 .await
669 .map_err(map_join_error)??
670 .get("started")
671 .and_then(|v| v.as_bool())
672 .unwrap_or(false);
673 Ok(started)
674 }
675 }
676 }
677
678 pub async fn stop(&self) -> bool {
679 match &self.backend {
680 DnsBackend::Embedded { runtime } => runtime.stop().await,
681 DnsBackend::Remote { client } => {
682 let client = Arc::clone(client);
683 tokio::task::spawn_blocking(move || client.dns_stop())
684 .await
685 .ok()
686 .and_then(|res| res.ok())
687 .and_then(|json| json.get("stopped").and_then(|v| v.as_bool()))
688 .unwrap_or(false)
689 }
690 }
691 }
692
693 pub fn add_entry(&self, entry: DnsEntry) -> Result<Vec<DnsEntry>, KoiError> {
694 match &self.backend {
695 DnsBackend::Embedded { runtime } => Ok(runtime.core().add_entry(entry)?),
696 DnsBackend::Remote { client } => {
697 let json = client.dns_add(&entry.name, &entry.ip, entry.ttl)?;
698 parse_dns_entries(json)
699 }
700 }
701 }
702
703 pub fn remove_entry(&self, name: &str) -> Result<Vec<DnsEntry>, KoiError> {
704 match &self.backend {
705 DnsBackend::Embedded { runtime } => {
706 Ok(runtime.core().remove_entry(name)?.unwrap_or_default())
707 }
708 DnsBackend::Remote { client } => {
709 let json = client.dns_remove(name)?;
710 parse_dns_entries(json)
711 }
712 }
713 }
714}
715
716pub struct HealthHandle {
717 backend: HealthBackend,
718}
719
720enum HealthBackend {
721 Embedded { runtime: Arc<HealthRuntime> },
722 Remote { client: Arc<KoiClient> },
723}
724
725impl HealthHandle {
726 fn new_embedded(runtime: Arc<HealthRuntime>) -> Self {
727 Self {
728 backend: HealthBackend::Embedded { runtime },
729 }
730 }
731
732 fn new_remote(client: Arc<KoiClient>) -> Self {
733 Self {
734 backend: HealthBackend::Remote { client },
735 }
736 }
737
738 pub fn core(&self) -> Result<Arc<koi_health::HealthCore>, KoiError> {
739 match &self.backend {
740 HealthBackend::Embedded { runtime } => Ok(runtime.core()),
741 HealthBackend::Remote { .. } => Err(KoiError::DisabledCapability("health")),
742 }
743 }
744
745 pub async fn status(&self) -> koi_health::HealthSnapshot {
746 match &self.backend {
747 HealthBackend::Embedded { runtime } => runtime.core().snapshot().await,
748 HealthBackend::Remote { client } => {
749 let client = Arc::clone(client);
750 let json = tokio::task::spawn_blocking(move || client.health_status())
751 .await
752 .ok()
753 .and_then(|res| res.ok());
754 json.and_then(|json| serde_json::from_value(json).ok())
755 .unwrap_or_else(|| koi_health::HealthSnapshot {
756 machines: Vec::new(),
757 services: Vec::new(),
758 })
759 }
760 }
761 }
762
763 pub async fn add_check(&self, check: HealthCheck) -> Result<(), KoiError> {
764 match &self.backend {
765 HealthBackend::Embedded { runtime } => Ok(runtime.core().add_check(check).await?),
766 HealthBackend::Remote { client } => {
767 let client = Arc::clone(client);
768 let check = check.clone();
769 tokio::task::spawn_blocking(move || {
770 client.health_add_check(
771 &check.name,
772 check.kind,
773 &check.target,
774 check.interval_secs,
775 check.timeout_secs,
776 )
777 })
778 .await
779 .map_err(map_join_error)??;
780 Ok(())
781 }
782 }
783 }
784
785 pub async fn remove_check(&self, name: &str) -> Result<(), KoiError> {
786 match &self.backend {
787 HealthBackend::Embedded { runtime } => Ok(runtime.core().remove_check(name).await?),
788 HealthBackend::Remote { client } => {
789 let client = Arc::clone(client);
790 let name = name.to_string();
791 tokio::task::spawn_blocking(move || client.health_remove_check(&name))
792 .await
793 .map_err(map_join_error)??;
794 Ok(())
795 }
796 }
797 }
798
799 pub async fn start(&self) -> Result<bool, KoiError> {
800 match &self.backend {
801 HealthBackend::Embedded { runtime } => Ok(runtime.start().await?),
802 HealthBackend::Remote { .. } => Ok(false),
803 }
804 }
805
806 pub async fn stop(&self) -> bool {
807 match &self.backend {
808 HealthBackend::Embedded { runtime } => runtime.stop().await,
809 HealthBackend::Remote { .. } => false,
810 }
811 }
812}
813
814pub struct CertmeshHandle {
815 backend: CertmeshBackend,
816}
817
818enum CertmeshBackend {
819 Embedded {
820 core: Arc<koi_certmesh::CertmeshCore>,
821 },
822 Remote {
823 client: Arc<KoiClient>,
824 },
825}
826
827impl CertmeshHandle {
828 fn new_embedded(core: Arc<koi_certmesh::CertmeshCore>) -> Self {
829 Self {
830 backend: CertmeshBackend::Embedded { core },
831 }
832 }
833
834 fn new_remote(client: Arc<KoiClient>) -> Self {
835 Self {
836 backend: CertmeshBackend::Remote { client },
837 }
838 }
839
840 pub fn core(&self) -> Result<Arc<koi_certmesh::CertmeshCore>, KoiError> {
841 match &self.backend {
842 CertmeshBackend::Embedded { core } => Ok(Arc::clone(core)),
843 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
844 }
845 }
846
847 pub async fn status(&self) -> koi_common::capability::CapabilityStatus {
848 match &self.backend {
849 CertmeshBackend::Embedded { core } => core.status().await,
850 CertmeshBackend::Remote { client } => {
851 let client = Arc::clone(client);
852 let json = tokio::task::spawn_blocking(move || client.unified_status())
853 .await
854 .ok()
855 .and_then(|res| res.ok());
856 json.and_then(extract_capability_status)
857 .unwrap_or_else(default_capability_status)
858 }
859 }
860 }
861
862 pub async fn posture(&self) -> Result<koi_common::posture::Posture, KoiError> {
869 match &self.backend {
870 CertmeshBackend::Embedded { core } => Ok(core.posture()),
871 CertmeshBackend::Remote { client } => {
872 let client = Arc::clone(client);
873 let json = tokio::task::spawn_blocking(move || {
874 client.get_json(koi_certmesh::http::paths::POSTURE)
875 })
876 .await
877 .map_err(map_join_error)??;
878 let signed = json
879 .get("signed")
880 .and_then(|v| v.as_bool())
881 .unwrap_or(false);
882 let encrypted = json
883 .get("encrypted")
884 .and_then(|v| v.as_bool())
885 .unwrap_or(false);
886 Ok(koi_common::posture::Posture::new(signed, encrypted))
887 }
888 }
889 }
890
891 pub fn on_posture(
897 &self,
898 ) -> Result<tokio::sync::watch::Receiver<koi_common::posture::Posture>, KoiError> {
899 match &self.backend {
900 CertmeshBackend::Embedded { core } => Ok(core.watch_posture()),
901 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
902 }
903 }
904
905 pub async fn local_identity(&self) -> Result<Option<koi_certmesh::Identity>, KoiError> {
908 match &self.backend {
909 CertmeshBackend::Embedded { core } => Ok(core.local_identity().await),
910 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
911 }
912 }
913
914 pub async fn ensure_identity(&self) -> Result<Option<koi_certmesh::Identity>, KoiError> {
917 match &self.backend {
918 CertmeshBackend::Embedded { core } => Ok(core.ensure_identity().await),
919 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
920 }
921 }
922
923 pub async fn sign(&self, bytes: &[u8]) -> Result<koi_common::envelope::Envelope, KoiError> {
927 match &self.backend {
928 CertmeshBackend::Embedded { core } => Ok(core.sign(bytes).await),
929 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
930 }
931 }
932
933 pub async fn verify(
936 &self,
937 env: &koi_common::envelope::Envelope,
938 ) -> Result<koi_common::envelope::Assurance, KoiError> {
939 match &self.backend {
940 CertmeshBackend::Embedded { core } => Ok(core.verify(env).await),
941 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
942 }
943 }
944
945 pub async fn seal(&self, bytes: &[u8]) -> Result<koi_common::sealed::Sealed, KoiError> {
949 match &self.backend {
950 CertmeshBackend::Embedded { core } => Ok(core.seal(bytes).await),
951 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
952 }
953 }
954
955 pub async fn open(
958 &self,
959 sealed: &koi_common::sealed::Sealed,
960 ) -> Result<koi_common::sealed::Opened, KoiError> {
961 match &self.backend {
962 CertmeshBackend::Embedded { core } => Ok(core.open(sealed).await?),
963 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
964 }
965 }
966
967 pub async fn diagnose(&self) -> Result<koi_common::diagnosis::TrustDiagnosis, KoiError> {
972 match &self.backend {
973 CertmeshBackend::Embedded { core } => Ok(core.diagnose().await),
974 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
975 }
976 }
977
978 pub async fn client_for(&self, peer: &Peer) -> Result<koi_certmesh::PeerClient, KoiError> {
986 match &self.backend {
987 CertmeshBackend::Embedded { core } => Ok(core.client_for(peer).await?),
988 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
989 }
990 }
991
992 pub async fn reqwest_client_for(&self, peer: &Peer) -> Result<reqwest::Client, KoiError> {
1010 let core = match &self.backend {
1011 CertmeshBackend::Embedded { core } => core,
1012 CertmeshBackend::Remote { .. } => return Err(KoiError::DisabledCapability("certmesh")),
1013 };
1014 let builder = match core.tls_client_config_for(peer).await? {
1015 Some(config) => reqwest::Client::builder().use_preconfigured_tls(config),
1018 None => reqwest::Client::builder(),
1020 };
1021 builder
1022 .build()
1023 .map_err(|e| KoiError::Certmesh(koi_certmesh::CertmeshError::Internal(e.to_string())))
1024 }
1025}
1026
1027async fn announce_once(
1031 mdns: &Arc<MdnsCore>,
1032 certmesh: &Arc<koi_certmesh::CertmeshCore>,
1033 hostname: &str,
1034 service_type: &str,
1035 port: u16,
1036) -> Option<String> {
1037 let id = certmesh.local_identity().await;
1038 let mut txt = std::collections::HashMap::new();
1039 koi_common::peer::stamp(
1040 &mut txt,
1041 certmesh.posture(),
1042 id.as_ref().map(|i| i.ca_fingerprint.as_str()),
1043 id.as_ref().map(|i| i.renewal.expires_at),
1044 );
1045 let payload = RegisterPayload {
1046 name: hostname.to_string(),
1047 service_type: service_type.to_string(),
1048 port,
1049 ip: None,
1050 lease_secs: None,
1051 txt,
1052 };
1053 match mdns.register(payload) {
1054 Ok(result) => Some(result.id),
1055 Err(e) => {
1056 tracing::warn!(error = %e, "participate: mDNS announce failed");
1057 None
1058 }
1059 }
1060}
1061
1062fn spawn_participate_announce(
1067 mdns: Arc<MdnsCore>,
1068 certmesh: Arc<koi_certmesh::CertmeshCore>,
1069 service_type: String,
1070 port: u16,
1071 cancel: CancellationToken,
1072) {
1073 tokio::spawn(async move {
1074 let hostname = hostname::get()
1075 .ok()
1076 .and_then(|os| os.into_string().ok())
1077 .unwrap_or_else(|| "unknown".to_string());
1078 let mut posture_rx = certmesh.watch_posture();
1079 let mut current_id = announce_once(&mdns, &certmesh, &hostname, &service_type, port).await;
1080 loop {
1081 tokio::select! {
1082 _ = cancel.cancelled() => break,
1083 changed = posture_rx.changed() => {
1084 if changed.is_err() {
1085 break; }
1087 if let Some(old) = current_id.take() {
1089 let _ = mdns.unregister(&old);
1090 }
1091 current_id =
1092 announce_once(&mdns, &certmesh, &hostname, &service_type, port).await;
1093 }
1094 }
1095 }
1096 if let Some(id) = current_id {
1097 let _ = mdns.unregister(&id);
1098 }
1099 });
1100}
1101
1102fn fold_peers(events: impl IntoIterator<Item = MdnsEvent>) -> Vec<Peer> {
1107 use std::collections::BTreeMap;
1108 let mut by_name: BTreeMap<String, ServiceRecord> = BTreeMap::new();
1109 for ev in events {
1110 match ev {
1111 MdnsEvent::Found(rec) => {
1112 by_name.entry(rec.name.clone()).or_insert(rec);
1113 }
1114 MdnsEvent::Resolved(rec) => {
1115 by_name.insert(rec.name.clone(), rec);
1116 }
1117 MdnsEvent::Removed { name, .. } => {
1118 by_name.remove(&name);
1119 }
1120 }
1121 }
1122 by_name.into_values().map(Peer::from_record).collect()
1123}
1124
1125pub struct ProxyHandle {
1126 backend: ProxyBackend,
1127}
1128
1129enum ProxyBackend {
1130 Embedded { runtime: Arc<ProxyRuntime> },
1131 Remote { client: Arc<KoiClient> },
1132}
1133
1134impl ProxyHandle {
1135 fn new_embedded(runtime: Arc<ProxyRuntime>) -> Self {
1136 Self {
1137 backend: ProxyBackend::Embedded { runtime },
1138 }
1139 }
1140
1141 fn new_remote(client: Arc<KoiClient>) -> Self {
1142 Self {
1143 backend: ProxyBackend::Remote { client },
1144 }
1145 }
1146
1147 pub fn runtime(&self) -> Result<Arc<ProxyRuntime>, KoiError> {
1148 match &self.backend {
1149 ProxyBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
1150 ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
1151 }
1152 }
1153
1154 pub fn core(&self) -> Result<Arc<koi_proxy::ProxyCore>, KoiError> {
1155 match &self.backend {
1156 ProxyBackend::Embedded { runtime } => Ok(runtime.core()),
1157 ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
1158 }
1159 }
1160
1161 pub async fn entries(&self) -> Vec<ProxyEntry> {
1162 match &self.backend {
1163 ProxyBackend::Embedded { runtime } => runtime.core().entries().await,
1164 ProxyBackend::Remote { client } => {
1165 let client = Arc::clone(client);
1166 tokio::task::spawn_blocking(move || client.proxy_list())
1167 .await
1168 .ok()
1169 .and_then(|res| res.ok())
1170 .and_then(|json| parse_proxy_entries(json).ok())
1171 .unwrap_or_default()
1172 }
1173 }
1174 }
1175
1176 pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, KoiError> {
1177 match &self.backend {
1178 ProxyBackend::Embedded { runtime } => Ok(runtime.core().upsert(entry).await?),
1179 ProxyBackend::Remote { client } => {
1180 let client = Arc::clone(client);
1181 let entry = entry.clone();
1182 let add_client = Arc::clone(&client);
1183 tokio::task::spawn_blocking(move || {
1184 add_client.proxy_add(
1185 &entry.name,
1186 entry.listen_port,
1187 &entry.backend,
1188 entry.allow_remote,
1189 )
1190 })
1191 .await
1192 .map_err(map_join_error)??;
1193 let list = tokio::task::spawn_blocking(move || client.proxy_list())
1194 .await
1195 .map_err(map_join_error)??;
1196 parse_proxy_entries(list)
1197 }
1198 }
1199 }
1200
1201 pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, KoiError> {
1202 match &self.backend {
1203 ProxyBackend::Embedded { runtime } => Ok(runtime.core().remove(name).await?),
1204 ProxyBackend::Remote { client } => {
1205 let client = Arc::clone(client);
1206 let name = name.to_string();
1207 let remove_client = Arc::clone(&client);
1208 tokio::task::spawn_blocking(move || remove_client.proxy_remove(&name))
1209 .await
1210 .map_err(map_join_error)??;
1211 let list = tokio::task::spawn_blocking(move || client.proxy_list())
1212 .await
1213 .map_err(map_join_error)??;
1214 parse_proxy_entries(list)
1215 }
1216 }
1217 }
1218
1219 pub async fn start_all(&self) -> Result<(), KoiError> {
1220 match &self.backend {
1221 ProxyBackend::Embedded { runtime } => Ok(runtime.start_all().await?),
1222 ProxyBackend::Remote { .. } => Ok(()),
1223 }
1224 }
1225
1226 pub async fn stop_all(&self) {
1227 if let ProxyBackend::Embedded { runtime } = &self.backend {
1228 runtime.stop_all().await;
1229 }
1230 }
1231}
1232
1233fn parse_dns_lookup(json: serde_json::Value) -> Option<DnsLookupResult> {
1234 let name = json.get("name").and_then(|v| v.as_str())?.to_string();
1235 let source = json
1236 .get("source")
1237 .and_then(|v| v.as_str())
1238 .unwrap_or("unknown")
1239 .to_string();
1240 let ips = json.get("ips").and_then(|v| v.as_array()).map(|arr| {
1241 arr.iter()
1242 .filter_map(|ip| ip.as_str())
1243 .filter_map(|ip| ip.parse::<IpAddr>().ok())
1244 .collect::<Vec<_>>()
1245 })?;
1246 Some(DnsLookupResult { name, ips, source })
1247}
1248
1249fn parse_dns_entries(json: serde_json::Value) -> Result<Vec<DnsEntry>, KoiError> {
1250 let entries = json.get("entries").ok_or_else(|| {
1251 KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::other(
1252 "missing entries",
1253 )))
1254 })?;
1255 let entries = serde_json::from_value(entries.clone()).map_err(|e| {
1256 KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::new(
1257 std::io::ErrorKind::InvalidData,
1258 e.to_string(),
1259 )))
1260 })?;
1261 Ok(entries)
1262}
1263
1264fn parse_proxy_entries(json: serde_json::Value) -> Result<Vec<ProxyEntry>, KoiError> {
1265 let entries = json
1266 .get("entries")
1267 .ok_or_else(|| KoiError::Proxy(koi_proxy::ProxyError::Io("missing entries".to_string())))?
1268 .clone();
1269 serde_json::from_value(entries)
1270 .map_err(|e| KoiError::Proxy(koi_proxy::ProxyError::Io(e.to_string())))
1271}
1272
1273fn extract_capability_status(
1274 json: serde_json::Value,
1275) -> Option<koi_common::capability::CapabilityStatus> {
1276 let caps = json.get("capabilities")?.as_array()?;
1277 for cap in caps {
1278 if cap.get("name")?.as_str()? == "certmesh" {
1279 let name = cap.get("name")?.as_str()?.to_string();
1280 let summary = cap
1281 .get("summary")
1282 .and_then(|v| v.as_str())
1283 .unwrap_or("unknown")
1284 .to_string();
1285 let healthy = cap
1286 .get("healthy")
1287 .and_then(|v| v.as_bool())
1288 .unwrap_or(false);
1289 return Some(koi_common::capability::CapabilityStatus {
1290 name,
1291 summary,
1292 healthy,
1293 });
1294 }
1295 }
1296 None
1297}
1298
1299fn default_capability_status() -> koi_common::capability::CapabilityStatus {
1300 koi_common::capability::CapabilityStatus {
1301 name: "certmesh".to_string(),
1302 summary: "unknown".to_string(),
1303 healthy: false,
1304 }
1305}
1306
1307fn mdns_event_from_pipeline(json: serde_json::Value) -> Option<MdnsEvent> {
1308 if let Some(found) = json.get("found") {
1309 let record: ServiceRecord = serde_json::from_value(found.clone()).ok()?;
1310 return Some(MdnsEvent::Found(record));
1311 }
1312 if let Some(resolved) = json.get("resolved") {
1313 let record: ServiceRecord = serde_json::from_value(resolved.clone()).ok()?;
1314 return Some(MdnsEvent::Resolved(record));
1315 }
1316 if let Some(event) = json.get("event") {
1317 let kind: EventKind = serde_json::from_value(event.clone()).ok()?;
1318 let service = json
1319 .get("service")
1320 .cloned()
1321 .unwrap_or(serde_json::Value::Null);
1322 let record: ServiceRecord = serde_json::from_value(service).ok()?;
1323 return match kind {
1324 EventKind::Found => Some(MdnsEvent::Found(record)),
1325 EventKind::Resolved => Some(MdnsEvent::Resolved(record)),
1326 EventKind::Removed => Some(MdnsEvent::Removed {
1327 name: record.name,
1328 service_type: record.service_type,
1329 }),
1330 };
1331 }
1332 None
1333}
1334
1335#[cfg(test)]
1336mod tests {
1337 use super::*;
1338 use koi_common::posture::PostureLevel;
1339 use std::collections::HashMap;
1340
1341 fn rec(name: &str, txt: &[(&str, &str)]) -> ServiceRecord {
1342 ServiceRecord {
1343 name: name.to_string(),
1344 service_type: "_http._tcp".to_string(),
1345 host: Some(format!("{name}.local")),
1346 ip: Some("10.0.0.9".to_string()),
1347 port: Some(8443),
1348 txt: txt
1349 .iter()
1350 .map(|(k, v)| (k.to_string(), v.to_string()))
1351 .collect::<HashMap<_, _>>(),
1352 }
1353 }
1354
1355 #[test]
1356 fn fold_resolved_overwrites_found_for_txt_enrichment() {
1357 let peers = fold_peers([
1359 MdnsEvent::Found(rec("a", &[])),
1360 MdnsEvent::Resolved(rec("a", &[("fp", "CAFP"), ("posture", "authenticated")])),
1361 ]);
1362 assert_eq!(peers.len(), 1, "the two events collapse to one peer");
1363 assert_eq!(peers[0].level(), PostureLevel::Authenticated);
1364 assert_eq!(peers[0].fp.as_deref(), Some("CAFP"));
1365 }
1366
1367 #[test]
1368 fn fold_removed_drops_the_peer() {
1369 let peers = fold_peers([
1370 MdnsEvent::Found(rec("b", &[])),
1371 MdnsEvent::Removed {
1372 name: "b".to_string(),
1373 service_type: "_http._tcp".to_string(),
1374 },
1375 ]);
1376 assert!(peers.is_empty(), "a removed peer is not in the snapshot");
1377 }
1378
1379 #[test]
1380 fn fold_orders_peers_by_name() {
1381 let peers = fold_peers([
1382 MdnsEvent::Resolved(rec("z", &[])),
1383 MdnsEvent::Resolved(rec("a", &[])),
1384 MdnsEvent::Resolved(rec("m", &[])),
1385 ]);
1386 let names: Vec<_> = peers.iter().map(|p| p.record.name.clone()).collect();
1387 assert_eq!(names, vec!["a", "m", "z"]);
1388 }
1389
1390 #[test]
1391 fn fold_open_peer_has_open_posture() {
1392 let peers = fold_peers([MdnsEvent::Resolved(rec("plain", &[]))]);
1393 assert_eq!(peers.len(), 1);
1394 assert_eq!(peers[0].level(), PostureLevel::Open);
1395 assert!(!peers[0].is_secure());
1396 }
1397
1398 #[tokio::test]
1401 async fn participate_remote_handle_is_disabled() {
1402 let client = Arc::new(KoiClient::new("http://127.0.0.1:1"));
1403 let (tx, _) = broadcast::channel(8);
1404 let handle = KoiHandle::new_remote(client, tx, CancellationToken::new(), Vec::new());
1405 let router = axum::Router::new();
1406 let addr: std::net::SocketAddr = ([127, 0, 0, 1], 0).into();
1407 let err = handle
1408 .participate(router, addr, "_x._tcp", CancellationToken::new())
1409 .await
1410 .unwrap_err();
1411 assert!(matches!(err, KoiError::DisabledCapability(_)));
1412 }
1413
1414 #[tokio::test]
1415 async fn participate_open_node_serves_plaintext() {
1416 let dir = std::env::temp_dir().join(format!("koi-emb-participate-{}", std::process::id()));
1420 let _ = std::fs::remove_dir_all(&dir);
1421 let koi = crate::Builder::new()
1422 .data_dir(&dir)
1423 .service_mode(crate::ServiceMode::EmbeddedOnly)
1424 .mdns(false)
1425 .dns_enabled(false)
1426 .health(false)
1427 .certmesh(true)
1428 .proxy(false)
1429 .build()
1430 .expect("build");
1431 let handle = koi.start().await.expect("start");
1432
1433 let addr = {
1434 let l = tokio::net::TcpListener::bind(("127.0.0.1", 0))
1435 .await
1436 .unwrap();
1437 l.local_addr().unwrap()
1438 };
1439 let router = axum::Router::new().route("/ping", axum::routing::get(|| async { "pong" }));
1440 let cancel = CancellationToken::new();
1441 let _server = handle
1442 .participate(router, addr, "_koi-test._tcp", cancel.clone())
1443 .await
1444 .expect("participate");
1445 tokio::time::sleep(std::time::Duration::from_millis(75)).await;
1446
1447 let (status, body) = koi_certmesh::mtls::get(&addr.ip().to_string(), addr.port(), "/ping")
1448 .await
1449 .expect("plain GET to an Open participating node");
1450 assert_eq!(status, 200);
1451 assert_eq!(body, "pong");
1452
1453 cancel.cancel();
1454 handle.shutdown().await.expect("shutdown");
1455 }
1456
1457 #[tokio::test]
1460 async fn seal_open_round_trip_on_open_node() {
1461 use koi_common::sealed::Confidentiality;
1462 let dir = std::env::temp_dir().join(format!("koi-emb-seal-{}", std::process::id()));
1463 let _ = std::fs::remove_dir_all(&dir);
1464 let koi = crate::Builder::new()
1465 .data_dir(&dir)
1466 .service_mode(crate::ServiceMode::EmbeddedOnly)
1467 .mdns(false)
1468 .dns_enabled(false)
1469 .health(false)
1470 .certmesh(true)
1471 .proxy(false)
1472 .build()
1473 .expect("build");
1474 let handle = koi.start().await.expect("start");
1475 let cm = handle.certmesh().expect("certmesh handle");
1476
1477 let sealed = cm.seal(b"hello seal").await.expect("seal");
1480 assert_eq!(sealed.confidentiality(), Confidentiality::None);
1481 let opened = cm.open(&sealed).await.expect("open");
1482 assert_eq!(opened.payload, b"hello seal");
1483 assert_eq!(opened.confidentiality, Confidentiality::None);
1484 assert!(
1485 opened.assurance.identity().is_none(),
1486 "an Open node's seal is anonymous, not a trusted identity"
1487 );
1488
1489 handle.shutdown().await.expect("shutdown");
1490 }
1491}