Skip to main content

koi_embedded/
handle.rs

1use std::net::IpAddr;
2use std::sync::Arc;
3
4use tokio::sync::{broadcast, mpsc, Mutex};
5use tokio::task::JoinHandle;
6use tokio_stream::wrappers::BroadcastStream;
7use tokio_util::sync::CancellationToken;
8
9use koi_client::KoiClient;
10use koi_common::capability::Capability;
11use koi_common::peer::Peer;
12use koi_common::types::{EventKind, ServiceRecord};
13use koi_config::state::DnsEntry;
14use koi_dns::{DnsLookupResult, DnsRuntime};
15use koi_health::{HealthCheck, HealthRuntime};
16use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
17use koi_mdns::{BrowseSubscription as MdnsBrowseHandle, MdnsCore, MdnsEvent};
18use koi_proxy::{ProxyEntry, ProxyRuntime};
19
20use crate::{map_join_error, KoiError, KoiEvent};
21
22/// Hard ceiling on ordered teardown — bounds the cancel/drain/join sequence so a wedged
23/// task can never hang the host application's shutdown (mirrors the daemon's limit).
24const SHUTDOWN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(20);
25/// Grace period after cancellation for in-flight work to drain before tasks are joined.
26const SHUTDOWN_DRAIN: std::time::Duration = std::time::Duration::from_millis(500);
27
28enum HandleBackend {
29    Embedded {
30        mdns: Option<Arc<MdnsCore>>,
31        dns: Option<Arc<DnsRuntime>>,
32        health: Option<Arc<HealthRuntime>>,
33        certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
34        proxy: Option<Arc<ProxyRuntime>>,
35        udp: Option<Arc<koi_udp::UdpRuntime>>,
36        runtime: Option<Arc<koi_runtime::RuntimeCore>>,
37    },
38    Remote {
39        client: Arc<KoiClient>,
40    },
41}
42
43pub struct KoiHandle {
44    backend: HandleBackend,
45    /// The address the embedded HTTP adapter bound to (`None` if HTTP is disabled
46    /// or in remote mode). Populated even for a fixed port; with `http_port(0)` it
47    /// carries the OS-assigned ephemeral port.
48    http_addr: Option<std::net::SocketAddr>,
49    data_dir: Option<std::path::PathBuf>,
50    events: broadcast::Sender<KoiEvent>,
51    cancel: CancellationToken,
52    tasks: Vec<JoinHandle<()>>,
53}
54
55impl KoiHandle {
56    #[allow(clippy::too_many_arguments)]
57    pub(crate) fn new_embedded(
58        mdns: Option<Arc<MdnsCore>>,
59        dns: Option<Arc<DnsRuntime>>,
60        health: Option<Arc<HealthRuntime>>,
61        certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
62        proxy: Option<Arc<ProxyRuntime>>,
63        udp: Option<Arc<koi_udp::UdpRuntime>>,
64        runtime: Option<Arc<koi_runtime::RuntimeCore>>,
65        http_addr: Option<std::net::SocketAddr>,
66        data_dir: Option<std::path::PathBuf>,
67        events: broadcast::Sender<KoiEvent>,
68        cancel: CancellationToken,
69        tasks: Vec<JoinHandle<()>>,
70    ) -> Self {
71        Self {
72            backend: HandleBackend::Embedded {
73                mdns,
74                dns,
75                health,
76                certmesh,
77                proxy,
78                udp,
79                runtime,
80            },
81            http_addr,
82            data_dir,
83            events,
84            cancel,
85            tasks,
86        }
87    }
88
89    pub(crate) fn new_remote(
90        client: Arc<KoiClient>,
91        events: broadcast::Sender<KoiEvent>,
92        cancel: CancellationToken,
93        tasks: Vec<JoinHandle<()>>,
94    ) -> Self {
95        Self {
96            backend: HandleBackend::Remote { client },
97            http_addr: None,
98            data_dir: None,
99            events,
100            cancel,
101            tasks,
102        }
103    }
104
105    pub fn events(&self) -> BroadcastStream<KoiEvent> {
106        BroadcastStream::new(self.events.subscribe())
107    }
108
109    /// The address the embedded HTTP adapter bound to, or `None` when HTTP is
110    /// disabled or this is a remote handle. With `Builder::http_port(0)` this
111    /// reports the OS-assigned ephemeral port — the supported way to run an
112    /// embedded HTTP surface on a free port without racing to pick one.
113    pub fn http_addr(&self) -> Option<std::net::SocketAddr> {
114        self.http_addr
115    }
116
117    /// The port the embedded HTTP adapter bound to (convenience over
118    /// [`http_addr`](Self::http_addr)). `None` if HTTP is disabled / remote.
119    pub fn bound_http_port(&self) -> Option<u16> {
120        self.http_addr.map(|addr| addr.port())
121    }
122
123    /// Serve `router` on `addr` with the same-port posture dial (ADR-020 §5):
124    /// plain HTTP while this node is Open, mTLS once it is secure, flipping live
125    /// with **no dropped connections** as the posture changes. The consumer writes
126    /// one `serve` call and never branches on posture.
127    ///
128    /// Returns the supervisor's [`JoinHandle`]; the listener stops when this
129    /// handle's `cancel` is triggered (e.g. on [`shutdown`](Self::shutdown)) or
130    /// the passed `cancel` fires. Embedded only — a remote handle has no local
131    /// identity to serve mTLS with.
132    pub fn serve(
133        &self,
134        router: axum::Router,
135        addr: std::net::SocketAddr,
136        cancel: CancellationToken,
137    ) -> Result<JoinHandle<()>, KoiError> {
138        match &self.backend {
139            HandleBackend::Embedded { certmesh, .. } => {
140                let core = certmesh
141                    .as_ref()
142                    .ok_or(KoiError::DisabledCapability("certmesh"))?;
143                let core = Arc::clone(core);
144                Ok(tokio::spawn(async move {
145                    if let Err(e) = crate::serve::serve_adaptive(core, router, addr, cancel).await {
146                        tracing::error!(error = %e, "same-port serve failed to bind");
147                    }
148                }))
149            }
150            HandleBackend::Remote { .. } => {
151                Err(KoiError::DisabledCapability("certmesh (remote mode)"))
152            }
153        }
154    }
155
156    /// Like [`serve`](Self::serve) but returns the bind error instead of swallowing
157    /// it (wishlist 5.1). The caller learns immediately if the listener never came
158    /// up, without polling `bound_http_port()` or waiting for a timeout.
159    ///
160    /// The `Err` is an `io::Error` from `TcpListener::bind`. On success the returned
161    /// `JoinHandle` resolves when the listener exits.
162    pub fn try_serve(
163        &self,
164        router: axum::Router,
165        addr: std::net::SocketAddr,
166        cancel: CancellationToken,
167    ) -> Result<JoinHandle<Result<(), std::io::Error>>, KoiError> {
168        match &self.backend {
169            HandleBackend::Embedded { certmesh, .. } => {
170                let core = certmesh
171                    .as_ref()
172                    .ok_or(KoiError::DisabledCapability("certmesh"))?;
173                let core = Arc::clone(core);
174                Ok(tokio::spawn(async move {
175                    crate::serve::serve_adaptive(core, router, addr, cancel).await
176                }))
177            }
178            HandleBackend::Remote { .. } => {
179                Err(KoiError::DisabledCapability("certmesh (remote mode)"))
180            }
181        }
182    }
183
184    /// Become a fully-participating trusted service in one call (ADR-020 §13 — the
185    /// "3-line trusted service"):
186    ///
187    /// 1. acquire/maintain this node's identity (best-effort — an Open node with no
188    ///    way to enroll simply stays plaintext),
189    /// 2. announce `service_type` on the LAN at `addr`'s port with the node's
190    ///    posture stamped into the TXT, **kept current across posture flips**, and
191    /// 3. serve `router` on `addr` with the same-port dial ([`serve`](Self::serve)).
192    ///
193    /// The consumer never branches on posture and never wires identity, discovery,
194    /// and serving separately. Returns the serve supervisor's [`JoinHandle`].
195    /// Certificate *renewal* is handled by the certmesh background loops — enable
196    /// them with `Builder::certmesh_background(true)` on a long-running host.
197    /// Embedded only.
198    pub async fn participate(
199        &self,
200        router: axum::Router,
201        addr: std::net::SocketAddr,
202        service_type: &str,
203        cancel: CancellationToken,
204    ) -> Result<JoinHandle<()>, KoiError> {
205        let (certmesh, mdns) = match &self.backend {
206            HandleBackend::Embedded { certmesh, mdns, .. } => (
207                certmesh
208                    .as_ref()
209                    .ok_or(KoiError::DisabledCapability("certmesh"))?
210                    .clone(),
211                mdns.clone(),
212            ),
213            HandleBackend::Remote { .. } => {
214                return Err(KoiError::DisabledCapability("certmesh (remote mode)"))
215            }
216        };
217
218        // 1. Acquire/maintain identity. Open (no CA / not a member) stays plaintext.
219        let _ = certmesh.ensure_identity().await;
220
221        // 2. Announce with posture, refreshed on every flip so the LAN trust map
222        //    never goes stale (ADR-020 §13 "maintained across flips").
223        if let Some(mdns) = mdns {
224            spawn_participate_announce(
225                mdns,
226                Arc::clone(&certmesh),
227                service_type.to_string(),
228                addr.port(),
229                cancel.clone(),
230            );
231        } else {
232            tracing::debug!("participate: mDNS disabled — serving without announcing");
233        }
234
235        // 3. Serve with the same-port posture dial.
236        self.serve(router, addr, cancel)
237    }
238
239    pub fn subscribe(&self) -> broadcast::Receiver<KoiEvent> {
240        self.events.subscribe()
241    }
242
243    pub fn mdns(&self) -> Result<MdnsHandle, KoiError> {
244        match &self.backend {
245            HandleBackend::Embedded { mdns, .. } => {
246                let core = mdns.as_ref().ok_or(KoiError::DisabledCapability("mdns"))?;
247                Ok(MdnsHandle::new_embedded(
248                    Arc::clone(core),
249                    self.events.clone(),
250                ))
251            }
252            HandleBackend::Remote { client } => Ok(MdnsHandle::new_remote(
253                Arc::clone(client),
254                self.events.clone(),
255            )),
256        }
257    }
258
259    pub fn dns(&self) -> Result<DnsHandle, KoiError> {
260        match &self.backend {
261            HandleBackend::Embedded { dns, .. } => {
262                let runtime = dns.as_ref().ok_or(KoiError::DisabledCapability("dns"))?;
263                Ok(DnsHandle::new_embedded(Arc::clone(runtime)))
264            }
265            HandleBackend::Remote { client } => Ok(DnsHandle::new_remote(Arc::clone(client))),
266        }
267    }
268
269    pub fn health(&self) -> Result<HealthHandle, KoiError> {
270        match &self.backend {
271            HandleBackend::Embedded { health, .. } => {
272                let runtime = health
273                    .as_ref()
274                    .ok_or(KoiError::DisabledCapability("health"))?;
275                Ok(HealthHandle::new_embedded(Arc::clone(runtime)))
276            }
277            HandleBackend::Remote { client } => Ok(HealthHandle::new_remote(Arc::clone(client))),
278        }
279    }
280
281    pub fn certmesh(&self) -> Result<CertmeshHandle, KoiError> {
282        match &self.backend {
283            HandleBackend::Embedded { certmesh, .. } => {
284                let core = certmesh
285                    .as_ref()
286                    .ok_or(KoiError::DisabledCapability("certmesh"))?;
287                Ok(CertmeshHandle::new_embedded(Arc::clone(core)))
288            }
289            HandleBackend::Remote { client } => Ok(CertmeshHandle::new_remote(Arc::clone(client))),
290        }
291    }
292
293    /// Open the encrypted key-value vault for general-purpose secret storage.
294    ///
295    /// The vault uses platform credential binding (keyring) when available,
296    /// with a machine-bound fallback. Each call opens a fresh handle sharing
297    /// the same on-disk state.
298    pub fn vault(&self) -> Result<koi_crypto::vault::Vault, KoiError> {
299        let dir = self
300            .data_dir
301            .as_ref()
302            .ok_or(KoiError::DisabledCapability("vault (no data_dir)"))?;
303        koi_crypto::vault::Vault::open(dir)
304            .map_err(|e| KoiError::Io(std::io::Error::other(e.to_string())))
305    }
306
307    pub fn proxy(&self) -> Result<ProxyHandle, KoiError> {
308        match &self.backend {
309            HandleBackend::Embedded { proxy, .. } => {
310                let runtime = proxy
311                    .as_ref()
312                    .ok_or(KoiError::DisabledCapability("proxy"))?;
313                Ok(ProxyHandle::new_embedded(Arc::clone(runtime)))
314            }
315            HandleBackend::Remote { client } => Ok(ProxyHandle::new_remote(Arc::clone(client))),
316        }
317    }
318
319    /// Get the UDP runtime handle.
320    ///
321    /// Only available in embedded mode — remote mode does not support UDP bridging
322    /// (the remote daemon itself handles bindings).
323    pub fn udp(&self) -> Result<Arc<koi_udp::UdpRuntime>, KoiError> {
324        match &self.backend {
325            HandleBackend::Embedded { udp, .. } => {
326                let runtime = udp.as_ref().ok_or(KoiError::DisabledCapability("udp"))?;
327                Ok(Arc::clone(runtime))
328            }
329            HandleBackend::Remote { .. } => Err(KoiError::DisabledCapability("udp (remote mode)")),
330        }
331    }
332
333    /// Get the runtime adapter core.
334    ///
335    /// Only available in embedded mode when runtime is enabled.
336    pub fn runtime(&self) -> Result<Arc<koi_runtime::RuntimeCore>, KoiError> {
337        match &self.backend {
338            HandleBackend::Embedded { runtime, .. } => {
339                let core = runtime
340                    .as_ref()
341                    .ok_or(KoiError::DisabledCapability("runtime"))?;
342                Ok(Arc::clone(core))
343            }
344            HandleBackend::Remote { .. } => {
345                Err(KoiError::DisabledCapability("runtime (remote mode)"))
346            }
347        }
348    }
349
350    pub async fn shutdown(mut self) -> Result<(), KoiError> {
351        let tasks = std::mem::take(&mut self.tasks);
352
353        if let HandleBackend::Embedded {
354            mdns,
355            dns,
356            health,
357            certmesh,
358            proxy,
359            udp,
360            runtime,
361        } = &self.backend
362        {
363            // Route through the shared ordered teardown so embedded inherits the same
364            // cancel → drain → join → withdraw-announce → per-core goodbye sequence the
365            // daemon runs, including the UDP shutdown + drain + hard timeout it omitted.
366            let cores = koi_compose::cores::Cores {
367                mdns: mdns.clone(),
368                certmesh: certmesh.clone(),
369                dns: dns.clone(),
370                health: health.clone(),
371                proxy: proxy.clone(),
372                udp: udp.clone(),
373                runtime: runtime.clone(),
374                mdns_snapshot: None,
375            };
376            koi_compose::cores::ordered_shutdown(
377                &self.cancel,
378                tasks,
379                &cores,
380                SHUTDOWN_TIMEOUT,
381                SHUTDOWN_DRAIN,
382            )
383            .await;
384        } else {
385            // Remote backend: no local cores to tear down — just stop the tasks.
386            self.cancel.cancel();
387            for task in tasks {
388                let _ = task.await;
389            }
390        }
391
392        Ok(())
393    }
394}
395
396pub struct KoiBrowseHandle {
397    backend: BrowseBackend,
398}
399
400enum BrowseBackend {
401    Embedded(MdnsBrowseHandle),
402    Remote(Mutex<mpsc::Receiver<MdnsEvent>>),
403}
404
405impl KoiBrowseHandle {
406    fn embedded(handle: MdnsBrowseHandle) -> Self {
407        Self {
408            backend: BrowseBackend::Embedded(handle),
409        }
410    }
411
412    fn remote(rx: mpsc::Receiver<MdnsEvent>) -> Self {
413        Self {
414            backend: BrowseBackend::Remote(Mutex::new(rx)),
415        }
416    }
417
418    pub async fn recv(&self) -> Option<MdnsEvent> {
419        match &self.backend {
420            BrowseBackend::Embedded(handle) => handle.recv().await,
421            BrowseBackend::Remote(rx) => rx.lock().await.recv().await,
422        }
423    }
424}
425
426/// Default discovery window (ADR-020 §8): long enough for mDNS resolution on a
427/// quiet LAN, short enough to stay responsive — a sane default so the common
428/// `discover(type)` call needs no tuning.
429pub const DEFAULT_DISCOVER_WINDOW: std::time::Duration = std::time::Duration::from_secs(2);
430
431pub struct MdnsHandle {
432    backend: MdnsBackend,
433    events: broadcast::Sender<KoiEvent>,
434}
435
436enum MdnsBackend {
437    Embedded { core: Arc<MdnsCore> },
438    Remote { client: Arc<KoiClient> },
439}
440
441impl MdnsHandle {
442    fn new_embedded(core: Arc<MdnsCore>, events: broadcast::Sender<KoiEvent>) -> Self {
443        Self {
444            backend: MdnsBackend::Embedded { core },
445            events,
446        }
447    }
448
449    fn new_remote(client: Arc<KoiClient>, events: broadcast::Sender<KoiEvent>) -> Self {
450        Self {
451            backend: MdnsBackend::Remote { client },
452            events,
453        }
454    }
455
456    pub fn core(&self) -> Result<Arc<MdnsCore>, KoiError> {
457        match &self.backend {
458            MdnsBackend::Embedded { core } => Ok(Arc::clone(core)),
459            MdnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("mdns")),
460        }
461    }
462
463    pub async fn browse(&self, service_type: &str) -> Result<KoiBrowseHandle, KoiError> {
464        match &self.backend {
465            MdnsBackend::Embedded { core } => {
466                let handle = core.subscribe_type(service_type).await?;
467                Ok(KoiBrowseHandle::embedded(handle))
468            }
469            MdnsBackend::Remote { client } => {
470                let (tx, rx) = mpsc::channel(64);
471                let client = Arc::clone(client);
472                let service_type = service_type.to_string();
473                tokio::task::spawn_blocking(move || {
474                    let stream = match client.browse_stream(&service_type) {
475                        Ok(stream) => stream,
476                        Err(_) => return,
477                    };
478                    for item in stream {
479                        let Ok(json) = item else {
480                            break;
481                        };
482                        if let Some(event) = mdns_event_from_pipeline(json) {
483                            if tx.blocking_send(event).is_err() {
484                                break;
485                            }
486                        }
487                    }
488                });
489                Ok(KoiBrowseHandle::remote(rx))
490            }
491        }
492    }
493
494    /// Discover peers of `service_type`, each enriched with its advertised trust
495    /// posture, mesh anchor, and identity expiry (ADR-020 §8) — the fleet-wide
496    /// trust-legibility primitive. A snapshot collected over
497    /// [`DEFAULT_DISCOVER_WINDOW`]; for a custom window use
498    /// [`discover_for`](Self::discover_for).
499    ///
500    /// The posture each peer carries is an **untrusted hint** (ADR-016 §2);
501    /// `certmesh().verify(..)` / mTLS adjudicates actual trust. Works in both
502    /// embedded and remote mode (it layers on [`browse`](Self::browse)).
503    pub async fn discover(&self, service_type: &str) -> Result<Vec<Peer>, KoiError> {
504        self.discover_for(service_type, DEFAULT_DISCOVER_WINDOW)
505            .await
506    }
507
508    /// Like [`discover`](Self::discover) with an explicit collection `window`.
509    pub async fn discover_for(
510        &self,
511        service_type: &str,
512        window: std::time::Duration,
513    ) -> Result<Vec<Peer>, KoiError> {
514        let browse = self.browse(service_type).await?;
515        let mut events = Vec::new();
516        let deadline = tokio::time::sleep(window);
517        tokio::pin!(deadline);
518        loop {
519            tokio::select! {
520                _ = &mut deadline => break,
521                ev = browse.recv() => match ev {
522                    Some(e) => events.push(e),
523                    None => break,
524                },
525            }
526        }
527        Ok(fold_peers(events))
528    }
529
530    pub async fn resolve(&self, name: &str) -> Result<ServiceRecord, KoiError> {
531        match &self.backend {
532            MdnsBackend::Embedded { core } => Ok(core.resolve(name).await?),
533            MdnsBackend::Remote { client } => {
534                let name = name.to_string();
535                let client = Arc::clone(client);
536                let record = tokio::task::spawn_blocking(move || client.resolve(&name))
537                    .await
538                    .map_err(map_join_error)??;
539                Ok(record)
540            }
541        }
542    }
543
544    pub fn register(&self, payload: RegisterPayload) -> Result<RegistrationResult, KoiError> {
545        match &self.backend {
546            MdnsBackend::Embedded { core } => Ok(core.register(payload)?),
547            MdnsBackend::Remote { client } => Ok(client.register(&payload)?),
548        }
549    }
550
551    pub fn unregister(&self, id: &str) -> Result<(), KoiError> {
552        match &self.backend {
553            MdnsBackend::Embedded { core } => Ok(core.unregister(id)?),
554            MdnsBackend::Remote { client } => Ok(client.unregister(id)?),
555        }
556    }
557
558    /// Subscribe to the live mDNS lifecycle-event stream (Found / Resolved / Removed).
559    ///
560    /// Available only in **embedded** mode, where there is a local `MdnsCore` to subscribe
561    /// to. In **client (remote)** mode there is no all-types lifecycle stream to forward —
562    /// the daemon's `/v1/mdns/subscribe` requires a service type — so this returns
563    /// [`KoiError::RemoteUnsupported`]. For a remote event stream, use
564    /// [`MdnsHandle::browse`] with a specific service type (it forwards the daemon's SSE).
565    ///
566    /// Previously this silently returned a dead receiver in remote mode (it yielded nothing,
567    /// forever); the typed error makes the limitation visible instead of swallowing it.
568    pub fn subscribe(&self) -> Result<broadcast::Receiver<MdnsEvent>, KoiError> {
569        match &self.backend {
570            MdnsBackend::Embedded { core } => Ok(core.subscribe()),
571            MdnsBackend::Remote { .. } => Err(KoiError::RemoteUnsupported(
572                "mdns subscribe — use mdns.browse(service_type) for a remote event stream",
573            )),
574        }
575    }
576
577    pub fn emit_event(&self, event: KoiEvent) {
578        let _ = self.events.send(event);
579    }
580}
581
582pub struct DnsHandle {
583    backend: DnsBackend,
584}
585
586enum DnsBackend {
587    Embedded { runtime: Arc<DnsRuntime> },
588    Remote { client: Arc<KoiClient> },
589}
590
591impl DnsHandle {
592    fn new_embedded(runtime: Arc<DnsRuntime>) -> Self {
593        Self {
594            backend: DnsBackend::Embedded { runtime },
595        }
596    }
597
598    fn new_remote(client: Arc<KoiClient>) -> Self {
599        Self {
600            backend: DnsBackend::Remote { client },
601        }
602    }
603
604    pub fn runtime(&self) -> Result<Arc<DnsRuntime>, KoiError> {
605        match &self.backend {
606            DnsBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
607            DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
608        }
609    }
610
611    pub fn core(&self) -> Result<Arc<koi_dns::DnsCore>, KoiError> {
612        match &self.backend {
613            DnsBackend::Embedded { runtime } => Ok(runtime.core()),
614            DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
615        }
616    }
617
618    pub async fn lookup(
619        &self,
620        name: &str,
621        record_type: hickory_proto::rr::RecordType,
622    ) -> Option<DnsLookupResult> {
623        match &self.backend {
624            DnsBackend::Embedded { runtime } => runtime.core().lookup(name, record_type).await,
625            DnsBackend::Remote { client } => {
626                let name = name.to_string();
627                let client = Arc::clone(client);
628                let result =
629                    tokio::task::spawn_blocking(move || client.dns_lookup(&name, record_type))
630                        .await
631                        .ok()
632                        .and_then(|res| res.ok());
633                let json = match result {
634                    Some(json) => json,
635                    None => return None,
636                };
637                parse_dns_lookup(json)
638            }
639        }
640    }
641
642    pub fn list_names(&self) -> Vec<String> {
643        match &self.backend {
644            DnsBackend::Embedded { runtime } => runtime.core().list_names(),
645            DnsBackend::Remote { client } => {
646                let result = client.dns_list();
647                let Ok(json) = result else {
648                    return Vec::new();
649                };
650                json.get("names")
651                    .and_then(|v| v.as_array())
652                    .map(|arr| {
653                        arr.iter()
654                            .filter_map(|name| name.as_str().map(|s| s.to_string()))
655                            .collect()
656                    })
657                    .unwrap_or_default()
658            }
659        }
660    }
661
662    pub async fn start(&self) -> Result<bool, KoiError> {
663        match &self.backend {
664            DnsBackend::Embedded { runtime } => Ok(runtime.start().await?),
665            DnsBackend::Remote { client } => {
666                let client = Arc::clone(client);
667                let started = tokio::task::spawn_blocking(move || client.dns_start())
668                    .await
669                    .map_err(map_join_error)??
670                    .get("started")
671                    .and_then(|v| v.as_bool())
672                    .unwrap_or(false);
673                Ok(started)
674            }
675        }
676    }
677
678    pub async fn stop(&self) -> bool {
679        match &self.backend {
680            DnsBackend::Embedded { runtime } => runtime.stop().await,
681            DnsBackend::Remote { client } => {
682                let client = Arc::clone(client);
683                tokio::task::spawn_blocking(move || client.dns_stop())
684                    .await
685                    .ok()
686                    .and_then(|res| res.ok())
687                    .and_then(|json| json.get("stopped").and_then(|v| v.as_bool()))
688                    .unwrap_or(false)
689            }
690        }
691    }
692
693    pub fn add_entry(&self, entry: DnsEntry) -> Result<Vec<DnsEntry>, KoiError> {
694        match &self.backend {
695            DnsBackend::Embedded { runtime } => Ok(runtime.core().add_entry(entry)?),
696            DnsBackend::Remote { client } => {
697                let json = client.dns_add(&entry.name, &entry.ip, entry.ttl)?;
698                parse_dns_entries(json)
699            }
700        }
701    }
702
703    pub fn remove_entry(&self, name: &str) -> Result<Vec<DnsEntry>, KoiError> {
704        match &self.backend {
705            DnsBackend::Embedded { runtime } => {
706                Ok(runtime.core().remove_entry(name)?.unwrap_or_default())
707            }
708            DnsBackend::Remote { client } => {
709                let json = client.dns_remove(name)?;
710                parse_dns_entries(json)
711            }
712        }
713    }
714}
715
716pub struct HealthHandle {
717    backend: HealthBackend,
718}
719
720enum HealthBackend {
721    Embedded { runtime: Arc<HealthRuntime> },
722    Remote { client: Arc<KoiClient> },
723}
724
725impl HealthHandle {
726    fn new_embedded(runtime: Arc<HealthRuntime>) -> Self {
727        Self {
728            backend: HealthBackend::Embedded { runtime },
729        }
730    }
731
732    fn new_remote(client: Arc<KoiClient>) -> Self {
733        Self {
734            backend: HealthBackend::Remote { client },
735        }
736    }
737
738    pub fn core(&self) -> Result<Arc<koi_health::HealthCore>, KoiError> {
739        match &self.backend {
740            HealthBackend::Embedded { runtime } => Ok(runtime.core()),
741            HealthBackend::Remote { .. } => Err(KoiError::DisabledCapability("health")),
742        }
743    }
744
745    pub async fn status(&self) -> koi_health::HealthSnapshot {
746        match &self.backend {
747            HealthBackend::Embedded { runtime } => runtime.core().snapshot().await,
748            HealthBackend::Remote { client } => {
749                let client = Arc::clone(client);
750                let json = tokio::task::spawn_blocking(move || client.health_status())
751                    .await
752                    .ok()
753                    .and_then(|res| res.ok());
754                json.and_then(|json| serde_json::from_value(json).ok())
755                    .unwrap_or_else(|| koi_health::HealthSnapshot {
756                        machines: Vec::new(),
757                        services: Vec::new(),
758                    })
759            }
760        }
761    }
762
763    pub async fn add_check(&self, check: HealthCheck) -> Result<(), KoiError> {
764        match &self.backend {
765            HealthBackend::Embedded { runtime } => Ok(runtime.core().add_check(check).await?),
766            HealthBackend::Remote { client } => {
767                let client = Arc::clone(client);
768                let check = check.clone();
769                tokio::task::spawn_blocking(move || {
770                    client.health_add_check(
771                        &check.name,
772                        check.kind,
773                        &check.target,
774                        check.interval_secs,
775                        check.timeout_secs,
776                    )
777                })
778                .await
779                .map_err(map_join_error)??;
780                Ok(())
781            }
782        }
783    }
784
785    pub async fn remove_check(&self, name: &str) -> Result<(), KoiError> {
786        match &self.backend {
787            HealthBackend::Embedded { runtime } => Ok(runtime.core().remove_check(name).await?),
788            HealthBackend::Remote { client } => {
789                let client = Arc::clone(client);
790                let name = name.to_string();
791                tokio::task::spawn_blocking(move || client.health_remove_check(&name))
792                    .await
793                    .map_err(map_join_error)??;
794                Ok(())
795            }
796        }
797    }
798
799    pub async fn start(&self) -> Result<bool, KoiError> {
800        match &self.backend {
801            HealthBackend::Embedded { runtime } => Ok(runtime.start().await?),
802            HealthBackend::Remote { .. } => Ok(false),
803        }
804    }
805
806    pub async fn stop(&self) -> bool {
807        match &self.backend {
808            HealthBackend::Embedded { runtime } => runtime.stop().await,
809            HealthBackend::Remote { .. } => false,
810        }
811    }
812}
813
814pub struct CertmeshHandle {
815    backend: CertmeshBackend,
816}
817
818enum CertmeshBackend {
819    Embedded {
820        core: Arc<koi_certmesh::CertmeshCore>,
821    },
822    Remote {
823        client: Arc<KoiClient>,
824    },
825}
826
827impl CertmeshHandle {
828    fn new_embedded(core: Arc<koi_certmesh::CertmeshCore>) -> Self {
829        Self {
830            backend: CertmeshBackend::Embedded { core },
831        }
832    }
833
834    fn new_remote(client: Arc<KoiClient>) -> Self {
835        Self {
836            backend: CertmeshBackend::Remote { client },
837        }
838    }
839
840    pub fn core(&self) -> Result<Arc<koi_certmesh::CertmeshCore>, KoiError> {
841        match &self.backend {
842            CertmeshBackend::Embedded { core } => Ok(Arc::clone(core)),
843            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
844        }
845    }
846
847    pub async fn status(&self) -> koi_common::capability::CapabilityStatus {
848        match &self.backend {
849            CertmeshBackend::Embedded { core } => core.status().await,
850            CertmeshBackend::Remote { client } => {
851                let client = Arc::clone(client);
852                let json = tokio::task::spawn_blocking(move || client.unified_status())
853                    .await
854                    .ok()
855                    .and_then(|res| res.ok());
856                json.and_then(extract_capability_status)
857                    .unwrap_or_else(default_capability_status)
858            }
859        }
860    }
861
862    /// This node's current trust posture — the mode oracle (ADR-020 §0).
863    ///
864    /// Works in **both** modes (wishlist 1.3): embedded reads the live watch value;
865    /// a remote handle queries the daemon's `GET /v1/certmesh/posture` (DAT-gated, so
866    /// the handle must carry a token — adopted from the local breadcrumb or set via
867    /// `Builder::service_token`). A remote query needs the network, hence `async`.
868    pub async fn posture(&self) -> Result<koi_common::posture::Posture, KoiError> {
869        match &self.backend {
870            CertmeshBackend::Embedded { core } => Ok(core.posture()),
871            CertmeshBackend::Remote { client } => {
872                let client = Arc::clone(client);
873                let json = tokio::task::spawn_blocking(move || {
874                    client.get_json(koi_certmesh::http::paths::POSTURE)
875                })
876                .await
877                .map_err(map_join_error)??;
878                let signed = json
879                    .get("signed")
880                    .and_then(|v| v.as_bool())
881                    .unwrap_or(false);
882                let encrypted = json
883                    .get("encrypted")
884                    .and_then(|v| v.as_bool())
885                    .unwrap_or(false);
886                Ok(koi_common::posture::Posture::new(signed, encrypted))
887            }
888        }
889    }
890
891    /// Subscribe to live posture transitions — a `tokio::sync::watch` receiver
892    /// that fires on every Open↔Authenticated flip (ADR-020 §5 / wishlist 5.2).
893    ///
894    /// Ergonomic shorthand for `certmesh()?.core()?.watch_posture()`. Embedded
895    /// only; returns `DisabledCapability` in Remote mode.
896    pub fn on_posture(
897        &self,
898    ) -> Result<tokio::sync::watch::Receiver<koi_common::posture::Posture>, KoiError> {
899        match &self.backend {
900            CertmeshBackend::Embedded { core } => Ok(core.watch_posture()),
901            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
902        }
903    }
904
905    /// This node's live identity, or `None` if it is Open (ADR-020 §7).
906    /// Read-only; embedded only.
907    pub async fn local_identity(&self) -> Result<Option<koi_certmesh::Identity>, KoiError> {
908        match &self.backend {
909            CertmeshBackend::Embedded { core } => Ok(core.local_identity().await),
910            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
911        }
912    }
913
914    /// Ensure this node holds a current identity, then return it (ADR-020 §7).
915    /// Idempotent and mode-transparent; embedded only.
916    pub async fn ensure_identity(&self) -> Result<Option<koi_certmesh::Identity>, KoiError> {
917        match &self.backend {
918            CertmeshBackend::Embedded { core } => Ok(core.ensure_identity().await),
919            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
920        }
921    }
922
923    /// Sign `bytes` into an `Envelope` (ADR-020 §3). Mode-transparent: a
924    /// freshness-stamped passthrough when Open, ES256-signed when Authenticated.
925    /// Embedded only.
926    pub async fn sign(&self, bytes: &[u8]) -> Result<koi_common::envelope::Envelope, KoiError> {
927        match &self.backend {
928            CertmeshBackend::Embedded { core } => Ok(core.sign(bytes).await),
929            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
930        }
931    }
932
933    /// Verify an `Envelope`, returning an `Assurance` (ADR-020 §3). Read a trusted
934    /// identity only via `Assurance::identity()`. Embedded only.
935    pub async fn verify(
936        &self,
937        env: &koi_common::envelope::Envelope,
938    ) -> Result<koi_common::envelope::Assurance, KoiError> {
939        match &self.backend {
940            CertmeshBackend::Embedded { core } => Ok(core.verify(env).await),
941            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
942        }
943    }
944
945    /// Seal `bytes` into a `Sealed` (ADR-020 §4). The confidentiality rung, today a
946    /// signed-not-encrypted passthrough; the consumer codes against the final API
947    /// now. Embedded only.
948    pub async fn seal(&self, bytes: &[u8]) -> Result<koi_common::sealed::Sealed, KoiError> {
949        match &self.backend {
950            CertmeshBackend::Embedded { core } => Ok(core.seal(bytes).await),
951            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
952        }
953    }
954
955    /// Open a `Sealed` → `Opened` (recovered bytes + trust state, ADR-020 §4). A
956    /// tampered/rejected message errors rather than yielding bytes. Embedded only.
957    pub async fn open(
958        &self,
959        sealed: &koi_common::sealed::Sealed,
960    ) -> Result<koi_common::sealed::Opened, KoiError> {
961        match &self.backend {
962            CertmeshBackend::Embedded { core } => Ok(core.open(sealed).await?),
963            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
964        }
965    }
966
967    /// Run the trust-doctor (ADR-020 §13) → a structured `TrustDiagnosis`: posture,
968    /// identity + renewal health, on-disk-leaf integrity, self-revocation, and the
969    /// CA trust-install state, each with an exact remedy. `is_red()`/`exit_code()`
970    /// fail loud. Embedded only.
971    pub async fn diagnose(&self) -> Result<koi_common::diagnosis::TrustDiagnosis, KoiError> {
972        match &self.backend {
973            CertmeshBackend::Embedded { core } => Ok(core.diagnose().await),
974            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
975        }
976    }
977
978    /// Build a posture-keyed client to a discovered [`Peer`] (ADR-020 §6): plain
979    /// HTTP to an Open peer, mTLS to a secure peer — the caller writes one code
980    /// path. Embedded only (a remote handle has no local identity to present).
981    ///
982    /// Errors loudly (not via an opaque handshake failure) when the peer requires
983    /// authentication but this node is Open, or when the peer anchors to a
984    /// different mesh — see [`koi_certmesh::CertmeshCore::client_for`].
985    pub async fn client_for(&self, peer: &Peer) -> Result<koi_certmesh::PeerClient, KoiError> {
986        match &self.backend {
987            CertmeshBackend::Embedded { core } => Ok(core.client_for(peer).await?),
988            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
989        }
990    }
991
992    /// Build a posture-keyed [`reqwest::Client`] for a discovered [`Peer`] — the
993    /// full-traffic dual of [`client_for`](Self::client_for) (wishlist 3.1).
994    ///
995    /// Unlike [`PeerClient`](koi_certmesh::PeerClient) (GET + JSON-POST only), the
996    /// returned `reqwest::Client` carries koi's *transport policy* (plain HTTP to an
997    /// Open peer; mTLS presenting this node's leaf + pinning the mesh CA to a secure
998    /// peer) while the consumer drives the full request surface itself — every verb,
999    /// custom headers, SSE/streaming, large bodies. One mode-transparent client for
1000    /// *all* inter-node traffic, not just trivial GETs.
1001    ///
1002    /// An Open peer yields a plain `reqwest::Client` (no TLS); a secure peer yields
1003    /// one configured with `use_preconfigured_tls`. Same loud errors as `client_for`
1004    /// (missing identity, different mesh). Embedded only.
1005    ///
1006    /// The raw `rustls::ClientConfig` is available via
1007    /// `certmesh().core()?.tls_client_config_for(peer)` for consumers driving hyper
1008    /// or a tower service directly.
1009    pub async fn reqwest_client_for(&self, peer: &Peer) -> Result<reqwest::Client, KoiError> {
1010        let core = match &self.backend {
1011            CertmeshBackend::Embedded { core } => core,
1012            CertmeshBackend::Remote { .. } => return Err(KoiError::DisabledCapability("certmesh")),
1013        };
1014        let builder = match core.tls_client_config_for(peer).await? {
1015            // Secure peer → hand the posture-keyed rustls config to reqwest. The
1016            // workspace pins a single rustls version, so the `Any` downcast matches.
1017            Some(config) => reqwest::Client::builder().use_preconfigured_tls(config),
1018            // Open peer → plain HTTP, no TLS.
1019            None => reqwest::Client::builder(),
1020        };
1021        builder
1022            .build()
1023            .map_err(|e| KoiError::Certmesh(koi_certmesh::CertmeshError::Internal(e.to_string())))
1024    }
1025}
1026
1027/// Announce this node's `service_type` on `port` with its current posture stamped
1028/// into the TXT (ADR-020 §8). Returns the registration id, or `None` if mDNS
1029/// registration failed. Used by [`participate`](KoiHandle::participate).
1030async fn announce_once(
1031    mdns: &Arc<MdnsCore>,
1032    certmesh: &Arc<koi_certmesh::CertmeshCore>,
1033    hostname: &str,
1034    service_type: &str,
1035    port: u16,
1036) -> Option<String> {
1037    let id = certmesh.local_identity().await;
1038    let mut txt = std::collections::HashMap::new();
1039    koi_common::peer::stamp(
1040        &mut txt,
1041        certmesh.posture(),
1042        id.as_ref().map(|i| i.ca_fingerprint.as_str()),
1043        id.as_ref().map(|i| i.renewal.expires_at),
1044    );
1045    let payload = RegisterPayload {
1046        name: hostname.to_string(),
1047        service_type: service_type.to_string(),
1048        port,
1049        ip: None,
1050        lease_secs: None,
1051        txt,
1052    };
1053    match mdns.register(payload) {
1054        Ok(result) => Some(result.id),
1055        Err(e) => {
1056            tracing::warn!(error = %e, "participate: mDNS announce failed");
1057            None
1058        }
1059    }
1060}
1061
1062/// Maintain a posture-stamped mDNS announcement across posture flips until
1063/// `cancel` (ADR-020 §13). Re-announces on every transition so a peer discovering
1064/// this node always reads its *current* posture, then withdraws the record on
1065/// shutdown.
1066fn spawn_participate_announce(
1067    mdns: Arc<MdnsCore>,
1068    certmesh: Arc<koi_certmesh::CertmeshCore>,
1069    service_type: String,
1070    port: u16,
1071    cancel: CancellationToken,
1072) {
1073    tokio::spawn(async move {
1074        let hostname = hostname::get()
1075            .ok()
1076            .and_then(|os| os.into_string().ok())
1077            .unwrap_or_else(|| "unknown".to_string());
1078        let mut posture_rx = certmesh.watch_posture();
1079        let mut current_id = announce_once(&mdns, &certmesh, &hostname, &service_type, port).await;
1080        loop {
1081            tokio::select! {
1082                _ = cancel.cancelled() => break,
1083                changed = posture_rx.changed() => {
1084                    if changed.is_err() {
1085                        break; // the certmesh core was dropped
1086                    }
1087                    // Posture flipped → re-announce so the advertised posture is current.
1088                    if let Some(old) = current_id.take() {
1089                        let _ = mdns.unregister(&old);
1090                    }
1091                    current_id =
1092                        announce_once(&mdns, &certmesh, &hostname, &service_type, port).await;
1093                }
1094            }
1095        }
1096        if let Some(id) = current_id {
1097            let _ = mdns.unregister(&id);
1098        }
1099    });
1100}
1101
1102/// Fold a stream of mDNS lifecycle events into a deduplicated peer snapshot
1103/// (ADR-020 §8). Resolved records (which carry TXT, hence the trust hints)
1104/// overwrite an earlier Found for the same name; a Removed drops it. Ordered by
1105/// name for deterministic output. Pure — unit-tested without the network.
1106fn fold_peers(events: impl IntoIterator<Item = MdnsEvent>) -> Vec<Peer> {
1107    use std::collections::BTreeMap;
1108    let mut by_name: BTreeMap<String, ServiceRecord> = BTreeMap::new();
1109    for ev in events {
1110        match ev {
1111            MdnsEvent::Found(rec) => {
1112                by_name.entry(rec.name.clone()).or_insert(rec);
1113            }
1114            MdnsEvent::Resolved(rec) => {
1115                by_name.insert(rec.name.clone(), rec);
1116            }
1117            MdnsEvent::Removed { name, .. } => {
1118                by_name.remove(&name);
1119            }
1120        }
1121    }
1122    by_name.into_values().map(Peer::from_record).collect()
1123}
1124
1125pub struct ProxyHandle {
1126    backend: ProxyBackend,
1127}
1128
1129enum ProxyBackend {
1130    Embedded { runtime: Arc<ProxyRuntime> },
1131    Remote { client: Arc<KoiClient> },
1132}
1133
1134impl ProxyHandle {
1135    fn new_embedded(runtime: Arc<ProxyRuntime>) -> Self {
1136        Self {
1137            backend: ProxyBackend::Embedded { runtime },
1138        }
1139    }
1140
1141    fn new_remote(client: Arc<KoiClient>) -> Self {
1142        Self {
1143            backend: ProxyBackend::Remote { client },
1144        }
1145    }
1146
1147    pub fn runtime(&self) -> Result<Arc<ProxyRuntime>, KoiError> {
1148        match &self.backend {
1149            ProxyBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
1150            ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
1151        }
1152    }
1153
1154    pub fn core(&self) -> Result<Arc<koi_proxy::ProxyCore>, KoiError> {
1155        match &self.backend {
1156            ProxyBackend::Embedded { runtime } => Ok(runtime.core()),
1157            ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
1158        }
1159    }
1160
1161    pub async fn entries(&self) -> Vec<ProxyEntry> {
1162        match &self.backend {
1163            ProxyBackend::Embedded { runtime } => runtime.core().entries().await,
1164            ProxyBackend::Remote { client } => {
1165                let client = Arc::clone(client);
1166                tokio::task::spawn_blocking(move || client.proxy_list())
1167                    .await
1168                    .ok()
1169                    .and_then(|res| res.ok())
1170                    .and_then(|json| parse_proxy_entries(json).ok())
1171                    .unwrap_or_default()
1172            }
1173        }
1174    }
1175
1176    pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, KoiError> {
1177        match &self.backend {
1178            ProxyBackend::Embedded { runtime } => Ok(runtime.core().upsert(entry).await?),
1179            ProxyBackend::Remote { client } => {
1180                let client = Arc::clone(client);
1181                let entry = entry.clone();
1182                let add_client = Arc::clone(&client);
1183                tokio::task::spawn_blocking(move || {
1184                    add_client.proxy_add(
1185                        &entry.name,
1186                        entry.listen_port,
1187                        &entry.backend,
1188                        entry.allow_remote,
1189                    )
1190                })
1191                .await
1192                .map_err(map_join_error)??;
1193                let list = tokio::task::spawn_blocking(move || client.proxy_list())
1194                    .await
1195                    .map_err(map_join_error)??;
1196                parse_proxy_entries(list)
1197            }
1198        }
1199    }
1200
1201    pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, KoiError> {
1202        match &self.backend {
1203            ProxyBackend::Embedded { runtime } => Ok(runtime.core().remove(name).await?),
1204            ProxyBackend::Remote { client } => {
1205                let client = Arc::clone(client);
1206                let name = name.to_string();
1207                let remove_client = Arc::clone(&client);
1208                tokio::task::spawn_blocking(move || remove_client.proxy_remove(&name))
1209                    .await
1210                    .map_err(map_join_error)??;
1211                let list = tokio::task::spawn_blocking(move || client.proxy_list())
1212                    .await
1213                    .map_err(map_join_error)??;
1214                parse_proxy_entries(list)
1215            }
1216        }
1217    }
1218
1219    pub async fn start_all(&self) -> Result<(), KoiError> {
1220        match &self.backend {
1221            ProxyBackend::Embedded { runtime } => Ok(runtime.start_all().await?),
1222            ProxyBackend::Remote { .. } => Ok(()),
1223        }
1224    }
1225
1226    pub async fn stop_all(&self) {
1227        if let ProxyBackend::Embedded { runtime } = &self.backend {
1228            runtime.stop_all().await;
1229        }
1230    }
1231}
1232
1233fn parse_dns_lookup(json: serde_json::Value) -> Option<DnsLookupResult> {
1234    let name = json.get("name").and_then(|v| v.as_str())?.to_string();
1235    let source = json
1236        .get("source")
1237        .and_then(|v| v.as_str())
1238        .unwrap_or("unknown")
1239        .to_string();
1240    let ips = json.get("ips").and_then(|v| v.as_array()).map(|arr| {
1241        arr.iter()
1242            .filter_map(|ip| ip.as_str())
1243            .filter_map(|ip| ip.parse::<IpAddr>().ok())
1244            .collect::<Vec<_>>()
1245    })?;
1246    Some(DnsLookupResult { name, ips, source })
1247}
1248
1249fn parse_dns_entries(json: serde_json::Value) -> Result<Vec<DnsEntry>, KoiError> {
1250    let entries = json.get("entries").ok_or_else(|| {
1251        KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::other(
1252            "missing entries",
1253        )))
1254    })?;
1255    let entries = serde_json::from_value(entries.clone()).map_err(|e| {
1256        KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::new(
1257            std::io::ErrorKind::InvalidData,
1258            e.to_string(),
1259        )))
1260    })?;
1261    Ok(entries)
1262}
1263
1264fn parse_proxy_entries(json: serde_json::Value) -> Result<Vec<ProxyEntry>, KoiError> {
1265    let entries = json
1266        .get("entries")
1267        .ok_or_else(|| KoiError::Proxy(koi_proxy::ProxyError::Io("missing entries".to_string())))?
1268        .clone();
1269    serde_json::from_value(entries)
1270        .map_err(|e| KoiError::Proxy(koi_proxy::ProxyError::Io(e.to_string())))
1271}
1272
1273fn extract_capability_status(
1274    json: serde_json::Value,
1275) -> Option<koi_common::capability::CapabilityStatus> {
1276    let caps = json.get("capabilities")?.as_array()?;
1277    for cap in caps {
1278        if cap.get("name")?.as_str()? == "certmesh" {
1279            let name = cap.get("name")?.as_str()?.to_string();
1280            let summary = cap
1281                .get("summary")
1282                .and_then(|v| v.as_str())
1283                .unwrap_or("unknown")
1284                .to_string();
1285            let healthy = cap
1286                .get("healthy")
1287                .and_then(|v| v.as_bool())
1288                .unwrap_or(false);
1289            return Some(koi_common::capability::CapabilityStatus {
1290                name,
1291                summary,
1292                healthy,
1293            });
1294        }
1295    }
1296    None
1297}
1298
1299fn default_capability_status() -> koi_common::capability::CapabilityStatus {
1300    koi_common::capability::CapabilityStatus {
1301        name: "certmesh".to_string(),
1302        summary: "unknown".to_string(),
1303        healthy: false,
1304    }
1305}
1306
1307fn mdns_event_from_pipeline(json: serde_json::Value) -> Option<MdnsEvent> {
1308    if let Some(found) = json.get("found") {
1309        let record: ServiceRecord = serde_json::from_value(found.clone()).ok()?;
1310        return Some(MdnsEvent::Found(record));
1311    }
1312    if let Some(resolved) = json.get("resolved") {
1313        let record: ServiceRecord = serde_json::from_value(resolved.clone()).ok()?;
1314        return Some(MdnsEvent::Resolved(record));
1315    }
1316    if let Some(event) = json.get("event") {
1317        let kind: EventKind = serde_json::from_value(event.clone()).ok()?;
1318        let service = json
1319            .get("service")
1320            .cloned()
1321            .unwrap_or(serde_json::Value::Null);
1322        let record: ServiceRecord = serde_json::from_value(service).ok()?;
1323        return match kind {
1324            EventKind::Found => Some(MdnsEvent::Found(record)),
1325            EventKind::Resolved => Some(MdnsEvent::Resolved(record)),
1326            EventKind::Removed => Some(MdnsEvent::Removed {
1327                name: record.name,
1328                service_type: record.service_type,
1329            }),
1330        };
1331    }
1332    None
1333}
1334
1335#[cfg(test)]
1336mod tests {
1337    use super::*;
1338    use koi_common::posture::PostureLevel;
1339    use std::collections::HashMap;
1340
1341    fn rec(name: &str, txt: &[(&str, &str)]) -> ServiceRecord {
1342        ServiceRecord {
1343            name: name.to_string(),
1344            service_type: "_http._tcp".to_string(),
1345            host: Some(format!("{name}.local")),
1346            ip: Some("10.0.0.9".to_string()),
1347            port: Some(8443),
1348            txt: txt
1349                .iter()
1350                .map(|(k, v)| (k.to_string(), v.to_string()))
1351                .collect::<HashMap<_, _>>(),
1352        }
1353    }
1354
1355    #[test]
1356    fn fold_resolved_overwrites_found_for_txt_enrichment() {
1357        // Found arrives first (no TXT), then Resolved carries the trust hints.
1358        let peers = fold_peers([
1359            MdnsEvent::Found(rec("a", &[])),
1360            MdnsEvent::Resolved(rec("a", &[("fp", "CAFP"), ("posture", "authenticated")])),
1361        ]);
1362        assert_eq!(peers.len(), 1, "the two events collapse to one peer");
1363        assert_eq!(peers[0].level(), PostureLevel::Authenticated);
1364        assert_eq!(peers[0].fp.as_deref(), Some("CAFP"));
1365    }
1366
1367    #[test]
1368    fn fold_removed_drops_the_peer() {
1369        let peers = fold_peers([
1370            MdnsEvent::Found(rec("b", &[])),
1371            MdnsEvent::Removed {
1372                name: "b".to_string(),
1373                service_type: "_http._tcp".to_string(),
1374            },
1375        ]);
1376        assert!(peers.is_empty(), "a removed peer is not in the snapshot");
1377    }
1378
1379    #[test]
1380    fn fold_orders_peers_by_name() {
1381        let peers = fold_peers([
1382            MdnsEvent::Resolved(rec("z", &[])),
1383            MdnsEvent::Resolved(rec("a", &[])),
1384            MdnsEvent::Resolved(rec("m", &[])),
1385        ]);
1386        let names: Vec<_> = peers.iter().map(|p| p.record.name.clone()).collect();
1387        assert_eq!(names, vec!["a", "m", "z"]);
1388    }
1389
1390    #[test]
1391    fn fold_open_peer_has_open_posture() {
1392        let peers = fold_peers([MdnsEvent::Resolved(rec("plain", &[]))]);
1393        assert_eq!(peers.len(), 1);
1394        assert_eq!(peers[0].level(), PostureLevel::Open);
1395        assert!(!peers[0].is_secure());
1396    }
1397
1398    // ── participate (ADR-020 §13) ───────────────────────────────────
1399
1400    #[tokio::test]
1401    async fn participate_remote_handle_is_disabled() {
1402        let client = Arc::new(KoiClient::new("http://127.0.0.1:1"));
1403        let (tx, _) = broadcast::channel(8);
1404        let handle = KoiHandle::new_remote(client, tx, CancellationToken::new(), Vec::new());
1405        let router = axum::Router::new();
1406        let addr: std::net::SocketAddr = ([127, 0, 0, 1], 0).into();
1407        let err = handle
1408            .participate(router, addr, "_x._tcp", CancellationToken::new())
1409            .await
1410            .unwrap_err();
1411        assert!(matches!(err, KoiError::DisabledCapability(_)));
1412    }
1413
1414    #[tokio::test]
1415    async fn participate_open_node_serves_plaintext() {
1416        // certmesh on (but no CA → Open), mDNS off (participate just serves plain),
1417        // isolated data dir. The Open node serves the consumer's router in plaintext
1418        // with no posture branching by the caller.
1419        let dir = std::env::temp_dir().join(format!("koi-emb-participate-{}", std::process::id()));
1420        let _ = std::fs::remove_dir_all(&dir);
1421        let koi = crate::Builder::new()
1422            .data_dir(&dir)
1423            .service_mode(crate::ServiceMode::EmbeddedOnly)
1424            .mdns(false)
1425            .dns_enabled(false)
1426            .health(false)
1427            .certmesh(true)
1428            .proxy(false)
1429            .build()
1430            .expect("build");
1431        let handle = koi.start().await.expect("start");
1432
1433        let addr = {
1434            let l = tokio::net::TcpListener::bind(("127.0.0.1", 0))
1435                .await
1436                .unwrap();
1437            l.local_addr().unwrap()
1438        };
1439        let router = axum::Router::new().route("/ping", axum::routing::get(|| async { "pong" }));
1440        let cancel = CancellationToken::new();
1441        let _server = handle
1442            .participate(router, addr, "_koi-test._tcp", cancel.clone())
1443            .await
1444            .expect("participate");
1445        tokio::time::sleep(std::time::Duration::from_millis(75)).await;
1446
1447        let (status, body) = koi_certmesh::mtls::get(&addr.ip().to_string(), addr.port(), "/ping")
1448            .await
1449            .expect("plain GET to an Open participating node");
1450        assert_eq!(status, 200);
1451        assert_eq!(body, "pong");
1452
1453        cancel.cancel();
1454        handle.shutdown().await.expect("shutdown");
1455    }
1456
1457    // ── seal/open (ADR-020 §4) ──────────────────────────────────────
1458
1459    #[tokio::test]
1460    async fn seal_open_round_trip_on_open_node() {
1461        use koi_common::sealed::Confidentiality;
1462        let dir = std::env::temp_dir().join(format!("koi-emb-seal-{}", std::process::id()));
1463        let _ = std::fs::remove_dir_all(&dir);
1464        let koi = crate::Builder::new()
1465            .data_dir(&dir)
1466            .service_mode(crate::ServiceMode::EmbeddedOnly)
1467            .mdns(false)
1468            .dns_enabled(false)
1469            .health(false)
1470            .certmesh(true)
1471            .proxy(false)
1472            .build()
1473            .expect("build");
1474        let handle = koi.start().await.expect("start");
1475        let cm = handle.certmesh().expect("certmesh handle");
1476
1477        // Open node: seal is a passthrough (signed-not-encrypted); the same code path
1478        // round-trips the bytes back with an anonymous assurance.
1479        let sealed = cm.seal(b"hello seal").await.expect("seal");
1480        assert_eq!(sealed.confidentiality(), Confidentiality::None);
1481        let opened = cm.open(&sealed).await.expect("open");
1482        assert_eq!(opened.payload, b"hello seal");
1483        assert_eq!(opened.confidentiality, Confidentiality::None);
1484        assert!(
1485            opened.assurance.identity().is_none(),
1486            "an Open node's seal is anonymous, not a trusted identity"
1487        );
1488
1489        handle.shutdown().await.expect("shutdown");
1490    }
1491}