1use std::collections::HashMap;
15use std::convert::Infallible;
16use std::sync::Arc;
17use std::time::Instant;
18
19use axum::extract::Extension;
20use axum::response::sse::{Event, KeepAlive, Sse};
21use axum::response::{Html, Json};
22use axum::routing::get;
23use axum::Router;
24use chrono::Utc;
25use serde::Serialize;
26use tokio::sync::{broadcast, mpsc, RwLock};
27use tokio_stream::Stream;
28use tokio_util::sync::CancellationToken;
29
30use crate::types::META_QUERY;
31
32const BROWSER_HTML: &str = include_str!("../assets/mdns-browser.html");
35
36#[derive(Clone, Debug, Serialize)]
42pub struct ResolvedService {
43 pub name: String,
44 pub service_type: String,
45 pub host: String,
46 pub ip: String,
47 pub port: u16,
48 pub txt: HashMap<String, String>,
49}
50
51#[derive(Clone, Debug)]
53pub enum BrowserEvent {
54 Found { name: String, service_type: String },
55 Resolved(ResolvedService),
56 Removed { name: String, service_type: String },
57}
58
59impl From<&crate::types::ServiceRecord> for ResolvedService {
60 fn from(record: &crate::types::ServiceRecord) -> Self {
61 Self {
62 name: record.name.clone(),
63 service_type: record.service_type.clone(),
64 host: record.host.clone().unwrap_or_default(),
65 ip: record.ip.clone().unwrap_or_default(),
66 port: record.port.unwrap_or(0),
67 txt: record.txt.clone(),
68 }
69 }
70}
71
72#[derive(Debug, thiserror::Error)]
74#[error("{0}")]
75pub struct BrowseError(pub String);
76
77pub struct BrowseHandle {
79 rx: mpsc::Receiver<BrowserEvent>,
80}
81
82impl BrowseHandle {
83 pub fn new(rx: mpsc::Receiver<BrowserEvent>) -> Self {
85 Self { rx }
86 }
87
88 pub async fn recv(&mut self) -> Option<BrowserEvent> {
90 self.rx.recv().await
91 }
92}
93
94pub trait BrowseSource: Send + Sync {
98 fn browse(
102 &self,
103 service_type: &str,
104 ) -> std::pin::Pin<
105 Box<dyn std::future::Future<Output = Result<BrowseHandle, BrowseError>> + Send + '_>,
106 >;
107
108 fn subscribe(&self) -> broadcast::Receiver<BrowserEvent>;
110}
111
112const MAX_INSTANCES: usize = 2000;
116
117const PURGE_AFTER_SECS: i64 = 120;
119
120#[derive(Clone)]
122pub struct BrowserCache {
123 inner: Arc<RwLock<CacheInner>>,
124 started_at: Instant,
125}
126
127struct CacheInner {
128 types: HashMap<String, DiscoveredType>,
129}
130
131#[derive(Debug, Clone, Serialize)]
132struct DiscoveredType {
133 service_type: String,
134 first_seen: String,
135 instances: HashMap<String, ServiceInstance>,
136}
137
138#[derive(Debug, Clone, Serialize)]
139struct ServiceInstance {
140 name: String,
141 instance_name: String,
142 service_type: String,
143 host: String,
144 ip: String,
145 port: u16,
146 txt: HashMap<String, String>,
147 first_seen: String,
148 last_seen: String,
149 resolved: bool,
150 #[serde(skip_serializing_if = "Option::is_none")]
151 removed_at: Option<String>,
152}
153
154impl BrowserCache {
155 pub fn new() -> Self {
156 Self {
157 inner: Arc::new(RwLock::new(CacheInner {
158 types: HashMap::new(),
159 })),
160 started_at: Instant::now(),
161 }
162 }
163
164 async fn record_type(&self, type_name: &str) {
165 let now = Utc::now().to_rfc3339();
166 let normalized = normalize_type(type_name);
167 let mut inner = self.inner.write().await;
168 inner
169 .types
170 .entry(normalized.clone())
171 .or_insert_with(|| DiscoveredType {
172 service_type: normalized,
173 first_seen: now,
174 instances: HashMap::new(),
175 });
176 }
177
178 async fn record_resolved(&self, record: &ResolvedService) {
179 let now = Utc::now().to_rfc3339();
180 let mut inner = self.inner.write().await;
181
182 let svc_type = normalize_type(&record.service_type);
183
184 let type_entry = inner
185 .types
186 .entry(svc_type.clone())
187 .or_insert_with(|| DiscoveredType {
188 service_type: svc_type.clone(),
189 first_seen: now.clone(),
190 instances: HashMap::new(),
191 });
192
193 let full_name = record.name.clone();
194 type_entry
195 .instances
196 .entry(full_name.clone())
197 .and_modify(|inst| {
198 inst.host.clone_from(&record.host);
199 inst.ip.clone_from(&record.ip);
200 inst.port = record.port;
201 inst.txt.clone_from(&record.txt);
202 inst.resolved = true;
203 inst.last_seen.clone_from(&now);
204 inst.removed_at = None;
205 })
206 .or_insert_with(|| ServiceInstance {
207 name: short_name(&full_name, &svc_type),
208 instance_name: full_name,
209 service_type: svc_type,
210 host: record.host.clone(),
211 ip: record.ip.clone(),
212 port: record.port,
213 txt: record.txt.clone(),
214 first_seen: now.clone(),
215 last_seen: now,
216 resolved: true,
217 removed_at: None,
218 });
219
220 let total: usize = inner.types.values().map(|t| t.instances.len()).sum();
221 if total > MAX_INSTANCES {
222 evict_oldest_instance(&mut inner.types);
223 }
224 }
225
226 async fn record_removed(&self, full_name: &str) {
227 let now = Utc::now().to_rfc3339();
228 let mut inner = self.inner.write().await;
229 for dtype in inner.types.values_mut() {
230 if let Some(inst) = dtype.instances.get_mut(full_name) {
231 inst.removed_at = Some(now);
232 return;
233 }
234 }
235 }
236
237 async fn purge_stale(&self) {
238 let now = Utc::now();
239 let mut inner = self.inner.write().await;
240 for dtype in inner.types.values_mut() {
241 dtype.instances.retain(|_, inst| {
242 if let Some(ref removed) = inst.removed_at {
243 if let Ok(removed_time) = chrono::DateTime::parse_from_rfc3339(removed) {
244 let elapsed = now.signed_duration_since(removed_time);
245 return elapsed.num_seconds() < PURGE_AFTER_SECS;
246 }
247 }
248 true
249 });
250 }
251 inner.types.retain(|_, dtype| !dtype.instances.is_empty());
252 }
253
254 async fn snapshot(&self) -> BrowserSnapshot {
255 let inner = self.inner.read().await;
256
257 let mut all_instances = Vec::new();
258 let mut type_summaries = Vec::new();
259
260 for dtype in inner.types.values() {
261 let live_count = dtype
262 .instances
263 .values()
264 .filter(|i| i.removed_at.is_none())
265 .count();
266 type_summaries.push(TypeSummary {
267 service_type: dtype.service_type.clone(),
268 count: live_count,
269 first_seen: dtype.first_seen.clone(),
270 });
271 for inst in dtype.instances.values() {
272 all_instances.push(inst.clone());
273 }
274 }
275
276 type_summaries.sort_by(|a, b| b.count.cmp(&a.count));
277 all_instances.sort_by(|a, b| a.last_seen.cmp(&b.last_seen).reverse());
278
279 BrowserSnapshot {
280 total_types: type_summaries.len(),
281 total_instances: all_instances
282 .iter()
283 .filter(|i| i.removed_at.is_none())
284 .count(),
285 service_types: type_summaries,
286 instances: all_instances,
287 cache_age_secs: self.started_at.elapsed().as_secs(),
288 }
289 }
290}
291
292impl Default for BrowserCache {
293 fn default() -> Self {
294 Self::new()
295 }
296}
297
298fn normalize_type(t: &str) -> String {
299 t.trim_end_matches('.')
300 .trim_end_matches(".local")
301 .to_string()
302}
303
304fn short_name(full_name: &str, service_type: &str) -> String {
305 let clean = full_name.trim_end_matches('.');
306 for suffix in &[format!(".{service_type}.local"), format!(".{service_type}")] {
307 if let Some(prefix) = clean.strip_suffix(suffix.as_str()) {
308 let name = prefix.trim_end_matches('.');
309 if !name.is_empty() {
310 return name.to_string();
311 }
312 }
313 }
314 clean.trim_end_matches(".local").to_string()
315}
316
317fn evict_oldest_instance(types: &mut HashMap<String, DiscoveredType>) {
318 let mut oldest_key = None;
319 let mut oldest_type = None;
320 let mut oldest_time = String::new();
321
322 for (tname, dtype) in types.iter() {
323 for (iname, inst) in &dtype.instances {
324 if oldest_key.is_none() || inst.last_seen < oldest_time {
325 oldest_time.clone_from(&inst.last_seen);
326 oldest_key = Some(iname.clone());
327 oldest_type = Some(tname.clone());
328 }
329 }
330 }
331
332 if let (Some(tname), Some(iname)) = (oldest_type, oldest_key) {
333 if let Some(dtype) = types.get_mut(&tname) {
334 dtype.instances.remove(&iname);
335 }
336 }
337}
338
339#[derive(Debug, Serialize)]
342pub struct BrowserSnapshot {
343 total_types: usize,
344 total_instances: usize,
345 service_types: Vec<TypeSummary>,
346 instances: Vec<ServiceInstance>,
347 cache_age_secs: u64,
348}
349
350#[derive(Debug, Serialize)]
351struct TypeSummary {
352 service_type: String,
353 count: usize,
354 first_seen: String,
355}
356
357pub async fn worker(source: Arc<dyn BrowseSource>, cache: BrowserCache, cancel: CancellationToken) {
363 tracing::info!("mDNS browser worker starting");
364
365 let mut meta_handle = match source.browse(META_QUERY).await {
366 Ok(handle) => Some(handle),
367 Err(e) => {
368 tracing::warn!(error = %e, "Failed to start meta-browse");
369 None
370 }
371 };
372
373 let mut discovered_types = std::collections::HashSet::new();
374 let mut pump_tasks: Vec<tokio::task::JoinHandle<()>> = Vec::new();
375
376 let mut purge_interval = tokio::time::interval(std::time::Duration::from_secs(30));
377 purge_interval.tick().await;
378
379 loop {
380 tokio::select! {
381 _ = cancel.cancelled() => break,
382
383 Some(event) = async {
384 match meta_handle.as_mut() {
385 Some(h) => h.recv().await,
386 None => std::future::pending::<Option<BrowserEvent>>().await,
387 }
388 } => {
389 if let BrowserEvent::Found { ref name, .. } = event {
390 let type_name = name.clone();
391 if type_name.is_empty() {
392 continue;
393 }
394
395 cache.record_type(&type_name).await;
396
397 let normalized = normalize_type(&type_name);
398 if discovered_types.insert(normalized.clone()) {
399 tracing::debug!(service_type = %normalized, "Discovered service type, starting per-type browse");
400
401 let browse_type = format!("{normalized}.local.");
402 match source.browse(&browse_type).await {
403 Ok(mut handle) => {
404 let cache_clone = cache.clone();
405 let cancel_clone = cancel.clone();
406
407 let task = tokio::spawn(async move {
408 loop {
409 tokio::select! {
410 _ = cancel_clone.cancelled() => break,
411 result = handle.recv() => {
412 match result {
413 Some(BrowserEvent::Resolved(record)) => {
414 cache_clone.record_resolved(&record).await;
415 }
416 Some(BrowserEvent::Removed { name, .. }) => {
417 cache_clone.record_removed(&name).await;
418 }
419 Some(BrowserEvent::Found { .. }) => {}
420 None => break,
421 }
422 }
423 }
424 }
425 });
426 pump_tasks.push(task);
427 pump_tasks.retain(|h| !h.is_finished());
428 }
429 Err(e) => {
430 tracing::debug!(error = %e, browse_type, "Failed to browse type");
431 }
432 }
433 }
434 }
435 }
436
437 _ = purge_interval.tick() => {
438 cache.purge_stale().await;
439 }
440 }
441 }
442
443 for task in &pump_tasks {
444 task.abort();
445 }
446
447 tracing::info!("mDNS browser worker stopped");
448}
449
450fn browser_event_stream(
453 source: Arc<dyn BrowseSource>,
454 cache: BrowserCache,
455) -> impl Stream<Item = Result<Event, Infallible>> {
456 async_stream::stream! {
457 let mut rx = source.subscribe();
458 let mut heartbeat = tokio::time::interval(std::time::Duration::from_secs(15));
459 heartbeat.tick().await;
460
461 loop {
462 tokio::select! {
463 result = rx.recv() => {
464 match result {
465 Ok(event) => {
466 let sse = match &event {
467 BrowserEvent::Found { name, service_type } => {
468 if service_type.is_empty() {
469 Event::default()
470 .event("type_found")
471 .id(uuid::Uuid::now_v7().to_string())
472 .json_data(serde_json::json!({
473 "service_type": name,
474 })).ok()
475 } else {
476 None
477 }
478 }
479 BrowserEvent::Resolved(record) => {
480 Event::default()
481 .event("resolved")
482 .id(uuid::Uuid::now_v7().to_string())
483 .json_data(record).ok()
484 }
485 BrowserEvent::Removed { name, service_type } => {
486 Event::default()
487 .event("removed")
488 .id(uuid::Uuid::now_v7().to_string())
489 .json_data(serde_json::json!({
490 "name": name,
491 "service_type": service_type
492 })).ok()
493 }
494 };
495 if let Some(ev) = sse {
496 yield Ok(ev);
497 }
498 }
499 Err(broadcast::error::RecvError::Lagged(n)) => {
500 tracing::warn!(dropped = n, "Browser SSE stream lagged");
501 continue;
502 }
503 Err(broadcast::error::RecvError::Closed) => break,
504 }
505 },
506 _ = heartbeat.tick() => {
507 let snap = cache.snapshot().await;
508 if let Ok(ev) = Event::default()
509 .event("heartbeat")
510 .json_data(serde_json::json!({
511 "total_types": snap.total_types,
512 "total_instances": snap.total_instances
513 }))
514 {
515 yield Ok(ev);
516 }
517 },
518 }
519 }
520 }
521}
522
523#[derive(Clone)]
527pub struct BrowserState {
528 pub source: Arc<dyn BrowseSource>,
529 pub cache: BrowserCache,
530}
531
532pub fn routes(state: BrowserState) -> Router {
536 Router::new()
537 .route("/snapshot", get(get_snapshot))
538 .route("/events", get(get_events))
539 .layer(axum::Extension(state))
540}
541
542pub async fn get_page() -> Html<&'static str> {
546 Html(BROWSER_HTML)
547}
548
549async fn get_snapshot(Extension(state): Extension<BrowserState>) -> Json<BrowserSnapshot> {
551 Json(state.cache.snapshot().await)
552}
553
554async fn get_events(
556 Extension(state): Extension<BrowserState>,
557) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
558 Sse::new(browser_event_stream(
559 state.source.clone(),
560 state.cache.clone(),
561 ))
562 .keep_alive(KeepAlive::default())
563}