Skip to main content

koi_embedded/
handle.rs

1use std::net::IpAddr;
2use std::sync::Arc;
3
4use tokio::sync::{broadcast, mpsc, Mutex};
5use tokio::task::JoinHandle;
6use tokio_stream::wrappers::BroadcastStream;
7use tokio_util::sync::CancellationToken;
8
9use koi_client::KoiClient;
10use koi_common::capability::Capability;
11use koi_common::types::{EventKind, ServiceRecord};
12use koi_config::state::DnsEntry;
13use koi_dns::{DnsLookupResult, DnsRuntime};
14use koi_health::{HealthCheck, HealthRuntime};
15use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
16use koi_mdns::{BrowseSubscription as MdnsBrowseHandle, MdnsCore, MdnsEvent};
17use koi_proxy::{ProxyEntry, ProxyRuntime};
18
19use crate::{map_join_error, KoiError, KoiEvent};
20
21enum HandleBackend {
22    Embedded {
23        mdns: Option<Arc<MdnsCore>>,
24        dns: Option<Arc<DnsRuntime>>,
25        health: Option<Arc<HealthRuntime>>,
26        certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
27        proxy: Option<Arc<ProxyRuntime>>,
28        udp: Option<Arc<koi_udp::UdpRuntime>>,
29        runtime: Option<Arc<koi_runtime::RuntimeCore>>,
30    },
31    Remote {
32        client: Arc<KoiClient>,
33    },
34}
35
36pub struct KoiHandle {
37    backend: HandleBackend,
38    data_dir: Option<std::path::PathBuf>,
39    events: broadcast::Sender<KoiEvent>,
40    cancel: CancellationToken,
41    tasks: Vec<JoinHandle<()>>,
42    http_announce_id: Option<String>,
43}
44
45impl KoiHandle {
46    #[allow(clippy::too_many_arguments)]
47    pub(crate) fn new_embedded(
48        mdns: Option<Arc<MdnsCore>>,
49        dns: Option<Arc<DnsRuntime>>,
50        health: Option<Arc<HealthRuntime>>,
51        certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
52        proxy: Option<Arc<ProxyRuntime>>,
53        udp: Option<Arc<koi_udp::UdpRuntime>>,
54        runtime: Option<Arc<koi_runtime::RuntimeCore>>,
55        data_dir: Option<std::path::PathBuf>,
56        events: broadcast::Sender<KoiEvent>,
57        cancel: CancellationToken,
58        tasks: Vec<JoinHandle<()>>,
59        http_announce_id: Option<String>,
60    ) -> Self {
61        Self {
62            backend: HandleBackend::Embedded {
63                mdns,
64                dns,
65                health,
66                certmesh,
67                proxy,
68                udp,
69                runtime,
70            },
71            data_dir,
72            events,
73            cancel,
74            tasks,
75            http_announce_id,
76        }
77    }
78
79    pub(crate) fn new_remote(
80        client: Arc<KoiClient>,
81        events: broadcast::Sender<KoiEvent>,
82        cancel: CancellationToken,
83        tasks: Vec<JoinHandle<()>>,
84    ) -> Self {
85        Self {
86            backend: HandleBackend::Remote { client },
87            data_dir: None,
88            events,
89            cancel,
90            tasks,
91            http_announce_id: None,
92        }
93    }
94
95    pub fn events(&self) -> BroadcastStream<KoiEvent> {
96        BroadcastStream::new(self.events.subscribe())
97    }
98
99    pub fn subscribe(&self) -> broadcast::Receiver<KoiEvent> {
100        self.events.subscribe()
101    }
102
103    pub fn mdns(&self) -> Result<MdnsHandle, KoiError> {
104        match &self.backend {
105            HandleBackend::Embedded { mdns, .. } => {
106                let core = mdns.as_ref().ok_or(KoiError::DisabledCapability("mdns"))?;
107                Ok(MdnsHandle::new_embedded(
108                    Arc::clone(core),
109                    self.events.clone(),
110                ))
111            }
112            HandleBackend::Remote { client } => Ok(MdnsHandle::new_remote(
113                Arc::clone(client),
114                self.events.clone(),
115            )),
116        }
117    }
118
119    pub fn dns(&self) -> Result<DnsHandle, KoiError> {
120        match &self.backend {
121            HandleBackend::Embedded { dns, .. } => {
122                let runtime = dns.as_ref().ok_or(KoiError::DisabledCapability("dns"))?;
123                Ok(DnsHandle::new_embedded(Arc::clone(runtime)))
124            }
125            HandleBackend::Remote { client } => Ok(DnsHandle::new_remote(Arc::clone(client))),
126        }
127    }
128
129    pub fn health(&self) -> Result<HealthHandle, KoiError> {
130        match &self.backend {
131            HandleBackend::Embedded { health, .. } => {
132                let runtime = health
133                    .as_ref()
134                    .ok_or(KoiError::DisabledCapability("health"))?;
135                Ok(HealthHandle::new_embedded(Arc::clone(runtime)))
136            }
137            HandleBackend::Remote { client } => Ok(HealthHandle::new_remote(Arc::clone(client))),
138        }
139    }
140
141    pub fn certmesh(&self) -> Result<CertmeshHandle, KoiError> {
142        match &self.backend {
143            HandleBackend::Embedded { certmesh, .. } => {
144                let core = certmesh
145                    .as_ref()
146                    .ok_or(KoiError::DisabledCapability("certmesh"))?;
147                Ok(CertmeshHandle::new_embedded(Arc::clone(core)))
148            }
149            HandleBackend::Remote { client } => Ok(CertmeshHandle::new_remote(Arc::clone(client))),
150        }
151    }
152
153    /// Open the encrypted key-value vault for general-purpose secret storage.
154    ///
155    /// The vault uses platform credential binding (keyring) when available,
156    /// with a machine-bound fallback. Each call opens a fresh handle sharing
157    /// the same on-disk state.
158    pub fn vault(&self) -> Result<koi_crypto::vault::Vault, KoiError> {
159        let dir = self
160            .data_dir
161            .as_ref()
162            .ok_or(KoiError::DisabledCapability("vault (no data_dir)"))?;
163        koi_crypto::vault::Vault::open(dir)
164            .map_err(|e| KoiError::Io(std::io::Error::other(e.to_string())))
165    }
166
167    pub fn proxy(&self) -> Result<ProxyHandle, KoiError> {
168        match &self.backend {
169            HandleBackend::Embedded { proxy, .. } => {
170                let runtime = proxy
171                    .as_ref()
172                    .ok_or(KoiError::DisabledCapability("proxy"))?;
173                Ok(ProxyHandle::new_embedded(Arc::clone(runtime)))
174            }
175            HandleBackend::Remote { client } => Ok(ProxyHandle::new_remote(Arc::clone(client))),
176        }
177    }
178
179    /// Get the UDP runtime handle.
180    ///
181    /// Only available in embedded mode — remote mode does not support UDP bridging
182    /// (the remote daemon itself handles bindings).
183    pub fn udp(&self) -> Result<Arc<koi_udp::UdpRuntime>, KoiError> {
184        match &self.backend {
185            HandleBackend::Embedded { udp, .. } => {
186                let runtime = udp.as_ref().ok_or(KoiError::DisabledCapability("udp"))?;
187                Ok(Arc::clone(runtime))
188            }
189            HandleBackend::Remote { .. } => Err(KoiError::DisabledCapability("udp (remote mode)")),
190        }
191    }
192
193    /// Get the runtime adapter core.
194    ///
195    /// Only available in embedded mode when runtime is enabled.
196    pub fn runtime(&self) -> Result<Arc<koi_runtime::RuntimeCore>, KoiError> {
197        match &self.backend {
198            HandleBackend::Embedded { runtime, .. } => {
199                let core = runtime
200                    .as_ref()
201                    .ok_or(KoiError::DisabledCapability("runtime"))?;
202                Ok(Arc::clone(core))
203            }
204            HandleBackend::Remote { .. } => {
205                Err(KoiError::DisabledCapability("runtime (remote mode)"))
206            }
207        }
208    }
209
210    pub async fn shutdown(mut self) -> Result<(), KoiError> {
211        self.cancel.cancel();
212        for task in self.tasks.drain(..) {
213            let _ = task.await;
214        }
215
216        if let HandleBackend::Embedded {
217            mdns,
218            dns,
219            health,
220            proxy,
221            ..
222        } = &self.backend
223        {
224            if let Some(runtime) = proxy {
225                runtime.stop_all().await;
226            }
227            if let Some(runtime) = health {
228                let _ = runtime.stop().await;
229            }
230            if let Some(runtime) = dns {
231                let _ = runtime.stop().await;
232            }
233            if let Some(id) = &self.http_announce_id {
234                if let Some(core) = mdns {
235                    if let Err(e) = core.unregister(id) {
236                        tracing::warn!(error = %e, "Failed to withdraw HTTP mDNS announcement");
237                    }
238                }
239            }
240            if let Some(core) = mdns {
241                core.shutdown().await?;
242            }
243        }
244
245        Ok(())
246    }
247}
248
249pub struct KoiBrowseHandle {
250    backend: BrowseBackend,
251}
252
253enum BrowseBackend {
254    Embedded(MdnsBrowseHandle),
255    Remote(Mutex<mpsc::Receiver<MdnsEvent>>),
256}
257
258impl KoiBrowseHandle {
259    fn embedded(handle: MdnsBrowseHandle) -> Self {
260        Self {
261            backend: BrowseBackend::Embedded(handle),
262        }
263    }
264
265    fn remote(rx: mpsc::Receiver<MdnsEvent>) -> Self {
266        Self {
267            backend: BrowseBackend::Remote(Mutex::new(rx)),
268        }
269    }
270
271    pub async fn recv(&self) -> Option<MdnsEvent> {
272        match &self.backend {
273            BrowseBackend::Embedded(handle) => handle.recv().await,
274            BrowseBackend::Remote(rx) => rx.lock().await.recv().await,
275        }
276    }
277}
278
279pub struct MdnsHandle {
280    backend: MdnsBackend,
281    events: broadcast::Sender<KoiEvent>,
282}
283
284enum MdnsBackend {
285    Embedded { core: Arc<MdnsCore> },
286    Remote { client: Arc<KoiClient> },
287}
288
289impl MdnsHandle {
290    fn new_embedded(core: Arc<MdnsCore>, events: broadcast::Sender<KoiEvent>) -> Self {
291        Self {
292            backend: MdnsBackend::Embedded { core },
293            events,
294        }
295    }
296
297    fn new_remote(client: Arc<KoiClient>, events: broadcast::Sender<KoiEvent>) -> Self {
298        Self {
299            backend: MdnsBackend::Remote { client },
300            events,
301        }
302    }
303
304    pub fn core(&self) -> Result<Arc<MdnsCore>, KoiError> {
305        match &self.backend {
306            MdnsBackend::Embedded { core } => Ok(Arc::clone(core)),
307            MdnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("mdns")),
308        }
309    }
310
311    pub async fn browse(&self, service_type: &str) -> Result<KoiBrowseHandle, KoiError> {
312        match &self.backend {
313            MdnsBackend::Embedded { core } => {
314                let handle = core.subscribe_type(service_type).await?;
315                Ok(KoiBrowseHandle::embedded(handle))
316            }
317            MdnsBackend::Remote { client } => {
318                let (tx, rx) = mpsc::channel(64);
319                let client = Arc::clone(client);
320                let service_type = service_type.to_string();
321                tokio::task::spawn_blocking(move || {
322                    let stream = match client.browse_stream(&service_type) {
323                        Ok(stream) => stream,
324                        Err(_) => return,
325                    };
326                    for item in stream {
327                        let Ok(json) = item else {
328                            break;
329                        };
330                        if let Some(event) = mdns_event_from_pipeline(json) {
331                            if tx.blocking_send(event).is_err() {
332                                break;
333                            }
334                        }
335                    }
336                });
337                Ok(KoiBrowseHandle::remote(rx))
338            }
339        }
340    }
341
342    pub async fn resolve(&self, name: &str) -> Result<ServiceRecord, KoiError> {
343        match &self.backend {
344            MdnsBackend::Embedded { core } => Ok(core.resolve(name).await?),
345            MdnsBackend::Remote { client } => {
346                let name = name.to_string();
347                let client = Arc::clone(client);
348                let record = tokio::task::spawn_blocking(move || client.resolve(&name))
349                    .await
350                    .map_err(map_join_error)??;
351                Ok(record)
352            }
353        }
354    }
355
356    pub fn register(&self, payload: RegisterPayload) -> Result<RegistrationResult, KoiError> {
357        match &self.backend {
358            MdnsBackend::Embedded { core } => Ok(core.register(payload)?),
359            MdnsBackend::Remote { client } => Ok(client.register(&payload)?),
360        }
361    }
362
363    pub fn unregister(&self, id: &str) -> Result<(), KoiError> {
364        match &self.backend {
365            MdnsBackend::Embedded { core } => Ok(core.unregister(id)?),
366            MdnsBackend::Remote { client } => Ok(client.unregister(id)?),
367        }
368    }
369
370    /// Subscribe to the live mDNS lifecycle-event stream (Found / Resolved / Removed).
371    ///
372    /// Available only in **embedded** mode, where there is a local `MdnsCore` to subscribe
373    /// to. In **client (remote)** mode there is no all-types lifecycle stream to forward —
374    /// the daemon's `/v1/mdns/subscribe` requires a service type — so this returns
375    /// [`KoiError::RemoteUnsupported`]. For a remote event stream, use
376    /// [`MdnsHandle::browse`] with a specific service type (it forwards the daemon's SSE).
377    ///
378    /// Previously this silently returned a dead receiver in remote mode (it yielded nothing,
379    /// forever); the typed error makes the limitation visible instead of swallowing it.
380    pub fn subscribe(&self) -> Result<broadcast::Receiver<MdnsEvent>, KoiError> {
381        match &self.backend {
382            MdnsBackend::Embedded { core } => Ok(core.subscribe()),
383            MdnsBackend::Remote { .. } => Err(KoiError::RemoteUnsupported(
384                "mdns subscribe — use mdns.browse(service_type) for a remote event stream",
385            )),
386        }
387    }
388
389    pub fn emit_event(&self, event: KoiEvent) {
390        let _ = self.events.send(event);
391    }
392}
393
394pub struct DnsHandle {
395    backend: DnsBackend,
396}
397
398enum DnsBackend {
399    Embedded { runtime: Arc<DnsRuntime> },
400    Remote { client: Arc<KoiClient> },
401}
402
403impl DnsHandle {
404    fn new_embedded(runtime: Arc<DnsRuntime>) -> Self {
405        Self {
406            backend: DnsBackend::Embedded { runtime },
407        }
408    }
409
410    fn new_remote(client: Arc<KoiClient>) -> Self {
411        Self {
412            backend: DnsBackend::Remote { client },
413        }
414    }
415
416    pub fn runtime(&self) -> Result<Arc<DnsRuntime>, KoiError> {
417        match &self.backend {
418            DnsBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
419            DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
420        }
421    }
422
423    pub fn core(&self) -> Result<Arc<koi_dns::DnsCore>, KoiError> {
424        match &self.backend {
425            DnsBackend::Embedded { runtime } => Ok(runtime.core()),
426            DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
427        }
428    }
429
430    pub async fn lookup(
431        &self,
432        name: &str,
433        record_type: hickory_proto::rr::RecordType,
434    ) -> Option<DnsLookupResult> {
435        match &self.backend {
436            DnsBackend::Embedded { runtime } => runtime.core().lookup(name, record_type).await,
437            DnsBackend::Remote { client } => {
438                let name = name.to_string();
439                let client = Arc::clone(client);
440                let result =
441                    tokio::task::spawn_blocking(move || client.dns_lookup(&name, record_type))
442                        .await
443                        .ok()
444                        .and_then(|res| res.ok());
445                let json = match result {
446                    Some(json) => json,
447                    None => return None,
448                };
449                parse_dns_lookup(json)
450            }
451        }
452    }
453
454    pub fn list_names(&self) -> Vec<String> {
455        match &self.backend {
456            DnsBackend::Embedded { runtime } => runtime.core().list_names(),
457            DnsBackend::Remote { client } => {
458                let result = client.dns_list();
459                let Ok(json) = result else {
460                    return Vec::new();
461                };
462                json.get("names")
463                    .and_then(|v| v.as_array())
464                    .map(|arr| {
465                        arr.iter()
466                            .filter_map(|name| name.as_str().map(|s| s.to_string()))
467                            .collect()
468                    })
469                    .unwrap_or_default()
470            }
471        }
472    }
473
474    pub async fn start(&self) -> Result<bool, KoiError> {
475        match &self.backend {
476            DnsBackend::Embedded { runtime } => Ok(runtime.start().await?),
477            DnsBackend::Remote { client } => {
478                let client = Arc::clone(client);
479                let started = tokio::task::spawn_blocking(move || client.dns_start())
480                    .await
481                    .map_err(map_join_error)??
482                    .get("started")
483                    .and_then(|v| v.as_bool())
484                    .unwrap_or(false);
485                Ok(started)
486            }
487        }
488    }
489
490    pub async fn stop(&self) -> bool {
491        match &self.backend {
492            DnsBackend::Embedded { runtime } => runtime.stop().await,
493            DnsBackend::Remote { client } => {
494                let client = Arc::clone(client);
495                tokio::task::spawn_blocking(move || client.dns_stop())
496                    .await
497                    .ok()
498                    .and_then(|res| res.ok())
499                    .and_then(|json| json.get("stopped").and_then(|v| v.as_bool()))
500                    .unwrap_or(false)
501            }
502        }
503    }
504
505    pub fn add_entry(&self, entry: DnsEntry) -> Result<Vec<DnsEntry>, KoiError> {
506        match &self.backend {
507            DnsBackend::Embedded { runtime } => Ok(runtime.core().add_entry(entry)?),
508            DnsBackend::Remote { client } => {
509                let json = client.dns_add(&entry.name, &entry.ip, entry.ttl)?;
510                parse_dns_entries(json)
511            }
512        }
513    }
514
515    pub fn remove_entry(&self, name: &str) -> Result<Vec<DnsEntry>, KoiError> {
516        match &self.backend {
517            DnsBackend::Embedded { runtime } => {
518                Ok(runtime.core().remove_entry(name)?.unwrap_or_default())
519            }
520            DnsBackend::Remote { client } => {
521                let json = client.dns_remove(name)?;
522                parse_dns_entries(json)
523            }
524        }
525    }
526}
527
528pub struct HealthHandle {
529    backend: HealthBackend,
530}
531
532enum HealthBackend {
533    Embedded { runtime: Arc<HealthRuntime> },
534    Remote { client: Arc<KoiClient> },
535}
536
537impl HealthHandle {
538    fn new_embedded(runtime: Arc<HealthRuntime>) -> Self {
539        Self {
540            backend: HealthBackend::Embedded { runtime },
541        }
542    }
543
544    fn new_remote(client: Arc<KoiClient>) -> Self {
545        Self {
546            backend: HealthBackend::Remote { client },
547        }
548    }
549
550    pub fn core(&self) -> Result<Arc<koi_health::HealthCore>, KoiError> {
551        match &self.backend {
552            HealthBackend::Embedded { runtime } => Ok(runtime.core()),
553            HealthBackend::Remote { .. } => Err(KoiError::DisabledCapability("health")),
554        }
555    }
556
557    pub async fn status(&self) -> koi_health::HealthSnapshot {
558        match &self.backend {
559            HealthBackend::Embedded { runtime } => runtime.core().snapshot().await,
560            HealthBackend::Remote { client } => {
561                let client = Arc::clone(client);
562                let json = tokio::task::spawn_blocking(move || client.health_status())
563                    .await
564                    .ok()
565                    .and_then(|res| res.ok());
566                json.and_then(|json| serde_json::from_value(json).ok())
567                    .unwrap_or_else(|| koi_health::HealthSnapshot {
568                        machines: Vec::new(),
569                        services: Vec::new(),
570                    })
571            }
572        }
573    }
574
575    pub async fn add_check(&self, check: HealthCheck) -> Result<(), KoiError> {
576        match &self.backend {
577            HealthBackend::Embedded { runtime } => Ok(runtime.core().add_check(check).await?),
578            HealthBackend::Remote { client } => {
579                let client = Arc::clone(client);
580                let check = check.clone();
581                tokio::task::spawn_blocking(move || {
582                    client.health_add_check(
583                        &check.name,
584                        check.kind,
585                        &check.target,
586                        check.interval_secs,
587                        check.timeout_secs,
588                    )
589                })
590                .await
591                .map_err(map_join_error)??;
592                Ok(())
593            }
594        }
595    }
596
597    pub async fn remove_check(&self, name: &str) -> Result<(), KoiError> {
598        match &self.backend {
599            HealthBackend::Embedded { runtime } => Ok(runtime.core().remove_check(name).await?),
600            HealthBackend::Remote { client } => {
601                let client = Arc::clone(client);
602                let name = name.to_string();
603                tokio::task::spawn_blocking(move || client.health_remove_check(&name))
604                    .await
605                    .map_err(map_join_error)??;
606                Ok(())
607            }
608        }
609    }
610
611    pub async fn start(&self) -> Result<bool, KoiError> {
612        match &self.backend {
613            HealthBackend::Embedded { runtime } => Ok(runtime.start().await?),
614            HealthBackend::Remote { .. } => Ok(false),
615        }
616    }
617
618    pub async fn stop(&self) -> bool {
619        match &self.backend {
620            HealthBackend::Embedded { runtime } => runtime.stop().await,
621            HealthBackend::Remote { .. } => false,
622        }
623    }
624}
625
626pub struct CertmeshHandle {
627    backend: CertmeshBackend,
628}
629
630enum CertmeshBackend {
631    Embedded {
632        core: Arc<koi_certmesh::CertmeshCore>,
633    },
634    Remote {
635        client: Arc<KoiClient>,
636    },
637}
638
639impl CertmeshHandle {
640    fn new_embedded(core: Arc<koi_certmesh::CertmeshCore>) -> Self {
641        Self {
642            backend: CertmeshBackend::Embedded { core },
643        }
644    }
645
646    fn new_remote(client: Arc<KoiClient>) -> Self {
647        Self {
648            backend: CertmeshBackend::Remote { client },
649        }
650    }
651
652    pub fn core(&self) -> Result<Arc<koi_certmesh::CertmeshCore>, KoiError> {
653        match &self.backend {
654            CertmeshBackend::Embedded { core } => Ok(Arc::clone(core)),
655            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
656        }
657    }
658
659    pub async fn status(&self) -> koi_common::capability::CapabilityStatus {
660        match &self.backend {
661            CertmeshBackend::Embedded { core } => core.status(),
662            CertmeshBackend::Remote { client } => {
663                let client = Arc::clone(client);
664                let json = tokio::task::spawn_blocking(move || client.unified_status())
665                    .await
666                    .ok()
667                    .and_then(|res| res.ok());
668                json.and_then(extract_capability_status)
669                    .unwrap_or_else(default_capability_status)
670            }
671        }
672    }
673}
674
675pub struct ProxyHandle {
676    backend: ProxyBackend,
677}
678
679enum ProxyBackend {
680    Embedded { runtime: Arc<ProxyRuntime> },
681    Remote { client: Arc<KoiClient> },
682}
683
684impl ProxyHandle {
685    fn new_embedded(runtime: Arc<ProxyRuntime>) -> Self {
686        Self {
687            backend: ProxyBackend::Embedded { runtime },
688        }
689    }
690
691    fn new_remote(client: Arc<KoiClient>) -> Self {
692        Self {
693            backend: ProxyBackend::Remote { client },
694        }
695    }
696
697    pub fn runtime(&self) -> Result<Arc<ProxyRuntime>, KoiError> {
698        match &self.backend {
699            ProxyBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
700            ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
701        }
702    }
703
704    pub fn core(&self) -> Result<Arc<koi_proxy::ProxyCore>, KoiError> {
705        match &self.backend {
706            ProxyBackend::Embedded { runtime } => Ok(runtime.core()),
707            ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
708        }
709    }
710
711    pub async fn entries(&self) -> Vec<ProxyEntry> {
712        match &self.backend {
713            ProxyBackend::Embedded { runtime } => runtime.core().entries().await,
714            ProxyBackend::Remote { client } => {
715                let client = Arc::clone(client);
716                tokio::task::spawn_blocking(move || client.proxy_list())
717                    .await
718                    .ok()
719                    .and_then(|res| res.ok())
720                    .and_then(|json| parse_proxy_entries(json).ok())
721                    .unwrap_or_default()
722            }
723        }
724    }
725
726    pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, KoiError> {
727        match &self.backend {
728            ProxyBackend::Embedded { runtime } => Ok(runtime.core().upsert(entry).await?),
729            ProxyBackend::Remote { client } => {
730                let client = Arc::clone(client);
731                let entry = entry.clone();
732                let add_client = Arc::clone(&client);
733                tokio::task::spawn_blocking(move || {
734                    add_client.proxy_add(
735                        &entry.name,
736                        entry.listen_port,
737                        &entry.backend,
738                        entry.allow_remote,
739                    )
740                })
741                .await
742                .map_err(map_join_error)??;
743                let list = tokio::task::spawn_blocking(move || client.proxy_list())
744                    .await
745                    .map_err(map_join_error)??;
746                parse_proxy_entries(list)
747            }
748        }
749    }
750
751    pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, KoiError> {
752        match &self.backend {
753            ProxyBackend::Embedded { runtime } => Ok(runtime.core().remove(name).await?),
754            ProxyBackend::Remote { client } => {
755                let client = Arc::clone(client);
756                let name = name.to_string();
757                let remove_client = Arc::clone(&client);
758                tokio::task::spawn_blocking(move || remove_client.proxy_remove(&name))
759                    .await
760                    .map_err(map_join_error)??;
761                let list = tokio::task::spawn_blocking(move || client.proxy_list())
762                    .await
763                    .map_err(map_join_error)??;
764                parse_proxy_entries(list)
765            }
766        }
767    }
768
769    pub async fn start_all(&self) -> Result<(), KoiError> {
770        match &self.backend {
771            ProxyBackend::Embedded { runtime } => Ok(runtime.start_all().await?),
772            ProxyBackend::Remote { .. } => Ok(()),
773        }
774    }
775
776    pub async fn stop_all(&self) {
777        if let ProxyBackend::Embedded { runtime } = &self.backend {
778            runtime.stop_all().await;
779        }
780    }
781}
782
783fn parse_dns_lookup(json: serde_json::Value) -> Option<DnsLookupResult> {
784    let name = json.get("name").and_then(|v| v.as_str())?.to_string();
785    let source = json
786        .get("source")
787        .and_then(|v| v.as_str())
788        .unwrap_or("unknown")
789        .to_string();
790    let ips = json.get("ips").and_then(|v| v.as_array()).map(|arr| {
791        arr.iter()
792            .filter_map(|ip| ip.as_str())
793            .filter_map(|ip| ip.parse::<IpAddr>().ok())
794            .collect::<Vec<_>>()
795    })?;
796    Some(DnsLookupResult { name, ips, source })
797}
798
799fn parse_dns_entries(json: serde_json::Value) -> Result<Vec<DnsEntry>, KoiError> {
800    let entries = json.get("entries").ok_or_else(|| {
801        KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::other(
802            "missing entries",
803        )))
804    })?;
805    let entries = serde_json::from_value(entries.clone()).map_err(|e| {
806        KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::new(
807            std::io::ErrorKind::InvalidData,
808            e.to_string(),
809        )))
810    })?;
811    Ok(entries)
812}
813
814fn parse_proxy_entries(json: serde_json::Value) -> Result<Vec<ProxyEntry>, KoiError> {
815    let entries = json
816        .get("entries")
817        .ok_or_else(|| KoiError::Proxy(koi_proxy::ProxyError::Io("missing entries".to_string())))?
818        .clone();
819    serde_json::from_value(entries)
820        .map_err(|e| KoiError::Proxy(koi_proxy::ProxyError::Io(e.to_string())))
821}
822
823fn extract_capability_status(
824    json: serde_json::Value,
825) -> Option<koi_common::capability::CapabilityStatus> {
826    let caps = json.get("capabilities")?.as_array()?;
827    for cap in caps {
828        if cap.get("name")?.as_str()? == "certmesh" {
829            let name = cap.get("name")?.as_str()?.to_string();
830            let summary = cap
831                .get("summary")
832                .and_then(|v| v.as_str())
833                .unwrap_or("unknown")
834                .to_string();
835            let healthy = cap
836                .get("healthy")
837                .and_then(|v| v.as_bool())
838                .unwrap_or(false);
839            return Some(koi_common::capability::CapabilityStatus {
840                name,
841                summary,
842                healthy,
843            });
844        }
845    }
846    None
847}
848
849fn default_capability_status() -> koi_common::capability::CapabilityStatus {
850    koi_common::capability::CapabilityStatus {
851        name: "certmesh".to_string(),
852        summary: "unknown".to_string(),
853        healthy: false,
854    }
855}
856
857fn mdns_event_from_pipeline(json: serde_json::Value) -> Option<MdnsEvent> {
858    if let Some(found) = json.get("found") {
859        let record: ServiceRecord = serde_json::from_value(found.clone()).ok()?;
860        return Some(MdnsEvent::Found(record));
861    }
862    if let Some(resolved) = json.get("resolved") {
863        let record: ServiceRecord = serde_json::from_value(resolved.clone()).ok()?;
864        return Some(MdnsEvent::Resolved(record));
865    }
866    if let Some(event) = json.get("event") {
867        let kind: EventKind = serde_json::from_value(event.clone()).ok()?;
868        let service = json
869            .get("service")
870            .cloned()
871            .unwrap_or(serde_json::Value::Null);
872        let record: ServiceRecord = serde_json::from_value(service).ok()?;
873        return match kind {
874            EventKind::Found => Some(MdnsEvent::Found(record)),
875            EventKind::Resolved => Some(MdnsEvent::Resolved(record)),
876            EventKind::Removed => Some(MdnsEvent::Removed {
877                name: record.name,
878                service_type: record.service_type,
879            }),
880        };
881    }
882    None
883}