Skip to main content

koi_common/
browser.rs

1//! mDNS browser adapter — live network service discovery explorer.
2//!
3//! Maintains an in-memory [`BrowserCache`] populated by a background
4//! worker.  The worker runs a meta-browse to discover service types,
5//! then spawns a per-type browse pump for each discovered type.
6//!
7//! Domain-specific mDNS operations are abstracted behind the
8//! [`BrowseSource`] trait so that both the standalone daemon and
9//! embedded mode can provide their own implementation.
10//!
11//! This is a **presentation adapter** — the cache is an adapter-level
12//! read model, NOT a domain concept.
13
14use std::collections::HashMap;
15use std::convert::Infallible;
16use std::sync::Arc;
17use std::time::Instant;
18
19use axum::extract::Extension;
20use axum::response::sse::{Event, KeepAlive, Sse};
21use axum::response::{Html, Json};
22use axum::routing::get;
23use axum::Router;
24use chrono::Utc;
25use serde::Serialize;
26use tokio::sync::{broadcast, mpsc, RwLock};
27use tokio_stream::Stream;
28use tokio_util::sync::CancellationToken;
29
30use crate::types::META_QUERY;
31
32// ── HTML asset ──────────────────────────────────────────────────────
33
34const BROWSER_HTML: &str = include_str!("../assets/mdns-browser.html");
35
36// ── Domain-agnostic types ───────────────────────────────────────────
37
38/// A resolved service instance (domain-agnostic mirror of
39/// `koi_common::types::ServiceRecord` with guaranteed non-optional
40/// fields for the browser cache).
41#[derive(Clone, Debug, Serialize)]
42pub struct ResolvedService {
43    pub name: String,
44    pub service_type: String,
45    pub host: String,
46    pub ip: String,
47    pub port: u16,
48    pub txt: HashMap<String, String>,
49}
50
51/// Domain-agnostic browser event.
52#[derive(Clone, Debug)]
53pub enum BrowserEvent {
54    Found { name: String, service_type: String },
55    Resolved(ResolvedService),
56    Removed { name: String, service_type: String },
57}
58
59impl From<&crate::types::ServiceRecord> for ResolvedService {
60    fn from(record: &crate::types::ServiceRecord) -> Self {
61        Self {
62            name: record.name.clone(),
63            service_type: record.service_type.clone(),
64            host: record.host.clone().unwrap_or_default(),
65            ip: record.ip.clone().unwrap_or_default(),
66            port: record.port.unwrap_or(0),
67            txt: record.txt.clone(),
68        }
69    }
70}
71
72/// Error returned by [`BrowseSource::browse`].
73#[derive(Debug, thiserror::Error)]
74#[error("{0}")]
75pub struct BrowseError(pub String);
76
77/// Handle for receiving events from a single browse operation.
78pub struct BrowseHandle {
79    rx: mpsc::Receiver<BrowserEvent>,
80}
81
82impl BrowseHandle {
83    /// Create a new handle from an mpsc receiver.
84    pub fn new(rx: mpsc::Receiver<BrowserEvent>) -> Self {
85        Self { rx }
86    }
87
88    /// Receive the next event, or `None` if the browse stopped.
89    pub async fn recv(&mut self) -> Option<BrowserEvent> {
90        self.rx.recv().await
91    }
92}
93
94/// Trait abstracting mDNS browse operations.
95///
96/// Implemented by the caller wrapping their concrete `MdnsCore`.
97pub trait BrowseSource: Send + Sync {
98    /// Start browsing for the given service type.
99    ///
100    /// Returns a handle that yields events for this browse.
101    fn browse(
102        &self,
103        service_type: &str,
104    ) -> std::pin::Pin<
105        Box<dyn std::future::Future<Output = Result<BrowseHandle, BrowseError>> + Send + '_>,
106    >;
107
108    /// Subscribe to the global event broadcast channel.
109    fn subscribe(&self) -> broadcast::Receiver<BrowserEvent>;
110}
111
112// ── Cache model ─────────────────────────────────────────────────────
113
114/// Maximum total instances across all types.
115const MAX_INSTANCES: usize = 2000;
116
117/// Seconds after removal before purging from cache.
118const PURGE_AFTER_SECS: i64 = 120;
119
120/// Top-level cache: types -> instances tree.
121#[derive(Clone)]
122pub struct BrowserCache {
123    inner: Arc<RwLock<CacheInner>>,
124    started_at: Instant,
125}
126
127struct CacheInner {
128    types: HashMap<String, DiscoveredType>,
129}
130
131#[derive(Debug, Clone, Serialize)]
132struct DiscoveredType {
133    service_type: String,
134    first_seen: String,
135    instances: HashMap<String, ServiceInstance>,
136}
137
138#[derive(Debug, Clone, Serialize)]
139struct ServiceInstance {
140    name: String,
141    instance_name: String,
142    service_type: String,
143    host: String,
144    ip: String,
145    port: u16,
146    txt: HashMap<String, String>,
147    first_seen: String,
148    last_seen: String,
149    resolved: bool,
150    #[serde(skip_serializing_if = "Option::is_none")]
151    removed_at: Option<String>,
152}
153
154impl BrowserCache {
155    pub fn new() -> Self {
156        Self {
157            inner: Arc::new(RwLock::new(CacheInner {
158                types: HashMap::new(),
159            })),
160            started_at: Instant::now(),
161        }
162    }
163
164    async fn record_type(&self, type_name: &str) {
165        let now = Utc::now().to_rfc3339();
166        let normalized = normalize_type(type_name);
167        let mut inner = self.inner.write().await;
168        inner
169            .types
170            .entry(normalized.clone())
171            .or_insert_with(|| DiscoveredType {
172                service_type: normalized,
173                first_seen: now,
174                instances: HashMap::new(),
175            });
176    }
177
178    async fn record_resolved(&self, record: &ResolvedService) {
179        let now = Utc::now().to_rfc3339();
180        let mut inner = self.inner.write().await;
181
182        let svc_type = normalize_type(&record.service_type);
183
184        let type_entry = inner
185            .types
186            .entry(svc_type.clone())
187            .or_insert_with(|| DiscoveredType {
188                service_type: svc_type.clone(),
189                first_seen: now.clone(),
190                instances: HashMap::new(),
191            });
192
193        let full_name = record.name.clone();
194        type_entry
195            .instances
196            .entry(full_name.clone())
197            .and_modify(|inst| {
198                inst.host.clone_from(&record.host);
199                inst.ip.clone_from(&record.ip);
200                inst.port = record.port;
201                inst.txt.clone_from(&record.txt);
202                inst.resolved = true;
203                inst.last_seen.clone_from(&now);
204                inst.removed_at = None;
205            })
206            .or_insert_with(|| ServiceInstance {
207                name: short_name(&full_name, &svc_type),
208                instance_name: full_name,
209                service_type: svc_type,
210                host: record.host.clone(),
211                ip: record.ip.clone(),
212                port: record.port,
213                txt: record.txt.clone(),
214                first_seen: now.clone(),
215                last_seen: now,
216                resolved: true,
217                removed_at: None,
218            });
219
220        let total: usize = inner.types.values().map(|t| t.instances.len()).sum();
221        if total > MAX_INSTANCES {
222            evict_oldest_instance(&mut inner.types);
223        }
224    }
225
226    async fn record_removed(&self, full_name: &str) {
227        let now = Utc::now().to_rfc3339();
228        let mut inner = self.inner.write().await;
229        for dtype in inner.types.values_mut() {
230            if let Some(inst) = dtype.instances.get_mut(full_name) {
231                inst.removed_at = Some(now);
232                return;
233            }
234        }
235    }
236
237    async fn purge_stale(&self) {
238        let now = Utc::now();
239        let mut inner = self.inner.write().await;
240        for dtype in inner.types.values_mut() {
241            dtype.instances.retain(|_, inst| {
242                if let Some(ref removed) = inst.removed_at {
243                    if let Ok(removed_time) = chrono::DateTime::parse_from_rfc3339(removed) {
244                        let elapsed = now.signed_duration_since(removed_time);
245                        return elapsed.num_seconds() < PURGE_AFTER_SECS;
246                    }
247                }
248                true
249            });
250        }
251        inner.types.retain(|_, dtype| !dtype.instances.is_empty());
252    }
253
254    async fn snapshot(&self) -> BrowserSnapshot {
255        let inner = self.inner.read().await;
256
257        let mut all_instances = Vec::new();
258        let mut type_summaries = Vec::new();
259
260        for dtype in inner.types.values() {
261            let live_count = dtype
262                .instances
263                .values()
264                .filter(|i| i.removed_at.is_none())
265                .count();
266            type_summaries.push(TypeSummary {
267                service_type: dtype.service_type.clone(),
268                count: live_count,
269                first_seen: dtype.first_seen.clone(),
270            });
271            for inst in dtype.instances.values() {
272                all_instances.push(inst.clone());
273            }
274        }
275
276        type_summaries.sort_by(|a, b| b.count.cmp(&a.count));
277        all_instances.sort_by(|a, b| a.last_seen.cmp(&b.last_seen).reverse());
278
279        BrowserSnapshot {
280            total_types: type_summaries.len(),
281            total_instances: all_instances
282                .iter()
283                .filter(|i| i.removed_at.is_none())
284                .count(),
285            service_types: type_summaries,
286            instances: all_instances,
287            cache_age_secs: self.started_at.elapsed().as_secs(),
288        }
289    }
290}
291
292impl Default for BrowserCache {
293    fn default() -> Self {
294        Self::new()
295    }
296}
297
298fn normalize_type(t: &str) -> String {
299    t.trim_end_matches('.')
300        .trim_end_matches(".local")
301        .to_string()
302}
303
304fn short_name(full_name: &str, service_type: &str) -> String {
305    let clean = full_name.trim_end_matches('.');
306    for suffix in &[format!(".{service_type}.local"), format!(".{service_type}")] {
307        if let Some(prefix) = clean.strip_suffix(suffix.as_str()) {
308            let name = prefix.trim_end_matches('.');
309            if !name.is_empty() {
310                return name.to_string();
311            }
312        }
313    }
314    clean.trim_end_matches(".local").to_string()
315}
316
317fn evict_oldest_instance(types: &mut HashMap<String, DiscoveredType>) {
318    let mut oldest_key = None;
319    let mut oldest_type = None;
320    let mut oldest_time = String::new();
321
322    for (tname, dtype) in types.iter() {
323        for (iname, inst) in &dtype.instances {
324            if oldest_key.is_none() || inst.last_seen < oldest_time {
325                oldest_time.clone_from(&inst.last_seen);
326                oldest_key = Some(iname.clone());
327                oldest_type = Some(tname.clone());
328            }
329        }
330    }
331
332    if let (Some(tname), Some(iname)) = (oldest_type, oldest_key) {
333        if let Some(dtype) = types.get_mut(&tname) {
334            dtype.instances.remove(&iname);
335        }
336    }
337}
338
339// ── Snapshot types ──────────────────────────────────────────────────
340
341#[derive(Debug, Serialize)]
342pub struct BrowserSnapshot {
343    total_types: usize,
344    total_instances: usize,
345    service_types: Vec<TypeSummary>,
346    instances: Vec<ServiceInstance>,
347    cache_age_secs: u64,
348}
349
350#[derive(Debug, Serialize)]
351struct TypeSummary {
352    service_type: String,
353    count: usize,
354    first_seen: String,
355}
356
357// ── Background worker ───────────────────────────────────────────────
358
359/// Spawns the browser worker that populates the [`BrowserCache`].
360///
361/// The caller is responsible for spawning this as a tokio task.
362pub async fn worker(source: Arc<dyn BrowseSource>, cache: BrowserCache, cancel: CancellationToken) {
363    tracing::info!("mDNS browser worker starting");
364
365    let mut meta_handle = match source.browse(META_QUERY).await {
366        Ok(handle) => Some(handle),
367        Err(e) => {
368            tracing::warn!(error = %e, "Failed to start meta-browse");
369            None
370        }
371    };
372
373    let mut discovered_types = std::collections::HashSet::new();
374    let mut pump_tasks: Vec<tokio::task::JoinHandle<()>> = Vec::new();
375
376    let mut purge_interval = tokio::time::interval(std::time::Duration::from_secs(30));
377    purge_interval.tick().await;
378
379    loop {
380        tokio::select! {
381            _ = cancel.cancelled() => break,
382
383            Some(event) = async {
384                match meta_handle.as_mut() {
385                    Some(h) => h.recv().await,
386                    None => std::future::pending::<Option<BrowserEvent>>().await,
387                }
388            } => {
389                if let BrowserEvent::Found { ref name, .. } = event {
390                    let type_name = name.clone();
391                    if type_name.is_empty() {
392                        continue;
393                    }
394
395                    cache.record_type(&type_name).await;
396
397                    let normalized = normalize_type(&type_name);
398                    if discovered_types.insert(normalized.clone()) {
399                        tracing::debug!(service_type = %normalized, "Discovered service type, starting per-type browse");
400
401                        let browse_type = format!("{normalized}.local.");
402                        match source.browse(&browse_type).await {
403                            Ok(mut handle) => {
404                                let cache_clone = cache.clone();
405                                let cancel_clone = cancel.clone();
406
407                                let task = tokio::spawn(async move {
408                                    loop {
409                                        tokio::select! {
410                                            _ = cancel_clone.cancelled() => break,
411                                            result = handle.recv() => {
412                                                match result {
413                                                    Some(BrowserEvent::Resolved(record)) => {
414                                                        cache_clone.record_resolved(&record).await;
415                                                    }
416                                                    Some(BrowserEvent::Removed { name, .. }) => {
417                                                        cache_clone.record_removed(&name).await;
418                                                    }
419                                                    Some(BrowserEvent::Found { .. }) => {}
420                                                    None => break,
421                                                }
422                                            }
423                                        }
424                                    }
425                                });
426                                pump_tasks.push(task);
427                                pump_tasks.retain(|h| !h.is_finished());
428                            }
429                            Err(e) => {
430                                tracing::debug!(error = %e, browse_type, "Failed to browse type");
431                            }
432                        }
433                    }
434                }
435            }
436
437            _ = purge_interval.tick() => {
438                cache.purge_stale().await;
439            }
440        }
441    }
442
443    for task in &pump_tasks {
444        task.abort();
445    }
446
447    tracing::info!("mDNS browser worker stopped");
448}
449
450// ── SSE stream ──────────────────────────────────────────────────────
451
452fn browser_event_stream(
453    source: Arc<dyn BrowseSource>,
454    cache: BrowserCache,
455) -> impl Stream<Item = Result<Event, Infallible>> {
456    async_stream::stream! {
457        let mut rx = source.subscribe();
458        let mut heartbeat = tokio::time::interval(std::time::Duration::from_secs(15));
459        heartbeat.tick().await;
460
461        loop {
462            tokio::select! {
463                result = rx.recv() => {
464                    match result {
465                        Ok(event) => {
466                            let sse = match &event {
467                                BrowserEvent::Found { name, service_type } => {
468                                    if service_type.is_empty() {
469                                        Event::default()
470                                            .event("type_found")
471                                            .id(uuid::Uuid::now_v7().to_string())
472                                            .json_data(serde_json::json!({
473                                                "service_type": name,
474                                            })).ok()
475                                    } else {
476                                        None
477                                    }
478                                }
479                                BrowserEvent::Resolved(record) => {
480                                    Event::default()
481                                        .event("resolved")
482                                        .id(uuid::Uuid::now_v7().to_string())
483                                        .json_data(record).ok()
484                                }
485                                BrowserEvent::Removed { name, service_type } => {
486                                    Event::default()
487                                        .event("removed")
488                                        .id(uuid::Uuid::now_v7().to_string())
489                                        .json_data(serde_json::json!({
490                                            "name": name,
491                                            "service_type": service_type
492                                        })).ok()
493                                }
494                            };
495                            if let Some(ev) = sse {
496                                yield Ok(ev);
497                            }
498                        }
499                        Err(broadcast::error::RecvError::Lagged(n)) => {
500                            tracing::warn!(dropped = n, "Browser SSE stream lagged");
501                            continue;
502                        }
503                        Err(broadcast::error::RecvError::Closed) => break,
504                    }
505                },
506                _ = heartbeat.tick() => {
507                    let snap = cache.snapshot().await;
508                    if let Ok(ev) = Event::default()
509                        .event("heartbeat")
510                        .json_data(serde_json::json!({
511                            "total_types": snap.total_types,
512                            "total_instances": snap.total_instances
513                        }))
514                    {
515                        yield Ok(ev);
516                    }
517                },
518            }
519        }
520    }
521}
522
523// ── Shared state ────────────────────────────────────────────────────
524
525/// Shared state for the browser routes.
526#[derive(Clone)]
527pub struct BrowserState {
528    pub source: Arc<dyn BrowseSource>,
529    pub cache: BrowserCache,
530}
531
532// ── Routes ──────────────────────────────────────────────────────────
533
534/// Build the browser sub-router mounted at `/v1/mdns/browser`.
535pub fn routes(state: BrowserState) -> Router {
536    Router::new()
537        .route("/snapshot", get(get_snapshot))
538        .route("/events", get(get_events))
539        .layer(axum::Extension(state))
540}
541
542// ── Handlers ────────────────────────────────────────────────────────
543
544/// `GET /mdns-browser` — Serve the mDNS browser SPA.
545pub async fn get_page() -> Html<&'static str> {
546    Html(BROWSER_HTML)
547}
548
549/// `GET /v1/mdns/browser/snapshot` — Full browser cache as JSON.
550async fn get_snapshot(Extension(state): Extension<BrowserState>) -> Json<BrowserSnapshot> {
551    Json(state.cache.snapshot().await)
552}
553
554/// `GET /v1/mdns/browser/events` — SSE stream of discovery events.
555async fn get_events(
556    Extension(state): Extension<BrowserState>,
557) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
558    Sse::new(browser_event_stream(
559        state.source.clone(),
560        state.cache.clone(),
561    ))
562    .keep_alive(KeepAlive::default())
563}