Skip to main content

koi_embedded/
handle.rs

1use std::net::IpAddr;
2use std::sync::Arc;
3
4use tokio::sync::{broadcast, mpsc, Mutex};
5use tokio::task::JoinHandle;
6use tokio_stream::wrappers::BroadcastStream;
7use tokio_util::sync::CancellationToken;
8
9use koi_client::KoiClient;
10use koi_common::capability::Capability;
11use koi_common::types::{EventKind, ServiceRecord};
12use koi_config::state::DnsEntry;
13use koi_dns::{DnsLookupResult, DnsRuntime};
14use koi_health::{HealthCheck, HealthRuntime};
15use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
16use koi_mdns::{BrowseHandle as MdnsBrowseHandle, MdnsCore, MdnsEvent};
17use koi_proxy::{ProxyEntry, ProxyRuntime};
18
19use crate::{map_join_error, KoiError, KoiEvent};
20
21enum HandleBackend {
22    Embedded {
23        mdns: Option<Arc<MdnsCore>>,
24        dns: Option<Arc<DnsRuntime>>,
25        health: Option<Arc<HealthRuntime>>,
26        certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
27        proxy: Option<Arc<ProxyRuntime>>,
28        udp: Option<Arc<koi_udp::UdpRuntime>>,
29        runtime: Option<Arc<koi_runtime::RuntimeCore>>,
30    },
31    Remote {
32        client: Arc<KoiClient>,
33    },
34}
35
36pub struct KoiHandle {
37    backend: HandleBackend,
38    data_dir: Option<std::path::PathBuf>,
39    events: broadcast::Sender<KoiEvent>,
40    cancel: CancellationToken,
41    tasks: Vec<JoinHandle<()>>,
42    http_announce_id: Option<String>,
43}
44
45impl KoiHandle {
46    #[allow(clippy::too_many_arguments)]
47    pub(crate) fn new_embedded(
48        mdns: Option<Arc<MdnsCore>>,
49        dns: Option<Arc<DnsRuntime>>,
50        health: Option<Arc<HealthRuntime>>,
51        certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
52        proxy: Option<Arc<ProxyRuntime>>,
53        udp: Option<Arc<koi_udp::UdpRuntime>>,
54        runtime: Option<Arc<koi_runtime::RuntimeCore>>,
55        data_dir: Option<std::path::PathBuf>,
56        events: broadcast::Sender<KoiEvent>,
57        cancel: CancellationToken,
58        tasks: Vec<JoinHandle<()>>,
59        http_announce_id: Option<String>,
60    ) -> Self {
61        Self {
62            backend: HandleBackend::Embedded {
63                mdns,
64                dns,
65                health,
66                certmesh,
67                proxy,
68                udp,
69                runtime,
70            },
71            data_dir,
72            events,
73            cancel,
74            tasks,
75            http_announce_id,
76        }
77    }
78
79    pub(crate) fn new_remote(
80        client: Arc<KoiClient>,
81        events: broadcast::Sender<KoiEvent>,
82        cancel: CancellationToken,
83        tasks: Vec<JoinHandle<()>>,
84    ) -> Self {
85        Self {
86            backend: HandleBackend::Remote { client },
87            data_dir: None,
88            events,
89            cancel,
90            tasks,
91            http_announce_id: None,
92        }
93    }
94
95    pub fn events(&self) -> BroadcastStream<KoiEvent> {
96        BroadcastStream::new(self.events.subscribe())
97    }
98
99    pub fn subscribe(&self) -> broadcast::Receiver<KoiEvent> {
100        self.events.subscribe()
101    }
102
103    pub fn mdns(&self) -> Result<MdnsHandle, KoiError> {
104        match &self.backend {
105            HandleBackend::Embedded { mdns, .. } => {
106                let core = mdns.as_ref().ok_or(KoiError::DisabledCapability("mdns"))?;
107                Ok(MdnsHandle::new_embedded(
108                    Arc::clone(core),
109                    self.events.clone(),
110                ))
111            }
112            HandleBackend::Remote { client } => Ok(MdnsHandle::new_remote(
113                Arc::clone(client),
114                self.events.clone(),
115            )),
116        }
117    }
118
119    pub fn dns(&self) -> Result<DnsHandle, KoiError> {
120        match &self.backend {
121            HandleBackend::Embedded { dns, .. } => {
122                let runtime = dns.as_ref().ok_or(KoiError::DisabledCapability("dns"))?;
123                Ok(DnsHandle::new_embedded(Arc::clone(runtime)))
124            }
125            HandleBackend::Remote { client } => Ok(DnsHandle::new_remote(Arc::clone(client))),
126        }
127    }
128
129    pub fn health(&self) -> Result<HealthHandle, KoiError> {
130        match &self.backend {
131            HandleBackend::Embedded { health, .. } => {
132                let runtime = health
133                    .as_ref()
134                    .ok_or(KoiError::DisabledCapability("health"))?;
135                Ok(HealthHandle::new_embedded(Arc::clone(runtime)))
136            }
137            HandleBackend::Remote { client } => Ok(HealthHandle::new_remote(Arc::clone(client))),
138        }
139    }
140
141    pub fn certmesh(&self) -> Result<CertmeshHandle, KoiError> {
142        match &self.backend {
143            HandleBackend::Embedded { certmesh, .. } => {
144                let core = certmesh
145                    .as_ref()
146                    .ok_or(KoiError::DisabledCapability("certmesh"))?;
147                Ok(CertmeshHandle::new_embedded(Arc::clone(core)))
148            }
149            HandleBackend::Remote { client } => Ok(CertmeshHandle::new_remote(Arc::clone(client))),
150        }
151    }
152
153    /// Open the encrypted key-value vault for general-purpose secret storage.
154    ///
155    /// The vault uses platform credential binding (keyring) when available,
156    /// with a machine-bound fallback. Each call opens a fresh handle sharing
157    /// the same on-disk state.
158    pub fn vault(&self) -> Result<koi_crypto::vault::Vault, KoiError> {
159        let dir = self
160            .data_dir
161            .as_ref()
162            .ok_or(KoiError::DisabledCapability("vault (no data_dir)"))?;
163        koi_crypto::vault::Vault::open(dir)
164            .map_err(|e| KoiError::Io(std::io::Error::other(e.to_string())))
165    }
166
167    pub fn proxy(&self) -> Result<ProxyHandle, KoiError> {
168        match &self.backend {
169            HandleBackend::Embedded { proxy, .. } => {
170                let runtime = proxy
171                    .as_ref()
172                    .ok_or(KoiError::DisabledCapability("proxy"))?;
173                Ok(ProxyHandle::new_embedded(Arc::clone(runtime)))
174            }
175            HandleBackend::Remote { client } => Ok(ProxyHandle::new_remote(Arc::clone(client))),
176        }
177    }
178
179    /// Get the UDP runtime handle.
180    ///
181    /// Only available in embedded mode — remote mode does not support UDP bridging
182    /// (the remote daemon itself handles bindings).
183    pub fn udp(&self) -> Result<Arc<koi_udp::UdpRuntime>, KoiError> {
184        match &self.backend {
185            HandleBackend::Embedded { udp, .. } => {
186                let runtime = udp.as_ref().ok_or(KoiError::DisabledCapability("udp"))?;
187                Ok(Arc::clone(runtime))
188            }
189            HandleBackend::Remote { .. } => Err(KoiError::DisabledCapability("udp (remote mode)")),
190        }
191    }
192
193    /// Get the runtime adapter core.
194    ///
195    /// Only available in embedded mode when runtime is enabled.
196    pub fn runtime(&self) -> Result<Arc<koi_runtime::RuntimeCore>, KoiError> {
197        match &self.backend {
198            HandleBackend::Embedded { runtime, .. } => {
199                let core = runtime
200                    .as_ref()
201                    .ok_or(KoiError::DisabledCapability("runtime"))?;
202                Ok(Arc::clone(core))
203            }
204            HandleBackend::Remote { .. } => {
205                Err(KoiError::DisabledCapability("runtime (remote mode)"))
206            }
207        }
208    }
209
210    pub async fn shutdown(mut self) -> Result<(), KoiError> {
211        self.cancel.cancel();
212        for task in self.tasks.drain(..) {
213            let _ = task.await;
214        }
215
216        if let HandleBackend::Embedded {
217            mdns,
218            dns,
219            health,
220            proxy,
221            ..
222        } = &self.backend
223        {
224            if let Some(runtime) = proxy {
225                runtime.stop_all().await;
226            }
227            if let Some(runtime) = health {
228                let _ = runtime.stop().await;
229            }
230            if let Some(runtime) = dns {
231                let _ = runtime.stop().await;
232            }
233            if let Some(id) = &self.http_announce_id {
234                if let Some(core) = mdns {
235                    if let Err(e) = core.unregister(id) {
236                        tracing::warn!(error = %e, "Failed to withdraw HTTP mDNS announcement");
237                    }
238                }
239            }
240            if let Some(core) = mdns {
241                core.shutdown().await?;
242            }
243        }
244
245        Ok(())
246    }
247}
248
249pub struct KoiBrowseHandle {
250    backend: BrowseBackend,
251}
252
253enum BrowseBackend {
254    Embedded(MdnsBrowseHandle),
255    Remote(Mutex<mpsc::Receiver<MdnsEvent>>),
256}
257
258impl KoiBrowseHandle {
259    fn embedded(handle: MdnsBrowseHandle) -> Self {
260        Self {
261            backend: BrowseBackend::Embedded(handle),
262        }
263    }
264
265    fn remote(rx: mpsc::Receiver<MdnsEvent>) -> Self {
266        Self {
267            backend: BrowseBackend::Remote(Mutex::new(rx)),
268        }
269    }
270
271    pub async fn recv(&self) -> Option<MdnsEvent> {
272        match &self.backend {
273            BrowseBackend::Embedded(handle) => handle.recv().await,
274            BrowseBackend::Remote(rx) => rx.lock().await.recv().await,
275        }
276    }
277}
278
279pub struct MdnsHandle {
280    backend: MdnsBackend,
281    events: broadcast::Sender<KoiEvent>,
282}
283
284enum MdnsBackend {
285    Embedded { core: Arc<MdnsCore> },
286    Remote { client: Arc<KoiClient> },
287}
288
289impl MdnsHandle {
290    fn new_embedded(core: Arc<MdnsCore>, events: broadcast::Sender<KoiEvent>) -> Self {
291        Self {
292            backend: MdnsBackend::Embedded { core },
293            events,
294        }
295    }
296
297    fn new_remote(client: Arc<KoiClient>, events: broadcast::Sender<KoiEvent>) -> Self {
298        Self {
299            backend: MdnsBackend::Remote { client },
300            events,
301        }
302    }
303
304    pub fn core(&self) -> Result<Arc<MdnsCore>, KoiError> {
305        match &self.backend {
306            MdnsBackend::Embedded { core } => Ok(Arc::clone(core)),
307            MdnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("mdns")),
308        }
309    }
310
311    pub async fn browse(&self, service_type: &str) -> Result<KoiBrowseHandle, KoiError> {
312        match &self.backend {
313            MdnsBackend::Embedded { core } => {
314                let handle = core.browse(service_type).await?;
315                Ok(KoiBrowseHandle::embedded(handle))
316            }
317            MdnsBackend::Remote { client } => {
318                let (tx, rx) = mpsc::channel(64);
319                let client = Arc::clone(client);
320                let service_type = service_type.to_string();
321                tokio::task::spawn_blocking(move || {
322                    let stream = match client.browse_stream(&service_type) {
323                        Ok(stream) => stream,
324                        Err(_) => return,
325                    };
326                    for item in stream {
327                        let Ok(json) = item else {
328                            break;
329                        };
330                        if let Some(event) = mdns_event_from_pipeline(json) {
331                            if tx.blocking_send(event).is_err() {
332                                break;
333                            }
334                        }
335                    }
336                });
337                Ok(KoiBrowseHandle::remote(rx))
338            }
339        }
340    }
341
342    pub async fn resolve(&self, name: &str) -> Result<ServiceRecord, KoiError> {
343        match &self.backend {
344            MdnsBackend::Embedded { core } => Ok(core.resolve(name).await?),
345            MdnsBackend::Remote { client } => {
346                let name = name.to_string();
347                let client = Arc::clone(client);
348                let record = tokio::task::spawn_blocking(move || client.resolve(&name))
349                    .await
350                    .map_err(map_join_error)??;
351                Ok(record)
352            }
353        }
354    }
355
356    pub fn register(&self, payload: RegisterPayload) -> Result<RegistrationResult, KoiError> {
357        match &self.backend {
358            MdnsBackend::Embedded { core } => Ok(core.register(payload)?),
359            MdnsBackend::Remote { client } => Ok(client.register(&payload)?),
360        }
361    }
362
363    pub fn unregister(&self, id: &str) -> Result<(), KoiError> {
364        match &self.backend {
365            MdnsBackend::Embedded { core } => Ok(core.unregister(id)?),
366            MdnsBackend::Remote { client } => Ok(client.unregister(id)?),
367        }
368    }
369
370    pub fn subscribe(&self) -> broadcast::Receiver<MdnsEvent> {
371        match &self.backend {
372            MdnsBackend::Embedded { core } => core.subscribe(),
373            MdnsBackend::Remote { .. } => {
374                let (_tx, rx) = broadcast::channel(1);
375                rx
376            }
377        }
378    }
379
380    pub fn emit_event(&self, event: KoiEvent) {
381        let _ = self.events.send(event);
382    }
383}
384
385pub struct DnsHandle {
386    backend: DnsBackend,
387}
388
389enum DnsBackend {
390    Embedded { runtime: Arc<DnsRuntime> },
391    Remote { client: Arc<KoiClient> },
392}
393
394impl DnsHandle {
395    fn new_embedded(runtime: Arc<DnsRuntime>) -> Self {
396        Self {
397            backend: DnsBackend::Embedded { runtime },
398        }
399    }
400
401    fn new_remote(client: Arc<KoiClient>) -> Self {
402        Self {
403            backend: DnsBackend::Remote { client },
404        }
405    }
406
407    pub fn runtime(&self) -> Result<Arc<DnsRuntime>, KoiError> {
408        match &self.backend {
409            DnsBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
410            DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
411        }
412    }
413
414    pub fn core(&self) -> Result<Arc<koi_dns::DnsCore>, KoiError> {
415        match &self.backend {
416            DnsBackend::Embedded { runtime } => Ok(runtime.core()),
417            DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
418        }
419    }
420
421    pub async fn lookup(
422        &self,
423        name: &str,
424        record_type: hickory_proto::rr::RecordType,
425    ) -> Option<DnsLookupResult> {
426        match &self.backend {
427            DnsBackend::Embedded { runtime } => runtime.core().lookup(name, record_type).await,
428            DnsBackend::Remote { client } => {
429                let name = name.to_string();
430                let client = Arc::clone(client);
431                let result =
432                    tokio::task::spawn_blocking(move || client.dns_lookup(&name, record_type))
433                        .await
434                        .ok()
435                        .and_then(|res| res.ok());
436                let json = match result {
437                    Some(json) => json,
438                    None => return None,
439                };
440                parse_dns_lookup(json)
441            }
442        }
443    }
444
445    pub fn list_names(&self) -> Vec<String> {
446        match &self.backend {
447            DnsBackend::Embedded { runtime } => runtime.core().list_names(),
448            DnsBackend::Remote { client } => {
449                let result = client.dns_list();
450                let Ok(json) = result else {
451                    return Vec::new();
452                };
453                json.get("names")
454                    .and_then(|v| v.as_array())
455                    .map(|arr| {
456                        arr.iter()
457                            .filter_map(|name| name.as_str().map(|s| s.to_string()))
458                            .collect()
459                    })
460                    .unwrap_or_default()
461            }
462        }
463    }
464
465    pub async fn start(&self) -> Result<bool, KoiError> {
466        match &self.backend {
467            DnsBackend::Embedded { runtime } => Ok(runtime.start().await?),
468            DnsBackend::Remote { client } => {
469                let client = Arc::clone(client);
470                let started = tokio::task::spawn_blocking(move || client.dns_start())
471                    .await
472                    .map_err(map_join_error)??
473                    .get("started")
474                    .and_then(|v| v.as_bool())
475                    .unwrap_or(false);
476                Ok(started)
477            }
478        }
479    }
480
481    pub async fn stop(&self) -> bool {
482        match &self.backend {
483            DnsBackend::Embedded { runtime } => runtime.stop().await,
484            DnsBackend::Remote { client } => {
485                let client = Arc::clone(client);
486                tokio::task::spawn_blocking(move || client.dns_stop())
487                    .await
488                    .ok()
489                    .and_then(|res| res.ok())
490                    .and_then(|json| json.get("stopped").and_then(|v| v.as_bool()))
491                    .unwrap_or(false)
492            }
493        }
494    }
495
496    pub fn add_entry(&self, entry: DnsEntry) -> Result<Vec<DnsEntry>, KoiError> {
497        match &self.backend {
498            DnsBackend::Embedded { runtime } => Ok(runtime.core().add_entry(entry)?),
499            DnsBackend::Remote { client } => {
500                let json = client.dns_add(&entry.name, &entry.ip, entry.ttl)?;
501                parse_dns_entries(json)
502            }
503        }
504    }
505
506    pub fn remove_entry(&self, name: &str) -> Result<Vec<DnsEntry>, KoiError> {
507        match &self.backend {
508            DnsBackend::Embedded { runtime } => {
509                Ok(runtime.core().remove_entry(name)?.unwrap_or_default())
510            }
511            DnsBackend::Remote { client } => {
512                let json = client.dns_remove(name)?;
513                parse_dns_entries(json)
514            }
515        }
516    }
517}
518
519pub struct HealthHandle {
520    backend: HealthBackend,
521}
522
523enum HealthBackend {
524    Embedded { runtime: Arc<HealthRuntime> },
525    Remote { client: Arc<KoiClient> },
526}
527
528impl HealthHandle {
529    fn new_embedded(runtime: Arc<HealthRuntime>) -> Self {
530        Self {
531            backend: HealthBackend::Embedded { runtime },
532        }
533    }
534
535    fn new_remote(client: Arc<KoiClient>) -> Self {
536        Self {
537            backend: HealthBackend::Remote { client },
538        }
539    }
540
541    pub fn core(&self) -> Result<Arc<koi_health::HealthCore>, KoiError> {
542        match &self.backend {
543            HealthBackend::Embedded { runtime } => Ok(runtime.core()),
544            HealthBackend::Remote { .. } => Err(KoiError::DisabledCapability("health")),
545        }
546    }
547
548    pub async fn status(&self) -> koi_health::HealthSnapshot {
549        match &self.backend {
550            HealthBackend::Embedded { runtime } => runtime.core().snapshot().await,
551            HealthBackend::Remote { client } => {
552                let client = Arc::clone(client);
553                let json = tokio::task::spawn_blocking(move || client.health_status())
554                    .await
555                    .ok()
556                    .and_then(|res| res.ok());
557                json.and_then(|json| serde_json::from_value(json).ok())
558                    .unwrap_or_else(|| koi_health::HealthSnapshot {
559                        machines: Vec::new(),
560                        services: Vec::new(),
561                    })
562            }
563        }
564    }
565
566    pub async fn add_check(&self, check: HealthCheck) -> Result<(), KoiError> {
567        match &self.backend {
568            HealthBackend::Embedded { runtime } => Ok(runtime.core().add_check(check).await?),
569            HealthBackend::Remote { client } => {
570                let client = Arc::clone(client);
571                let check = check.clone();
572                tokio::task::spawn_blocking(move || {
573                    client.health_add_check(
574                        &check.name,
575                        check.kind,
576                        &check.target,
577                        check.interval_secs,
578                        check.timeout_secs,
579                    )
580                })
581                .await
582                .map_err(map_join_error)??;
583                Ok(())
584            }
585        }
586    }
587
588    pub async fn remove_check(&self, name: &str) -> Result<(), KoiError> {
589        match &self.backend {
590            HealthBackend::Embedded { runtime } => Ok(runtime.core().remove_check(name).await?),
591            HealthBackend::Remote { client } => {
592                let client = Arc::clone(client);
593                let name = name.to_string();
594                tokio::task::spawn_blocking(move || client.health_remove_check(&name))
595                    .await
596                    .map_err(map_join_error)??;
597                Ok(())
598            }
599        }
600    }
601
602    pub async fn start(&self) -> Result<bool, KoiError> {
603        match &self.backend {
604            HealthBackend::Embedded { runtime } => Ok(runtime.start().await?),
605            HealthBackend::Remote { .. } => Ok(false),
606        }
607    }
608
609    pub async fn stop(&self) -> bool {
610        match &self.backend {
611            HealthBackend::Embedded { runtime } => runtime.stop().await,
612            HealthBackend::Remote { .. } => false,
613        }
614    }
615}
616
617pub struct CertmeshHandle {
618    backend: CertmeshBackend,
619}
620
621enum CertmeshBackend {
622    Embedded {
623        core: Arc<koi_certmesh::CertmeshCore>,
624    },
625    Remote {
626        client: Arc<KoiClient>,
627    },
628}
629
630impl CertmeshHandle {
631    fn new_embedded(core: Arc<koi_certmesh::CertmeshCore>) -> Self {
632        Self {
633            backend: CertmeshBackend::Embedded { core },
634        }
635    }
636
637    fn new_remote(client: Arc<KoiClient>) -> Self {
638        Self {
639            backend: CertmeshBackend::Remote { client },
640        }
641    }
642
643    pub fn core(&self) -> Result<Arc<koi_certmesh::CertmeshCore>, KoiError> {
644        match &self.backend {
645            CertmeshBackend::Embedded { core } => Ok(Arc::clone(core)),
646            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
647        }
648    }
649
650    pub async fn status(&self) -> koi_common::capability::CapabilityStatus {
651        match &self.backend {
652            CertmeshBackend::Embedded { core } => core.status(),
653            CertmeshBackend::Remote { client } => {
654                let client = Arc::clone(client);
655                let json = tokio::task::spawn_blocking(move || client.unified_status())
656                    .await
657                    .ok()
658                    .and_then(|res| res.ok());
659                json.and_then(extract_capability_status)
660                    .unwrap_or_else(default_capability_status)
661            }
662        }
663    }
664}
665
666pub struct ProxyHandle {
667    backend: ProxyBackend,
668}
669
670enum ProxyBackend {
671    Embedded { runtime: Arc<ProxyRuntime> },
672    Remote { client: Arc<KoiClient> },
673}
674
675impl ProxyHandle {
676    fn new_embedded(runtime: Arc<ProxyRuntime>) -> Self {
677        Self {
678            backend: ProxyBackend::Embedded { runtime },
679        }
680    }
681
682    fn new_remote(client: Arc<KoiClient>) -> Self {
683        Self {
684            backend: ProxyBackend::Remote { client },
685        }
686    }
687
688    pub fn runtime(&self) -> Result<Arc<ProxyRuntime>, KoiError> {
689        match &self.backend {
690            ProxyBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
691            ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
692        }
693    }
694
695    pub fn core(&self) -> Result<Arc<koi_proxy::ProxyCore>, KoiError> {
696        match &self.backend {
697            ProxyBackend::Embedded { runtime } => Ok(runtime.core()),
698            ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
699        }
700    }
701
702    pub async fn entries(&self) -> Vec<ProxyEntry> {
703        match &self.backend {
704            ProxyBackend::Embedded { runtime } => runtime.core().entries().await,
705            ProxyBackend::Remote { client } => {
706                let client = Arc::clone(client);
707                tokio::task::spawn_blocking(move || client.proxy_list())
708                    .await
709                    .ok()
710                    .and_then(|res| res.ok())
711                    .and_then(|json| parse_proxy_entries(json).ok())
712                    .unwrap_or_default()
713            }
714        }
715    }
716
717    pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, KoiError> {
718        match &self.backend {
719            ProxyBackend::Embedded { runtime } => Ok(runtime.core().upsert(entry).await?),
720            ProxyBackend::Remote { client } => {
721                let client = Arc::clone(client);
722                let entry = entry.clone();
723                let add_client = Arc::clone(&client);
724                tokio::task::spawn_blocking(move || {
725                    add_client.proxy_add(
726                        &entry.name,
727                        entry.listen_port,
728                        &entry.backend,
729                        entry.allow_remote,
730                    )
731                })
732                .await
733                .map_err(map_join_error)??;
734                let list = tokio::task::spawn_blocking(move || client.proxy_list())
735                    .await
736                    .map_err(map_join_error)??;
737                parse_proxy_entries(list)
738            }
739        }
740    }
741
742    pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, KoiError> {
743        match &self.backend {
744            ProxyBackend::Embedded { runtime } => Ok(runtime.core().remove(name).await?),
745            ProxyBackend::Remote { client } => {
746                let client = Arc::clone(client);
747                let name = name.to_string();
748                let remove_client = Arc::clone(&client);
749                tokio::task::spawn_blocking(move || remove_client.proxy_remove(&name))
750                    .await
751                    .map_err(map_join_error)??;
752                let list = tokio::task::spawn_blocking(move || client.proxy_list())
753                    .await
754                    .map_err(map_join_error)??;
755                parse_proxy_entries(list)
756            }
757        }
758    }
759
760    pub async fn start_all(&self) -> Result<(), KoiError> {
761        match &self.backend {
762            ProxyBackend::Embedded { runtime } => Ok(runtime.start_all().await?),
763            ProxyBackend::Remote { .. } => Ok(()),
764        }
765    }
766
767    pub async fn stop_all(&self) {
768        if let ProxyBackend::Embedded { runtime } = &self.backend {
769            runtime.stop_all().await;
770        }
771    }
772}
773
774fn parse_dns_lookup(json: serde_json::Value) -> Option<DnsLookupResult> {
775    let name = json.get("name").and_then(|v| v.as_str())?.to_string();
776    let source = json
777        .get("source")
778        .and_then(|v| v.as_str())
779        .unwrap_or("unknown")
780        .to_string();
781    let ips = json.get("ips").and_then(|v| v.as_array()).map(|arr| {
782        arr.iter()
783            .filter_map(|ip| ip.as_str())
784            .filter_map(|ip| ip.parse::<IpAddr>().ok())
785            .collect::<Vec<_>>()
786    })?;
787    Some(DnsLookupResult { name, ips, source })
788}
789
790fn parse_dns_entries(json: serde_json::Value) -> Result<Vec<DnsEntry>, KoiError> {
791    let entries = json.get("entries").ok_or_else(|| {
792        KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::other(
793            "missing entries",
794        )))
795    })?;
796    let entries = serde_json::from_value(entries.clone()).map_err(|e| {
797        KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::new(
798            std::io::ErrorKind::InvalidData,
799            e.to_string(),
800        )))
801    })?;
802    Ok(entries)
803}
804
805fn parse_proxy_entries(json: serde_json::Value) -> Result<Vec<ProxyEntry>, KoiError> {
806    let entries = json
807        .get("entries")
808        .ok_or_else(|| KoiError::Proxy(koi_proxy::ProxyError::Io("missing entries".to_string())))?
809        .clone();
810    serde_json::from_value(entries)
811        .map_err(|e| KoiError::Proxy(koi_proxy::ProxyError::Io(e.to_string())))
812}
813
814fn extract_capability_status(
815    json: serde_json::Value,
816) -> Option<koi_common::capability::CapabilityStatus> {
817    let caps = json.get("capabilities")?.as_array()?;
818    for cap in caps {
819        if cap.get("name")?.as_str()? == "certmesh" {
820            let name = cap.get("name")?.as_str()?.to_string();
821            let summary = cap
822                .get("summary")
823                .and_then(|v| v.as_str())
824                .unwrap_or("unknown")
825                .to_string();
826            let healthy = cap
827                .get("healthy")
828                .and_then(|v| v.as_bool())
829                .unwrap_or(false);
830            return Some(koi_common::capability::CapabilityStatus {
831                name,
832                summary,
833                healthy,
834            });
835        }
836    }
837    None
838}
839
840fn default_capability_status() -> koi_common::capability::CapabilityStatus {
841    koi_common::capability::CapabilityStatus {
842        name: "certmesh".to_string(),
843        summary: "unknown".to_string(),
844        healthy: false,
845    }
846}
847
848fn mdns_event_from_pipeline(json: serde_json::Value) -> Option<MdnsEvent> {
849    if let Some(found) = json.get("found") {
850        let record: ServiceRecord = serde_json::from_value(found.clone()).ok()?;
851        return Some(MdnsEvent::Found(record));
852    }
853    if let Some(resolved) = json.get("resolved") {
854        let record: ServiceRecord = serde_json::from_value(resolved.clone()).ok()?;
855        return Some(MdnsEvent::Resolved(record));
856    }
857    if let Some(event) = json.get("event") {
858        let kind: EventKind = serde_json::from_value(event.clone()).ok()?;
859        let service = json
860            .get("service")
861            .cloned()
862            .unwrap_or(serde_json::Value::Null);
863        let record: ServiceRecord = serde_json::from_value(service).ok()?;
864        return match kind {
865            EventKind::Found => Some(MdnsEvent::Found(record)),
866            EventKind::Resolved => Some(MdnsEvent::Resolved(record)),
867            EventKind::Removed => Some(MdnsEvent::Removed {
868                name: record.name,
869                service_type: record.service_type,
870            }),
871        };
872    }
873    None
874}