Skip to main content

koi_embedded/
handle.rs

1use std::net::IpAddr;
2use std::sync::Arc;
3
4use tokio::sync::{broadcast, mpsc, Mutex};
5use tokio::task::JoinHandle;
6use tokio_stream::wrappers::BroadcastStream;
7use tokio_util::sync::CancellationToken;
8
9use koi_client::KoiClient;
10use koi_common::capability::Capability;
11use koi_common::peer::Peer;
12use koi_common::types::{EventKind, ServiceRecord};
13use koi_config::state::DnsEntry;
14use koi_dns::{DnsLookupResult, DnsRuntime};
15use koi_health::{HealthCheck, HealthRuntime};
16use koi_mdns::protocol::{RegisterPayload, RegistrationResult};
17use koi_mdns::{BrowseSubscription as MdnsBrowseHandle, MdnsCore, MdnsEvent};
18use koi_proxy::{ProxyEntry, ProxyRuntime};
19
20use crate::{map_join_error, KoiError, KoiEvent};
21
22enum HandleBackend {
23    Embedded {
24        mdns: Option<Arc<MdnsCore>>,
25        dns: Option<Arc<DnsRuntime>>,
26        health: Option<Arc<HealthRuntime>>,
27        certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
28        proxy: Option<Arc<ProxyRuntime>>,
29        udp: Option<Arc<koi_udp::UdpRuntime>>,
30        runtime: Option<Arc<koi_runtime::RuntimeCore>>,
31    },
32    Remote {
33        client: Arc<KoiClient>,
34    },
35}
36
37pub struct KoiHandle {
38    backend: HandleBackend,
39    data_dir: Option<std::path::PathBuf>,
40    events: broadcast::Sender<KoiEvent>,
41    cancel: CancellationToken,
42    tasks: Vec<JoinHandle<()>>,
43    http_announce_id: Option<String>,
44}
45
46impl KoiHandle {
47    #[allow(clippy::too_many_arguments)]
48    pub(crate) fn new_embedded(
49        mdns: Option<Arc<MdnsCore>>,
50        dns: Option<Arc<DnsRuntime>>,
51        health: Option<Arc<HealthRuntime>>,
52        certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
53        proxy: Option<Arc<ProxyRuntime>>,
54        udp: Option<Arc<koi_udp::UdpRuntime>>,
55        runtime: Option<Arc<koi_runtime::RuntimeCore>>,
56        data_dir: Option<std::path::PathBuf>,
57        events: broadcast::Sender<KoiEvent>,
58        cancel: CancellationToken,
59        tasks: Vec<JoinHandle<()>>,
60        http_announce_id: Option<String>,
61    ) -> Self {
62        Self {
63            backend: HandleBackend::Embedded {
64                mdns,
65                dns,
66                health,
67                certmesh,
68                proxy,
69                udp,
70                runtime,
71            },
72            data_dir,
73            events,
74            cancel,
75            tasks,
76            http_announce_id,
77        }
78    }
79
80    pub(crate) fn new_remote(
81        client: Arc<KoiClient>,
82        events: broadcast::Sender<KoiEvent>,
83        cancel: CancellationToken,
84        tasks: Vec<JoinHandle<()>>,
85    ) -> Self {
86        Self {
87            backend: HandleBackend::Remote { client },
88            data_dir: None,
89            events,
90            cancel,
91            tasks,
92            http_announce_id: None,
93        }
94    }
95
96    pub fn events(&self) -> BroadcastStream<KoiEvent> {
97        BroadcastStream::new(self.events.subscribe())
98    }
99
100    /// Serve `router` on `addr` with the same-port posture dial (ADR-020 §5):
101    /// plain HTTP while this node is Open, mTLS once it is secure, flipping live
102    /// with **no dropped connections** as the posture changes. The consumer writes
103    /// one `serve` call and never branches on posture.
104    ///
105    /// Returns the supervisor's [`JoinHandle`]; the listener stops when this
106    /// handle's `cancel` is triggered (e.g. on [`shutdown`](Self::shutdown)) or
107    /// the passed `cancel` fires. Embedded only — a remote handle has no local
108    /// identity to serve mTLS with.
109    pub fn serve(
110        &self,
111        router: axum::Router,
112        addr: std::net::SocketAddr,
113        cancel: CancellationToken,
114    ) -> Result<JoinHandle<()>, KoiError> {
115        match &self.backend {
116            HandleBackend::Embedded { certmesh, .. } => {
117                let core = certmesh
118                    .as_ref()
119                    .ok_or(KoiError::DisabledCapability("certmesh"))?;
120                let core = Arc::clone(core);
121                Ok(tokio::spawn(async move {
122                    if let Err(e) = crate::serve::serve_adaptive(core, router, addr, cancel).await {
123                        tracing::error!(error = %e, "same-port serve failed to bind");
124                    }
125                }))
126            }
127            HandleBackend::Remote { .. } => {
128                Err(KoiError::DisabledCapability("certmesh (remote mode)"))
129            }
130        }
131    }
132
133    /// Become a fully-participating trusted service in one call (ADR-020 §13 — the
134    /// "3-line trusted service"):
135    ///
136    /// 1. acquire/maintain this node's identity (best-effort — an Open node with no
137    ///    way to enroll simply stays plaintext),
138    /// 2. announce `service_type` on the LAN at `addr`'s port with the node's
139    ///    posture stamped into the TXT, **kept current across posture flips**, and
140    /// 3. serve `router` on `addr` with the same-port dial ([`serve`](Self::serve)).
141    ///
142    /// The consumer never branches on posture and never wires identity, discovery,
143    /// and serving separately. Returns the serve supervisor's [`JoinHandle`].
144    /// Certificate *renewal* is handled by the certmesh background loops — enable
145    /// them with `Builder::certmesh_background(true)` on a long-running host.
146    /// Embedded only.
147    pub async fn participate(
148        &self,
149        router: axum::Router,
150        addr: std::net::SocketAddr,
151        service_type: &str,
152        cancel: CancellationToken,
153    ) -> Result<JoinHandle<()>, KoiError> {
154        let (certmesh, mdns) = match &self.backend {
155            HandleBackend::Embedded { certmesh, mdns, .. } => (
156                certmesh
157                    .as_ref()
158                    .ok_or(KoiError::DisabledCapability("certmesh"))?
159                    .clone(),
160                mdns.clone(),
161            ),
162            HandleBackend::Remote { .. } => {
163                return Err(KoiError::DisabledCapability("certmesh (remote mode)"))
164            }
165        };
166
167        // 1. Acquire/maintain identity. Open (no CA / not a member) stays plaintext.
168        let _ = certmesh.ensure_identity().await;
169
170        // 2. Announce with posture, refreshed on every flip so the LAN trust map
171        //    never goes stale (ADR-020 §13 "maintained across flips").
172        if let Some(mdns) = mdns {
173            spawn_participate_announce(
174                mdns,
175                Arc::clone(&certmesh),
176                service_type.to_string(),
177                addr.port(),
178                cancel.clone(),
179            );
180        } else {
181            tracing::debug!("participate: mDNS disabled — serving without announcing");
182        }
183
184        // 3. Serve with the same-port posture dial.
185        self.serve(router, addr, cancel)
186    }
187
188    pub fn subscribe(&self) -> broadcast::Receiver<KoiEvent> {
189        self.events.subscribe()
190    }
191
192    pub fn mdns(&self) -> Result<MdnsHandle, KoiError> {
193        match &self.backend {
194            HandleBackend::Embedded { mdns, .. } => {
195                let core = mdns.as_ref().ok_or(KoiError::DisabledCapability("mdns"))?;
196                Ok(MdnsHandle::new_embedded(
197                    Arc::clone(core),
198                    self.events.clone(),
199                ))
200            }
201            HandleBackend::Remote { client } => Ok(MdnsHandle::new_remote(
202                Arc::clone(client),
203                self.events.clone(),
204            )),
205        }
206    }
207
208    pub fn dns(&self) -> Result<DnsHandle, KoiError> {
209        match &self.backend {
210            HandleBackend::Embedded { dns, .. } => {
211                let runtime = dns.as_ref().ok_or(KoiError::DisabledCapability("dns"))?;
212                Ok(DnsHandle::new_embedded(Arc::clone(runtime)))
213            }
214            HandleBackend::Remote { client } => Ok(DnsHandle::new_remote(Arc::clone(client))),
215        }
216    }
217
218    pub fn health(&self) -> Result<HealthHandle, KoiError> {
219        match &self.backend {
220            HandleBackend::Embedded { health, .. } => {
221                let runtime = health
222                    .as_ref()
223                    .ok_or(KoiError::DisabledCapability("health"))?;
224                Ok(HealthHandle::new_embedded(Arc::clone(runtime)))
225            }
226            HandleBackend::Remote { client } => Ok(HealthHandle::new_remote(Arc::clone(client))),
227        }
228    }
229
230    pub fn certmesh(&self) -> Result<CertmeshHandle, KoiError> {
231        match &self.backend {
232            HandleBackend::Embedded { certmesh, .. } => {
233                let core = certmesh
234                    .as_ref()
235                    .ok_or(KoiError::DisabledCapability("certmesh"))?;
236                Ok(CertmeshHandle::new_embedded(Arc::clone(core)))
237            }
238            HandleBackend::Remote { client } => Ok(CertmeshHandle::new_remote(Arc::clone(client))),
239        }
240    }
241
242    /// Open the encrypted key-value vault for general-purpose secret storage.
243    ///
244    /// The vault uses platform credential binding (keyring) when available,
245    /// with a machine-bound fallback. Each call opens a fresh handle sharing
246    /// the same on-disk state.
247    pub fn vault(&self) -> Result<koi_crypto::vault::Vault, KoiError> {
248        let dir = self
249            .data_dir
250            .as_ref()
251            .ok_or(KoiError::DisabledCapability("vault (no data_dir)"))?;
252        koi_crypto::vault::Vault::open(dir)
253            .map_err(|e| KoiError::Io(std::io::Error::other(e.to_string())))
254    }
255
256    pub fn proxy(&self) -> Result<ProxyHandle, KoiError> {
257        match &self.backend {
258            HandleBackend::Embedded { proxy, .. } => {
259                let runtime = proxy
260                    .as_ref()
261                    .ok_or(KoiError::DisabledCapability("proxy"))?;
262                Ok(ProxyHandle::new_embedded(Arc::clone(runtime)))
263            }
264            HandleBackend::Remote { client } => Ok(ProxyHandle::new_remote(Arc::clone(client))),
265        }
266    }
267
268    /// Get the UDP runtime handle.
269    ///
270    /// Only available in embedded mode — remote mode does not support UDP bridging
271    /// (the remote daemon itself handles bindings).
272    pub fn udp(&self) -> Result<Arc<koi_udp::UdpRuntime>, KoiError> {
273        match &self.backend {
274            HandleBackend::Embedded { udp, .. } => {
275                let runtime = udp.as_ref().ok_or(KoiError::DisabledCapability("udp"))?;
276                Ok(Arc::clone(runtime))
277            }
278            HandleBackend::Remote { .. } => Err(KoiError::DisabledCapability("udp (remote mode)")),
279        }
280    }
281
282    /// Get the runtime adapter core.
283    ///
284    /// Only available in embedded mode when runtime is enabled.
285    pub fn runtime(&self) -> Result<Arc<koi_runtime::RuntimeCore>, KoiError> {
286        match &self.backend {
287            HandleBackend::Embedded { runtime, .. } => {
288                let core = runtime
289                    .as_ref()
290                    .ok_or(KoiError::DisabledCapability("runtime"))?;
291                Ok(Arc::clone(core))
292            }
293            HandleBackend::Remote { .. } => {
294                Err(KoiError::DisabledCapability("runtime (remote mode)"))
295            }
296        }
297    }
298
299    pub async fn shutdown(mut self) -> Result<(), KoiError> {
300        self.cancel.cancel();
301        for task in self.tasks.drain(..) {
302            let _ = task.await;
303        }
304
305        if let HandleBackend::Embedded {
306            mdns,
307            dns,
308            health,
309            proxy,
310            ..
311        } = &self.backend
312        {
313            if let Some(runtime) = proxy {
314                runtime.stop_all().await;
315            }
316            if let Some(runtime) = health {
317                let _ = runtime.stop().await;
318            }
319            if let Some(runtime) = dns {
320                let _ = runtime.stop().await;
321            }
322            if let Some(id) = &self.http_announce_id {
323                if let Some(core) = mdns {
324                    if let Err(e) = core.unregister(id) {
325                        tracing::warn!(error = %e, "Failed to withdraw HTTP mDNS announcement");
326                    }
327                }
328            }
329            if let Some(core) = mdns {
330                core.shutdown().await?;
331            }
332        }
333
334        Ok(())
335    }
336}
337
338pub struct KoiBrowseHandle {
339    backend: BrowseBackend,
340}
341
342enum BrowseBackend {
343    Embedded(MdnsBrowseHandle),
344    Remote(Mutex<mpsc::Receiver<MdnsEvent>>),
345}
346
347impl KoiBrowseHandle {
348    fn embedded(handle: MdnsBrowseHandle) -> Self {
349        Self {
350            backend: BrowseBackend::Embedded(handle),
351        }
352    }
353
354    fn remote(rx: mpsc::Receiver<MdnsEvent>) -> Self {
355        Self {
356            backend: BrowseBackend::Remote(Mutex::new(rx)),
357        }
358    }
359
360    pub async fn recv(&self) -> Option<MdnsEvent> {
361        match &self.backend {
362            BrowseBackend::Embedded(handle) => handle.recv().await,
363            BrowseBackend::Remote(rx) => rx.lock().await.recv().await,
364        }
365    }
366}
367
368/// Default discovery window (ADR-020 §8): long enough for mDNS resolution on a
369/// quiet LAN, short enough to stay responsive — a sane default so the common
370/// `discover(type)` call needs no tuning.
371pub const DEFAULT_DISCOVER_WINDOW: std::time::Duration = std::time::Duration::from_secs(2);
372
373pub struct MdnsHandle {
374    backend: MdnsBackend,
375    events: broadcast::Sender<KoiEvent>,
376}
377
378enum MdnsBackend {
379    Embedded { core: Arc<MdnsCore> },
380    Remote { client: Arc<KoiClient> },
381}
382
383impl MdnsHandle {
384    fn new_embedded(core: Arc<MdnsCore>, events: broadcast::Sender<KoiEvent>) -> Self {
385        Self {
386            backend: MdnsBackend::Embedded { core },
387            events,
388        }
389    }
390
391    fn new_remote(client: Arc<KoiClient>, events: broadcast::Sender<KoiEvent>) -> Self {
392        Self {
393            backend: MdnsBackend::Remote { client },
394            events,
395        }
396    }
397
398    pub fn core(&self) -> Result<Arc<MdnsCore>, KoiError> {
399        match &self.backend {
400            MdnsBackend::Embedded { core } => Ok(Arc::clone(core)),
401            MdnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("mdns")),
402        }
403    }
404
405    pub async fn browse(&self, service_type: &str) -> Result<KoiBrowseHandle, KoiError> {
406        match &self.backend {
407            MdnsBackend::Embedded { core } => {
408                let handle = core.subscribe_type(service_type).await?;
409                Ok(KoiBrowseHandle::embedded(handle))
410            }
411            MdnsBackend::Remote { client } => {
412                let (tx, rx) = mpsc::channel(64);
413                let client = Arc::clone(client);
414                let service_type = service_type.to_string();
415                tokio::task::spawn_blocking(move || {
416                    let stream = match client.browse_stream(&service_type) {
417                        Ok(stream) => stream,
418                        Err(_) => return,
419                    };
420                    for item in stream {
421                        let Ok(json) = item else {
422                            break;
423                        };
424                        if let Some(event) = mdns_event_from_pipeline(json) {
425                            if tx.blocking_send(event).is_err() {
426                                break;
427                            }
428                        }
429                    }
430                });
431                Ok(KoiBrowseHandle::remote(rx))
432            }
433        }
434    }
435
436    /// Discover peers of `service_type`, each enriched with its advertised trust
437    /// posture, mesh anchor, and identity expiry (ADR-020 §8) — the fleet-wide
438    /// trust-legibility primitive. A snapshot collected over
439    /// [`DEFAULT_DISCOVER_WINDOW`]; for a custom window use
440    /// [`discover_for`](Self::discover_for).
441    ///
442    /// The posture each peer carries is an **untrusted hint** (ADR-016 §2);
443    /// `certmesh().verify(..)` / mTLS adjudicates actual trust. Works in both
444    /// embedded and remote mode (it layers on [`browse`](Self::browse)).
445    pub async fn discover(&self, service_type: &str) -> Result<Vec<Peer>, KoiError> {
446        self.discover_for(service_type, DEFAULT_DISCOVER_WINDOW)
447            .await
448    }
449
450    /// Like [`discover`](Self::discover) with an explicit collection `window`.
451    pub async fn discover_for(
452        &self,
453        service_type: &str,
454        window: std::time::Duration,
455    ) -> Result<Vec<Peer>, KoiError> {
456        let browse = self.browse(service_type).await?;
457        let mut events = Vec::new();
458        let deadline = tokio::time::sleep(window);
459        tokio::pin!(deadline);
460        loop {
461            tokio::select! {
462                _ = &mut deadline => break,
463                ev = browse.recv() => match ev {
464                    Some(e) => events.push(e),
465                    None => break,
466                },
467            }
468        }
469        Ok(fold_peers(events))
470    }
471
472    pub async fn resolve(&self, name: &str) -> Result<ServiceRecord, KoiError> {
473        match &self.backend {
474            MdnsBackend::Embedded { core } => Ok(core.resolve(name).await?),
475            MdnsBackend::Remote { client } => {
476                let name = name.to_string();
477                let client = Arc::clone(client);
478                let record = tokio::task::spawn_blocking(move || client.resolve(&name))
479                    .await
480                    .map_err(map_join_error)??;
481                Ok(record)
482            }
483        }
484    }
485
486    pub fn register(&self, payload: RegisterPayload) -> Result<RegistrationResult, KoiError> {
487        match &self.backend {
488            MdnsBackend::Embedded { core } => Ok(core.register(payload)?),
489            MdnsBackend::Remote { client } => Ok(client.register(&payload)?),
490        }
491    }
492
493    pub fn unregister(&self, id: &str) -> Result<(), KoiError> {
494        match &self.backend {
495            MdnsBackend::Embedded { core } => Ok(core.unregister(id)?),
496            MdnsBackend::Remote { client } => Ok(client.unregister(id)?),
497        }
498    }
499
500    /// Subscribe to the live mDNS lifecycle-event stream (Found / Resolved / Removed).
501    ///
502    /// Available only in **embedded** mode, where there is a local `MdnsCore` to subscribe
503    /// to. In **client (remote)** mode there is no all-types lifecycle stream to forward —
504    /// the daemon's `/v1/mdns/subscribe` requires a service type — so this returns
505    /// [`KoiError::RemoteUnsupported`]. For a remote event stream, use
506    /// [`MdnsHandle::browse`] with a specific service type (it forwards the daemon's SSE).
507    ///
508    /// Previously this silently returned a dead receiver in remote mode (it yielded nothing,
509    /// forever); the typed error makes the limitation visible instead of swallowing it.
510    pub fn subscribe(&self) -> Result<broadcast::Receiver<MdnsEvent>, KoiError> {
511        match &self.backend {
512            MdnsBackend::Embedded { core } => Ok(core.subscribe()),
513            MdnsBackend::Remote { .. } => Err(KoiError::RemoteUnsupported(
514                "mdns subscribe — use mdns.browse(service_type) for a remote event stream",
515            )),
516        }
517    }
518
519    pub fn emit_event(&self, event: KoiEvent) {
520        let _ = self.events.send(event);
521    }
522}
523
524pub struct DnsHandle {
525    backend: DnsBackend,
526}
527
528enum DnsBackend {
529    Embedded { runtime: Arc<DnsRuntime> },
530    Remote { client: Arc<KoiClient> },
531}
532
533impl DnsHandle {
534    fn new_embedded(runtime: Arc<DnsRuntime>) -> Self {
535        Self {
536            backend: DnsBackend::Embedded { runtime },
537        }
538    }
539
540    fn new_remote(client: Arc<KoiClient>) -> Self {
541        Self {
542            backend: DnsBackend::Remote { client },
543        }
544    }
545
546    pub fn runtime(&self) -> Result<Arc<DnsRuntime>, KoiError> {
547        match &self.backend {
548            DnsBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
549            DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
550        }
551    }
552
553    pub fn core(&self) -> Result<Arc<koi_dns::DnsCore>, KoiError> {
554        match &self.backend {
555            DnsBackend::Embedded { runtime } => Ok(runtime.core()),
556            DnsBackend::Remote { .. } => Err(KoiError::DisabledCapability("dns")),
557        }
558    }
559
560    pub async fn lookup(
561        &self,
562        name: &str,
563        record_type: hickory_proto::rr::RecordType,
564    ) -> Option<DnsLookupResult> {
565        match &self.backend {
566            DnsBackend::Embedded { runtime } => runtime.core().lookup(name, record_type).await,
567            DnsBackend::Remote { client } => {
568                let name = name.to_string();
569                let client = Arc::clone(client);
570                let result =
571                    tokio::task::spawn_blocking(move || client.dns_lookup(&name, record_type))
572                        .await
573                        .ok()
574                        .and_then(|res| res.ok());
575                let json = match result {
576                    Some(json) => json,
577                    None => return None,
578                };
579                parse_dns_lookup(json)
580            }
581        }
582    }
583
584    pub fn list_names(&self) -> Vec<String> {
585        match &self.backend {
586            DnsBackend::Embedded { runtime } => runtime.core().list_names(),
587            DnsBackend::Remote { client } => {
588                let result = client.dns_list();
589                let Ok(json) = result else {
590                    return Vec::new();
591                };
592                json.get("names")
593                    .and_then(|v| v.as_array())
594                    .map(|arr| {
595                        arr.iter()
596                            .filter_map(|name| name.as_str().map(|s| s.to_string()))
597                            .collect()
598                    })
599                    .unwrap_or_default()
600            }
601        }
602    }
603
604    pub async fn start(&self) -> Result<bool, KoiError> {
605        match &self.backend {
606            DnsBackend::Embedded { runtime } => Ok(runtime.start().await?),
607            DnsBackend::Remote { client } => {
608                let client = Arc::clone(client);
609                let started = tokio::task::spawn_blocking(move || client.dns_start())
610                    .await
611                    .map_err(map_join_error)??
612                    .get("started")
613                    .and_then(|v| v.as_bool())
614                    .unwrap_or(false);
615                Ok(started)
616            }
617        }
618    }
619
620    pub async fn stop(&self) -> bool {
621        match &self.backend {
622            DnsBackend::Embedded { runtime } => runtime.stop().await,
623            DnsBackend::Remote { client } => {
624                let client = Arc::clone(client);
625                tokio::task::spawn_blocking(move || client.dns_stop())
626                    .await
627                    .ok()
628                    .and_then(|res| res.ok())
629                    .and_then(|json| json.get("stopped").and_then(|v| v.as_bool()))
630                    .unwrap_or(false)
631            }
632        }
633    }
634
635    pub fn add_entry(&self, entry: DnsEntry) -> Result<Vec<DnsEntry>, KoiError> {
636        match &self.backend {
637            DnsBackend::Embedded { runtime } => Ok(runtime.core().add_entry(entry)?),
638            DnsBackend::Remote { client } => {
639                let json = client.dns_add(&entry.name, &entry.ip, entry.ttl)?;
640                parse_dns_entries(json)
641            }
642        }
643    }
644
645    pub fn remove_entry(&self, name: &str) -> Result<Vec<DnsEntry>, KoiError> {
646        match &self.backend {
647            DnsBackend::Embedded { runtime } => {
648                Ok(runtime.core().remove_entry(name)?.unwrap_or_default())
649            }
650            DnsBackend::Remote { client } => {
651                let json = client.dns_remove(name)?;
652                parse_dns_entries(json)
653            }
654        }
655    }
656}
657
658pub struct HealthHandle {
659    backend: HealthBackend,
660}
661
662enum HealthBackend {
663    Embedded { runtime: Arc<HealthRuntime> },
664    Remote { client: Arc<KoiClient> },
665}
666
667impl HealthHandle {
668    fn new_embedded(runtime: Arc<HealthRuntime>) -> Self {
669        Self {
670            backend: HealthBackend::Embedded { runtime },
671        }
672    }
673
674    fn new_remote(client: Arc<KoiClient>) -> Self {
675        Self {
676            backend: HealthBackend::Remote { client },
677        }
678    }
679
680    pub fn core(&self) -> Result<Arc<koi_health::HealthCore>, KoiError> {
681        match &self.backend {
682            HealthBackend::Embedded { runtime } => Ok(runtime.core()),
683            HealthBackend::Remote { .. } => Err(KoiError::DisabledCapability("health")),
684        }
685    }
686
687    pub async fn status(&self) -> koi_health::HealthSnapshot {
688        match &self.backend {
689            HealthBackend::Embedded { runtime } => runtime.core().snapshot().await,
690            HealthBackend::Remote { client } => {
691                let client = Arc::clone(client);
692                let json = tokio::task::spawn_blocking(move || client.health_status())
693                    .await
694                    .ok()
695                    .and_then(|res| res.ok());
696                json.and_then(|json| serde_json::from_value(json).ok())
697                    .unwrap_or_else(|| koi_health::HealthSnapshot {
698                        machines: Vec::new(),
699                        services: Vec::new(),
700                    })
701            }
702        }
703    }
704
705    pub async fn add_check(&self, check: HealthCheck) -> Result<(), KoiError> {
706        match &self.backend {
707            HealthBackend::Embedded { runtime } => Ok(runtime.core().add_check(check).await?),
708            HealthBackend::Remote { client } => {
709                let client = Arc::clone(client);
710                let check = check.clone();
711                tokio::task::spawn_blocking(move || {
712                    client.health_add_check(
713                        &check.name,
714                        check.kind,
715                        &check.target,
716                        check.interval_secs,
717                        check.timeout_secs,
718                    )
719                })
720                .await
721                .map_err(map_join_error)??;
722                Ok(())
723            }
724        }
725    }
726
727    pub async fn remove_check(&self, name: &str) -> Result<(), KoiError> {
728        match &self.backend {
729            HealthBackend::Embedded { runtime } => Ok(runtime.core().remove_check(name).await?),
730            HealthBackend::Remote { client } => {
731                let client = Arc::clone(client);
732                let name = name.to_string();
733                tokio::task::spawn_blocking(move || client.health_remove_check(&name))
734                    .await
735                    .map_err(map_join_error)??;
736                Ok(())
737            }
738        }
739    }
740
741    pub async fn start(&self) -> Result<bool, KoiError> {
742        match &self.backend {
743            HealthBackend::Embedded { runtime } => Ok(runtime.start().await?),
744            HealthBackend::Remote { .. } => Ok(false),
745        }
746    }
747
748    pub async fn stop(&self) -> bool {
749        match &self.backend {
750            HealthBackend::Embedded { runtime } => runtime.stop().await,
751            HealthBackend::Remote { .. } => false,
752        }
753    }
754}
755
756pub struct CertmeshHandle {
757    backend: CertmeshBackend,
758}
759
760enum CertmeshBackend {
761    Embedded {
762        core: Arc<koi_certmesh::CertmeshCore>,
763    },
764    Remote {
765        client: Arc<KoiClient>,
766    },
767}
768
769impl CertmeshHandle {
770    fn new_embedded(core: Arc<koi_certmesh::CertmeshCore>) -> Self {
771        Self {
772            backend: CertmeshBackend::Embedded { core },
773        }
774    }
775
776    fn new_remote(client: Arc<KoiClient>) -> Self {
777        Self {
778            backend: CertmeshBackend::Remote { client },
779        }
780    }
781
782    pub fn core(&self) -> Result<Arc<koi_certmesh::CertmeshCore>, KoiError> {
783        match &self.backend {
784            CertmeshBackend::Embedded { core } => Ok(Arc::clone(core)),
785            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
786        }
787    }
788
789    pub async fn status(&self) -> koi_common::capability::CapabilityStatus {
790        match &self.backend {
791            CertmeshBackend::Embedded { core } => core.status().await,
792            CertmeshBackend::Remote { client } => {
793                let client = Arc::clone(client);
794                let json = tokio::task::spawn_blocking(move || client.unified_status())
795                    .await
796                    .ok()
797                    .and_then(|res| res.ok());
798                json.and_then(extract_capability_status)
799                    .unwrap_or_else(default_capability_status)
800            }
801        }
802    }
803
804    /// This node's current trust posture — the mode oracle (ADR-020 §0).
805    ///
806    /// Embedded only: a remote handle has no endpoint to query the daemon's
807    /// posture yet (that arrives with the diagnose/status surface in a later
808    /// ADR-020 phase), so it returns `DisabledCapability`.
809    pub fn posture(&self) -> Result<koi_common::posture::Posture, KoiError> {
810        match &self.backend {
811            CertmeshBackend::Embedded { core } => Ok(core.posture()),
812            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
813        }
814    }
815
816    /// This node's live identity, or `None` if it is Open (ADR-020 §7).
817    /// Read-only; embedded only.
818    pub async fn local_identity(&self) -> Result<Option<koi_certmesh::Identity>, KoiError> {
819        match &self.backend {
820            CertmeshBackend::Embedded { core } => Ok(core.local_identity().await),
821            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
822        }
823    }
824
825    /// Ensure this node holds a current identity, then return it (ADR-020 §7).
826    /// Idempotent and mode-transparent; embedded only.
827    pub async fn ensure_identity(&self) -> Result<Option<koi_certmesh::Identity>, KoiError> {
828        match &self.backend {
829            CertmeshBackend::Embedded { core } => Ok(core.ensure_identity().await),
830            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
831        }
832    }
833
834    /// Sign `bytes` into an `Envelope` (ADR-020 §3). Mode-transparent: a
835    /// freshness-stamped passthrough when Open, ES256-signed when Authenticated.
836    /// Embedded only.
837    pub async fn sign(&self, bytes: &[u8]) -> Result<koi_common::envelope::Envelope, KoiError> {
838        match &self.backend {
839            CertmeshBackend::Embedded { core } => Ok(core.sign(bytes).await),
840            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
841        }
842    }
843
844    /// Verify an `Envelope`, returning an `Assurance` (ADR-020 §3). Read a trusted
845    /// identity only via `Assurance::identity()`. Embedded only.
846    pub async fn verify(
847        &self,
848        env: &koi_common::envelope::Envelope,
849    ) -> Result<koi_common::envelope::Assurance, KoiError> {
850        match &self.backend {
851            CertmeshBackend::Embedded { core } => Ok(core.verify(env).await),
852            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
853        }
854    }
855
856    /// Seal `bytes` into a `Sealed` (ADR-020 §4). The confidentiality rung, today a
857    /// signed-not-encrypted passthrough; the consumer codes against the final API
858    /// now. Embedded only.
859    pub async fn seal(&self, bytes: &[u8]) -> Result<koi_common::sealed::Sealed, KoiError> {
860        match &self.backend {
861            CertmeshBackend::Embedded { core } => Ok(core.seal(bytes).await),
862            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
863        }
864    }
865
866    /// Open a `Sealed` → `Opened` (recovered bytes + trust state, ADR-020 §4). A
867    /// tampered/rejected message errors rather than yielding bytes. Embedded only.
868    pub async fn open(
869        &self,
870        sealed: &koi_common::sealed::Sealed,
871    ) -> Result<koi_common::sealed::Opened, KoiError> {
872        match &self.backend {
873            CertmeshBackend::Embedded { core } => Ok(core.open(sealed).await?),
874            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
875        }
876    }
877
878    /// Run the trust-doctor (ADR-020 §13) → a structured `TrustDiagnosis`: posture,
879    /// identity + renewal health, on-disk-leaf integrity, self-revocation, and the
880    /// CA trust-install state, each with an exact remedy. `is_red()`/`exit_code()`
881    /// fail loud. Embedded only.
882    pub async fn diagnose(&self) -> Result<koi_common::diagnosis::TrustDiagnosis, KoiError> {
883        match &self.backend {
884            CertmeshBackend::Embedded { core } => Ok(core.diagnose().await),
885            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
886        }
887    }
888
889    /// Build a posture-keyed client to a discovered [`Peer`] (ADR-020 §6): plain
890    /// HTTP to an Open peer, mTLS to a secure peer — the caller writes one code
891    /// path. Embedded only (a remote handle has no local identity to present).
892    ///
893    /// Errors loudly (not via an opaque handshake failure) when the peer requires
894    /// authentication but this node is Open, or when the peer anchors to a
895    /// different mesh — see [`koi_certmesh::CertmeshCore::client_for`].
896    pub async fn client_for(&self, peer: &Peer) -> Result<koi_certmesh::PeerClient, KoiError> {
897        match &self.backend {
898            CertmeshBackend::Embedded { core } => Ok(core.client_for(peer).await?),
899            CertmeshBackend::Remote { .. } => Err(KoiError::DisabledCapability("certmesh")),
900        }
901    }
902}
903
904/// Announce this node's `service_type` on `port` with its current posture stamped
905/// into the TXT (ADR-020 §8). Returns the registration id, or `None` if mDNS
906/// registration failed. Used by [`participate`](KoiHandle::participate).
907async fn announce_once(
908    mdns: &Arc<MdnsCore>,
909    certmesh: &Arc<koi_certmesh::CertmeshCore>,
910    hostname: &str,
911    service_type: &str,
912    port: u16,
913) -> Option<String> {
914    let id = certmesh.local_identity().await;
915    let mut txt = std::collections::HashMap::new();
916    koi_common::peer::stamp(
917        &mut txt,
918        certmesh.posture(),
919        id.as_ref().map(|i| i.ca_fingerprint.as_str()),
920        id.as_ref().map(|i| i.renewal.expires_at),
921    );
922    let payload = RegisterPayload {
923        name: hostname.to_string(),
924        service_type: service_type.to_string(),
925        port,
926        ip: None,
927        lease_secs: None,
928        txt,
929    };
930    match mdns.register(payload) {
931        Ok(result) => Some(result.id),
932        Err(e) => {
933            tracing::warn!(error = %e, "participate: mDNS announce failed");
934            None
935        }
936    }
937}
938
939/// Maintain a posture-stamped mDNS announcement across posture flips until
940/// `cancel` (ADR-020 §13). Re-announces on every transition so a peer discovering
941/// this node always reads its *current* posture, then withdraws the record on
942/// shutdown.
943fn spawn_participate_announce(
944    mdns: Arc<MdnsCore>,
945    certmesh: Arc<koi_certmesh::CertmeshCore>,
946    service_type: String,
947    port: u16,
948    cancel: CancellationToken,
949) {
950    tokio::spawn(async move {
951        let hostname = hostname::get()
952            .ok()
953            .and_then(|os| os.into_string().ok())
954            .unwrap_or_else(|| "unknown".to_string());
955        let mut posture_rx = certmesh.watch_posture();
956        let mut current_id = announce_once(&mdns, &certmesh, &hostname, &service_type, port).await;
957        loop {
958            tokio::select! {
959                _ = cancel.cancelled() => break,
960                changed = posture_rx.changed() => {
961                    if changed.is_err() {
962                        break; // the certmesh core was dropped
963                    }
964                    // Posture flipped → re-announce so the advertised posture is current.
965                    if let Some(old) = current_id.take() {
966                        let _ = mdns.unregister(&old);
967                    }
968                    current_id =
969                        announce_once(&mdns, &certmesh, &hostname, &service_type, port).await;
970                }
971            }
972        }
973        if let Some(id) = current_id {
974            let _ = mdns.unregister(&id);
975        }
976    });
977}
978
979/// Fold a stream of mDNS lifecycle events into a deduplicated peer snapshot
980/// (ADR-020 §8). Resolved records (which carry TXT, hence the trust hints)
981/// overwrite an earlier Found for the same name; a Removed drops it. Ordered by
982/// name for deterministic output. Pure — unit-tested without the network.
983fn fold_peers(events: impl IntoIterator<Item = MdnsEvent>) -> Vec<Peer> {
984    use std::collections::BTreeMap;
985    let mut by_name: BTreeMap<String, ServiceRecord> = BTreeMap::new();
986    for ev in events {
987        match ev {
988            MdnsEvent::Found(rec) => {
989                by_name.entry(rec.name.clone()).or_insert(rec);
990            }
991            MdnsEvent::Resolved(rec) => {
992                by_name.insert(rec.name.clone(), rec);
993            }
994            MdnsEvent::Removed { name, .. } => {
995                by_name.remove(&name);
996            }
997        }
998    }
999    by_name.into_values().map(Peer::from_record).collect()
1000}
1001
1002pub struct ProxyHandle {
1003    backend: ProxyBackend,
1004}
1005
1006enum ProxyBackend {
1007    Embedded { runtime: Arc<ProxyRuntime> },
1008    Remote { client: Arc<KoiClient> },
1009}
1010
1011impl ProxyHandle {
1012    fn new_embedded(runtime: Arc<ProxyRuntime>) -> Self {
1013        Self {
1014            backend: ProxyBackend::Embedded { runtime },
1015        }
1016    }
1017
1018    fn new_remote(client: Arc<KoiClient>) -> Self {
1019        Self {
1020            backend: ProxyBackend::Remote { client },
1021        }
1022    }
1023
1024    pub fn runtime(&self) -> Result<Arc<ProxyRuntime>, KoiError> {
1025        match &self.backend {
1026            ProxyBackend::Embedded { runtime } => Ok(Arc::clone(runtime)),
1027            ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
1028        }
1029    }
1030
1031    pub fn core(&self) -> Result<Arc<koi_proxy::ProxyCore>, KoiError> {
1032        match &self.backend {
1033            ProxyBackend::Embedded { runtime } => Ok(runtime.core()),
1034            ProxyBackend::Remote { .. } => Err(KoiError::DisabledCapability("proxy")),
1035        }
1036    }
1037
1038    pub async fn entries(&self) -> Vec<ProxyEntry> {
1039        match &self.backend {
1040            ProxyBackend::Embedded { runtime } => runtime.core().entries().await,
1041            ProxyBackend::Remote { client } => {
1042                let client = Arc::clone(client);
1043                tokio::task::spawn_blocking(move || client.proxy_list())
1044                    .await
1045                    .ok()
1046                    .and_then(|res| res.ok())
1047                    .and_then(|json| parse_proxy_entries(json).ok())
1048                    .unwrap_or_default()
1049            }
1050        }
1051    }
1052
1053    pub async fn upsert(&self, entry: ProxyEntry) -> Result<Vec<ProxyEntry>, KoiError> {
1054        match &self.backend {
1055            ProxyBackend::Embedded { runtime } => Ok(runtime.core().upsert(entry).await?),
1056            ProxyBackend::Remote { client } => {
1057                let client = Arc::clone(client);
1058                let entry = entry.clone();
1059                let add_client = Arc::clone(&client);
1060                tokio::task::spawn_blocking(move || {
1061                    add_client.proxy_add(
1062                        &entry.name,
1063                        entry.listen_port,
1064                        &entry.backend,
1065                        entry.allow_remote,
1066                    )
1067                })
1068                .await
1069                .map_err(map_join_error)??;
1070                let list = tokio::task::spawn_blocking(move || client.proxy_list())
1071                    .await
1072                    .map_err(map_join_error)??;
1073                parse_proxy_entries(list)
1074            }
1075        }
1076    }
1077
1078    pub async fn remove(&self, name: &str) -> Result<Vec<ProxyEntry>, KoiError> {
1079        match &self.backend {
1080            ProxyBackend::Embedded { runtime } => Ok(runtime.core().remove(name).await?),
1081            ProxyBackend::Remote { client } => {
1082                let client = Arc::clone(client);
1083                let name = name.to_string();
1084                let remove_client = Arc::clone(&client);
1085                tokio::task::spawn_blocking(move || remove_client.proxy_remove(&name))
1086                    .await
1087                    .map_err(map_join_error)??;
1088                let list = tokio::task::spawn_blocking(move || client.proxy_list())
1089                    .await
1090                    .map_err(map_join_error)??;
1091                parse_proxy_entries(list)
1092            }
1093        }
1094    }
1095
1096    pub async fn start_all(&self) -> Result<(), KoiError> {
1097        match &self.backend {
1098            ProxyBackend::Embedded { runtime } => Ok(runtime.start_all().await?),
1099            ProxyBackend::Remote { .. } => Ok(()),
1100        }
1101    }
1102
1103    pub async fn stop_all(&self) {
1104        if let ProxyBackend::Embedded { runtime } = &self.backend {
1105            runtime.stop_all().await;
1106        }
1107    }
1108}
1109
1110fn parse_dns_lookup(json: serde_json::Value) -> Option<DnsLookupResult> {
1111    let name = json.get("name").and_then(|v| v.as_str())?.to_string();
1112    let source = json
1113        .get("source")
1114        .and_then(|v| v.as_str())
1115        .unwrap_or("unknown")
1116        .to_string();
1117    let ips = json.get("ips").and_then(|v| v.as_array()).map(|arr| {
1118        arr.iter()
1119            .filter_map(|ip| ip.as_str())
1120            .filter_map(|ip| ip.parse::<IpAddr>().ok())
1121            .collect::<Vec<_>>()
1122    })?;
1123    Some(DnsLookupResult { name, ips, source })
1124}
1125
1126fn parse_dns_entries(json: serde_json::Value) -> Result<Vec<DnsEntry>, KoiError> {
1127    let entries = json.get("entries").ok_or_else(|| {
1128        KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::other(
1129            "missing entries",
1130        )))
1131    })?;
1132    let entries = serde_json::from_value(entries.clone()).map_err(|e| {
1133        KoiError::Dns(koi_dns::DnsError::Io(std::io::Error::new(
1134            std::io::ErrorKind::InvalidData,
1135            e.to_string(),
1136        )))
1137    })?;
1138    Ok(entries)
1139}
1140
1141fn parse_proxy_entries(json: serde_json::Value) -> Result<Vec<ProxyEntry>, KoiError> {
1142    let entries = json
1143        .get("entries")
1144        .ok_or_else(|| KoiError::Proxy(koi_proxy::ProxyError::Io("missing entries".to_string())))?
1145        .clone();
1146    serde_json::from_value(entries)
1147        .map_err(|e| KoiError::Proxy(koi_proxy::ProxyError::Io(e.to_string())))
1148}
1149
1150fn extract_capability_status(
1151    json: serde_json::Value,
1152) -> Option<koi_common::capability::CapabilityStatus> {
1153    let caps = json.get("capabilities")?.as_array()?;
1154    for cap in caps {
1155        if cap.get("name")?.as_str()? == "certmesh" {
1156            let name = cap.get("name")?.as_str()?.to_string();
1157            let summary = cap
1158                .get("summary")
1159                .and_then(|v| v.as_str())
1160                .unwrap_or("unknown")
1161                .to_string();
1162            let healthy = cap
1163                .get("healthy")
1164                .and_then(|v| v.as_bool())
1165                .unwrap_or(false);
1166            return Some(koi_common::capability::CapabilityStatus {
1167                name,
1168                summary,
1169                healthy,
1170            });
1171        }
1172    }
1173    None
1174}
1175
1176fn default_capability_status() -> koi_common::capability::CapabilityStatus {
1177    koi_common::capability::CapabilityStatus {
1178        name: "certmesh".to_string(),
1179        summary: "unknown".to_string(),
1180        healthy: false,
1181    }
1182}
1183
1184fn mdns_event_from_pipeline(json: serde_json::Value) -> Option<MdnsEvent> {
1185    if let Some(found) = json.get("found") {
1186        let record: ServiceRecord = serde_json::from_value(found.clone()).ok()?;
1187        return Some(MdnsEvent::Found(record));
1188    }
1189    if let Some(resolved) = json.get("resolved") {
1190        let record: ServiceRecord = serde_json::from_value(resolved.clone()).ok()?;
1191        return Some(MdnsEvent::Resolved(record));
1192    }
1193    if let Some(event) = json.get("event") {
1194        let kind: EventKind = serde_json::from_value(event.clone()).ok()?;
1195        let service = json
1196            .get("service")
1197            .cloned()
1198            .unwrap_or(serde_json::Value::Null);
1199        let record: ServiceRecord = serde_json::from_value(service).ok()?;
1200        return match kind {
1201            EventKind::Found => Some(MdnsEvent::Found(record)),
1202            EventKind::Resolved => Some(MdnsEvent::Resolved(record)),
1203            EventKind::Removed => Some(MdnsEvent::Removed {
1204                name: record.name,
1205                service_type: record.service_type,
1206            }),
1207        };
1208    }
1209    None
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214    use super::*;
1215    use koi_common::posture::PostureLevel;
1216    use std::collections::HashMap;
1217
1218    fn rec(name: &str, txt: &[(&str, &str)]) -> ServiceRecord {
1219        ServiceRecord {
1220            name: name.to_string(),
1221            service_type: "_http._tcp".to_string(),
1222            host: Some(format!("{name}.local")),
1223            ip: Some("10.0.0.9".to_string()),
1224            port: Some(8443),
1225            txt: txt
1226                .iter()
1227                .map(|(k, v)| (k.to_string(), v.to_string()))
1228                .collect::<HashMap<_, _>>(),
1229        }
1230    }
1231
1232    #[test]
1233    fn fold_resolved_overwrites_found_for_txt_enrichment() {
1234        // Found arrives first (no TXT), then Resolved carries the trust hints.
1235        let peers = fold_peers([
1236            MdnsEvent::Found(rec("a", &[])),
1237            MdnsEvent::Resolved(rec("a", &[("fp", "CAFP"), ("posture", "authenticated")])),
1238        ]);
1239        assert_eq!(peers.len(), 1, "the two events collapse to one peer");
1240        assert_eq!(peers[0].level(), PostureLevel::Authenticated);
1241        assert_eq!(peers[0].fp.as_deref(), Some("CAFP"));
1242    }
1243
1244    #[test]
1245    fn fold_removed_drops_the_peer() {
1246        let peers = fold_peers([
1247            MdnsEvent::Found(rec("b", &[])),
1248            MdnsEvent::Removed {
1249                name: "b".to_string(),
1250                service_type: "_http._tcp".to_string(),
1251            },
1252        ]);
1253        assert!(peers.is_empty(), "a removed peer is not in the snapshot");
1254    }
1255
1256    #[test]
1257    fn fold_orders_peers_by_name() {
1258        let peers = fold_peers([
1259            MdnsEvent::Resolved(rec("z", &[])),
1260            MdnsEvent::Resolved(rec("a", &[])),
1261            MdnsEvent::Resolved(rec("m", &[])),
1262        ]);
1263        let names: Vec<_> = peers.iter().map(|p| p.record.name.clone()).collect();
1264        assert_eq!(names, vec!["a", "m", "z"]);
1265    }
1266
1267    #[test]
1268    fn fold_open_peer_has_open_posture() {
1269        let peers = fold_peers([MdnsEvent::Resolved(rec("plain", &[]))]);
1270        assert_eq!(peers.len(), 1);
1271        assert_eq!(peers[0].level(), PostureLevel::Open);
1272        assert!(!peers[0].is_secure());
1273    }
1274
1275    // ── participate (ADR-020 §13) ───────────────────────────────────
1276
1277    #[tokio::test]
1278    async fn participate_remote_handle_is_disabled() {
1279        let client = Arc::new(KoiClient::new("http://127.0.0.1:1"));
1280        let (tx, _) = broadcast::channel(8);
1281        let handle = KoiHandle::new_remote(client, tx, CancellationToken::new(), Vec::new());
1282        let router = axum::Router::new();
1283        let addr: std::net::SocketAddr = ([127, 0, 0, 1], 0).into();
1284        let err = handle
1285            .participate(router, addr, "_x._tcp", CancellationToken::new())
1286            .await
1287            .unwrap_err();
1288        assert!(matches!(err, KoiError::DisabledCapability(_)));
1289    }
1290
1291    #[tokio::test]
1292    async fn participate_open_node_serves_plaintext() {
1293        // certmesh on (but no CA → Open), mDNS off (participate just serves plain),
1294        // isolated data dir. The Open node serves the consumer's router in plaintext
1295        // with no posture branching by the caller.
1296        let dir = std::env::temp_dir().join(format!("koi-emb-participate-{}", std::process::id()));
1297        let _ = std::fs::remove_dir_all(&dir);
1298        let koi = crate::Builder::new()
1299            .data_dir(&dir)
1300            .service_mode(crate::ServiceMode::EmbeddedOnly)
1301            .mdns(false)
1302            .dns_enabled(false)
1303            .health(false)
1304            .certmesh(true)
1305            .proxy(false)
1306            .build()
1307            .expect("build");
1308        let handle = koi.start().await.expect("start");
1309
1310        let addr = {
1311            let l = tokio::net::TcpListener::bind(("127.0.0.1", 0))
1312                .await
1313                .unwrap();
1314            l.local_addr().unwrap()
1315        };
1316        let router = axum::Router::new().route("/ping", axum::routing::get(|| async { "pong" }));
1317        let cancel = CancellationToken::new();
1318        let _server = handle
1319            .participate(router, addr, "_koi-test._tcp", cancel.clone())
1320            .await
1321            .expect("participate");
1322        tokio::time::sleep(std::time::Duration::from_millis(75)).await;
1323
1324        let (status, body) = koi_certmesh::mtls::get(&addr.ip().to_string(), addr.port(), "/ping")
1325            .await
1326            .expect("plain GET to an Open participating node");
1327        assert_eq!(status, 200);
1328        assert_eq!(body, "pong");
1329
1330        cancel.cancel();
1331        handle.shutdown().await.expect("shutdown");
1332    }
1333
1334    // ── seal/open (ADR-020 §4) ──────────────────────────────────────
1335
1336    #[tokio::test]
1337    async fn seal_open_round_trip_on_open_node() {
1338        use koi_common::sealed::Confidentiality;
1339        let dir = std::env::temp_dir().join(format!("koi-emb-seal-{}", std::process::id()));
1340        let _ = std::fs::remove_dir_all(&dir);
1341        let koi = crate::Builder::new()
1342            .data_dir(&dir)
1343            .service_mode(crate::ServiceMode::EmbeddedOnly)
1344            .mdns(false)
1345            .dns_enabled(false)
1346            .health(false)
1347            .certmesh(true)
1348            .proxy(false)
1349            .build()
1350            .expect("build");
1351        let handle = koi.start().await.expect("start");
1352        let cm = handle.certmesh().expect("certmesh handle");
1353
1354        // Open node: seal is a passthrough (signed-not-encrypted); the same code path
1355        // round-trips the bytes back with an anonymous assurance.
1356        let sealed = cm.seal(b"hello seal").await.expect("seal");
1357        assert_eq!(sealed.confidentiality(), Confidentiality::None);
1358        let opened = cm.open(&sealed).await.expect("open");
1359        assert_eq!(opened.payload, b"hello seal");
1360        assert_eq!(opened.confidentiality, Confidentiality::None);
1361        assert!(
1362            opened.assurance.identity().is_none(),
1363            "an Open node's seal is anonymous, not a trusted identity"
1364        );
1365
1366        handle.shutdown().await.expect("shutdown");
1367    }
1368}