1use std::net::IpAddr;
2use std::sync::Arc;
3
4use tokio::sync::{broadcast, mpsc, Mutex};
5use tokio::task::JoinHandle;
6use tokio_stream::wrappers::BroadcastStream;
7use tokio_util::sync::CancellationToken;
8
9use koi_client::KoiClient;
10use koi_common::capability::Capability;
11use koi_common::types::{EventKind, ServiceRecord};
12use koi_config::state::DnsEntry;
13use koi_dns::{DnsLookupResult, DnsRuntime};
14use koi_health::{HealthCheck, HealthRuntime};
15use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
16use koi_mdns::{BrowseSubscription as MdnsBrowseHandle, MdnsCore, MdnsEvent};
17use koi_proxy::{ProxyEntry, ProxyRuntime};
18
19use crate::{map_join_error, KoiError, KoiEvent};
20
21enum HandleBackend {
22 Embedded {
23 mdns: Option<Arc<MdnsCore>>,
24 dns: Option<Arc<DnsRuntime>>,
25 health: Option<Arc<HealthRuntime>>,
26 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
27 proxy: Option<Arc<ProxyRuntime>>,
28 udp: Option<Arc<koi_udp::UdpRuntime>>,
29 runtime: Option<Arc<koi_runtime::RuntimeCore>>,
30 },
31 Remote {
32 client: Arc<KoiClient>,
33 },
34}
35
36pub struct KoiHandle {
37 backend: HandleBackend,
38 data_dir: Option<std::path::PathBuf>,
39 events: broadcast::Sender<KoiEvent>,
40 cancel: CancellationToken,
41 tasks: Vec<JoinHandle<()>>,
42 http_announce_id: Option<String>,
43}
44
45impl KoiHandle {
46 #[allow(clippy::too_many_arguments)]
47 pub(crate) fn new_embedded(
48 mdns: Option<Arc<MdnsCore>>,
49 dns: Option<Arc<DnsRuntime>>,
50 health: Option<Arc<HealthRuntime>>,
51 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
52 proxy: Option<Arc<ProxyRuntime>>,
53 udp: Option<Arc<koi_udp::UdpRuntime>>,
54 runtime: Option<Arc<koi_runtime::RuntimeCore>>,
55 data_dir: Option<std::path::PathBuf>,
56 events: broadcast::Sender<KoiEvent>,
57 cancel: CancellationToken,
58 tasks: Vec<JoinHandle<()>>,
59 http_announce_id: Option<String>,
60 ) -> Self {
61 Self {
62 backend: HandleBackend::Embedded {
63 mdns,
64 dns,
65 health,
66 certmesh,
67 proxy,
68 udp,
69 runtime,
70 },
71 data_dir,
72 events,
73 cancel,
74 tasks,
75 http_announce_id,
76 }
77 }
78
79 pub(crate) fn new_remote(
80 client: Arc<KoiClient>,
81 events: broadcast::Sender<KoiEvent>,
82 cancel: CancellationToken,
83 tasks: Vec<JoinHandle<()>>,
84 ) -> Self {
85 Self {
86 backend: HandleBackend::Remote { client },
87 data_dir: None,
88 events,
89 cancel,
90 tasks,
91 http_announce_id: None,
92 }
93 }
94
95 pub fn events(&self) -> BroadcastStream<KoiEvent> {
96 BroadcastStream::new(self.events.subscribe())
97 }
98
99 pub fn subscribe(&self) -> broadcast::Receiver<KoiEvent> {
100 self.events.subscribe()
101 }
102
103 pub fn mdns(&self) -> Result<MdnsHandle, KoiError> {
104 match &self.backend {
105 HandleBackend::Embedded { mdns, .. } => {
106 let core = mdns.as_ref().ok_or(KoiError::DisabledCapability("mdns"))?;
107 Ok(MdnsHandle::new_embedded(
108 Arc::clone(core),
109 self.events.clone(),
110 ))
111 }
112 HandleBackend::Remote { client } => Ok(MdnsHandle::new_remote(
113 Arc::clone(client),
114 self.events.clone(),
115 )),
116 }
117 }
118
119 pub fn dns(&self) -> Result<DnsHandle, KoiError> {
120 match &self.backend {
121 HandleBackend::Embedded { dns, .. } => {
122 let runtime = dns.as_ref().ok_or(KoiError::DisabledCapability("dns"))?;
123 Ok(DnsHandle::new_embedded(Arc::clone(runtime)))
124 }
125 HandleBackend::Remote { client } => Ok(DnsHandle::new_remote(Arc::clone(client))),
126 }
127 }
128
129 pub fn health(&self) -> Result<HealthHandle, KoiError> {
130 match &self.backend {
131 HandleBackend::Embedded { health, .. } => {
132 let runtime = health
133 .as_ref()
134 .ok_or(KoiError::DisabledCapability("health"))?;
135 Ok(HealthHandle::new_embedded(Arc::clone(runtime)))
136 }
137 HandleBackend::Remote { client } => Ok(HealthHandle::new_remote(Arc::clone(client))),
138 }
139 }
140
141 pub fn certmesh(&self) -> Result<CertmeshHandle, KoiError> {
142 match &self.backend {
143 HandleBackend::Embedded { certmesh, .. } => {
144 let core = certmesh
145 .as_ref()
146 .ok_or(KoiError::DisabledCapability("certmesh"))?;
147 Ok(CertmeshHandle::new_embedded(Arc::clone(core)))
148 }
149 HandleBackend::Remote { client } => Ok(CertmeshHandle::new_remote(Arc::clone(client))),
150 }
151 }
152
153 pub fn vault(&self) -> Result<koi_crypto::vault::Vault, KoiError> {
159 let dir = self
160 .data_dir
161 .as_ref()
162 .ok_or(KoiError::DisabledCapability("vault (no data_dir)"))?;
163 koi_crypto::vault::Vault::open(dir)
164 .map_err(|e| KoiError::Io(std::io::Error::other(e.to_string())))
165 }
166
167 pub fn proxy(&self) -> Result<ProxyHandle, KoiError> {
168 match &self.backend {
169 HandleBackend::Embedded { proxy, .. } => {
170 let runtime = proxy
171 .as_ref()
172 .ok_or(KoiError::DisabledCapability("proxy"))?;
173 Ok(ProxyHandle::new_embedded(Arc::clone(runtime)))
174 }
175 HandleBackend::Remote { client } => Ok(ProxyHandle::new_remote(Arc::clone(client))),
176 }
177 }
178
179 pub fn udp(&self) -> Result<Arc<koi_udp::UdpRuntime>, KoiError> {
184 match &self.backend {
185 HandleBackend::Embedded { udp, .. } => {
186 let runtime = udp.as_ref().ok_or(KoiError::DisabledCapability("udp"))?;
187 Ok(Arc::clone(runtime))
188 }
189 HandleBackend::Remote { .. } => Err(KoiError::DisabledCapability("udp (remote mode)")),
190 }
191 }
192
193 pub fn runtime(&self) -> Result<Arc<koi_runtime::RuntimeCore>, KoiError> {
197 match &self.backend {
198 HandleBackend::Embedded { runtime, .. } => {
199 let core = runtime
200 .as_ref()
201 .ok_or(KoiError::DisabledCapability("runtime"))?;
202 Ok(Arc::clone(core))
203 }
204 HandleBackend::Remote { .. } => {
205 Err(KoiError::DisabledCapability("runtime (remote mode)"))
206 }
207 }
208 }
209
210 pub async fn shutdown(mut self) -> Result<(), KoiError> {
211 self.cancel.cancel();
212 for task in self.tasks.drain(..) {
213 let _ = task.await;
214 }
215
216 if let HandleBackend::Embedded {
217 mdns,
218 dns,
219 health,
220 proxy,
221 ..
222 } = &self.backend
223 {
224 if let Some(runtime) = proxy {
225 runtime.stop_all().await;
226 }
227 if let Some(runtime) = health {
228 let _ = runtime.stop().await;
229 }
230 if let Some(runtime) = dns {
231 let _ = runtime.stop().await;
232 }
233 if let Some(id) = &self.http_announce_id {
234 if let Some(core) = mdns {
235 if let Err(e) = core.unregister(id) {
236 tracing::warn!(error = %e, "Failed to withdraw HTTP mDNS announcement");
237 }
238 }
239 }
240 if let Some(core) = mdns {
241 core.shutdown().await?;
242 }
243 }
244
245 Ok(())
246 }
247}
248
249pub struct KoiBrowseHandle {
250 backend: BrowseBackend,
251}
252
253enum BrowseBackend {
254 Embedded(MdnsBrowseHandle),
255 Remote(Mutex<mpsc::Receiver<MdnsEvent>>),
256}
257
258impl KoiBrowseHandle {
259 fn embedded(handle: MdnsBrowseHandle) -> Self {
260 Self {
261 backend: BrowseBackend::Embedded(handle),
262 }
263 }
264
265 fn remote(rx: mpsc::Receiver<MdnsEvent>) -> Self {
266 Self {
267 backend: BrowseBackend::Remote(Mutex::new(rx)),
268 }
269 }
270
271 pub async fn recv(&self) -> Option<MdnsEvent> {
272 match &self.backend {
273 BrowseBackend::Embedded(handle) => handle.recv().await,
274 BrowseBackend::Remote(rx) => rx.lock().await.recv().await,
275 }
276 }
277}
278
279pub struct MdnsHandle {
280 backend: MdnsBackend,
281 events: broadcast::Sender<KoiEvent>,
282}
283
284enum MdnsBackend {
285 Embedded { core: Arc<MdnsCore> },
286 Remote { client: Arc<KoiClient> },
287}
288
289impl MdnsHandle {
290 fn new_embedded(core: Arc<MdnsCore>, events: broadcast::Sender<KoiEvent>) -> Self {
291 Self {
292 backend: MdnsBackend::Embedded { core },
293 events,
294 }
295 }
296
297 fn new_remote(client: Arc<KoiClient>, events: broadcast::Sender<KoiEvent>) -> Self {
298 Self {
299 backend: MdnsBackend::Remote { client },
300 events,
301 }
302 }
303
304 pub fn core(&self) -> Result<Arc<MdnsCore>, KoiError> {
305 match &self.backend {
306 MdnsBackend::Embedded { core } => Ok(Arc::clone(core)),
307 MdnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("mdns")),
308 }
309 }
310
311 pub async fn browse(&self, service_type: &str) -> Result<KoiBrowseHandle, KoiError> {
312 match &self.backend {
313 MdnsBackend::Embedded { core } => {
314 let handle = core.subscribe_type(service_type).await?;
315 Ok(KoiBrowseHandle::embedded(handle))
316 }
317 MdnsBackend::Remote { client } => {
318 let (tx, rx) = mpsc::channel(64);
319 let client = Arc::clone(client);
320 let service_type = service_type.to_string();
321 tokio::task::spawn_blocking(move || {
322 let stream = match client.browse_stream(&service_type) {
323 Ok(stream) => stream,
324 Err(_) => return,
325 };
326 for item in stream {
327 let Ok(json) = item else {
328 break;
329 };
330 if let Some(event) = mdns_event_from_pipeline(json) {
331 if tx.blocking_send(event).is_err() {
332 break;
333 }
334 }
335 }
336 });
337 Ok(KoiBrowseHandle::remote(rx))
338 }
339 }
340 }
341
342 pub async fn resolve(&self, name: &str) -> Result<ServiceRecord, KoiError> {
343 match &self.backend {
344 MdnsBackend::Embedded { core } => Ok(core.resolve(name).await?),
345 MdnsBackend::Remote { client } => {
346 let name = name.to_string();
347 let client = Arc::clone(client);
348 let record = tokio::task::spawn_blocking(move || client.resolve(&name))
349 .await
350 .map_err(map_join_error)??;
351 Ok(record)
352 }
353 }
354 }
355
356 pub fn register(&self, payload: RegisterPayload) -> Result<RegistrationResult, KoiError> {
357 match &self.backend {
358 MdnsBackend::Embedded { core } => Ok(core.register(payload)?),
359 MdnsBackend::Remote { client } => Ok(client.register(&payload)?),
360 }
361 }
362
363 pub fn unregister(&self, id: &str) -> Result<(), KoiError> {
364 match &self.backend {
365 MdnsBackend::Embedded { core } => Ok(core.unregister(id)?),
366 MdnsBackend::Remote { client } => Ok(client.unregister(id)?),
367 }
368 }
369
370 pub fn subscribe(&self) -> Result<broadcast::Receiver<MdnsEvent>, KoiError> {
381 match &self.backend {
382 MdnsBackend::Embedded { core } => Ok(core.subscribe()),
383 MdnsBackend::Remote { .. } => Err(KoiError::RemoteUnsupported(
384 "mdns subscribe — use mdns.browse(service_type) for a remote event stream",
385 )),
386 }
387 }
388
389 pub fn emit_event(&self, event: KoiEvent) {
390 let _ = self.events.send(event);
391 }
392}
393
394pub struct DnsHandle {
395 backend: DnsBackend,
396}
397
398enum DnsBackend {
399 Embedded { runtime: Arc<DnsRuntime> },
400 Remote { client: Arc<KoiClient> },
401}
402
403impl DnsHandle {
404 fn new_embedded(runtime: Arc<DnsRuntime>) -> Self {
405 Self {
406 backend: DnsBackend::Embedded { runtime },
407 }
408 }
409
410 fn new_remote(client: Arc<KoiClient>) -> Self {
411 Self {
412 backend: DnsBackend::Remote { client },
413 }
414 }
415
416 pub fn runtime(&self) -> Result<Arc<DnsRuntime>, KoiError> {
417 match &self.backend {
418 DnsBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
419 DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
420 }
421 }
422
423 pub fn core(&self) -> Result<Arc<koi_dns::DnsCore>, KoiError> {
424 match &self.backend {
425 DnsBackend::Embedded { runtime } => Ok(runtime.core()),
426 DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
427 }
428 }
429
430 pub async fn lookup(
431 &self,
432 name: &str,
433 record_type: hickory_proto::rr::RecordType,
434 ) -> Option<DnsLookupResult> {
435 match &self.backend {
436 DnsBackend::Embedded { runtime } => runtime.core().lookup(name, record_type).await,
437 DnsBackend::Remote { client } => {
438 let name = name.to_string();
439 let client = Arc::clone(client);
440 let result =
441 tokio::task::spawn_blocking(move || client.dns_lookup(&name, record_type))
442 .await
443 .ok()
444 .and_then(|res| res.ok());
445 let json = match result {
446 Some(json) => json,
447 None => return None,
448 };
449 parse_dns_lookup(json)
450 }
451 }
452 }
453
454 pub fn list_names(&self) -> Vec<String> {
455 match &self.backend {
456 DnsBackend::Embedded { runtime } => runtime.core().list_names(),
457 DnsBackend::Remote { client } => {
458 let result = client.dns_list();
459 let Ok(json) = result else {
460 return Vec::new();
461 };
462 json.get("names")
463 .and_then(|v| v.as_array())
464 .map(|arr| {
465 arr.iter()
466 .filter_map(|name| name.as_str().map(|s| s.to_string()))
467 .collect()
468 })
469 .unwrap_or_default()
470 }
471 }
472 }
473
474 pub async fn start(&self) -> Result<bool, KoiError> {
475 match &self.backend {
476 DnsBackend::Embedded { runtime } => Ok(runtime.start().await?),
477 DnsBackend::Remote { client } => {
478 let client = Arc::clone(client);
479 let started = tokio::task::spawn_blocking(move || client.dns_start())
480 .await
481 .map_err(map_join_error)??
482 .get("started")
483 .and_then(|v| v.as_bool())
484 .unwrap_or(false);
485 Ok(started)
486 }
487 }
488 }
489
490 pub async fn stop(&self) -> bool {
491 match &self.backend {
492 DnsBackend::Embedded { runtime } => runtime.stop().await,
493 DnsBackend::Remote { client } => {
494 let client = Arc::clone(client);
495 tokio::task::spawn_blocking(move || client.dns_stop())
496 .await
497 .ok()
498 .and_then(|res| res.ok())
499 .and_then(|json| json.get("stopped").and_then(|v| v.as_bool()))
500 .unwrap_or(false)
501 }
502 }
503 }
504
505 pub fn add_entry(&self, entry: DnsEntry) -> Result<Vec<DnsEntry>, KoiError> {
506 match &self.backend {
507 DnsBackend::Embedded { runtime } => Ok(runtime.core().add_entry(entry)?),
508 DnsBackend::Remote { client } => {
509 let json = client.dns_add(&entry.name, &entry.ip, entry.ttl)?;
510 parse_dns_entries(json)
511 }
512 }
513 }
514
515 pub fn remove_entry(&self, name: &str) -> Result<Vec<DnsEntry>, KoiError> {
516 match &self.backend {
517 DnsBackend::Embedded { runtime } => {
518 Ok(runtime.core().remove_entry(name)?.unwrap_or_default())
519 }
520 DnsBackend::Remote { client } => {
521 let json = client.dns_remove(name)?;
522 parse_dns_entries(json)
523 }
524 }
525 }
526}
527
528pub struct HealthHandle {
529 backend: HealthBackend,
530}
531
532enum HealthBackend {
533 Embedded { runtime: Arc<HealthRuntime> },
534 Remote { client: Arc<KoiClient> },
535}
536
537impl HealthHandle {
538 fn new_embedded(runtime: Arc<HealthRuntime>) -> Self {
539 Self {
540 backend: HealthBackend::Embedded { runtime },
541 }
542 }
543
544 fn new_remote(client: Arc<KoiClient>) -> Self {
545 Self {
546 backend: HealthBackend::Remote { client },
547 }
548 }
549
550 pub fn core(&self) -> Result<Arc<koi_health::HealthCore>, KoiError> {
551 match &self.backend {
552 HealthBackend::Embedded { runtime } => Ok(runtime.core()),
553 HealthBackend::Remote { .. } => Err(KoiError::DisabledCapability("health")),
554 }
555 }
556
557 pub async fn status(&self) -> koi_health::HealthSnapshot {
558 match &self.backend {
559 HealthBackend::Embedded { runtime } => runtime.core().snapshot().await,
560 HealthBackend::Remote { client } => {
561 let client = Arc::clone(client);
562 let json = tokio::task::spawn_blocking(move || client.health_status())
563 .await
564 .ok()
565 .and_then(|res| res.ok());
566 json.and_then(|json| serde_json::from_value(json).ok())
567 .unwrap_or_else(|| koi_health::HealthSnapshot {
568 machines: Vec::new(),
569 services: Vec::new(),
570 })
571 }
572 }
573 }
574
575 pub async fn add_check(&self, check: HealthCheck) -> Result<(), KoiError> {
576 match &self.backend {
577 HealthBackend::Embedded { runtime } => Ok(runtime.core().add_check(check).await?),
578 HealthBackend::Remote { client } => {
579 let client = Arc::clone(client);
580 let check = check.clone();
581 tokio::task::spawn_blocking(move || {
582 client.health_add_check(
583 &check.name,
584 check.kind,
585 &check.target,
586 check.interval_secs,
587 check.timeout_secs,
588 )
589 })
590 .await
591 .map_err(map_join_error)??;
592 Ok(())
593 }
594 }
595 }
596
597 pub async fn remove_check(&self, name: &str) -> Result<(), KoiError> {
598 match &self.backend {
599 HealthBackend::Embedded { runtime } => Ok(runtime.core().remove_check(name).await?),
600 HealthBackend::Remote { client } => {
601 let client = Arc::clone(client);
602 let name = name.to_string();
603 tokio::task::spawn_blocking(move || client.health_remove_check(&name))
604 .await
605 .map_err(map_join_error)??;
606 Ok(())
607 }
608 }
609 }
610
611 pub async fn start(&self) -> Result<bool, KoiError> {
612 match &self.backend {
613 HealthBackend::Embedded { runtime } => Ok(runtime.start().await?),
614 HealthBackend::Remote { .. } => Ok(false),
615 }
616 }
617
618 pub async fn stop(&self) -> bool {
619 match &self.backend {
620 HealthBackend::Embedded { runtime } => runtime.stop().await,
621 HealthBackend::Remote { .. } => false,
622 }
623 }
624}
625
626pub struct CertmeshHandle {
627 backend: CertmeshBackend,
628}
629
630enum CertmeshBackend {
631 Embedded {
632 core: Arc<koi_certmesh::CertmeshCore>,
633 },
634 Remote {
635 client: Arc<KoiClient>,
636 },
637}
638
639impl CertmeshHandle {
640 fn new_embedded(core: Arc<koi_certmesh::CertmeshCore>) -> Self {
641 Self {
642 backend: CertmeshBackend::Embedded { core },
643 }
644 }
645
646 fn new_remote(client: Arc<KoiClient>) -> Self {
647 Self {
648 backend: CertmeshBackend::Remote { client },
649 }
650 }
651
652 pub fn core(&self) -> Result<Arc<koi_certmesh::CertmeshCore>, KoiError> {
653 match &self.backend {
654 CertmeshBackend::Embedded { core } => Ok(Arc::clone(core)),
655 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
656 }
657 }
658
659 pub async fn status(&self) -> koi_common::capability::CapabilityStatus {
660 match &self.backend {
661 CertmeshBackend::Embedded { core } => core.status(),
662 CertmeshBackend::Remote { client } => {
663 let client = Arc::clone(client);
664 let json = tokio::task::spawn_blocking(move || client.unified_status())
665 .await
666 .ok()
667 .and_then(|res| res.ok());
668 json.and_then(extract_capability_status)
669 .unwrap_or_else(default_capability_status)
670 }
671 }
672 }
673}
674
675pub struct ProxyHandle {
676 backend: ProxyBackend,
677}
678
679enum ProxyBackend {
680 Embedded { runtime: Arc<ProxyRuntime> },
681 Remote { client: Arc<KoiClient> },
682}
683
684impl ProxyHandle {
685 fn new_embedded(runtime: Arc<ProxyRuntime>) -> Self {
686 Self {
687 backend: ProxyBackend::Embedded { runtime },
688 }
689 }
690
691 fn new_remote(client: Arc<KoiClient>) -> Self {
692 Self {
693 backend: ProxyBackend::Remote { client },
694 }
695 }
696
697 pub fn runtime(&self) -> Result<Arc<ProxyRuntime>, KoiError> {
698 match &self.backend {
699 ProxyBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
700 ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
701 }
702 }
703
704 pub fn core(&self) -> Result<Arc<koi_proxy::ProxyCore>, KoiError> {
705 match &self.backend {
706 ProxyBackend::Embedded { runtime } => Ok(runtime.core()),
707 ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
708 }
709 }
710
711 pub async fn entries(&self) -> Vec<ProxyEntry> {
712 match &self.backend {
713 ProxyBackend::Embedded { runtime } => runtime.core().entries().await,
714 ProxyBackend::Remote { client } => {
715 let client = Arc::clone(client);
716 tokio::task::spawn_blocking(move || client.proxy_list())
717 .await
718 .ok()
719 .and_then(|res| res.ok())
720 .and_then(|json| parse_proxy_entries(json).ok())
721 .unwrap_or_default()
722 }
723 }
724 }
725
726 pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, KoiError> {
727 match &self.backend {
728 ProxyBackend::Embedded { runtime } => Ok(runtime.core().upsert(entry).await?),
729 ProxyBackend::Remote { client } => {
730 let client = Arc::clone(client);
731 let entry = entry.clone();
732 let add_client = Arc::clone(&client);
733 tokio::task::spawn_blocking(move || {
734 add_client.proxy_add(
735 &entry.name,
736 entry.listen_port,
737 &entry.backend,
738 entry.allow_remote,
739 )
740 })
741 .await
742 .map_err(map_join_error)??;
743 let list = tokio::task::spawn_blocking(move || client.proxy_list())
744 .await
745 .map_err(map_join_error)??;
746 parse_proxy_entries(list)
747 }
748 }
749 }
750
751 pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, KoiError> {
752 match &self.backend {
753 ProxyBackend::Embedded { runtime } => Ok(runtime.core().remove(name).await?),
754 ProxyBackend::Remote { client } => {
755 let client = Arc::clone(client);
756 let name = name.to_string();
757 let remove_client = Arc::clone(&client);
758 tokio::task::spawn_blocking(move || remove_client.proxy_remove(&name))
759 .await
760 .map_err(map_join_error)??;
761 let list = tokio::task::spawn_blocking(move || client.proxy_list())
762 .await
763 .map_err(map_join_error)??;
764 parse_proxy_entries(list)
765 }
766 }
767 }
768
769 pub async fn start_all(&self) -> Result<(), KoiError> {
770 match &self.backend {
771 ProxyBackend::Embedded { runtime } => Ok(runtime.start_all().await?),
772 ProxyBackend::Remote { .. } => Ok(()),
773 }
774 }
775
776 pub async fn stop_all(&self) {
777 if let ProxyBackend::Embedded { runtime } = &self.backend {
778 runtime.stop_all().await;
779 }
780 }
781}
782
783fn parse_dns_lookup(json: serde_json::Value) -> Option<DnsLookupResult> {
784 let name = json.get("name").and_then(|v| v.as_str())?.to_string();
785 let source = json
786 .get("source")
787 .and_then(|v| v.as_str())
788 .unwrap_or("unknown")
789 .to_string();
790 let ips = json.get("ips").and_then(|v| v.as_array()).map(|arr| {
791 arr.iter()
792 .filter_map(|ip| ip.as_str())
793 .filter_map(|ip| ip.parse::<IpAddr>().ok())
794 .collect::<Vec<_>>()
795 })?;
796 Some(DnsLookupResult { name, ips, source })
797}
798
799fn parse_dns_entries(json: serde_json::Value) -> Result<Vec<DnsEntry>, KoiError> {
800 let entries = json.get("entries").ok_or_else(|| {
801 KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::other(
802 "missing entries",
803 )))
804 })?;
805 let entries = serde_json::from_value(entries.clone()).map_err(|e| {
806 KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::new(
807 std::io::ErrorKind::InvalidData,
808 e.to_string(),
809 )))
810 })?;
811 Ok(entries)
812}
813
814fn parse_proxy_entries(json: serde_json::Value) -> Result<Vec<ProxyEntry>, KoiError> {
815 let entries = json
816 .get("entries")
817 .ok_or_else(|| KoiError::Proxy(koi_proxy::ProxyError::Io("missing entries".to_string())))?
818 .clone();
819 serde_json::from_value(entries)
820 .map_err(|e| KoiError::Proxy(koi_proxy::ProxyError::Io(e.to_string())))
821}
822
823fn extract_capability_status(
824 json: serde_json::Value,
825) -> Option<koi_common::capability::CapabilityStatus> {
826 let caps = json.get("capabilities")?.as_array()?;
827 for cap in caps {
828 if cap.get("name")?.as_str()? == "certmesh" {
829 let name = cap.get("name")?.as_str()?.to_string();
830 let summary = cap
831 .get("summary")
832 .and_then(|v| v.as_str())
833 .unwrap_or("unknown")
834 .to_string();
835 let healthy = cap
836 .get("healthy")
837 .and_then(|v| v.as_bool())
838 .unwrap_or(false);
839 return Some(koi_common::capability::CapabilityStatus {
840 name,
841 summary,
842 healthy,
843 });
844 }
845 }
846 None
847}
848
849fn default_capability_status() -> koi_common::capability::CapabilityStatus {
850 koi_common::capability::CapabilityStatus {
851 name: "certmesh".to_string(),
852 summary: "unknown".to_string(),
853 healthy: false,
854 }
855}
856
857fn mdns_event_from_pipeline(json: serde_json::Value) -> Option<MdnsEvent> {
858 if let Some(found) = json.get("found") {
859 let record: ServiceRecord = serde_json::from_value(found.clone()).ok()?;
860 return Some(MdnsEvent::Found(record));
861 }
862 if let Some(resolved) = json.get("resolved") {
863 let record: ServiceRecord = serde_json::from_value(resolved.clone()).ok()?;
864 return Some(MdnsEvent::Resolved(record));
865 }
866 if let Some(event) = json.get("event") {
867 let kind: EventKind = serde_json::from_value(event.clone()).ok()?;
868 let service = json
869 .get("service")
870 .cloned()
871 .unwrap_or(serde_json::Value::Null);
872 let record: ServiceRecord = serde_json::from_value(service).ok()?;
873 return match kind {
874 EventKind::Found => Some(MdnsEvent::Found(record)),
875 EventKind::Resolved => Some(MdnsEvent::Resolved(record)),
876 EventKind::Removed => Some(MdnsEvent::Removed {
877 name: record.name,
878 service_type: record.service_type,
879 }),
880 };
881 }
882 None
883}