1use std::net::IpAddr;
2use std::sync::Arc;
3
4use tokio::sync::{broadcast, mpsc, Mutex};
5use tokio::task::JoinHandle;
6use tokio_stream::wrappers::BroadcastStream;
7use tokio_util::sync::CancellationToken;
8
9use koi_client::KoiClient;
10use koi_common::capability::Capability;
11use koi_common::types::{EventKind, ServiceRecord};
12use koi_config::state::{load_dns_state, save_dns_state, DnsEntry};
13use koi_dns::{DnsLookupResult, DnsRuntime};
14use koi_health::{HealthCheck, HealthRuntime};
15use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
16use koi_mdns::{BrowseHandle as MdnsBrowseHandle, MdnsCore, MdnsEvent};
17use koi_proxy::{ProxyEntry, ProxyRuntime};
18
19use crate::{map_join_error, KoiError, KoiEvent};
20
21enum HandleBackend {
22 Embedded {
23 mdns: Option<Arc<MdnsCore>>,
24 dns: Option<Arc<DnsRuntime>>,
25 health: Option<Arc<HealthRuntime>>,
26 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
27 proxy: Option<Arc<ProxyRuntime>>,
28 },
29 Remote {
30 client: Arc<KoiClient>,
31 },
32}
33
34pub struct KoiHandle {
35 backend: HandleBackend,
36 events: broadcast::Sender<KoiEvent>,
37 cancel: CancellationToken,
38 tasks: Vec<JoinHandle<()>>,
39}
40
41impl KoiHandle {
42 #[allow(clippy::too_many_arguments)]
43 pub(crate) fn new_embedded(
44 mdns: Option<Arc<MdnsCore>>,
45 dns: Option<Arc<DnsRuntime>>,
46 health: Option<Arc<HealthRuntime>>,
47 certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
48 proxy: Option<Arc<ProxyRuntime>>,
49 events: broadcast::Sender<KoiEvent>,
50 cancel: CancellationToken,
51 tasks: Vec<JoinHandle<()>>,
52 ) -> Self {
53 Self {
54 backend: HandleBackend::Embedded {
55 mdns,
56 dns,
57 health,
58 certmesh,
59 proxy,
60 },
61 events,
62 cancel,
63 tasks,
64 }
65 }
66
67 pub(crate) fn new_remote(
68 client: Arc<KoiClient>,
69 events: broadcast::Sender<KoiEvent>,
70 cancel: CancellationToken,
71 tasks: Vec<JoinHandle<()>>,
72 ) -> Self {
73 Self {
74 backend: HandleBackend::Remote { client },
75 events,
76 cancel,
77 tasks,
78 }
79 }
80
81 pub fn events(&self) -> BroadcastStream<KoiEvent> {
82 BroadcastStream::new(self.events.subscribe())
83 }
84
85 pub fn subscribe(&self) -> broadcast::Receiver<KoiEvent> {
86 self.events.subscribe()
87 }
88
89 pub fn mdns(&self) -> Result<MdnsHandle, KoiError> {
90 match &self.backend {
91 HandleBackend::Embedded { mdns, .. } => {
92 let core = mdns.as_ref().ok_or(KoiError::DisabledCapability("mdns"))?;
93 Ok(MdnsHandle::new_embedded(
94 Arc::clone(core),
95 self.events.clone(),
96 ))
97 }
98 HandleBackend::Remote { client } => Ok(MdnsHandle::new_remote(
99 Arc::clone(client),
100 self.events.clone(),
101 )),
102 }
103 }
104
105 pub fn dns(&self) -> Result<DnsHandle, KoiError> {
106 match &self.backend {
107 HandleBackend::Embedded { dns, .. } => {
108 let runtime = dns.as_ref().ok_or(KoiError::DisabledCapability("dns"))?;
109 Ok(DnsHandle::new_embedded(Arc::clone(runtime)))
110 }
111 HandleBackend::Remote { client } => Ok(DnsHandle::new_remote(Arc::clone(client))),
112 }
113 }
114
115 pub fn health(&self) -> Result<HealthHandle, KoiError> {
116 match &self.backend {
117 HandleBackend::Embedded { health, .. } => {
118 let runtime = health
119 .as_ref()
120 .ok_or(KoiError::DisabledCapability("health"))?;
121 Ok(HealthHandle::new_embedded(Arc::clone(runtime)))
122 }
123 HandleBackend::Remote { client } => Ok(HealthHandle::new_remote(Arc::clone(client))),
124 }
125 }
126
127 pub fn certmesh(&self) -> Result<CertmeshHandle, KoiError> {
128 match &self.backend {
129 HandleBackend::Embedded { certmesh, .. } => {
130 let core = certmesh
131 .as_ref()
132 .ok_or(KoiError::DisabledCapability("certmesh"))?;
133 Ok(CertmeshHandle::new_embedded(Arc::clone(core)))
134 }
135 HandleBackend::Remote { client } => Ok(CertmeshHandle::new_remote(Arc::clone(client))),
136 }
137 }
138
139 pub fn proxy(&self) -> Result<ProxyHandle, KoiError> {
140 match &self.backend {
141 HandleBackend::Embedded { proxy, .. } => {
142 let runtime = proxy
143 .as_ref()
144 .ok_or(KoiError::DisabledCapability("proxy"))?;
145 Ok(ProxyHandle::new_embedded(Arc::clone(runtime)))
146 }
147 HandleBackend::Remote { client } => Ok(ProxyHandle::new_remote(Arc::clone(client))),
148 }
149 }
150
151 pub async fn shutdown(mut self) -> Result<(), KoiError> {
152 self.cancel.cancel();
153 for task in self.tasks.drain(..) {
154 let _ = task.await;
155 }
156
157 if let HandleBackend::Embedded {
158 mdns,
159 dns,
160 health,
161 proxy,
162 ..
163 } = &self.backend
164 {
165 if let Some(runtime) = proxy {
166 runtime.stop_all().await;
167 }
168 if let Some(runtime) = health {
169 let _ = runtime.stop().await;
170 }
171 if let Some(runtime) = dns {
172 let _ = runtime.stop().await;
173 }
174 if let Some(core) = mdns {
175 core.shutdown().await?;
176 }
177 }
178
179 Ok(())
180 }
181}
182
183pub struct KoiBrowseHandle {
184 backend: BrowseBackend,
185}
186
187enum BrowseBackend {
188 Embedded(MdnsBrowseHandle),
189 Remote(Mutex<mpsc::Receiver<MdnsEvent>>),
190}
191
192impl KoiBrowseHandle {
193 fn embedded(handle: MdnsBrowseHandle) -> Self {
194 Self {
195 backend: BrowseBackend::Embedded(handle),
196 }
197 }
198
199 fn remote(rx: mpsc::Receiver<MdnsEvent>) -> Self {
200 Self {
201 backend: BrowseBackend::Remote(Mutex::new(rx)),
202 }
203 }
204
205 pub async fn recv(&self) -> Option<MdnsEvent> {
206 match &self.backend {
207 BrowseBackend::Embedded(handle) => handle.recv().await,
208 BrowseBackend::Remote(rx) => rx.lock().await.recv().await,
209 }
210 }
211}
212
213pub struct MdnsHandle {
214 backend: MdnsBackend,
215 events: broadcast::Sender<KoiEvent>,
216}
217
218enum MdnsBackend {
219 Embedded { core: Arc<MdnsCore> },
220 Remote { client: Arc<KoiClient> },
221}
222
223impl MdnsHandle {
224 fn new_embedded(core: Arc<MdnsCore>, events: broadcast::Sender<KoiEvent>) -> Self {
225 Self {
226 backend: MdnsBackend::Embedded { core },
227 events,
228 }
229 }
230
231 fn new_remote(client: Arc<KoiClient>, events: broadcast::Sender<KoiEvent>) -> Self {
232 Self {
233 backend: MdnsBackend::Remote { client },
234 events,
235 }
236 }
237
238 pub fn core(&self) -> Result<Arc<MdnsCore>, KoiError> {
239 match &self.backend {
240 MdnsBackend::Embedded { core } => Ok(Arc::clone(core)),
241 MdnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("mdns")),
242 }
243 }
244
245 pub async fn browse(&self, service_type: &str) -> Result<KoiBrowseHandle, KoiError> {
246 match &self.backend {
247 MdnsBackend::Embedded { core } => {
248 let handle = core.browse(service_type).await?;
249 Ok(KoiBrowseHandle::embedded(handle))
250 }
251 MdnsBackend::Remote { client } => {
252 let (tx, rx) = mpsc::channel(64);
253 let client = Arc::clone(client);
254 let service_type = service_type.to_string();
255 tokio::task::spawn_blocking(move || {
256 let stream = match client.browse_stream(&service_type) {
257 Ok(stream) => stream,
258 Err(_) => return,
259 };
260 for item in stream {
261 let Ok(json) = item else {
262 break;
263 };
264 if let Some(event) = mdns_event_from_pipeline(json) {
265 if tx.blocking_send(event).is_err() {
266 break;
267 }
268 }
269 }
270 });
271 Ok(KoiBrowseHandle::remote(rx))
272 }
273 }
274 }
275
276 pub async fn resolve(&self, name: &str) -> Result<ServiceRecord, KoiError> {
277 match &self.backend {
278 MdnsBackend::Embedded { core } => Ok(core.resolve(name).await?),
279 MdnsBackend::Remote { client } => {
280 let name = name.to_string();
281 let client = Arc::clone(client);
282 let record = tokio::task::spawn_blocking(move || client.resolve(&name))
283 .await
284 .map_err(map_join_error)??;
285 Ok(record)
286 }
287 }
288 }
289
290 pub fn register(&self, payload: RegisterPayload) -> Result<RegistrationResult, KoiError> {
291 match &self.backend {
292 MdnsBackend::Embedded { core } => Ok(core.register(payload)?),
293 MdnsBackend::Remote { client } => Ok(client.register(&payload)?),
294 }
295 }
296
297 pub fn unregister(&self, id: &str) -> Result<(), KoiError> {
298 match &self.backend {
299 MdnsBackend::Embedded { core } => Ok(core.unregister(id)?),
300 MdnsBackend::Remote { client } => Ok(client.unregister(id)?),
301 }
302 }
303
304 pub fn subscribe(&self) -> broadcast::Receiver<MdnsEvent> {
305 match &self.backend {
306 MdnsBackend::Embedded { core } => core.subscribe(),
307 MdnsBackend::Remote { .. } => {
308 let (_tx, rx) = broadcast::channel(1);
309 rx
310 }
311 }
312 }
313
314 pub fn emit_event(&self, event: KoiEvent) {
315 let _ = self.events.send(event);
316 }
317}
318
319pub struct DnsHandle {
320 backend: DnsBackend,
321}
322
323enum DnsBackend {
324 Embedded { runtime: Arc<DnsRuntime> },
325 Remote { client: Arc<KoiClient> },
326}
327
328impl DnsHandle {
329 fn new_embedded(runtime: Arc<DnsRuntime>) -> Self {
330 Self {
331 backend: DnsBackend::Embedded { runtime },
332 }
333 }
334
335 fn new_remote(client: Arc<KoiClient>) -> Self {
336 Self {
337 backend: DnsBackend::Remote { client },
338 }
339 }
340
341 pub fn runtime(&self) -> Result<Arc<DnsRuntime>, KoiError> {
342 match &self.backend {
343 DnsBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
344 DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
345 }
346 }
347
348 pub fn core(&self) -> Result<Arc<koi_dns::DnsCore>, KoiError> {
349 match &self.backend {
350 DnsBackend::Embedded { runtime } => Ok(runtime.core()),
351 DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
352 }
353 }
354
355 pub async fn lookup(
356 &self,
357 name: &str,
358 record_type: hickory_proto::rr::RecordType,
359 ) -> Option<DnsLookupResult> {
360 match &self.backend {
361 DnsBackend::Embedded { runtime } => runtime.core().lookup(name, record_type).await,
362 DnsBackend::Remote { client } => {
363 let name = name.to_string();
364 let client = Arc::clone(client);
365 let result =
366 tokio::task::spawn_blocking(move || client.dns_lookup(&name, record_type))
367 .await
368 .ok()
369 .and_then(|res| res.ok());
370 let json = match result {
371 Some(json) => json,
372 None => return None,
373 };
374 parse_dns_lookup(json)
375 }
376 }
377 }
378
379 pub fn list_names(&self) -> Vec<String> {
380 match &self.backend {
381 DnsBackend::Embedded { runtime } => runtime.core().list_names(),
382 DnsBackend::Remote { client } => {
383 let result = client.dns_list();
384 let Ok(json) = result else {
385 return Vec::new();
386 };
387 json.get("names")
388 .and_then(|v| v.as_array())
389 .map(|arr| {
390 arr.iter()
391 .filter_map(|name| name.as_str().map(|s| s.to_string()))
392 .collect()
393 })
394 .unwrap_or_default()
395 }
396 }
397 }
398
399 pub async fn start(&self) -> Result<bool, KoiError> {
400 match &self.backend {
401 DnsBackend::Embedded { runtime } => Ok(runtime.start().await?),
402 DnsBackend::Remote { client } => {
403 let client = Arc::clone(client);
404 let started = tokio::task::spawn_blocking(move || client.dns_start())
405 .await
406 .map_err(map_join_error)??
407 .get("started")
408 .and_then(|v| v.as_bool())
409 .unwrap_or(false);
410 Ok(started)
411 }
412 }
413 }
414
415 pub async fn stop(&self) -> bool {
416 match &self.backend {
417 DnsBackend::Embedded { runtime } => runtime.stop().await,
418 DnsBackend::Remote { client } => {
419 let client = Arc::clone(client);
420 tokio::task::spawn_blocking(move || client.dns_stop())
421 .await
422 .ok()
423 .and_then(|res| res.ok())
424 .and_then(|json| json.get("stopped").and_then(|v| v.as_bool()))
425 .unwrap_or(false)
426 }
427 }
428 }
429
430 pub fn add_entry(&self, entry: DnsEntry) -> Result<Vec<DnsEntry>, KoiError> {
431 match &self.backend {
432 DnsBackend::Embedded { runtime } => {
433 let mut state = load_dns_state().unwrap_or_default();
434 if let Some(existing) = state.entries.iter_mut().find(|e| e.name == entry.name) {
435 *existing = entry.clone();
436 } else {
437 state.entries.push(entry.clone());
438 }
439 save_dns_state(&state)?;
440 runtime.core().emit(koi_dns::DnsEvent::EntryUpdated {
441 name: entry.name,
442 ip: entry.ip,
443 });
444 Ok(state.entries)
445 }
446 DnsBackend::Remote { client } => {
447 let json = client.dns_add(&entry.name, &entry.ip, entry.ttl)?;
448 parse_dns_entries(json)
449 }
450 }
451 }
452
453 pub fn remove_entry(&self, name: &str) -> Result<Vec<DnsEntry>, KoiError> {
454 match &self.backend {
455 DnsBackend::Embedded { runtime } => {
456 let mut state = load_dns_state().unwrap_or_default();
457 state.entries.retain(|entry| entry.name != name);
458 save_dns_state(&state)?;
459 runtime.core().emit(koi_dns::DnsEvent::EntryRemoved {
460 name: name.to_string(),
461 });
462 Ok(state.entries)
463 }
464 DnsBackend::Remote { client } => {
465 let json = client.dns_remove(name)?;
466 parse_dns_entries(json)
467 }
468 }
469 }
470}
471
472pub struct HealthHandle {
473 backend: HealthBackend,
474}
475
476enum HealthBackend {
477 Embedded { runtime: Arc<HealthRuntime> },
478 Remote { client: Arc<KoiClient> },
479}
480
481impl HealthHandle {
482 fn new_embedded(runtime: Arc<HealthRuntime>) -> Self {
483 Self {
484 backend: HealthBackend::Embedded { runtime },
485 }
486 }
487
488 fn new_remote(client: Arc<KoiClient>) -> Self {
489 Self {
490 backend: HealthBackend::Remote { client },
491 }
492 }
493
494 pub fn core(&self) -> Result<Arc<koi_health::HealthCore>, KoiError> {
495 match &self.backend {
496 HealthBackend::Embedded { runtime } => Ok(runtime.core()),
497 HealthBackend::Remote { .. } => Err(KoiError::DisabledCapability("health")),
498 }
499 }
500
501 pub async fn status(&self) -> koi_health::HealthSnapshot {
502 match &self.backend {
503 HealthBackend::Embedded { runtime } => runtime.core().snapshot().await,
504 HealthBackend::Remote { client } => {
505 let client = Arc::clone(client);
506 let json = tokio::task::spawn_blocking(move || client.health_status())
507 .await
508 .ok()
509 .and_then(|res| res.ok());
510 json.and_then(|json| serde_json::from_value(json).ok())
511 .unwrap_or_else(|| koi_health::HealthSnapshot {
512 machines: Vec::new(),
513 services: Vec::new(),
514 })
515 }
516 }
517 }
518
519 pub async fn add_check(&self, check: HealthCheck) -> Result<(), KoiError> {
520 match &self.backend {
521 HealthBackend::Embedded { runtime } => Ok(runtime.core().add_check(check).await?),
522 HealthBackend::Remote { client } => {
523 let client = Arc::clone(client);
524 let check = check.clone();
525 tokio::task::spawn_blocking(move || {
526 client.health_add_check(
527 &check.name,
528 check.kind,
529 &check.target,
530 check.interval_secs,
531 check.timeout_secs,
532 )
533 })
534 .await
535 .map_err(map_join_error)??;
536 Ok(())
537 }
538 }
539 }
540
541 pub async fn remove_check(&self, name: &str) -> Result<(), KoiError> {
542 match &self.backend {
543 HealthBackend::Embedded { runtime } => Ok(runtime.core().remove_check(name).await?),
544 HealthBackend::Remote { client } => {
545 let client = Arc::clone(client);
546 let name = name.to_string();
547 tokio::task::spawn_blocking(move || client.health_remove_check(&name))
548 .await
549 .map_err(map_join_error)??;
550 Ok(())
551 }
552 }
553 }
554
555 pub async fn start(&self) -> Result<bool, KoiError> {
556 match &self.backend {
557 HealthBackend::Embedded { runtime } => Ok(runtime.start().await?),
558 HealthBackend::Remote { .. } => Ok(false),
559 }
560 }
561
562 pub async fn stop(&self) -> bool {
563 match &self.backend {
564 HealthBackend::Embedded { runtime } => runtime.stop().await,
565 HealthBackend::Remote { .. } => false,
566 }
567 }
568}
569
570pub struct CertmeshHandle {
571 backend: CertmeshBackend,
572}
573
574enum CertmeshBackend {
575 Embedded {
576 core: Arc<koi_certmesh::CertmeshCore>,
577 },
578 Remote {
579 client: Arc<KoiClient>,
580 },
581}
582
583impl CertmeshHandle {
584 fn new_embedded(core: Arc<koi_certmesh::CertmeshCore>) -> Self {
585 Self {
586 backend: CertmeshBackend::Embedded { core },
587 }
588 }
589
590 fn new_remote(client: Arc<KoiClient>) -> Self {
591 Self {
592 backend: CertmeshBackend::Remote { client },
593 }
594 }
595
596 pub fn core(&self) -> Result<Arc<koi_certmesh::CertmeshCore>, KoiError> {
597 match &self.backend {
598 CertmeshBackend::Embedded { core } => Ok(Arc::clone(core)),
599 CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
600 }
601 }
602
603 pub async fn status(&self) -> koi_common::capability::CapabilityStatus {
604 match &self.backend {
605 CertmeshBackend::Embedded { core } => core.status(),
606 CertmeshBackend::Remote { client } => {
607 let client = Arc::clone(client);
608 let json = tokio::task::spawn_blocking(move || client.unified_status())
609 .await
610 .ok()
611 .and_then(|res| res.ok());
612 json.and_then(extract_capability_status)
613 .unwrap_or_else(default_capability_status)
614 }
615 }
616 }
617}
618
619pub struct ProxyHandle {
620 backend: ProxyBackend,
621}
622
623enum ProxyBackend {
624 Embedded { runtime: Arc<ProxyRuntime> },
625 Remote { client: Arc<KoiClient> },
626}
627
628impl ProxyHandle {
629 fn new_embedded(runtime: Arc<ProxyRuntime>) -> Self {
630 Self {
631 backend: ProxyBackend::Embedded { runtime },
632 }
633 }
634
635 fn new_remote(client: Arc<KoiClient>) -> Self {
636 Self {
637 backend: ProxyBackend::Remote { client },
638 }
639 }
640
641 pub fn runtime(&self) -> Result<Arc<ProxyRuntime>, KoiError> {
642 match &self.backend {
643 ProxyBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
644 ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
645 }
646 }
647
648 pub fn core(&self) -> Result<Arc<koi_proxy::ProxyCore>, KoiError> {
649 match &self.backend {
650 ProxyBackend::Embedded { runtime } => Ok(runtime.core()),
651 ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
652 }
653 }
654
655 pub async fn entries(&self) -> Vec<ProxyEntry> {
656 match &self.backend {
657 ProxyBackend::Embedded { runtime } => runtime.core().entries().await,
658 ProxyBackend::Remote { client } => {
659 let client = Arc::clone(client);
660 tokio::task::spawn_blocking(move || client.proxy_list())
661 .await
662 .ok()
663 .and_then(|res| res.ok())
664 .and_then(|json| parse_proxy_entries(json).ok())
665 .unwrap_or_default()
666 }
667 }
668 }
669
670 pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, KoiError> {
671 match &self.backend {
672 ProxyBackend::Embedded { runtime } => Ok(runtime.core().upsert(entry).await?),
673 ProxyBackend::Remote { client } => {
674 let client = Arc::clone(client);
675 let entry = entry.clone();
676 let add_client = Arc::clone(&client);
677 tokio::task::spawn_blocking(move || {
678 add_client.proxy_add(
679 &entry.name,
680 entry.listen_port,
681 &entry.backend,
682 entry.allow_remote,
683 )
684 })
685 .await
686 .map_err(map_join_error)??;
687 let list = tokio::task::spawn_blocking(move || client.proxy_list())
688 .await
689 .map_err(map_join_error)??;
690 parse_proxy_entries(list)
691 }
692 }
693 }
694
695 pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, KoiError> {
696 match &self.backend {
697 ProxyBackend::Embedded { runtime } => Ok(runtime.core().remove(name).await?),
698 ProxyBackend::Remote { client } => {
699 let client = Arc::clone(client);
700 let name = name.to_string();
701 let remove_client = Arc::clone(&client);
702 tokio::task::spawn_blocking(move || remove_client.proxy_remove(&name))
703 .await
704 .map_err(map_join_error)??;
705 let list = tokio::task::spawn_blocking(move || client.proxy_list())
706 .await
707 .map_err(map_join_error)??;
708 parse_proxy_entries(list)
709 }
710 }
711 }
712
713 pub async fn start_all(&self) -> Result<(), KoiError> {
714 match &self.backend {
715 ProxyBackend::Embedded { runtime } => Ok(runtime.start_all().await?),
716 ProxyBackend::Remote { .. } => Ok(()),
717 }
718 }
719
720 pub async fn stop_all(&self) {
721 if let ProxyBackend::Embedded { runtime } = &self.backend {
722 runtime.stop_all().await;
723 }
724 }
725}
726
727fn parse_dns_lookup(json: serde_json::Value) -> Option<DnsLookupResult> {
728 let name = json.get("name").and_then(|v| v.as_str())?.to_string();
729 let source = json
730 .get("source")
731 .and_then(|v| v.as_str())
732 .unwrap_or("unknown")
733 .to_string();
734 let ips = json.get("ips").and_then(|v| v.as_array()).map(|arr| {
735 arr.iter()
736 .filter_map(|ip| ip.as_str())
737 .filter_map(|ip| ip.parse::<IpAddr>().ok())
738 .collect::<Vec<_>>()
739 })?;
740 Some(DnsLookupResult { name, ips, source })
741}
742
743fn parse_dns_entries(json: serde_json::Value) -> Result<Vec<DnsEntry>, KoiError> {
744 let entries = json.get("entries").ok_or_else(|| {
745 KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::other(
746 "missing entries",
747 )))
748 })?;
749 let entries = serde_json::from_value(entries.clone()).map_err(|e| {
750 KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::new(
751 std::io::ErrorKind::InvalidData,
752 e.to_string(),
753 )))
754 })?;
755 Ok(entries)
756}
757
758fn parse_proxy_entries(json: serde_json::Value) -> Result<Vec<ProxyEntry>, KoiError> {
759 let entries = json
760 .get("entries")
761 .ok_or_else(|| KoiError::Proxy(koi_proxy::ProxyError::Io("missing entries".to_string())))?
762 .clone();
763 serde_json::from_value(entries)
764 .map_err(|e| KoiError::Proxy(koi_proxy::ProxyError::Io(e.to_string())))
765}
766
767fn extract_capability_status(
768 json: serde_json::Value,
769) -> Option<koi_common::capability::CapabilityStatus> {
770 let caps = json.get("capabilities")?.as_array()?;
771 for cap in caps {
772 if cap.get("name")?.as_str()? == "certmesh" {
773 let name = cap.get("name")?.as_str()?.to_string();
774 let summary = cap
775 .get("summary")
776 .and_then(|v| v.as_str())
777 .unwrap_or("unknown")
778 .to_string();
779 let healthy = cap
780 .get("healthy")
781 .and_then(|v| v.as_bool())
782 .unwrap_or(false);
783 return Some(koi_common::capability::CapabilityStatus {
784 name,
785 summary,
786 healthy,
787 });
788 }
789 }
790 None
791}
792
793fn default_capability_status() -> koi_common::capability::CapabilityStatus {
794 koi_common::capability::CapabilityStatus {
795 name: "certmesh".to_string(),
796 summary: "unknown".to_string(),
797 healthy: false,
798 }
799}
800
801fn mdns_event_from_pipeline(json: serde_json::Value) -> Option<MdnsEvent> {
802 if let Some(found) = json.get("found") {
803 let record: ServiceRecord = serde_json::from_value(found.clone()).ok()?;
804 return Some(MdnsEvent::Found(record));
805 }
806 if let Some(resolved) = json.get("resolved") {
807 let record: ServiceRecord = serde_json::from_value(resolved.clone()).ok()?;
808 return Some(MdnsEvent::Resolved(record));
809 }
810 if let Some(event) = json.get("event") {
811 let kind: EventKind = serde_json::from_value(event.clone()).ok()?;
812 let service = json
813 .get("service")
814 .cloned()
815 .unwrap_or(serde_json::Value::Null);
816 let record: ServiceRecord = serde_json::from_value(service).ok()?;
817 return match kind {
818 EventKind::Found => Some(MdnsEvent::Found(record)),
819 EventKind::Resolved => Some(MdnsEvent::Resolved(record)),
820 EventKind::Removed => Some(MdnsEvent::Removed {
821 name: record.name,
822 service_type: record.service_type,
823 }),
824 };
825 }
826 None
827}