Skip to main content

koi_embedded/
handle.rs

1use std::net::IpAddr;
2use std::sync::Arc;
3
4use tokio::sync::{broadcast, mpsc, Mutex};
5use tokio::task::JoinHandle;
6use tokio_stream::wrappers::BroadcastStream;
7use tokio_util::sync::CancellationToken;
8
9use koi_client::KoiClient;
10use koi_common::capability::Capability;
11use koi_common::peer::Peer;
12use koi_common::types::{EventKind, ServiceRecord};
13use koi_config::state::DnsEntry;
14use koi_dns::{DnsLookupResult, DnsRuntime};
15use koi_health::{HealthCheck, HealthRuntime};
16use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
17use koi_mdns::{BrowseSubscription as MdnsBrowseHandle, MdnsCore, MdnsEvent};
18use koi_proxy::{ProxyEntry, ProxyRuntime};
19
20use crate::{map_join_error, KoiError, KoiEvent};
21
22/// Hard ceiling on ordered teardown — bounds the cancel/drain/join sequence so a wedged
23/// task can never hang the host application's shutdown (mirrors the daemon's limit).
24const SHUTDOWN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(20);
25/// Grace period after cancellation for in-flight work to drain before tasks are joined.
26const SHUTDOWN_DRAIN: std::time::Duration = std::time::Duration::from_millis(500);
27
28enum HandleBackend {
29    Embedded {
30        mdns: Option<Arc<MdnsCore>>,
31        dns: Option<Arc<DnsRuntime>>,
32        health: Option<Arc<HealthRuntime>>,
33        certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
34        proxy: Option<Arc<ProxyRuntime>>,
35        udp: Option<Arc<koi_udp::UdpRuntime>>,
36        runtime: Option<Arc<koi_runtime::RuntimeCore>>,
37    },
38    Remote {
39        client: Arc<KoiClient>,
40    },
41}
42
43pub struct KoiHandle {
44    backend: HandleBackend,
45    /// The address the embedded HTTP adapter bound to (`None` if HTTP is disabled
46    /// or in remote mode). Populated even for a fixed port; with `http_port(0)` it
47    /// carries the OS-assigned ephemeral port.
48    http_addr: Option<std::net::SocketAddr>,
49    data_dir: Option<std::path::PathBuf>,
50    events: broadcast::Sender<KoiEvent>,
51    cancel: CancellationToken,
52    tasks: Vec<JoinHandle<()>>,
53}
54
55impl KoiHandle {
56    #[allow(clippy::too_many_arguments)]
57    pub(crate) fn new_embedded(
58        mdns: Option<Arc<MdnsCore>>,
59        dns: Option<Arc<DnsRuntime>>,
60        health: Option<Arc<HealthRuntime>>,
61        certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
62        proxy: Option<Arc<ProxyRuntime>>,
63        udp: Option<Arc<koi_udp::UdpRuntime>>,
64        runtime: Option<Arc<koi_runtime::RuntimeCore>>,
65        http_addr: Option<std::net::SocketAddr>,
66        data_dir: Option<std::path::PathBuf>,
67        events: broadcast::Sender<KoiEvent>,
68        cancel: CancellationToken,
69        tasks: Vec<JoinHandle<()>>,
70    ) -> Self {
71        Self {
72            backend: HandleBackend::Embedded {
73                mdns,
74                dns,
75                health,
76                certmesh,
77                proxy,
78                udp,
79                runtime,
80            },
81            http_addr,
82            data_dir,
83            events,
84            cancel,
85            tasks,
86        }
87    }
88
89    pub(crate) fn new_remote(
90        client: Arc<KoiClient>,
91        events: broadcast::Sender<KoiEvent>,
92        cancel: CancellationToken,
93        tasks: Vec<JoinHandle<()>>,
94    ) -> Self {
95        Self {
96            backend: HandleBackend::Remote { client },
97            http_addr: None,
98            data_dir: None,
99            events,
100            cancel,
101            tasks,
102        }
103    }
104
105    pub fn events(&self) -> BroadcastStream<KoiEvent> {
106        BroadcastStream::new(self.events.subscribe())
107    }
108
109    /// The address the embedded HTTP adapter bound to, or `None` when HTTP is
110    /// disabled or this is a remote handle. With `Builder::http_port(0)` this
111    /// reports the OS-assigned ephemeral port — the supported way to run an
112    /// embedded HTTP surface on a free port without racing to pick one.
113    pub fn http_addr(&self) -> Option<std::net::SocketAddr> {
114        self.http_addr
115    }
116
117    /// The port the embedded HTTP adapter bound to (convenience over
118    /// [`http_addr`](Self::http_addr)). `None` if HTTP is disabled / remote.
119    pub fn bound_http_port(&self) -> Option<u16> {
120        self.http_addr.map(|addr| addr.port())
121    }
122
123    /// Serve `router` on `addr` with the same-port posture dial (ADR-020 §5):
124    /// plain HTTP while this node is Open, mTLS once it is secure, flipping live
125    /// with **no dropped connections** as the posture changes. The consumer writes
126    /// one `serve` call and never branches on posture.
127    ///
128    /// Returns the supervisor's [`JoinHandle`]; the listener stops when this
129    /// handle's `cancel` is triggered (e.g. on [`shutdown`](Self::shutdown)) or
130    /// the passed `cancel` fires. Embedded only — a remote handle has no local
131    /// identity to serve mTLS with.
132    pub fn serve(
133        &self,
134        router: axum::Router,
135        addr: std::net::SocketAddr,
136        cancel: CancellationToken,
137    ) -> Result<JoinHandle<()>, KoiError> {
138        match &self.backend {
139            HandleBackend::Embedded { certmesh, .. } => {
140                let core = certmesh
141                    .as_ref()
142                    .ok_or(KoiError::DisabledCapability("certmesh"))?;
143                let core = Arc::clone(core);
144                Ok(tokio::spawn(async move {
145                    if let Err(e) = crate::serve::serve_adaptive(core, router, addr, cancel).await {
146                        tracing::error!(error = %e, "same-port serve failed to bind");
147                    }
148                }))
149            }
150            HandleBackend::Remote { .. } => {
151                Err(KoiError::DisabledCapability("certmesh (remote mode)"))
152            }
153        }
154    }
155
156    /// Become a fully-participating trusted service in one call (ADR-020 §13 — the
157    /// "3-line trusted service"):
158    ///
159    /// 1. acquire/maintain this node's identity (best-effort — an Open node with no
160    ///    way to enroll simply stays plaintext),
161    /// 2. announce `service_type` on the LAN at `addr`'s port with the node's
162    ///    posture stamped into the TXT, **kept current across posture flips**, and
163    /// 3. serve `router` on `addr` with the same-port dial ([`serve`](Self::serve)).
164    ///
165    /// The consumer never branches on posture and never wires identity, discovery,
166    /// and serving separately. Returns the serve supervisor's [`JoinHandle`].
167    /// Certificate *renewal* is handled by the certmesh background loops — enable
168    /// them with `Builder::certmesh_background(true)` on a long-running host.
169    /// Embedded only.
170    pub async fn participate(
171        &self,
172        router: axum::Router,
173        addr: std::net::SocketAddr,
174        service_type: &str,
175        cancel: CancellationToken,
176    ) -> Result<JoinHandle<()>, KoiError> {
177        let (certmesh, mdns) = match &self.backend {
178            HandleBackend::Embedded { certmesh, mdns, .. } => (
179                certmesh
180                    .as_ref()
181                    .ok_or(KoiError::DisabledCapability("certmesh"))?
182                    .clone(),
183                mdns.clone(),
184            ),
185            HandleBackend::Remote { .. } => {
186                return Err(KoiError::DisabledCapability("certmesh (remote mode)"))
187            }
188        };
189
190        // 1. Acquire/maintain identity. Open (no CA / not a member) stays plaintext.
191        let _ = certmesh.ensure_identity().await;
192
193        // 2. Announce with posture, refreshed on every flip so the LAN trust map
194        //    never goes stale (ADR-020 §13 "maintained across flips").
195        if let Some(mdns) = mdns {
196            spawn_participate_announce(
197                mdns,
198                Arc::clone(&certmesh),
199                service_type.to_string(),
200                addr.port(),
201                cancel.clone(),
202            );
203        } else {
204            tracing::debug!("participate: mDNS disabled — serving without announcing");
205        }
206
207        // 3. Serve with the same-port posture dial.
208        self.serve(router, addr, cancel)
209    }
210
211    pub fn subscribe(&self) -> broadcast::Receiver<KoiEvent> {
212        self.events.subscribe()
213    }
214
215    pub fn mdns(&self) -> Result<MdnsHandle, KoiError> {
216        match &self.backend {
217            HandleBackend::Embedded { mdns, .. } => {
218                let core = mdns.as_ref().ok_or(KoiError::DisabledCapability("mdns"))?;
219                Ok(MdnsHandle::new_embedded(
220                    Arc::clone(core),
221                    self.events.clone(),
222                ))
223            }
224            HandleBackend::Remote { client } => Ok(MdnsHandle::new_remote(
225                Arc::clone(client),
226                self.events.clone(),
227            )),
228        }
229    }
230
231    pub fn dns(&self) -> Result<DnsHandle, KoiError> {
232        match &self.backend {
233            HandleBackend::Embedded { dns, .. } => {
234                let runtime = dns.as_ref().ok_or(KoiError::DisabledCapability("dns"))?;
235                Ok(DnsHandle::new_embedded(Arc::clone(runtime)))
236            }
237            HandleBackend::Remote { client } => Ok(DnsHandle::new_remote(Arc::clone(client))),
238        }
239    }
240
241    pub fn health(&self) -> Result<HealthHandle, KoiError> {
242        match &self.backend {
243            HandleBackend::Embedded { health, .. } => {
244                let runtime = health
245                    .as_ref()
246                    .ok_or(KoiError::DisabledCapability("health"))?;
247                Ok(HealthHandle::new_embedded(Arc::clone(runtime)))
248            }
249            HandleBackend::Remote { client } => Ok(HealthHandle::new_remote(Arc::clone(client))),
250        }
251    }
252
253    pub fn certmesh(&self) -> Result<CertmeshHandle, KoiError> {
254        match &self.backend {
255            HandleBackend::Embedded { certmesh, .. } => {
256                let core = certmesh
257                    .as_ref()
258                    .ok_or(KoiError::DisabledCapability("certmesh"))?;
259                Ok(CertmeshHandle::new_embedded(Arc::clone(core)))
260            }
261            HandleBackend::Remote { client } => Ok(CertmeshHandle::new_remote(Arc::clone(client))),
262        }
263    }
264
265    /// Open the encrypted key-value vault for general-purpose secret storage.
266    ///
267    /// The vault uses platform credential binding (keyring) when available,
268    /// with a machine-bound fallback. Each call opens a fresh handle sharing
269    /// the same on-disk state.
270    pub fn vault(&self) -> Result<koi_crypto::vault::Vault, KoiError> {
271        let dir = self
272            .data_dir
273            .as_ref()
274            .ok_or(KoiError::DisabledCapability("vault (no data_dir)"))?;
275        koi_crypto::vault::Vault::open(dir)
276            .map_err(|e| KoiError::Io(std::io::Error::other(e.to_string())))
277    }
278
279    pub fn proxy(&self) -> Result<ProxyHandle, KoiError> {
280        match &self.backend {
281            HandleBackend::Embedded { proxy, .. } => {
282                let runtime = proxy
283                    .as_ref()
284                    .ok_or(KoiError::DisabledCapability("proxy"))?;
285                Ok(ProxyHandle::new_embedded(Arc::clone(runtime)))
286            }
287            HandleBackend::Remote { client } => Ok(ProxyHandle::new_remote(Arc::clone(client))),
288        }
289    }
290
291    /// Get the UDP runtime handle.
292    ///
293    /// Only available in embedded mode — remote mode does not support UDP bridging
294    /// (the remote daemon itself handles bindings).
295    pub fn udp(&self) -> Result<Arc<koi_udp::UdpRuntime>, KoiError> {
296        match &self.backend {
297            HandleBackend::Embedded { udp, .. } => {
298                let runtime = udp.as_ref().ok_or(KoiError::DisabledCapability("udp"))?;
299                Ok(Arc::clone(runtime))
300            }
301            HandleBackend::Remote { .. } => Err(KoiError::DisabledCapability("udp (remote mode)")),
302        }
303    }
304
305    /// Get the runtime adapter core.
306    ///
307    /// Only available in embedded mode when runtime is enabled.
308    pub fn runtime(&self) -> Result<Arc<koi_runtime::RuntimeCore>, KoiError> {
309        match &self.backend {
310            HandleBackend::Embedded { runtime, .. } => {
311                let core = runtime
312                    .as_ref()
313                    .ok_or(KoiError::DisabledCapability("runtime"))?;
314                Ok(Arc::clone(core))
315            }
316            HandleBackend::Remote { .. } => {
317                Err(KoiError::DisabledCapability("runtime (remote mode)"))
318            }
319        }
320    }
321
322    pub async fn shutdown(mut self) -> Result<(), KoiError> {
323        let tasks = std::mem::take(&mut self.tasks);
324
325        if let HandleBackend::Embedded {
326            mdns,
327            dns,
328            health,
329            certmesh,
330            proxy,
331            udp,
332            runtime,
333        } = &self.backend
334        {
335            // Route through the shared ordered teardown so embedded inherits the same
336            // cancel → drain → join → withdraw-announce → per-core goodbye sequence the
337            // daemon runs, including the UDP shutdown + drain + hard timeout it omitted.
338            let cores = koi_compose::cores::Cores {
339                mdns: mdns.clone(),
340                certmesh: certmesh.clone(),
341                dns: dns.clone(),
342                health: health.clone(),
343                proxy: proxy.clone(),
344                udp: udp.clone(),
345                runtime: runtime.clone(),
346                mdns_snapshot: None,
347            };
348            koi_compose::cores::ordered_shutdown(
349                &self.cancel,
350                tasks,
351                &cores,
352                SHUTDOWN_TIMEOUT,
353                SHUTDOWN_DRAIN,
354            )
355            .await;
356        } else {
357            // Remote backend: no local cores to tear down — just stop the tasks.
358            self.cancel.cancel();
359            for task in tasks {
360                let _ = task.await;
361            }
362        }
363
364        Ok(())
365    }
366}
367
368pub struct KoiBrowseHandle {
369    backend: BrowseBackend,
370}
371
372enum BrowseBackend {
373    Embedded(MdnsBrowseHandle),
374    Remote(Mutex<mpsc::Receiver<MdnsEvent>>),
375}
376
377impl KoiBrowseHandle {
378    fn embedded(handle: MdnsBrowseHandle) -> Self {
379        Self {
380            backend: BrowseBackend::Embedded(handle),
381        }
382    }
383
384    fn remote(rx: mpsc::Receiver<MdnsEvent>) -> Self {
385        Self {
386            backend: BrowseBackend::Remote(Mutex::new(rx)),
387        }
388    }
389
390    pub async fn recv(&self) -> Option<MdnsEvent> {
391        match &self.backend {
392            BrowseBackend::Embedded(handle) => handle.recv().await,
393            BrowseBackend::Remote(rx) => rx.lock().await.recv().await,
394        }
395    }
396}
397
398/// Default discovery window (ADR-020 §8): long enough for mDNS resolution on a
399/// quiet LAN, short enough to stay responsive — a sane default so the common
400/// `discover(type)` call needs no tuning.
401pub const DEFAULT_DISCOVER_WINDOW: std::time::Duration = std::time::Duration::from_secs(2);
402
403pub struct MdnsHandle {
404    backend: MdnsBackend,
405    events: broadcast::Sender<KoiEvent>,
406}
407
408enum MdnsBackend {
409    Embedded { core: Arc<MdnsCore> },
410    Remote { client: Arc<KoiClient> },
411}
412
413impl MdnsHandle {
414    fn new_embedded(core: Arc<MdnsCore>, events: broadcast::Sender<KoiEvent>) -> Self {
415        Self {
416            backend: MdnsBackend::Embedded { core },
417            events,
418        }
419    }
420
421    fn new_remote(client: Arc<KoiClient>, events: broadcast::Sender<KoiEvent>) -> Self {
422        Self {
423            backend: MdnsBackend::Remote { client },
424            events,
425        }
426    }
427
428    pub fn core(&self) -> Result<Arc<MdnsCore>, KoiError> {
429        match &self.backend {
430            MdnsBackend::Embedded { core } => Ok(Arc::clone(core)),
431            MdnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("mdns")),
432        }
433    }
434
435    pub async fn browse(&self, service_type: &str) -> Result<KoiBrowseHandle, KoiError> {
436        match &self.backend {
437            MdnsBackend::Embedded { core } => {
438                let handle = core.subscribe_type(service_type).await?;
439                Ok(KoiBrowseHandle::embedded(handle))
440            }
441            MdnsBackend::Remote { client } => {
442                let (tx, rx) = mpsc::channel(64);
443                let client = Arc::clone(client);
444                let service_type = service_type.to_string();
445                tokio::task::spawn_blocking(move || {
446                    let stream = match client.browse_stream(&service_type) {
447                        Ok(stream) => stream,
448                        Err(_) => return,
449                    };
450                    for item in stream {
451                        let Ok(json) = item else {
452                            break;
453                        };
454                        if let Some(event) = mdns_event_from_pipeline(json) {
455                            if tx.blocking_send(event).is_err() {
456                                break;
457                            }
458                        }
459                    }
460                });
461                Ok(KoiBrowseHandle::remote(rx))
462            }
463        }
464    }
465
466    /// Discover peers of `service_type`, each enriched with its advertised trust
467    /// posture, mesh anchor, and identity expiry (ADR-020 §8) — the fleet-wide
468    /// trust-legibility primitive. A snapshot collected over
469    /// [`DEFAULT_DISCOVER_WINDOW`]; for a custom window use
470    /// [`discover_for`](Self::discover_for).
471    ///
472    /// The posture each peer carries is an **untrusted hint** (ADR-016 §2);
473    /// `certmesh().verify(..)` / mTLS adjudicates actual trust. Works in both
474    /// embedded and remote mode (it layers on [`browse`](Self::browse)).
475    pub async fn discover(&self, service_type: &str) -> Result<Vec<Peer>, KoiError> {
476        self.discover_for(service_type, DEFAULT_DISCOVER_WINDOW)
477            .await
478    }
479
480    /// Like [`discover`](Self::discover) with an explicit collection `window`.
481    pub async fn discover_for(
482        &self,
483        service_type: &str,
484        window: std::time::Duration,
485    ) -> Result<Vec<Peer>, KoiError> {
486        let browse = self.browse(service_type).await?;
487        let mut events = Vec::new();
488        let deadline = tokio::time::sleep(window);
489        tokio::pin!(deadline);
490        loop {
491            tokio::select! {
492                _ = &mut deadline => break,
493                ev = browse.recv() => match ev {
494                    Some(e) => events.push(e),
495                    None => break,
496                },
497            }
498        }
499        Ok(fold_peers(events))
500    }
501
502    pub async fn resolve(&self, name: &str) -> Result<ServiceRecord, KoiError> {
503        match &self.backend {
504            MdnsBackend::Embedded { core } => Ok(core.resolve(name).await?),
505            MdnsBackend::Remote { client } => {
506                let name = name.to_string();
507                let client = Arc::clone(client);
508                let record = tokio::task::spawn_blocking(move || client.resolve(&name))
509                    .await
510                    .map_err(map_join_error)??;
511                Ok(record)
512            }
513        }
514    }
515
516    pub fn register(&self, payload: RegisterPayload) -> Result<RegistrationResult, KoiError> {
517        match &self.backend {
518            MdnsBackend::Embedded { core } => Ok(core.register(payload)?),
519            MdnsBackend::Remote { client } => Ok(client.register(&payload)?),
520        }
521    }
522
523    pub fn unregister(&self, id: &str) -> Result<(), KoiError> {
524        match &self.backend {
525            MdnsBackend::Embedded { core } => Ok(core.unregister(id)?),
526            MdnsBackend::Remote { client } => Ok(client.unregister(id)?),
527        }
528    }
529
530    /// Subscribe to the live mDNS lifecycle-event stream (Found / Resolved / Removed).
531    ///
532    /// Available only in **embedded** mode, where there is a local `MdnsCore` to subscribe
533    /// to. In **client (remote)** mode there is no all-types lifecycle stream to forward —
534    /// the daemon's `/v1/mdns/subscribe` requires a service type — so this returns
535    /// [`KoiError::RemoteUnsupported`]. For a remote event stream, use
536    /// [`MdnsHandle::browse`] with a specific service type (it forwards the daemon's SSE).
537    ///
538    /// Previously this silently returned a dead receiver in remote mode (it yielded nothing,
539    /// forever); the typed error makes the limitation visible instead of swallowing it.
540    pub fn subscribe(&self) -> Result<broadcast::Receiver<MdnsEvent>, KoiError> {
541        match &self.backend {
542            MdnsBackend::Embedded { core } => Ok(core.subscribe()),
543            MdnsBackend::Remote { .. } => Err(KoiError::RemoteUnsupported(
544                "mdns subscribe — use mdns.browse(service_type) for a remote event stream",
545            )),
546        }
547    }
548
549    pub fn emit_event(&self, event: KoiEvent) {
550        let _ = self.events.send(event);
551    }
552}
553
554pub struct DnsHandle {
555    backend: DnsBackend,
556}
557
558enum DnsBackend {
559    Embedded { runtime: Arc<DnsRuntime> },
560    Remote { client: Arc<KoiClient> },
561}
562
563impl DnsHandle {
564    fn new_embedded(runtime: Arc<DnsRuntime>) -> Self {
565        Self {
566            backend: DnsBackend::Embedded { runtime },
567        }
568    }
569
570    fn new_remote(client: Arc<KoiClient>) -> Self {
571        Self {
572            backend: DnsBackend::Remote { client },
573        }
574    }
575
576    pub fn runtime(&self) -> Result<Arc<DnsRuntime>, KoiError> {
577        match &self.backend {
578            DnsBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
579            DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
580        }
581    }
582
583    pub fn core(&self) -> Result<Arc<koi_dns::DnsCore>, KoiError> {
584        match &self.backend {
585            DnsBackend::Embedded { runtime } => Ok(runtime.core()),
586            DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
587        }
588    }
589
590    pub async fn lookup(
591        &self,
592        name: &str,
593        record_type: hickory_proto::rr::RecordType,
594    ) -> Option<DnsLookupResult> {
595        match &self.backend {
596            DnsBackend::Embedded { runtime } => runtime.core().lookup(name, record_type).await,
597            DnsBackend::Remote { client } => {
598                let name = name.to_string();
599                let client = Arc::clone(client);
600                let result =
601                    tokio::task::spawn_blocking(move || client.dns_lookup(&name, record_type))
602                        .await
603                        .ok()
604                        .and_then(|res| res.ok());
605                let json = match result {
606                    Some(json) => json,
607                    None => return None,
608                };
609                parse_dns_lookup(json)
610            }
611        }
612    }
613
614    pub fn list_names(&self) -> Vec<String> {
615        match &self.backend {
616            DnsBackend::Embedded { runtime } => runtime.core().list_names(),
617            DnsBackend::Remote { client } => {
618                let result = client.dns_list();
619                let Ok(json) = result else {
620                    return Vec::new();
621                };
622                json.get("names")
623                    .and_then(|v| v.as_array())
624                    .map(|arr| {
625                        arr.iter()
626                            .filter_map(|name| name.as_str().map(|s| s.to_string()))
627                            .collect()
628                    })
629                    .unwrap_or_default()
630            }
631        }
632    }
633
634    pub async fn start(&self) -> Result<bool, KoiError> {
635        match &self.backend {
636            DnsBackend::Embedded { runtime } => Ok(runtime.start().await?),
637            DnsBackend::Remote { client } => {
638                let client = Arc::clone(client);
639                let started = tokio::task::spawn_blocking(move || client.dns_start())
640                    .await
641                    .map_err(map_join_error)??
642                    .get("started")
643                    .and_then(|v| v.as_bool())
644                    .unwrap_or(false);
645                Ok(started)
646            }
647        }
648    }
649
650    pub async fn stop(&self) -> bool {
651        match &self.backend {
652            DnsBackend::Embedded { runtime } => runtime.stop().await,
653            DnsBackend::Remote { client } => {
654                let client = Arc::clone(client);
655                tokio::task::spawn_blocking(move || client.dns_stop())
656                    .await
657                    .ok()
658                    .and_then(|res| res.ok())
659                    .and_then(|json| json.get("stopped").and_then(|v| v.as_bool()))
660                    .unwrap_or(false)
661            }
662        }
663    }
664
665    pub fn add_entry(&self, entry: DnsEntry) -> Result<Vec<DnsEntry>, KoiError> {
666        match &self.backend {
667            DnsBackend::Embedded { runtime } => Ok(runtime.core().add_entry(entry)?),
668            DnsBackend::Remote { client } => {
669                let json = client.dns_add(&entry.name, &entry.ip, entry.ttl)?;
670                parse_dns_entries(json)
671            }
672        }
673    }
674
675    pub fn remove_entry(&self, name: &str) -> Result<Vec<DnsEntry>, KoiError> {
676        match &self.backend {
677            DnsBackend::Embedded { runtime } => {
678                Ok(runtime.core().remove_entry(name)?.unwrap_or_default())
679            }
680            DnsBackend::Remote { client } => {
681                let json = client.dns_remove(name)?;
682                parse_dns_entries(json)
683            }
684        }
685    }
686}
687
688pub struct HealthHandle {
689    backend: HealthBackend,
690}
691
692enum HealthBackend {
693    Embedded { runtime: Arc<HealthRuntime> },
694    Remote { client: Arc<KoiClient> },
695}
696
697impl HealthHandle {
698    fn new_embedded(runtime: Arc<HealthRuntime>) -> Self {
699        Self {
700            backend: HealthBackend::Embedded { runtime },
701        }
702    }
703
704    fn new_remote(client: Arc<KoiClient>) -> Self {
705        Self {
706            backend: HealthBackend::Remote { client },
707        }
708    }
709
710    pub fn core(&self) -> Result<Arc<koi_health::HealthCore>, KoiError> {
711        match &self.backend {
712            HealthBackend::Embedded { runtime } => Ok(runtime.core()),
713            HealthBackend::Remote { .. } => Err(KoiError::DisabledCapability("health")),
714        }
715    }
716
717    pub async fn status(&self) -> koi_health::HealthSnapshot {
718        match &self.backend {
719            HealthBackend::Embedded { runtime } => runtime.core().snapshot().await,
720            HealthBackend::Remote { client } => {
721                let client = Arc::clone(client);
722                let json = tokio::task::spawn_blocking(move || client.health_status())
723                    .await
724                    .ok()
725                    .and_then(|res| res.ok());
726                json.and_then(|json| serde_json::from_value(json).ok())
727                    .unwrap_or_else(|| koi_health::HealthSnapshot {
728                        machines: Vec::new(),
729                        services: Vec::new(),
730                    })
731            }
732        }
733    }
734
735    pub async fn add_check(&self, check: HealthCheck) -> Result<(), KoiError> {
736        match &self.backend {
737            HealthBackend::Embedded { runtime } => Ok(runtime.core().add_check(check).await?),
738            HealthBackend::Remote { client } => {
739                let client = Arc::clone(client);
740                let check = check.clone();
741                tokio::task::spawn_blocking(move || {
742                    client.health_add_check(
743                        &check.name,
744                        check.kind,
745                        &check.target,
746                        check.interval_secs,
747                        check.timeout_secs,
748                    )
749                })
750                .await
751                .map_err(map_join_error)??;
752                Ok(())
753            }
754        }
755    }
756
757    pub async fn remove_check(&self, name: &str) -> Result<(), KoiError> {
758        match &self.backend {
759            HealthBackend::Embedded { runtime } => Ok(runtime.core().remove_check(name).await?),
760            HealthBackend::Remote { client } => {
761                let client = Arc::clone(client);
762                let name = name.to_string();
763                tokio::task::spawn_blocking(move || client.health_remove_check(&name))
764                    .await
765                    .map_err(map_join_error)??;
766                Ok(())
767            }
768        }
769    }
770
771    pub async fn start(&self) -> Result<bool, KoiError> {
772        match &self.backend {
773            HealthBackend::Embedded { runtime } => Ok(runtime.start().await?),
774            HealthBackend::Remote { .. } => Ok(false),
775        }
776    }
777
778    pub async fn stop(&self) -> bool {
779        match &self.backend {
780            HealthBackend::Embedded { runtime } => runtime.stop().await,
781            HealthBackend::Remote { .. } => false,
782        }
783    }
784}
785
786pub struct CertmeshHandle {
787    backend: CertmeshBackend,
788}
789
790enum CertmeshBackend {
791    Embedded {
792        core: Arc<koi_certmesh::CertmeshCore>,
793    },
794    Remote {
795        client: Arc<KoiClient>,
796    },
797}
798
799impl CertmeshHandle {
800    fn new_embedded(core: Arc<koi_certmesh::CertmeshCore>) -> Self {
801        Self {
802            backend: CertmeshBackend::Embedded { core },
803        }
804    }
805
806    fn new_remote(client: Arc<KoiClient>) -> Self {
807        Self {
808            backend: CertmeshBackend::Remote { client },
809        }
810    }
811
812    pub fn core(&self) -> Result<Arc<koi_certmesh::CertmeshCore>, KoiError> {
813        match &self.backend {
814            CertmeshBackend::Embedded { core } => Ok(Arc::clone(core)),
815            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
816        }
817    }
818
819    pub async fn status(&self) -> koi_common::capability::CapabilityStatus {
820        match &self.backend {
821            CertmeshBackend::Embedded { core } => core.status().await,
822            CertmeshBackend::Remote { client } => {
823                let client = Arc::clone(client);
824                let json = tokio::task::spawn_blocking(move || client.unified_status())
825                    .await
826                    .ok()
827                    .and_then(|res| res.ok());
828                json.and_then(extract_capability_status)
829                    .unwrap_or_else(default_capability_status)
830            }
831        }
832    }
833
834    /// This node's current trust posture — the mode oracle (ADR-020 §0).
835    ///
836    /// Embedded only: a remote handle has no endpoint to query the daemon's
837    /// posture yet (that arrives with the diagnose/status surface in a later
838    /// ADR-020 phase), so it returns `DisabledCapability`.
839    pub fn posture(&self) -> Result<koi_common::posture::Posture, KoiError> {
840        match &self.backend {
841            CertmeshBackend::Embedded { core } => Ok(core.posture()),
842            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
843        }
844    }
845
846    /// This node's live identity, or `None` if it is Open (ADR-020 §7).
847    /// Read-only; embedded only.
848    pub async fn local_identity(&self) -> Result<Option<koi_certmesh::Identity>, KoiError> {
849        match &self.backend {
850            CertmeshBackend::Embedded { core } => Ok(core.local_identity().await),
851            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
852        }
853    }
854
855    /// Ensure this node holds a current identity, then return it (ADR-020 §7).
856    /// Idempotent and mode-transparent; embedded only.
857    pub async fn ensure_identity(&self) -> Result<Option<koi_certmesh::Identity>, KoiError> {
858        match &self.backend {
859            CertmeshBackend::Embedded { core } => Ok(core.ensure_identity().await),
860            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
861        }
862    }
863
864    /// Sign `bytes` into an `Envelope` (ADR-020 §3). Mode-transparent: a
865    /// freshness-stamped passthrough when Open, ES256-signed when Authenticated.
866    /// Embedded only.
867    pub async fn sign(&self, bytes: &[u8]) -> Result<koi_common::envelope::Envelope, KoiError> {
868        match &self.backend {
869            CertmeshBackend::Embedded { core } => Ok(core.sign(bytes).await),
870            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
871        }
872    }
873
874    /// Verify an `Envelope`, returning an `Assurance` (ADR-020 §3). Read a trusted
875    /// identity only via `Assurance::identity()`. Embedded only.
876    pub async fn verify(
877        &self,
878        env: &koi_common::envelope::Envelope,
879    ) -> Result<koi_common::envelope::Assurance, KoiError> {
880        match &self.backend {
881            CertmeshBackend::Embedded { core } => Ok(core.verify(env).await),
882            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
883        }
884    }
885
886    /// Seal `bytes` into a `Sealed` (ADR-020 §4). The confidentiality rung, today a
887    /// signed-not-encrypted passthrough; the consumer codes against the final API
888    /// now. Embedded only.
889    pub async fn seal(&self, bytes: &[u8]) -> Result<koi_common::sealed::Sealed, KoiError> {
890        match &self.backend {
891            CertmeshBackend::Embedded { core } => Ok(core.seal(bytes).await),
892            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
893        }
894    }
895
896    /// Open a `Sealed` → `Opened` (recovered bytes + trust state, ADR-020 §4). A
897    /// tampered/rejected message errors rather than yielding bytes. Embedded only.
898    pub async fn open(
899        &self,
900        sealed: &koi_common::sealed::Sealed,
901    ) -> Result<koi_common::sealed::Opened, KoiError> {
902        match &self.backend {
903            CertmeshBackend::Embedded { core } => Ok(core.open(sealed).await?),
904            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
905        }
906    }
907
908    /// Run the trust-doctor (ADR-020 §13) → a structured `TrustDiagnosis`: posture,
909    /// identity + renewal health, on-disk-leaf integrity, self-revocation, and the
910    /// CA trust-install state, each with an exact remedy. `is_red()`/`exit_code()`
911    /// fail loud. Embedded only.
912    pub async fn diagnose(&self) -> Result<koi_common::diagnosis::TrustDiagnosis, KoiError> {
913        match &self.backend {
914            CertmeshBackend::Embedded { core } => Ok(core.diagnose().await),
915            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
916        }
917    }
918
919    /// Build a posture-keyed client to a discovered [`Peer`] (ADR-020 §6): plain
920    /// HTTP to an Open peer, mTLS to a secure peer — the caller writes one code
921    /// path. Embedded only (a remote handle has no local identity to present).
922    ///
923    /// Errors loudly (not via an opaque handshake failure) when the peer requires
924    /// authentication but this node is Open, or when the peer anchors to a
925    /// different mesh — see [`koi_certmesh::CertmeshCore::client_for`].
926    pub async fn client_for(&self, peer: &Peer) -> Result<koi_certmesh::PeerClient, KoiError> {
927        match &self.backend {
928            CertmeshBackend::Embedded { core } => Ok(core.client_for(peer).await?),
929            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
930        }
931    }
932}
933
934/// Announce this node's `service_type` on `port` with its current posture stamped
935/// into the TXT (ADR-020 §8). Returns the registration id, or `None` if mDNS
936/// registration failed. Used by [`participate`](KoiHandle::participate).
937async fn announce_once(
938    mdns: &Arc<MdnsCore>,
939    certmesh: &Arc<koi_certmesh::CertmeshCore>,
940    hostname: &str,
941    service_type: &str,
942    port: u16,
943) -> Option<String> {
944    let id = certmesh.local_identity().await;
945    let mut txt = std::collections::HashMap::new();
946    koi_common::peer::stamp(
947        &mut txt,
948        certmesh.posture(),
949        id.as_ref().map(|i| i.ca_fingerprint.as_str()),
950        id.as_ref().map(|i| i.renewal.expires_at),
951    );
952    let payload = RegisterPayload {
953        name: hostname.to_string(),
954        service_type: service_type.to_string(),
955        port,
956        ip: None,
957        lease_secs: None,
958        txt,
959    };
960    match mdns.register(payload) {
961        Ok(result) => Some(result.id),
962        Err(e) => {
963            tracing::warn!(error = %e, "participate: mDNS announce failed");
964            None
965        }
966    }
967}
968
969/// Maintain a posture-stamped mDNS announcement across posture flips until
970/// `cancel` (ADR-020 §13). Re-announces on every transition so a peer discovering
971/// this node always reads its *current* posture, then withdraws the record on
972/// shutdown.
973fn spawn_participate_announce(
974    mdns: Arc<MdnsCore>,
975    certmesh: Arc<koi_certmesh::CertmeshCore>,
976    service_type: String,
977    port: u16,
978    cancel: CancellationToken,
979) {
980    tokio::spawn(async move {
981        let hostname = hostname::get()
982            .ok()
983            .and_then(|os| os.into_string().ok())
984            .unwrap_or_else(|| "unknown".to_string());
985        let mut posture_rx = certmesh.watch_posture();
986        let mut current_id = announce_once(&mdns, &certmesh, &hostname, &service_type, port).await;
987        loop {
988            tokio::select! {
989                _ = cancel.cancelled() => break,
990                changed = posture_rx.changed() => {
991                    if changed.is_err() {
992                        break; // the certmesh core was dropped
993                    }
994                    // Posture flipped → re-announce so the advertised posture is current.
995                    if let Some(old) = current_id.take() {
996                        let _ = mdns.unregister(&old);
997                    }
998                    current_id =
999                        announce_once(&mdns, &certmesh, &hostname, &service_type, port).await;
1000                }
1001            }
1002        }
1003        if let Some(id) = current_id {
1004            let _ = mdns.unregister(&id);
1005        }
1006    });
1007}
1008
1009/// Fold a stream of mDNS lifecycle events into a deduplicated peer snapshot
1010/// (ADR-020 §8). Resolved records (which carry TXT, hence the trust hints)
1011/// overwrite an earlier Found for the same name; a Removed drops it. Ordered by
1012/// name for deterministic output. Pure — unit-tested without the network.
1013fn fold_peers(events: impl IntoIterator<Item = MdnsEvent>) -> Vec<Peer> {
1014    use std::collections::BTreeMap;
1015    let mut by_name: BTreeMap<String, ServiceRecord> = BTreeMap::new();
1016    for ev in events {
1017        match ev {
1018            MdnsEvent::Found(rec) => {
1019                by_name.entry(rec.name.clone()).or_insert(rec);
1020            }
1021            MdnsEvent::Resolved(rec) => {
1022                by_name.insert(rec.name.clone(), rec);
1023            }
1024            MdnsEvent::Removed { name, .. } => {
1025                by_name.remove(&name);
1026            }
1027        }
1028    }
1029    by_name.into_values().map(Peer::from_record).collect()
1030}
1031
1032pub struct ProxyHandle {
1033    backend: ProxyBackend,
1034}
1035
1036enum ProxyBackend {
1037    Embedded { runtime: Arc<ProxyRuntime> },
1038    Remote { client: Arc<KoiClient> },
1039}
1040
1041impl ProxyHandle {
1042    fn new_embedded(runtime: Arc<ProxyRuntime>) -> Self {
1043        Self {
1044            backend: ProxyBackend::Embedded { runtime },
1045        }
1046    }
1047
1048    fn new_remote(client: Arc<KoiClient>) -> Self {
1049        Self {
1050            backend: ProxyBackend::Remote { client },
1051        }
1052    }
1053
1054    pub fn runtime(&self) -> Result<Arc<ProxyRuntime>, KoiError> {
1055        match &self.backend {
1056            ProxyBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
1057            ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
1058        }
1059    }
1060
1061    pub fn core(&self) -> Result<Arc<koi_proxy::ProxyCore>, KoiError> {
1062        match &self.backend {
1063            ProxyBackend::Embedded { runtime } => Ok(runtime.core()),
1064            ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
1065        }
1066    }
1067
1068    pub async fn entries(&self) -> Vec<ProxyEntry> {
1069        match &self.backend {
1070            ProxyBackend::Embedded { runtime } => runtime.core().entries().await,
1071            ProxyBackend::Remote { client } => {
1072                let client = Arc::clone(client);
1073                tokio::task::spawn_blocking(move || client.proxy_list())
1074                    .await
1075                    .ok()
1076                    .and_then(|res| res.ok())
1077                    .and_then(|json| parse_proxy_entries(json).ok())
1078                    .unwrap_or_default()
1079            }
1080        }
1081    }
1082
1083    pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, KoiError> {
1084        match &self.backend {
1085            ProxyBackend::Embedded { runtime } => Ok(runtime.core().upsert(entry).await?),
1086            ProxyBackend::Remote { client } => {
1087                let client = Arc::clone(client);
1088                let entry = entry.clone();
1089                let add_client = Arc::clone(&client);
1090                tokio::task::spawn_blocking(move || {
1091                    add_client.proxy_add(
1092                        &entry.name,
1093                        entry.listen_port,
1094                        &entry.backend,
1095                        entry.allow_remote,
1096                    )
1097                })
1098                .await
1099                .map_err(map_join_error)??;
1100                let list = tokio::task::spawn_blocking(move || client.proxy_list())
1101                    .await
1102                    .map_err(map_join_error)??;
1103                parse_proxy_entries(list)
1104            }
1105        }
1106    }
1107
1108    pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, KoiError> {
1109        match &self.backend {
1110            ProxyBackend::Embedded { runtime } => Ok(runtime.core().remove(name).await?),
1111            ProxyBackend::Remote { client } => {
1112                let client = Arc::clone(client);
1113                let name = name.to_string();
1114                let remove_client = Arc::clone(&client);
1115                tokio::task::spawn_blocking(move || remove_client.proxy_remove(&name))
1116                    .await
1117                    .map_err(map_join_error)??;
1118                let list = tokio::task::spawn_blocking(move || client.proxy_list())
1119                    .await
1120                    .map_err(map_join_error)??;
1121                parse_proxy_entries(list)
1122            }
1123        }
1124    }
1125
1126    pub async fn start_all(&self) -> Result<(), KoiError> {
1127        match &self.backend {
1128            ProxyBackend::Embedded { runtime } => Ok(runtime.start_all().await?),
1129            ProxyBackend::Remote { .. } => Ok(()),
1130        }
1131    }
1132
1133    pub async fn stop_all(&self) {
1134        if let ProxyBackend::Embedded { runtime } = &self.backend {
1135            runtime.stop_all().await;
1136        }
1137    }
1138}
1139
1140fn parse_dns_lookup(json: serde_json::Value) -> Option<DnsLookupResult> {
1141    let name = json.get("name").and_then(|v| v.as_str())?.to_string();
1142    let source = json
1143        .get("source")
1144        .and_then(|v| v.as_str())
1145        .unwrap_or("unknown")
1146        .to_string();
1147    let ips = json.get("ips").and_then(|v| v.as_array()).map(|arr| {
1148        arr.iter()
1149            .filter_map(|ip| ip.as_str())
1150            .filter_map(|ip| ip.parse::<IpAddr>().ok())
1151            .collect::<Vec<_>>()
1152    })?;
1153    Some(DnsLookupResult { name, ips, source })
1154}
1155
1156fn parse_dns_entries(json: serde_json::Value) -> Result<Vec<DnsEntry>, KoiError> {
1157    let entries = json.get("entries").ok_or_else(|| {
1158        KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::other(
1159            "missing entries",
1160        )))
1161    })?;
1162    let entries = serde_json::from_value(entries.clone()).map_err(|e| {
1163        KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::new(
1164            std::io::ErrorKind::InvalidData,
1165            e.to_string(),
1166        )))
1167    })?;
1168    Ok(entries)
1169}
1170
1171fn parse_proxy_entries(json: serde_json::Value) -> Result<Vec<ProxyEntry>, KoiError> {
1172    let entries = json
1173        .get("entries")
1174        .ok_or_else(|| KoiError::Proxy(koi_proxy::ProxyError::Io("missing entries".to_string())))?
1175        .clone();
1176    serde_json::from_value(entries)
1177        .map_err(|e| KoiError::Proxy(koi_proxy::ProxyError::Io(e.to_string())))
1178}
1179
1180fn extract_capability_status(
1181    json: serde_json::Value,
1182) -> Option<koi_common::capability::CapabilityStatus> {
1183    let caps = json.get("capabilities")?.as_array()?;
1184    for cap in caps {
1185        if cap.get("name")?.as_str()? == "certmesh" {
1186            let name = cap.get("name")?.as_str()?.to_string();
1187            let summary = cap
1188                .get("summary")
1189                .and_then(|v| v.as_str())
1190                .unwrap_or("unknown")
1191                .to_string();
1192            let healthy = cap
1193                .get("healthy")
1194                .and_then(|v| v.as_bool())
1195                .unwrap_or(false);
1196            return Some(koi_common::capability::CapabilityStatus {
1197                name,
1198                summary,
1199                healthy,
1200            });
1201        }
1202    }
1203    None
1204}
1205
1206fn default_capability_status() -> koi_common::capability::CapabilityStatus {
1207    koi_common::capability::CapabilityStatus {
1208        name: "certmesh".to_string(),
1209        summary: "unknown".to_string(),
1210        healthy: false,
1211    }
1212}
1213
1214fn mdns_event_from_pipeline(json: serde_json::Value) -> Option<MdnsEvent> {
1215    if let Some(found) = json.get("found") {
1216        let record: ServiceRecord = serde_json::from_value(found.clone()).ok()?;
1217        return Some(MdnsEvent::Found(record));
1218    }
1219    if let Some(resolved) = json.get("resolved") {
1220        let record: ServiceRecord = serde_json::from_value(resolved.clone()).ok()?;
1221        return Some(MdnsEvent::Resolved(record));
1222    }
1223    if let Some(event) = json.get("event") {
1224        let kind: EventKind = serde_json::from_value(event.clone()).ok()?;
1225        let service = json
1226            .get("service")
1227            .cloned()
1228            .unwrap_or(serde_json::Value::Null);
1229        let record: ServiceRecord = serde_json::from_value(service).ok()?;
1230        return match kind {
1231            EventKind::Found => Some(MdnsEvent::Found(record)),
1232            EventKind::Resolved => Some(MdnsEvent::Resolved(record)),
1233            EventKind::Removed => Some(MdnsEvent::Removed {
1234                name: record.name,
1235                service_type: record.service_type,
1236            }),
1237        };
1238    }
1239    None
1240}
1241
1242#[cfg(test)]
1243mod tests {
1244    use super::*;
1245    use koi_common::posture::PostureLevel;
1246    use std::collections::HashMap;
1247
1248    fn rec(name: &str, txt: &[(&str, &str)]) -> ServiceRecord {
1249        ServiceRecord {
1250            name: name.to_string(),
1251            service_type: "_http._tcp".to_string(),
1252            host: Some(format!("{name}.local")),
1253            ip: Some("10.0.0.9".to_string()),
1254            port: Some(8443),
1255            txt: txt
1256                .iter()
1257                .map(|(k, v)| (k.to_string(), v.to_string()))
1258                .collect::<HashMap<_, _>>(),
1259        }
1260    }
1261
1262    #[test]
1263    fn fold_resolved_overwrites_found_for_txt_enrichment() {
1264        // Found arrives first (no TXT), then Resolved carries the trust hints.
1265        let peers = fold_peers([
1266            MdnsEvent::Found(rec("a", &[])),
1267            MdnsEvent::Resolved(rec("a", &[("fp", "CAFP"), ("posture", "authenticated")])),
1268        ]);
1269        assert_eq!(peers.len(), 1, "the two events collapse to one peer");
1270        assert_eq!(peers[0].level(), PostureLevel::Authenticated);
1271        assert_eq!(peers[0].fp.as_deref(), Some("CAFP"));
1272    }
1273
1274    #[test]
1275    fn fold_removed_drops_the_peer() {
1276        let peers = fold_peers([
1277            MdnsEvent::Found(rec("b", &[])),
1278            MdnsEvent::Removed {
1279                name: "b".to_string(),
1280                service_type: "_http._tcp".to_string(),
1281            },
1282        ]);
1283        assert!(peers.is_empty(), "a removed peer is not in the snapshot");
1284    }
1285
1286    #[test]
1287    fn fold_orders_peers_by_name() {
1288        let peers = fold_peers([
1289            MdnsEvent::Resolved(rec("z", &[])),
1290            MdnsEvent::Resolved(rec("a", &[])),
1291            MdnsEvent::Resolved(rec("m", &[])),
1292        ]);
1293        let names: Vec<_> = peers.iter().map(|p| p.record.name.clone()).collect();
1294        assert_eq!(names, vec!["a", "m", "z"]);
1295    }
1296
1297    #[test]
1298    fn fold_open_peer_has_open_posture() {
1299        let peers = fold_peers([MdnsEvent::Resolved(rec("plain", &[]))]);
1300        assert_eq!(peers.len(), 1);
1301        assert_eq!(peers[0].level(), PostureLevel::Open);
1302        assert!(!peers[0].is_secure());
1303    }
1304
1305    // ── participate (ADR-020 §13) ───────────────────────────────────
1306
1307    #[tokio::test]
1308    async fn participate_remote_handle_is_disabled() {
1309        let client = Arc::new(KoiClient::new("http://127.0.0.1:1"));
1310        let (tx, _) = broadcast::channel(8);
1311        let handle = KoiHandle::new_remote(client, tx, CancellationToken::new(), Vec::new());
1312        let router = axum::Router::new();
1313        let addr: std::net::SocketAddr = ([127, 0, 0, 1], 0).into();
1314        let err = handle
1315            .participate(router, addr, "_x._tcp", CancellationToken::new())
1316            .await
1317            .unwrap_err();
1318        assert!(matches!(err, KoiError::DisabledCapability(_)));
1319    }
1320
1321    #[tokio::test]
1322    async fn participate_open_node_serves_plaintext() {
1323        // certmesh on (but no CA → Open), mDNS off (participate just serves plain),
1324        // isolated data dir. The Open node serves the consumer's router in plaintext
1325        // with no posture branching by the caller.
1326        let dir = std::env::temp_dir().join(format!("koi-emb-participate-{}", std::process::id()));
1327        let _ = std::fs::remove_dir_all(&dir);
1328        let koi = crate::Builder::new()
1329            .data_dir(&dir)
1330            .service_mode(crate::ServiceMode::EmbeddedOnly)
1331            .mdns(false)
1332            .dns_enabled(false)
1333            .health(false)
1334            .certmesh(true)
1335            .proxy(false)
1336            .build()
1337            .expect("build");
1338        let handle = koi.start().await.expect("start");
1339
1340        let addr = {
1341            let l = tokio::net::TcpListener::bind(("127.0.0.1", 0))
1342                .await
1343                .unwrap();
1344            l.local_addr().unwrap()
1345        };
1346        let router = axum::Router::new().route("/ping", axum::routing::get(|| async { "pong" }));
1347        let cancel = CancellationToken::new();
1348        let _server = handle
1349            .participate(router, addr, "_koi-test._tcp", cancel.clone())
1350            .await
1351            .expect("participate");
1352        tokio::time::sleep(std::time::Duration::from_millis(75)).await;
1353
1354        let (status, body) = koi_certmesh::mtls::get(&addr.ip().to_string(), addr.port(), "/ping")
1355            .await
1356            .expect("plain GET to an Open participating node");
1357        assert_eq!(status, 200);
1358        assert_eq!(body, "pong");
1359
1360        cancel.cancel();
1361        handle.shutdown().await.expect("shutdown");
1362    }
1363
1364    // ── seal/open (ADR-020 §4) ──────────────────────────────────────
1365
1366    #[tokio::test]
1367    async fn seal_open_round_trip_on_open_node() {
1368        use koi_common::sealed::Confidentiality;
1369        let dir = std::env::temp_dir().join(format!("koi-emb-seal-{}", std::process::id()));
1370        let _ = std::fs::remove_dir_all(&dir);
1371        let koi = crate::Builder::new()
1372            .data_dir(&dir)
1373            .service_mode(crate::ServiceMode::EmbeddedOnly)
1374            .mdns(false)
1375            .dns_enabled(false)
1376            .health(false)
1377            .certmesh(true)
1378            .proxy(false)
1379            .build()
1380            .expect("build");
1381        let handle = koi.start().await.expect("start");
1382        let cm = handle.certmesh().expect("certmesh handle");
1383
1384        // Open node: seal is a passthrough (signed-not-encrypted); the same code path
1385        // round-trips the bytes back with an anonymous assurance.
1386        let sealed = cm.seal(b"hello seal").await.expect("seal");
1387        assert_eq!(sealed.confidentiality(), Confidentiality::None);
1388        let opened = cm.open(&sealed).await.expect("open");
1389        assert_eq!(opened.payload, b"hello seal");
1390        assert_eq!(opened.confidentiality, Confidentiality::None);
1391        assert!(
1392            opened.assurance.identity().is_none(),
1393            "an Open node's seal is anonymous, not a trusted identity"
1394        );
1395
1396        handle.shutdown().await.expect("shutdown");
1397    }
1398}