Skip to main content

koi_embedded/
handle.rs

1use std::net::IpAddr;
2use std::sync::Arc;
3
4use tokio::sync::{broadcast, mpsc, Mutex};
5use tokio::task::JoinHandle;
6use tokio_stream::wrappers::BroadcastStream;
7use tokio_util::sync::CancellationToken;
8
9use koi_client::KoiClient;
10use koi_common::capability::Capability;
11use koi_common::types::{EventKind, ServiceRecord};
12use koi_config::state::{load_dns_state, save_dns_state, DnsEntry};
13use koi_dns::{DnsLookupResult, DnsRuntime};
14use koi_health::{HealthCheck, HealthRuntime};
15use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
16use koi_mdns::{BrowseHandle as MdnsBrowseHandle, MdnsCore, MdnsEvent};
17use koi_proxy::{ProxyEntry, ProxyRuntime};
18
19use crate::{map_join_error, KoiError, KoiEvent};
20
21enum HandleBackend {
22    Embedded {
23        mdns: Option<Arc<MdnsCore>>,
24        dns: Option<Arc<DnsRuntime>>,
25        health: Option<Arc<HealthRuntime>>,
26        certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
27        proxy: Option<Arc<ProxyRuntime>>,
28    },
29    Remote {
30        client: Arc<KoiClient>,
31    },
32}
33
34pub struct KoiHandle {
35    backend: HandleBackend,
36    events: broadcast::Sender<KoiEvent>,
37    cancel: CancellationToken,
38    tasks: Vec<JoinHandle<()>>,
39}
40
41impl KoiHandle {
42    #[allow(clippy::too_many_arguments)]
43    pub(crate) fn new_embedded(
44        mdns: Option<Arc<MdnsCore>>,
45        dns: Option<Arc<DnsRuntime>>,
46        health: Option<Arc<HealthRuntime>>,
47        certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
48        proxy: Option<Arc<ProxyRuntime>>,
49        events: broadcast::Sender<KoiEvent>,
50        cancel: CancellationToken,
51        tasks: Vec<JoinHandle<()>>,
52    ) -> Self {
53        Self {
54            backend: HandleBackend::Embedded {
55                mdns,
56                dns,
57                health,
58                certmesh,
59                proxy,
60            },
61            events,
62            cancel,
63            tasks,
64        }
65    }
66
67    pub(crate) fn new_remote(
68        client: Arc<KoiClient>,
69        events: broadcast::Sender<KoiEvent>,
70        cancel: CancellationToken,
71        tasks: Vec<JoinHandle<()>>,
72    ) -> Self {
73        Self {
74            backend: HandleBackend::Remote { client },
75            events,
76            cancel,
77            tasks,
78        }
79    }
80
81    pub fn events(&self) -> BroadcastStream<KoiEvent> {
82        BroadcastStream::new(self.events.subscribe())
83    }
84
85    pub fn subscribe(&self) -> broadcast::Receiver<KoiEvent> {
86        self.events.subscribe()
87    }
88
89    pub fn mdns(&self) -> Result<MdnsHandle, KoiError> {
90        match &self.backend {
91            HandleBackend::Embedded { mdns, .. } => {
92                let core = mdns.as_ref().ok_or(KoiError::DisabledCapability("mdns"))?;
93                Ok(MdnsHandle::new_embedded(
94                    Arc::clone(core),
95                    self.events.clone(),
96                ))
97            }
98            HandleBackend::Remote { client } => Ok(MdnsHandle::new_remote(
99                Arc::clone(client),
100                self.events.clone(),
101            )),
102        }
103    }
104
105    pub fn dns(&self) -> Result<DnsHandle, KoiError> {
106        match &self.backend {
107            HandleBackend::Embedded { dns, .. } => {
108                let runtime = dns.as_ref().ok_or(KoiError::DisabledCapability("dns"))?;
109                Ok(DnsHandle::new_embedded(Arc::clone(runtime)))
110            }
111            HandleBackend::Remote { client } => Ok(DnsHandle::new_remote(Arc::clone(client))),
112        }
113    }
114
115    pub fn health(&self) -> Result<HealthHandle, KoiError> {
116        match &self.backend {
117            HandleBackend::Embedded { health, .. } => {
118                let runtime = health
119                    .as_ref()
120                    .ok_or(KoiError::DisabledCapability("health"))?;
121                Ok(HealthHandle::new_embedded(Arc::clone(runtime)))
122            }
123            HandleBackend::Remote { client } => Ok(HealthHandle::new_remote(Arc::clone(client))),
124        }
125    }
126
127    pub fn certmesh(&self) -> Result<CertmeshHandle, KoiError> {
128        match &self.backend {
129            HandleBackend::Embedded { certmesh, .. } => {
130                let core = certmesh
131                    .as_ref()
132                    .ok_or(KoiError::DisabledCapability("certmesh"))?;
133                Ok(CertmeshHandle::new_embedded(Arc::clone(core)))
134            }
135            HandleBackend::Remote { client } => Ok(CertmeshHandle::new_remote(Arc::clone(client))),
136        }
137    }
138
139    pub fn proxy(&self) -> Result<ProxyHandle, KoiError> {
140        match &self.backend {
141            HandleBackend::Embedded { proxy, .. } => {
142                let runtime = proxy
143                    .as_ref()
144                    .ok_or(KoiError::DisabledCapability("proxy"))?;
145                Ok(ProxyHandle::new_embedded(Arc::clone(runtime)))
146            }
147            HandleBackend::Remote { client } => Ok(ProxyHandle::new_remote(Arc::clone(client))),
148        }
149    }
150
151    pub async fn shutdown(mut self) -> Result<(), KoiError> {
152        self.cancel.cancel();
153        for task in self.tasks.drain(..) {
154            let _ = task.await;
155        }
156
157        if let HandleBackend::Embedded {
158            mdns,
159            dns,
160            health,
161            proxy,
162            ..
163        } = &self.backend
164        {
165            if let Some(runtime) = proxy {
166                runtime.stop_all().await;
167            }
168            if let Some(runtime) = health {
169                let _ = runtime.stop().await;
170            }
171            if let Some(runtime) = dns {
172                let _ = runtime.stop().await;
173            }
174            if let Some(core) = mdns {
175                core.shutdown().await?;
176            }
177        }
178
179        Ok(())
180    }
181}
182
183pub struct KoiBrowseHandle {
184    backend: BrowseBackend,
185}
186
187enum BrowseBackend {
188    Embedded(MdnsBrowseHandle),
189    Remote(Mutex<mpsc::Receiver<MdnsEvent>>),
190}
191
192impl KoiBrowseHandle {
193    fn embedded(handle: MdnsBrowseHandle) -> Self {
194        Self {
195            backend: BrowseBackend::Embedded(handle),
196        }
197    }
198
199    fn remote(rx: mpsc::Receiver<MdnsEvent>) -> Self {
200        Self {
201            backend: BrowseBackend::Remote(Mutex::new(rx)),
202        }
203    }
204
205    pub async fn recv(&self) -> Option<MdnsEvent> {
206        match &self.backend {
207            BrowseBackend::Embedded(handle) => handle.recv().await,
208            BrowseBackend::Remote(rx) => rx.lock().await.recv().await,
209        }
210    }
211}
212
213pub struct MdnsHandle {
214    backend: MdnsBackend,
215    events: broadcast::Sender<KoiEvent>,
216}
217
218enum MdnsBackend {
219    Embedded { core: Arc<MdnsCore> },
220    Remote { client: Arc<KoiClient> },
221}
222
223impl MdnsHandle {
224    fn new_embedded(core: Arc<MdnsCore>, events: broadcast::Sender<KoiEvent>) -> Self {
225        Self {
226            backend: MdnsBackend::Embedded { core },
227            events,
228        }
229    }
230
231    fn new_remote(client: Arc<KoiClient>, events: broadcast::Sender<KoiEvent>) -> Self {
232        Self {
233            backend: MdnsBackend::Remote { client },
234            events,
235        }
236    }
237
238    pub fn core(&self) -> Result<Arc<MdnsCore>, KoiError> {
239        match &self.backend {
240            MdnsBackend::Embedded { core } => Ok(Arc::clone(core)),
241            MdnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("mdns")),
242        }
243    }
244
245    pub async fn browse(&self, service_type: &str) -> Result<KoiBrowseHandle, KoiError> {
246        match &self.backend {
247            MdnsBackend::Embedded { core } => {
248                let handle = core.browse(service_type).await?;
249                Ok(KoiBrowseHandle::embedded(handle))
250            }
251            MdnsBackend::Remote { client } => {
252                let (tx, rx) = mpsc::channel(64);
253                let client = Arc::clone(client);
254                let service_type = service_type.to_string();
255                tokio::task::spawn_blocking(move || {
256                    let stream = match client.browse_stream(&service_type) {
257                        Ok(stream) => stream,
258                        Err(_) => return,
259                    };
260                    for item in stream {
261                        let Ok(json) = item else {
262                            break;
263                        };
264                        if let Some(event) = mdns_event_from_pipeline(json) {
265                            if tx.blocking_send(event).is_err() {
266                                break;
267                            }
268                        }
269                    }
270                });
271                Ok(KoiBrowseHandle::remote(rx))
272            }
273        }
274    }
275
276    pub async fn resolve(&self, name: &str) -> Result<ServiceRecord, KoiError> {
277        match &self.backend {
278            MdnsBackend::Embedded { core } => Ok(core.resolve(name).await?),
279            MdnsBackend::Remote { client } => {
280                let name = name.to_string();
281                let client = Arc::clone(client);
282                let record = tokio::task::spawn_blocking(move || client.resolve(&name))
283                    .await
284                    .map_err(map_join_error)??;
285                Ok(record)
286            }
287        }
288    }
289
290    pub fn register(&self, payload: RegisterPayload) -> Result<RegistrationResult, KoiError> {
291        match &self.backend {
292            MdnsBackend::Embedded { core } => Ok(core.register(payload)?),
293            MdnsBackend::Remote { client } => Ok(client.register(&payload)?),
294        }
295    }
296
297    pub fn unregister(&self, id: &str) -> Result<(), KoiError> {
298        match &self.backend {
299            MdnsBackend::Embedded { core } => Ok(core.unregister(id)?),
300            MdnsBackend::Remote { client } => Ok(client.unregister(id)?),
301        }
302    }
303
304    pub fn subscribe(&self) -> broadcast::Receiver<MdnsEvent> {
305        match &self.backend {
306            MdnsBackend::Embedded { core } => core.subscribe(),
307            MdnsBackend::Remote { .. } => {
308                let (_tx, rx) = broadcast::channel(1);
309                rx
310            }
311        }
312    }
313
314    pub fn emit_event(&self, event: KoiEvent) {
315        let _ = self.events.send(event);
316    }
317}
318
319pub struct DnsHandle {
320    backend: DnsBackend,
321}
322
323enum DnsBackend {
324    Embedded { runtime: Arc<DnsRuntime> },
325    Remote { client: Arc<KoiClient> },
326}
327
328impl DnsHandle {
329    fn new_embedded(runtime: Arc<DnsRuntime>) -> Self {
330        Self {
331            backend: DnsBackend::Embedded { runtime },
332        }
333    }
334
335    fn new_remote(client: Arc<KoiClient>) -> Self {
336        Self {
337            backend: DnsBackend::Remote { client },
338        }
339    }
340
341    pub fn runtime(&self) -> Result<Arc<DnsRuntime>, KoiError> {
342        match &self.backend {
343            DnsBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
344            DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
345        }
346    }
347
348    pub fn core(&self) -> Result<Arc<koi_dns::DnsCore>, KoiError> {
349        match &self.backend {
350            DnsBackend::Embedded { runtime } => Ok(runtime.core()),
351            DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
352        }
353    }
354
355    pub async fn lookup(
356        &self,
357        name: &str,
358        record_type: hickory_proto::rr::RecordType,
359    ) -> Option<DnsLookupResult> {
360        match &self.backend {
361            DnsBackend::Embedded { runtime } => runtime.core().lookup(name, record_type).await,
362            DnsBackend::Remote { client } => {
363                let name = name.to_string();
364                let client = Arc::clone(client);
365                let result =
366                    tokio::task::spawn_blocking(move || client.dns_lookup(&name, record_type))
367                        .await
368                        .ok()
369                        .and_then(|res| res.ok());
370                let json = match result {
371                    Some(json) => json,
372                    None => return None,
373                };
374                parse_dns_lookup(json)
375            }
376        }
377    }
378
379    pub fn list_names(&self) -> Vec<String> {
380        match &self.backend {
381            DnsBackend::Embedded { runtime } => runtime.core().list_names(),
382            DnsBackend::Remote { client } => {
383                let result = client.dns_list();
384                let Ok(json) = result else {
385                    return Vec::new();
386                };
387                json.get("names")
388                    .and_then(|v| v.as_array())
389                    .map(|arr| {
390                        arr.iter()
391                            .filter_map(|name| name.as_str().map(|s| s.to_string()))
392                            .collect()
393                    })
394                    .unwrap_or_default()
395            }
396        }
397    }
398
399    pub async fn start(&self) -> Result<bool, KoiError> {
400        match &self.backend {
401            DnsBackend::Embedded { runtime } => Ok(runtime.start().await?),
402            DnsBackend::Remote { client } => {
403                let client = Arc::clone(client);
404                let started = tokio::task::spawn_blocking(move || client.dns_start())
405                    .await
406                    .map_err(map_join_error)??
407                    .get("started")
408                    .and_then(|v| v.as_bool())
409                    .unwrap_or(false);
410                Ok(started)
411            }
412        }
413    }
414
415    pub async fn stop(&self) -> bool {
416        match &self.backend {
417            DnsBackend::Embedded { runtime } => runtime.stop().await,
418            DnsBackend::Remote { client } => {
419                let client = Arc::clone(client);
420                tokio::task::spawn_blocking(move || client.dns_stop())
421                    .await
422                    .ok()
423                    .and_then(|res| res.ok())
424                    .and_then(|json| json.get("stopped").and_then(|v| v.as_bool()))
425                    .unwrap_or(false)
426            }
427        }
428    }
429
430    pub fn add_entry(&self, entry: DnsEntry) -> Result<Vec<DnsEntry>, KoiError> {
431        match &self.backend {
432            DnsBackend::Embedded { runtime } => {
433                let mut state = load_dns_state().unwrap_or_default();
434                if let Some(existing) = state.entries.iter_mut().find(|e| e.name == entry.name) {
435                    *existing = entry.clone();
436                } else {
437                    state.entries.push(entry.clone());
438                }
439                save_dns_state(&state)?;
440                runtime.core().emit(koi_dns::DnsEvent::EntryUpdated {
441                    name: entry.name,
442                    ip: entry.ip,
443                });
444                Ok(state.entries)
445            }
446            DnsBackend::Remote { client } => {
447                let json = client.dns_add(&entry.name, &entry.ip, entry.ttl)?;
448                parse_dns_entries(json)
449            }
450        }
451    }
452
453    pub fn remove_entry(&self, name: &str) -> Result<Vec<DnsEntry>, KoiError> {
454        match &self.backend {
455            DnsBackend::Embedded { runtime } => {
456                let mut state = load_dns_state().unwrap_or_default();
457                state.entries.retain(|entry| entry.name != name);
458                save_dns_state(&state)?;
459                runtime.core().emit(koi_dns::DnsEvent::EntryRemoved {
460                    name: name.to_string(),
461                });
462                Ok(state.entries)
463            }
464            DnsBackend::Remote { client } => {
465                let json = client.dns_remove(name)?;
466                parse_dns_entries(json)
467            }
468        }
469    }
470}
471
472pub struct HealthHandle {
473    backend: HealthBackend,
474}
475
476enum HealthBackend {
477    Embedded { runtime: Arc<HealthRuntime> },
478    Remote { client: Arc<KoiClient> },
479}
480
481impl HealthHandle {
482    fn new_embedded(runtime: Arc<HealthRuntime>) -> Self {
483        Self {
484            backend: HealthBackend::Embedded { runtime },
485        }
486    }
487
488    fn new_remote(client: Arc<KoiClient>) -> Self {
489        Self {
490            backend: HealthBackend::Remote { client },
491        }
492    }
493
494    pub fn core(&self) -> Result<Arc<koi_health::HealthCore>, KoiError> {
495        match &self.backend {
496            HealthBackend::Embedded { runtime } => Ok(runtime.core()),
497            HealthBackend::Remote { .. } => Err(KoiError::DisabledCapability("health")),
498        }
499    }
500
501    pub async fn status(&self) -> koi_health::HealthSnapshot {
502        match &self.backend {
503            HealthBackend::Embedded { runtime } => runtime.core().snapshot().await,
504            HealthBackend::Remote { client } => {
505                let client = Arc::clone(client);
506                let json = tokio::task::spawn_blocking(move || client.health_status())
507                    .await
508                    .ok()
509                    .and_then(|res| res.ok());
510                json.and_then(|json| serde_json::from_value(json).ok())
511                    .unwrap_or_else(|| koi_health::HealthSnapshot {
512                        machines: Vec::new(),
513                        services: Vec::new(),
514                    })
515            }
516        }
517    }
518
519    pub async fn add_check(&self, check: HealthCheck) -> Result<(), KoiError> {
520        match &self.backend {
521            HealthBackend::Embedded { runtime } => Ok(runtime.core().add_check(check).await?),
522            HealthBackend::Remote { client } => {
523                let client = Arc::clone(client);
524                let check = check.clone();
525                tokio::task::spawn_blocking(move || {
526                    client.health_add_check(
527                        &check.name,
528                        check.kind,
529                        &check.target,
530                        check.interval_secs,
531                        check.timeout_secs,
532                    )
533                })
534                .await
535                .map_err(map_join_error)??;
536                Ok(())
537            }
538        }
539    }
540
541    pub async fn remove_check(&self, name: &str) -> Result<(), KoiError> {
542        match &self.backend {
543            HealthBackend::Embedded { runtime } => Ok(runtime.core().remove_check(name).await?),
544            HealthBackend::Remote { client } => {
545                let client = Arc::clone(client);
546                let name = name.to_string();
547                tokio::task::spawn_blocking(move || client.health_remove_check(&name))
548                    .await
549                    .map_err(map_join_error)??;
550                Ok(())
551            }
552        }
553    }
554
555    pub async fn start(&self) -> Result<bool, KoiError> {
556        match &self.backend {
557            HealthBackend::Embedded { runtime } => Ok(runtime.start().await?),
558            HealthBackend::Remote { .. } => Ok(false),
559        }
560    }
561
562    pub async fn stop(&self) -> bool {
563        match &self.backend {
564            HealthBackend::Embedded { runtime } => runtime.stop().await,
565            HealthBackend::Remote { .. } => false,
566        }
567    }
568}
569
570pub struct CertmeshHandle {
571    backend: CertmeshBackend,
572}
573
574enum CertmeshBackend {
575    Embedded {
576        core: Arc<koi_certmesh::CertmeshCore>,
577    },
578    Remote {
579        client: Arc<KoiClient>,
580    },
581}
582
583impl CertmeshHandle {
584    fn new_embedded(core: Arc<koi_certmesh::CertmeshCore>) -> Self {
585        Self {
586            backend: CertmeshBackend::Embedded { core },
587        }
588    }
589
590    fn new_remote(client: Arc<KoiClient>) -> Self {
591        Self {
592            backend: CertmeshBackend::Remote { client },
593        }
594    }
595
596    pub fn core(&self) -> Result<Arc<koi_certmesh::CertmeshCore>, KoiError> {
597        match &self.backend {
598            CertmeshBackend::Embedded { core } => Ok(Arc::clone(core)),
599            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
600        }
601    }
602
603    pub async fn status(&self) -> koi_common::capability::CapabilityStatus {
604        match &self.backend {
605            CertmeshBackend::Embedded { core } => core.status(),
606            CertmeshBackend::Remote { client } => {
607                let client = Arc::clone(client);
608                let json = tokio::task::spawn_blocking(move || client.unified_status())
609                    .await
610                    .ok()
611                    .and_then(|res| res.ok());
612                json.and_then(extract_capability_status)
613                    .unwrap_or_else(default_capability_status)
614            }
615        }
616    }
617}
618
619pub struct ProxyHandle {
620    backend: ProxyBackend,
621}
622
623enum ProxyBackend {
624    Embedded { runtime: Arc<ProxyRuntime> },
625    Remote { client: Arc<KoiClient> },
626}
627
628impl ProxyHandle {
629    fn new_embedded(runtime: Arc<ProxyRuntime>) -> Self {
630        Self {
631            backend: ProxyBackend::Embedded { runtime },
632        }
633    }
634
635    fn new_remote(client: Arc<KoiClient>) -> Self {
636        Self {
637            backend: ProxyBackend::Remote { client },
638        }
639    }
640
641    pub fn runtime(&self) -> Result<Arc<ProxyRuntime>, KoiError> {
642        match &self.backend {
643            ProxyBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
644            ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
645        }
646    }
647
648    pub fn core(&self) -> Result<Arc<koi_proxy::ProxyCore>, KoiError> {
649        match &self.backend {
650            ProxyBackend::Embedded { runtime } => Ok(runtime.core()),
651            ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
652        }
653    }
654
655    pub async fn entries(&self) -> Vec<ProxyEntry> {
656        match &self.backend {
657            ProxyBackend::Embedded { runtime } => runtime.core().entries().await,
658            ProxyBackend::Remote { client } => {
659                let client = Arc::clone(client);
660                tokio::task::spawn_blocking(move || client.proxy_list())
661                    .await
662                    .ok()
663                    .and_then(|res| res.ok())
664                    .and_then(|json| parse_proxy_entries(json).ok())
665                    .unwrap_or_default()
666            }
667        }
668    }
669
670    pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, KoiError> {
671        match &self.backend {
672            ProxyBackend::Embedded { runtime } => Ok(runtime.core().upsert(entry).await?),
673            ProxyBackend::Remote { client } => {
674                let client = Arc::clone(client);
675                let entry = entry.clone();
676                let add_client = Arc::clone(&client);
677                tokio::task::spawn_blocking(move || {
678                    add_client.proxy_add(
679                        &entry.name,
680                        entry.listen_port,
681                        &entry.backend,
682                        entry.allow_remote,
683                    )
684                })
685                .await
686                .map_err(map_join_error)??;
687                let list = tokio::task::spawn_blocking(move || client.proxy_list())
688                    .await
689                    .map_err(map_join_error)??;
690                parse_proxy_entries(list)
691            }
692        }
693    }
694
695    pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, KoiError> {
696        match &self.backend {
697            ProxyBackend::Embedded { runtime } => Ok(runtime.core().remove(name).await?),
698            ProxyBackend::Remote { client } => {
699                let client = Arc::clone(client);
700                let name = name.to_string();
701                let remove_client = Arc::clone(&client);
702                tokio::task::spawn_blocking(move || remove_client.proxy_remove(&name))
703                    .await
704                    .map_err(map_join_error)??;
705                let list = tokio::task::spawn_blocking(move || client.proxy_list())
706                    .await
707                    .map_err(map_join_error)??;
708                parse_proxy_entries(list)
709            }
710        }
711    }
712
713    pub async fn start_all(&self) -> Result<(), KoiError> {
714        match &self.backend {
715            ProxyBackend::Embedded { runtime } => Ok(runtime.start_all().await?),
716            ProxyBackend::Remote { .. } => Ok(()),
717        }
718    }
719
720    pub async fn stop_all(&self) {
721        if let ProxyBackend::Embedded { runtime } = &self.backend {
722            runtime.stop_all().await;
723        }
724    }
725}
726
727fn parse_dns_lookup(json: serde_json::Value) -> Option<DnsLookupResult> {
728    let name = json.get("name").and_then(|v| v.as_str())?.to_string();
729    let source = json
730        .get("source")
731        .and_then(|v| v.as_str())
732        .unwrap_or("unknown")
733        .to_string();
734    let ips = json.get("ips").and_then(|v| v.as_array()).map(|arr| {
735        arr.iter()
736            .filter_map(|ip| ip.as_str())
737            .filter_map(|ip| ip.parse::<IpAddr>().ok())
738            .collect::<Vec<_>>()
739    })?;
740    Some(DnsLookupResult { name, ips, source })
741}
742
743fn parse_dns_entries(json: serde_json::Value) -> Result<Vec<DnsEntry>, KoiError> {
744    let entries = json.get("entries").ok_or_else(|| {
745        KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::other(
746            "missing entries",
747        )))
748    })?;
749    let entries = serde_json::from_value(entries.clone()).map_err(|e| {
750        KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::new(
751            std::io::ErrorKind::InvalidData,
752            e.to_string(),
753        )))
754    })?;
755    Ok(entries)
756}
757
758fn parse_proxy_entries(json: serde_json::Value) -> Result<Vec<ProxyEntry>, KoiError> {
759    let entries = json
760        .get("entries")
761        .ok_or_else(|| KoiError::Proxy(koi_proxy::ProxyError::Io("missing entries".to_string())))?
762        .clone();
763    serde_json::from_value(entries)
764        .map_err(|e| KoiError::Proxy(koi_proxy::ProxyError::Io(e.to_string())))
765}
766
767fn extract_capability_status(
768    json: serde_json::Value,
769) -> Option<koi_common::capability::CapabilityStatus> {
770    let caps = json.get("capabilities")?.as_array()?;
771    for cap in caps {
772        if cap.get("name")?.as_str()? == "certmesh" {
773            let name = cap.get("name")?.as_str()?.to_string();
774            let summary = cap
775                .get("summary")
776                .and_then(|v| v.as_str())
777                .unwrap_or("unknown")
778                .to_string();
779            let healthy = cap
780                .get("healthy")
781                .and_then(|v| v.as_bool())
782                .unwrap_or(false);
783            return Some(koi_common::capability::CapabilityStatus {
784                name,
785                summary,
786                healthy,
787            });
788        }
789    }
790    None
791}
792
793fn default_capability_status() -> koi_common::capability::CapabilityStatus {
794    koi_common::capability::CapabilityStatus {
795        name: "certmesh".to_string(),
796        summary: "unknown".to_string(),
797        healthy: false,
798    }
799}
800
801fn mdns_event_from_pipeline(json: serde_json::Value) -> Option<MdnsEvent> {
802    if let Some(found) = json.get("found") {
803        let record: ServiceRecord = serde_json::from_value(found.clone()).ok()?;
804        return Some(MdnsEvent::Found(record));
805    }
806    if let Some(resolved) = json.get("resolved") {
807        let record: ServiceRecord = serde_json::from_value(resolved.clone()).ok()?;
808        return Some(MdnsEvent::Resolved(record));
809    }
810    if let Some(event) = json.get("event") {
811        let kind: EventKind = serde_json::from_value(event.clone()).ok()?;
812        let service = json
813            .get("service")
814            .cloned()
815            .unwrap_or(serde_json::Value::Null);
816        let record: ServiceRecord = serde_json::from_value(service).ok()?;
817        return match kind {
818            EventKind::Found => Some(MdnsEvent::Found(record)),
819            EventKind::Resolved => Some(MdnsEvent::Resolved(record)),
820            EventKind::Removed => Some(MdnsEvent::Removed {
821                name: record.name,
822                service_type: record.service_type,
823            }),
824        };
825    }
826    None
827}