1use std::net::IpAddr;
2use std::sync::Arc;
3
4use tokio::sync::{broadcast, mpsc, Mutex};
5use tokio::task::JoinHandle;
6use tokio_stream::wrappers::BroadcastStream;
7use tokio_util::sync::CancellationToken;
8
9use koi_client::KoiClient;
10use koi_common::capability::Capability;
11use koi_common::types::{EventKind, ServiceRecord};
12use koi_config::state::DnsEntry;
13use koi_dns::{DnsLookupResult, DnsRuntime};
14use koi_health::{HealthCheck, HealthRuntime};
15use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
16use koi_mdns::{BrowseHandle as MdnsBrowseHandle, MdnsCore, MdnsEvent};
17use koi_proxy::{ProxyEntry, ProxyRuntime};
18
19use crate::{map_join_error, KoiError, KoiEvent};
20
21enum HandleBackend {
22 Embedded {
23 mdns: Option<Arc<MdnsCore>>,
24 dns: Option<Arc<DnsRuntime>>,
25 health: Option<Arc<HealthRuntime>>,
26 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
27 proxy: Option<Arc<ProxyRuntime>>,
28 udp: Option<Arc<koi_udp::UdpRuntime>>,
29 runtime: Option<Arc<koi_runtime::RuntimeCore>>,
30 },
31 Remote {
32 client: Arc<KoiClient>,
33 },
34}
35
36pub struct KoiHandle {
37 backend: HandleBackend,
38 data_dir: Option<std::path::PathBuf>,
39 events: broadcast::Sender<KoiEvent>,
40 cancel: CancellationToken,
41 tasks: Vec<JoinHandle<()>>,
42 http_announce_id: Option<String>,
43}
44
45impl KoiHandle {
46 #[allow(clippy::too_many_arguments)]
47 pub(crate) fn new_embedded(
48 mdns: Option<Arc<MdnsCore>>,
49 dns: Option<Arc<DnsRuntime>>,
50 health: Option<Arc<HealthRuntime>>,
51 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
52 proxy: Option<Arc<ProxyRuntime>>,
53 udp: Option<Arc<koi_udp::UdpRuntime>>,
54 runtime: Option<Arc<koi_runtime::RuntimeCore>>,
55 data_dir: Option<std::path::PathBuf>,
56 events: broadcast::Sender<KoiEvent>,
57 cancel: CancellationToken,
58 tasks: Vec<JoinHandle<()>>,
59 http_announce_id: Option<String>,
60 ) -> Self {
61 Self {
62 backend: HandleBackend::Embedded {
63 mdns,
64 dns,
65 health,
66 certmesh,
67 proxy,
68 udp,
69 runtime,
70 },
71 data_dir,
72 events,
73 cancel,
74 tasks,
75 http_announce_id,
76 }
77 }
78
79 pub(crate) fn new_remote(
80 client: Arc<KoiClient>,
81 events: broadcast::Sender<KoiEvent>,
82 cancel: CancellationToken,
83 tasks: Vec<JoinHandle<()>>,
84 ) -> Self {
85 Self {
86 backend: HandleBackend::Remote { client },
87 data_dir: None,
88 events,
89 cancel,
90 tasks,
91 http_announce_id: None,
92 }
93 }
94
95 pub fn events(&self) -> BroadcastStream<KoiEvent> {
96 BroadcastStream::new(self.events.subscribe())
97 }
98
99 pub fn subscribe(&self) -> broadcast::Receiver<KoiEvent> {
100 self.events.subscribe()
101 }
102
103 pub fn mdns(&self) -> Result<MdnsHandle, KoiError> {
104 match &self.backend {
105 HandleBackend::Embedded { mdns, .. } => {
106 let core = mdns.as_ref().ok_or(KoiError::DisabledCapability("mdns"))?;
107 Ok(MdnsHandle::new_embedded(
108 Arc::clone(core),
109 self.events.clone(),
110 ))
111 }
112 HandleBackend::Remote { client } => Ok(MdnsHandle::new_remote(
113 Arc::clone(client),
114 self.events.clone(),
115 )),
116 }
117 }
118
119 pub fn dns(&self) -> Result<DnsHandle, KoiError> {
120 match &self.backend {
121 HandleBackend::Embedded { dns, .. } => {
122 let runtime = dns.as_ref().ok_or(KoiError::DisabledCapability("dns"))?;
123 Ok(DnsHandle::new_embedded(Arc::clone(runtime)))
124 }
125 HandleBackend::Remote { client } => Ok(DnsHandle::new_remote(Arc::clone(client))),
126 }
127 }
128
129 pub fn health(&self) -> Result<HealthHandle, KoiError> {
130 match &self.backend {
131 HandleBackend::Embedded { health, .. } => {
132 let runtime = health
133 .as_ref()
134 .ok_or(KoiError::DisabledCapability("health"))?;
135 Ok(HealthHandle::new_embedded(Arc::clone(runtime)))
136 }
137 HandleBackend::Remote { client } => Ok(HealthHandle::new_remote(Arc::clone(client))),
138 }
139 }
140
141 pub fn certmesh(&self) -> Result<CertmeshHandle, KoiError> {
142 match &self.backend {
143 HandleBackend::Embedded { certmesh, .. } => {
144 let core = certmesh
145 .as_ref()
146 .ok_or(KoiError::DisabledCapability("certmesh"))?;
147 Ok(CertmeshHandle::new_embedded(Arc::clone(core)))
148 }
149 HandleBackend::Remote { client } => Ok(CertmeshHandle::new_remote(Arc::clone(client))),
150 }
151 }
152
153 pub fn vault(&self) -> Result<koi_crypto::vault::Vault, KoiError> {
159 let dir = self
160 .data_dir
161 .as_ref()
162 .ok_or(KoiError::DisabledCapability("vault (no data_dir)"))?;
163 koi_crypto::vault::Vault::open(dir)
164 .map_err(|e| KoiError::Io(std::io::Error::other(e.to_string())))
165 }
166
167 pub fn proxy(&self) -> Result<ProxyHandle, KoiError> {
168 match &self.backend {
169 HandleBackend::Embedded { proxy, .. } => {
170 let runtime = proxy
171 .as_ref()
172 .ok_or(KoiError::DisabledCapability("proxy"))?;
173 Ok(ProxyHandle::new_embedded(Arc::clone(runtime)))
174 }
175 HandleBackend::Remote { client } => Ok(ProxyHandle::new_remote(Arc::clone(client))),
176 }
177 }
178
179 pub fn udp(&self) -> Result<Arc<koi_udp::UdpRuntime>, KoiError> {
184 match &self.backend {
185 HandleBackend::Embedded { udp, .. } => {
186 let runtime = udp.as_ref().ok_or(KoiError::DisabledCapability("udp"))?;
187 Ok(Arc::clone(runtime))
188 }
189 HandleBackend::Remote { .. } => Err(KoiError::DisabledCapability("udp (remote mode)")),
190 }
191 }
192
193 pub fn runtime(&self) -> Result<Arc<koi_runtime::RuntimeCore>, KoiError> {
197 match &self.backend {
198 HandleBackend::Embedded { runtime, .. } => {
199 let core = runtime
200 .as_ref()
201 .ok_or(KoiError::DisabledCapability("runtime"))?;
202 Ok(Arc::clone(core))
203 }
204 HandleBackend::Remote { .. } => {
205 Err(KoiError::DisabledCapability("runtime (remote mode)"))
206 }
207 }
208 }
209
210 pub async fn shutdown(mut self) -> Result<(), KoiError> {
211 self.cancel.cancel();
212 for task in self.tasks.drain(..) {
213 let _ = task.await;
214 }
215
216 if let HandleBackend::Embedded {
217 mdns,
218 dns,
219 health,
220 proxy,
221 ..
222 } = &self.backend
223 {
224 if let Some(runtime) = proxy {
225 runtime.stop_all().await;
226 }
227 if let Some(runtime) = health {
228 let _ = runtime.stop().await;
229 }
230 if let Some(runtime) = dns {
231 let _ = runtime.stop().await;
232 }
233 if let Some(id) = &self.http_announce_id {
234 if let Some(core) = mdns {
235 if let Err(e) = core.unregister(id) {
236 tracing::warn!(error = %e, "Failed to withdraw HTTP mDNS announcement");
237 }
238 }
239 }
240 if let Some(core) = mdns {
241 core.shutdown().await?;
242 }
243 }
244
245 Ok(())
246 }
247}
248
249pub struct KoiBrowseHandle {
250 backend: BrowseBackend,
251}
252
253enum BrowseBackend {
254 Embedded(MdnsBrowseHandle),
255 Remote(Mutex<mpsc::Receiver<MdnsEvent>>),
256}
257
258impl KoiBrowseHandle {
259 fn embedded(handle: MdnsBrowseHandle) -> Self {
260 Self {
261 backend: BrowseBackend::Embedded(handle),
262 }
263 }
264
265 fn remote(rx: mpsc::Receiver<MdnsEvent>) -> Self {
266 Self {
267 backend: BrowseBackend::Remote(Mutex::new(rx)),
268 }
269 }
270
271 pub async fn recv(&self) -> Option<MdnsEvent> {
272 match &self.backend {
273 BrowseBackend::Embedded(handle) => handle.recv().await,
274 BrowseBackend::Remote(rx) => rx.lock().await.recv().await,
275 }
276 }
277}
278
279pub struct MdnsHandle {
280 backend: MdnsBackend,
281 events: broadcast::Sender<KoiEvent>,
282}
283
284enum MdnsBackend {
285 Embedded { core: Arc<MdnsCore> },
286 Remote { client: Arc<KoiClient> },
287}
288
289impl MdnsHandle {
290 fn new_embedded(core: Arc<MdnsCore>, events: broadcast::Sender<KoiEvent>) -> Self {
291 Self {
292 backend: MdnsBackend::Embedded { core },
293 events,
294 }
295 }
296
297 fn new_remote(client: Arc<KoiClient>, events: broadcast::Sender<KoiEvent>) -> Self {
298 Self {
299 backend: MdnsBackend::Remote { client },
300 events,
301 }
302 }
303
304 pub fn core(&self) -> Result<Arc<MdnsCore>, KoiError> {
305 match &self.backend {
306 MdnsBackend::Embedded { core } => Ok(Arc::clone(core)),
307 MdnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("mdns")),
308 }
309 }
310
311 pub async fn browse(&self, service_type: &str) -> Result<KoiBrowseHandle, KoiError> {
312 match &self.backend {
313 MdnsBackend::Embedded { core } => {
314 let handle = core.browse(service_type).await?;
315 Ok(KoiBrowseHandle::embedded(handle))
316 }
317 MdnsBackend::Remote { client } => {
318 let (tx, rx) = mpsc::channel(64);
319 let client = Arc::clone(client);
320 let service_type = service_type.to_string();
321 tokio::task::spawn_blocking(move || {
322 let stream = match client.browse_stream(&service_type) {
323 Ok(stream) => stream,
324 Err(_) => return,
325 };
326 for item in stream {
327 let Ok(json) = item else {
328 break;
329 };
330 if let Some(event) = mdns_event_from_pipeline(json) {
331 if tx.blocking_send(event).is_err() {
332 break;
333 }
334 }
335 }
336 });
337 Ok(KoiBrowseHandle::remote(rx))
338 }
339 }
340 }
341
342 pub async fn resolve(&self, name: &str) -> Result<ServiceRecord, KoiError> {
343 match &self.backend {
344 MdnsBackend::Embedded { core } => Ok(core.resolve(name).await?),
345 MdnsBackend::Remote { client } => {
346 let name = name.to_string();
347 let client = Arc::clone(client);
348 let record = tokio::task::spawn_blocking(move || client.resolve(&name))
349 .await
350 .map_err(map_join_error)??;
351 Ok(record)
352 }
353 }
354 }
355
356 pub fn register(&self, payload: RegisterPayload) -> Result<RegistrationResult, KoiError> {
357 match &self.backend {
358 MdnsBackend::Embedded { core } => Ok(core.register(payload)?),
359 MdnsBackend::Remote { client } => Ok(client.register(&payload)?),
360 }
361 }
362
363 pub fn unregister(&self, id: &str) -> Result<(), KoiError> {
364 match &self.backend {
365 MdnsBackend::Embedded { core } => Ok(core.unregister(id)?),
366 MdnsBackend::Remote { client } => Ok(client.unregister(id)?),
367 }
368 }
369
370 pub fn subscribe(&self) -> broadcast::Receiver<MdnsEvent> {
371 match &self.backend {
372 MdnsBackend::Embedded { core } => core.subscribe(),
373 MdnsBackend::Remote { .. } => {
374 let (_tx, rx) = broadcast::channel(1);
375 rx
376 }
377 }
378 }
379
380 pub fn emit_event(&self, event: KoiEvent) {
381 let _ = self.events.send(event);
382 }
383}
384
385pub struct DnsHandle {
386 backend: DnsBackend,
387}
388
389enum DnsBackend {
390 Embedded { runtime: Arc<DnsRuntime> },
391 Remote { client: Arc<KoiClient> },
392}
393
394impl DnsHandle {
395 fn new_embedded(runtime: Arc<DnsRuntime>) -> Self {
396 Self {
397 backend: DnsBackend::Embedded { runtime },
398 }
399 }
400
401 fn new_remote(client: Arc<KoiClient>) -> Self {
402 Self {
403 backend: DnsBackend::Remote { client },
404 }
405 }
406
407 pub fn runtime(&self) -> Result<Arc<DnsRuntime>, KoiError> {
408 match &self.backend {
409 DnsBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
410 DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
411 }
412 }
413
414 pub fn core(&self) -> Result<Arc<koi_dns::DnsCore>, KoiError> {
415 match &self.backend {
416 DnsBackend::Embedded { runtime } => Ok(runtime.core()),
417 DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
418 }
419 }
420
421 pub async fn lookup(
422 &self,
423 name: &str,
424 record_type: hickory_proto::rr::RecordType,
425 ) -> Option<DnsLookupResult> {
426 match &self.backend {
427 DnsBackend::Embedded { runtime } => runtime.core().lookup(name, record_type).await,
428 DnsBackend::Remote { client } => {
429 let name = name.to_string();
430 let client = Arc::clone(client);
431 let result =
432 tokio::task::spawn_blocking(move || client.dns_lookup(&name, record_type))
433 .await
434 .ok()
435 .and_then(|res| res.ok());
436 let json = match result {
437 Some(json) => json,
438 None => return None,
439 };
440 parse_dns_lookup(json)
441 }
442 }
443 }
444
445 pub fn list_names(&self) -> Vec<String> {
446 match &self.backend {
447 DnsBackend::Embedded { runtime } => runtime.core().list_names(),
448 DnsBackend::Remote { client } => {
449 let result = client.dns_list();
450 let Ok(json) = result else {
451 return Vec::new();
452 };
453 json.get("names")
454 .and_then(|v| v.as_array())
455 .map(|arr| {
456 arr.iter()
457 .filter_map(|name| name.as_str().map(|s| s.to_string()))
458 .collect()
459 })
460 .unwrap_or_default()
461 }
462 }
463 }
464
465 pub async fn start(&self) -> Result<bool, KoiError> {
466 match &self.backend {
467 DnsBackend::Embedded { runtime } => Ok(runtime.start().await?),
468 DnsBackend::Remote { client } => {
469 let client = Arc::clone(client);
470 let started = tokio::task::spawn_blocking(move || client.dns_start())
471 .await
472 .map_err(map_join_error)??
473 .get("started")
474 .and_then(|v| v.as_bool())
475 .unwrap_or(false);
476 Ok(started)
477 }
478 }
479 }
480
481 pub async fn stop(&self) -> bool {
482 match &self.backend {
483 DnsBackend::Embedded { runtime } => runtime.stop().await,
484 DnsBackend::Remote { client } => {
485 let client = Arc::clone(client);
486 tokio::task::spawn_blocking(move || client.dns_stop())
487 .await
488 .ok()
489 .and_then(|res| res.ok())
490 .and_then(|json| json.get("stopped").and_then(|v| v.as_bool()))
491 .unwrap_or(false)
492 }
493 }
494 }
495
496 pub fn add_entry(&self, entry: DnsEntry) -> Result<Vec<DnsEntry>, KoiError> {
497 match &self.backend {
498 DnsBackend::Embedded { runtime } => Ok(runtime.core().add_entry(entry)?),
499 DnsBackend::Remote { client } => {
500 let json = client.dns_add(&entry.name, &entry.ip, entry.ttl)?;
501 parse_dns_entries(json)
502 }
503 }
504 }
505
506 pub fn remove_entry(&self, name: &str) -> Result<Vec<DnsEntry>, KoiError> {
507 match &self.backend {
508 DnsBackend::Embedded { runtime } => {
509 Ok(runtime.core().remove_entry(name)?.unwrap_or_default())
510 }
511 DnsBackend::Remote { client } => {
512 let json = client.dns_remove(name)?;
513 parse_dns_entries(json)
514 }
515 }
516 }
517}
518
519pub struct HealthHandle {
520 backend: HealthBackend,
521}
522
523enum HealthBackend {
524 Embedded { runtime: Arc<HealthRuntime> },
525 Remote { client: Arc<KoiClient> },
526}
527
528impl HealthHandle {
529 fn new_embedded(runtime: Arc<HealthRuntime>) -> Self {
530 Self {
531 backend: HealthBackend::Embedded { runtime },
532 }
533 }
534
535 fn new_remote(client: Arc<KoiClient>) -> Self {
536 Self {
537 backend: HealthBackend::Remote { client },
538 }
539 }
540
541 pub fn core(&self) -> Result<Arc<koi_health::HealthCore>, KoiError> {
542 match &self.backend {
543 HealthBackend::Embedded { runtime } => Ok(runtime.core()),
544 HealthBackend::Remote { .. } => Err(KoiError::DisabledCapability("health")),
545 }
546 }
547
548 pub async fn status(&self) -> koi_health::HealthSnapshot {
549 match &self.backend {
550 HealthBackend::Embedded { runtime } => runtime.core().snapshot().await,
551 HealthBackend::Remote { client } => {
552 let client = Arc::clone(client);
553 let json = tokio::task::spawn_blocking(move || client.health_status())
554 .await
555 .ok()
556 .and_then(|res| res.ok());
557 json.and_then(|json| serde_json::from_value(json).ok())
558 .unwrap_or_else(|| koi_health::HealthSnapshot {
559 machines: Vec::new(),
560 services: Vec::new(),
561 })
562 }
563 }
564 }
565
566 pub async fn add_check(&self, check: HealthCheck) -> Result<(), KoiError> {
567 match &self.backend {
568 HealthBackend::Embedded { runtime } => Ok(runtime.core().add_check(check).await?),
569 HealthBackend::Remote { client } => {
570 let client = Arc::clone(client);
571 let check = check.clone();
572 tokio::task::spawn_blocking(move || {
573 client.health_add_check(
574 &check.name,
575 check.kind,
576 &check.target,
577 check.interval_secs,
578 check.timeout_secs,
579 )
580 })
581 .await
582 .map_err(map_join_error)??;
583 Ok(())
584 }
585 }
586 }
587
588 pub async fn remove_check(&self, name: &str) -> Result<(), KoiError> {
589 match &self.backend {
590 HealthBackend::Embedded { runtime } => Ok(runtime.core().remove_check(name).await?),
591 HealthBackend::Remote { client } => {
592 let client = Arc::clone(client);
593 let name = name.to_string();
594 tokio::task::spawn_blocking(move || client.health_remove_check(&name))
595 .await
596 .map_err(map_join_error)??;
597 Ok(())
598 }
599 }
600 }
601
602 pub async fn start(&self) -> Result<bool, KoiError> {
603 match &self.backend {
604 HealthBackend::Embedded { runtime } => Ok(runtime.start().await?),
605 HealthBackend::Remote { .. } => Ok(false),
606 }
607 }
608
609 pub async fn stop(&self) -> bool {
610 match &self.backend {
611 HealthBackend::Embedded { runtime } => runtime.stop().await,
612 HealthBackend::Remote { .. } => false,
613 }
614 }
615}
616
617pub struct CertmeshHandle {
618 backend: CertmeshBackend,
619}
620
621enum CertmeshBackend {
622 Embedded {
623 core: Arc<koi_certmesh::CertmeshCore>,
624 },
625 Remote {
626 client: Arc<KoiClient>,
627 },
628}
629
630impl CertmeshHandle {
631 fn new_embedded(core: Arc<koi_certmesh::CertmeshCore>) -> Self {
632 Self {
633 backend: CertmeshBackend::Embedded { core },
634 }
635 }
636
637 fn new_remote(client: Arc<KoiClient>) -> Self {
638 Self {
639 backend: CertmeshBackend::Remote { client },
640 }
641 }
642
643 pub fn core(&self) -> Result<Arc<koi_certmesh::CertmeshCore>, KoiError> {
644 match &self.backend {
645 CertmeshBackend::Embedded { core } => Ok(Arc::clone(core)),
646 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
647 }
648 }
649
650 pub async fn status(&self) -> koi_common::capability::CapabilityStatus {
651 match &self.backend {
652 CertmeshBackend::Embedded { core } => core.status(),
653 CertmeshBackend::Remote { client } => {
654 let client = Arc::clone(client);
655 let json = tokio::task::spawn_blocking(move || client.unified_status())
656 .await
657 .ok()
658 .and_then(|res| res.ok());
659 json.and_then(extract_capability_status)
660 .unwrap_or_else(default_capability_status)
661 }
662 }
663 }
664}
665
666pub struct ProxyHandle {
667 backend: ProxyBackend,
668}
669
670enum ProxyBackend {
671 Embedded { runtime: Arc<ProxyRuntime> },
672 Remote { client: Arc<KoiClient> },
673}
674
675impl ProxyHandle {
676 fn new_embedded(runtime: Arc<ProxyRuntime>) -> Self {
677 Self {
678 backend: ProxyBackend::Embedded { runtime },
679 }
680 }
681
682 fn new_remote(client: Arc<KoiClient>) -> Self {
683 Self {
684 backend: ProxyBackend::Remote { client },
685 }
686 }
687
688 pub fn runtime(&self) -> Result<Arc<ProxyRuntime>, KoiError> {
689 match &self.backend {
690 ProxyBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
691 ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
692 }
693 }
694
695 pub fn core(&self) -> Result<Arc<koi_proxy::ProxyCore>, KoiError> {
696 match &self.backend {
697 ProxyBackend::Embedded { runtime } => Ok(runtime.core()),
698 ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
699 }
700 }
701
702 pub async fn entries(&self) -> Vec<ProxyEntry> {
703 match &self.backend {
704 ProxyBackend::Embedded { runtime } => runtime.core().entries().await,
705 ProxyBackend::Remote { client } => {
706 let client = Arc::clone(client);
707 tokio::task::spawn_blocking(move || client.proxy_list())
708 .await
709 .ok()
710 .and_then(|res| res.ok())
711 .and_then(|json| parse_proxy_entries(json).ok())
712 .unwrap_or_default()
713 }
714 }
715 }
716
717 pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, KoiError> {
718 match &self.backend {
719 ProxyBackend::Embedded { runtime } => Ok(runtime.core().upsert(entry).await?),
720 ProxyBackend::Remote { client } => {
721 let client = Arc::clone(client);
722 let entry = entry.clone();
723 let add_client = Arc::clone(&client);
724 tokio::task::spawn_blocking(move || {
725 add_client.proxy_add(
726 &entry.name,
727 entry.listen_port,
728 &entry.backend,
729 entry.allow_remote,
730 )
731 })
732 .await
733 .map_err(map_join_error)??;
734 let list = tokio::task::spawn_blocking(move || client.proxy_list())
735 .await
736 .map_err(map_join_error)??;
737 parse_proxy_entries(list)
738 }
739 }
740 }
741
742 pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, KoiError> {
743 match &self.backend {
744 ProxyBackend::Embedded { runtime } => Ok(runtime.core().remove(name).await?),
745 ProxyBackend::Remote { client } => {
746 let client = Arc::clone(client);
747 let name = name.to_string();
748 let remove_client = Arc::clone(&client);
749 tokio::task::spawn_blocking(move || remove_client.proxy_remove(&name))
750 .await
751 .map_err(map_join_error)??;
752 let list = tokio::task::spawn_blocking(move || client.proxy_list())
753 .await
754 .map_err(map_join_error)??;
755 parse_proxy_entries(list)
756 }
757 }
758 }
759
760 pub async fn start_all(&self) -> Result<(), KoiError> {
761 match &self.backend {
762 ProxyBackend::Embedded { runtime } => Ok(runtime.start_all().await?),
763 ProxyBackend::Remote { .. } => Ok(()),
764 }
765 }
766
767 pub async fn stop_all(&self) {
768 if let ProxyBackend::Embedded { runtime } = &self.backend {
769 runtime.stop_all().await;
770 }
771 }
772}
773
774fn parse_dns_lookup(json: serde_json::Value) -> Option<DnsLookupResult> {
775 let name = json.get("name").and_then(|v| v.as_str())?.to_string();
776 let source = json
777 .get("source")
778 .and_then(|v| v.as_str())
779 .unwrap_or("unknown")
780 .to_string();
781 let ips = json.get("ips").and_then(|v| v.as_array()).map(|arr| {
782 arr.iter()
783 .filter_map(|ip| ip.as_str())
784 .filter_map(|ip| ip.parse::<IpAddr>().ok())
785 .collect::<Vec<_>>()
786 })?;
787 Some(DnsLookupResult { name, ips, source })
788}
789
790fn parse_dns_entries(json: serde_json::Value) -> Result<Vec<DnsEntry>, KoiError> {
791 let entries = json.get("entries").ok_or_else(|| {
792 KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::other(
793 "missing entries",
794 )))
795 })?;
796 let entries = serde_json::from_value(entries.clone()).map_err(|e| {
797 KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::new(
798 std::io::ErrorKind::InvalidData,
799 e.to_string(),
800 )))
801 })?;
802 Ok(entries)
803}
804
805fn parse_proxy_entries(json: serde_json::Value) -> Result<Vec<ProxyEntry>, KoiError> {
806 let entries = json
807 .get("entries")
808 .ok_or_else(|| KoiError::Proxy(koi_proxy::ProxyError::Io("missing entries".to_string())))?
809 .clone();
810 serde_json::from_value(entries)
811 .map_err(|e| KoiError::Proxy(koi_proxy::ProxyError::Io(e.to_string())))
812}
813
814fn extract_capability_status(
815 json: serde_json::Value,
816) -> Option<koi_common::capability::CapabilityStatus> {
817 let caps = json.get("capabilities")?.as_array()?;
818 for cap in caps {
819 if cap.get("name")?.as_str()? == "certmesh" {
820 let name = cap.get("name")?.as_str()?.to_string();
821 let summary = cap
822 .get("summary")
823 .and_then(|v| v.as_str())
824 .unwrap_or("unknown")
825 .to_string();
826 let healthy = cap
827 .get("healthy")
828 .and_then(|v| v.as_bool())
829 .unwrap_or(false);
830 return Some(koi_common::capability::CapabilityStatus {
831 name,
832 summary,
833 healthy,
834 });
835 }
836 }
837 None
838}
839
840fn default_capability_status() -> koi_common::capability::CapabilityStatus {
841 koi_common::capability::CapabilityStatus {
842 name: "certmesh".to_string(),
843 summary: "unknown".to_string(),
844 healthy: false,
845 }
846}
847
848fn mdns_event_from_pipeline(json: serde_json::Value) -> Option<MdnsEvent> {
849 if let Some(found) = json.get("found") {
850 let record: ServiceRecord = serde_json::from_value(found.clone()).ok()?;
851 return Some(MdnsEvent::Found(record));
852 }
853 if let Some(resolved) = json.get("resolved") {
854 let record: ServiceRecord = serde_json::from_value(resolved.clone()).ok()?;
855 return Some(MdnsEvent::Resolved(record));
856 }
857 if let Some(event) = json.get("event") {
858 let kind: EventKind = serde_json::from_value(event.clone()).ok()?;
859 let service = json
860 .get("service")
861 .cloned()
862 .unwrap_or(serde_json::Value::Null);
863 let record: ServiceRecord = serde_json::from_value(service).ok()?;
864 return match kind {
865 EventKind::Found => Some(MdnsEvent::Found(record)),
866 EventKind::Resolved => Some(MdnsEvent::Resolved(record)),
867 EventKind::Removed => Some(MdnsEvent::Removed {
868 name: record.name,
869 service_type: record.service_type,
870 }),
871 };
872 }
873 None
874}