Skip to main content

koi_compose/
bridges.rs

1//! Bridge implementations that wrap domain cores and implement the cross-domain
2//! integration traits from `koi_common::integration`.
3//!
4//! These bridges are the only place where domains "see" each other — through the
5//! composition layer's wiring, never a direct domain→domain dependency. Moved here from
6//! the binary's `integrations.rs` (P07) so the daemon, the Windows service, and
7//! koi-embedded share one copy instead of three.
8
9use std::collections::{HashMap, HashSet};
10use std::net::IpAddr;
11use std::sync::{Arc, RwLock};
12
13use tokio_util::sync::CancellationToken;
14
15use koi_common::integration;
16use koi_common::types::{ServiceRecord, META_QUERY};
17
18// ── CertmeshBridge ─────────────────────────────────────────────────
19
20pub struct CertmeshBridge {
21    core: Arc<koi_certmesh::CertmeshCore>,
22}
23
24impl CertmeshBridge {
25    pub fn new(core: Arc<koi_certmesh::CertmeshCore>) -> Arc<Self> {
26        Arc::new(Self { core })
27    }
28}
29
30impl integration::CertmeshSnapshot for CertmeshBridge {
31    fn active_members(&self) -> Vec<integration::MemberSummary> {
32        let roster_path = self.core.paths().roster_path();
33        let Ok(roster) = koi_certmesh::roster::load_roster(&roster_path) else {
34            return Vec::new();
35        };
36        roster
37            .members
38            .into_iter()
39            .filter(|m| m.status == koi_certmesh::roster::MemberStatus::Active)
40            .map(|m| integration::MemberSummary {
41                hostname: m.hostname,
42                sans: m.cert_sans,
43                cert_expires: Some(m.cert_expires),
44                last_seen: m.last_seen,
45                status: "active".to_string(),
46                proxy_entries: m
47                    .proxy_entries
48                    .into_iter()
49                    .map(|p| integration::ProxyConfigSummary {
50                        name: p.name,
51                        listen_port: p.listen_port,
52                        backend: p.backend,
53                        allow_remote: p.allow_remote,
54                    })
55                    .collect(),
56            })
57            .collect()
58    }
59}
60
61// ── MdnsBridge ─────────────────────────────────────────────────────
62
63/// Maintains a polled cache of mDNS service records and exposes them
64/// through the `MdnsSnapshot` trait.
65pub struct MdnsBridge {
66    records: Arc<RwLock<HashMap<String, HashMap<String, ServiceRecord>>>>,
67    cancel: CancellationToken,
68}
69
70impl MdnsBridge {
71    /// Spawn a background browse task that keeps the cache warm.
72    pub async fn spawn(core: Arc<koi_mdns::MdnsCore>) -> Arc<Self> {
73        let records = Arc::new(RwLock::new(HashMap::new()));
74        let cancel = CancellationToken::new();
75
76        let meta_core = Arc::clone(&core);
77        let meta_records = Arc::clone(&records);
78        let meta_cancel = cancel.clone();
79        tokio::spawn(async move {
80            if let Ok(handle) = meta_core.subscribe_type(META_QUERY).await {
81                run_meta_browse(meta_core, handle, meta_records, meta_cancel).await;
82            }
83        });
84
85        Arc::new(Self { records, cancel })
86    }
87
88    fn snapshot_records(&self) -> Vec<ServiceRecord> {
89        let guard = self.records.read().unwrap_or_else(|e| e.into_inner());
90        guard
91            .values()
92            .flat_map(|map| map.values().cloned())
93            .collect()
94    }
95}
96
97impl Drop for MdnsBridge {
98    fn drop(&mut self) {
99        self.cancel.cancel();
100    }
101}
102
103impl integration::MdnsSnapshot for MdnsBridge {
104    fn host_ips(&self) -> HashMap<String, IpAddr> {
105        let records = self.snapshot_records();
106        let mut map = HashMap::new();
107        for record in &records {
108            let Some(host) = record.host.as_deref() else {
109                continue;
110            };
111            let Some(ip) = record.ip.as_deref().and_then(|ip| ip.parse().ok()) else {
112                continue;
113            };
114            let hostname = host.trim_end_matches('.').trim_end_matches(".local");
115            if !hostname.is_empty() {
116                map.insert(hostname.to_string(), ip);
117            }
118        }
119        map
120    }
121
122    fn cached_records(&self) -> Vec<ServiceRecord> {
123        self.snapshot_records()
124    }
125}
126
127// ── DnsBridge ──────────────────────────────────────────────────────
128
129pub struct DnsBridge {
130    runtime: Arc<koi_dns::DnsRuntime>,
131}
132
133impl DnsBridge {
134    pub fn new(runtime: Arc<koi_dns::DnsRuntime>) -> Arc<Self> {
135        Arc::new(Self { runtime })
136    }
137}
138
139impl integration::DnsProbe for DnsBridge {
140    fn resolve_local(&self, name: &str) -> Option<Vec<IpAddr>> {
141        use hickory_proto::rr::RecordType;
142        let core = self.runtime.core();
143        let result = core
144            .resolve_local(name, RecordType::A)
145            .or_else(|| core.resolve_local(name, RecordType::AAAA));
146        result.map(|r| r.ips)
147    }
148}
149
150// ── ProxyBridge ────────────────────────────────────────────────────
151
152pub struct ProxyBridge {
153    _core: Arc<koi_proxy::ProxyCore>,
154}
155
156impl ProxyBridge {
157    pub fn new(core: Arc<koi_proxy::ProxyCore>) -> Arc<Self> {
158        Arc::new(Self { _core: core })
159    }
160}
161
162impl integration::ProxySnapshot for ProxyBridge {
163    fn entries(&self) -> Vec<integration::ProxyEntrySummary> {
164        // Use the config module to load entries (sync operation is fine here).
165        let Ok(entries) = koi_proxy::config::load_entries() else {
166            return Vec::new();
167        };
168        entries
169            .into_iter()
170            .map(|e| integration::ProxyEntrySummary {
171                name: e.name,
172                listen_port: e.listen_port,
173                backend: e.backend,
174            })
175            .collect()
176    }
177}
178
179// ── AliasFeedbackBridge ────────────────────────────────────────────
180
181pub struct AliasFeedbackBridge {
182    core: Arc<koi_certmesh::CertmeshCore>,
183}
184
185impl AliasFeedbackBridge {
186    pub fn new(core: Arc<koi_certmesh::CertmeshCore>) -> Arc<Self> {
187        Arc::new(Self { core })
188    }
189}
190
191impl integration::AliasFeedback for AliasFeedbackBridge {
192    fn record_alias(&self, hostname: &str, alias: &str) {
193        let core = Arc::clone(&self.core);
194        let hostname = hostname.to_string();
195        let alias = alias.to_string();
196        // Fire and forget — alias feedback is best-effort.
197        tokio::spawn(async move {
198            let _ = core.add_alias_sans(&hostname, &[alias]).await;
199        });
200    }
201}
202
203// ── mDNS browse helpers (ported from koi-dns resolver.rs) ──────────
204
205async fn run_meta_browse(
206    core: Arc<koi_mdns::MdnsCore>,
207    handle: koi_mdns::BrowseSubscription,
208    records: Arc<RwLock<HashMap<String, HashMap<String, ServiceRecord>>>>,
209    cancel: CancellationToken,
210) {
211    // Dedup: browse each discovered service type once. This is plain
212    // bookkeeping now, not the old "never respawn" workaround — concurrent
213    // subscriptions share one real browse and survive resolve/subscriber churn.
214    let mut seen = HashSet::<String>::new();
215    loop {
216        tokio::select! {
217            _ = cancel.cancelled() => break,
218            event = handle.recv() => {
219                let Some(event) = event else { break; };
220                if let koi_mdns::events::MdnsEvent::Found(record) = event {
221                    let service_type = record.name;
222                    if seen.insert(service_type.clone()) {
223                        let c = Arc::clone(&core);
224                        let r = Arc::clone(&records);
225                        let t = service_type.clone();
226                        let cancel_child = cancel.clone();
227                        tokio::spawn(async move {
228                            if let Ok(handle) = c.subscribe_type(&t).await {
229                                run_type_browse(handle, r, cancel_child).await;
230                            }
231                        });
232                    }
233                }
234            }
235        }
236    }
237}
238
239async fn run_type_browse(
240    handle: koi_mdns::BrowseSubscription,
241    records: Arc<RwLock<HashMap<String, HashMap<String, ServiceRecord>>>>,
242    cancel: CancellationToken,
243) {
244    loop {
245        tokio::select! {
246            _ = cancel.cancelled() => break,
247            event = handle.recv() => {
248                let Some(event) = event else { break; };
249                match event {
250                    koi_mdns::events::MdnsEvent::Resolved(record) => {
251                        let mut guard = records.write().unwrap_or_else(|e| e.into_inner());
252                        let entry = guard.entry(record.service_type.clone()).or_default();
253                        entry.insert(record.name.clone(), record);
254                    }
255                    // Removed events now carry the instance name and service type
256                    // parsed at the mDNS boundary — no string re-parsing here.
257                    koi_mdns::events::MdnsEvent::Removed { name, service_type } => {
258                        if service_type.is_empty() {
259                            continue;
260                        }
261                        let mut guard = records.write().unwrap_or_else(|e| e.into_inner());
262                        if let Some(map) = guard.get_mut(&service_type) {
263                            map.remove(&name);
264                        }
265                    }
266                    _ => {}
267                }
268            }
269        }
270    }
271}