use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use mdns_sd::{ServiceDaemon, ServiceEvent, ServiceInfo};
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
use crate::Identity;
pub const SERVICE_TYPE: &str = "_fips._udp.local.";
pub const TXT_KEY_NPUB: &str = "npub";
pub const TXT_KEY_SCOPE: &str = "scope";
pub const TXT_KEY_VERSION: &str = "v";
#[derive(Debug, Error)]
pub enum LanDiscoveryError {
#[error("mDNS daemon init failed: {0}")]
Daemon(String),
#[error("mDNS register failed: {0}")]
Register(String),
#[error("mDNS browse failed: {0}")]
Browse(String),
#[error("no advertised UDP port — start a UDP transport first")]
NoAdvertisedPort,
#[error("LAN discovery disabled in config")]
Disabled,
}
#[derive(Debug, Clone)]
pub struct LanDiscoveredPeer {
pub npub: String,
pub scope: Option<String>,
pub addr: SocketAddr,
pub observed_at: Instant,
}
#[derive(Debug, Clone)]
pub enum LanEvent {
Discovered(LanDiscoveredPeer),
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct LanDiscoveryConfig {
#[serde(default = "LanDiscoveryConfig::default_enabled")]
pub enabled: bool,
#[serde(default = "LanDiscoveryConfig::default_service_type")]
pub service_type: String,
}
impl Default for LanDiscoveryConfig {
fn default() -> Self {
Self {
enabled: Self::default_enabled(),
service_type: Self::default_service_type(),
}
}
}
impl LanDiscoveryConfig {
fn default_enabled() -> bool {
true
}
fn default_service_type() -> String {
SERVICE_TYPE.to_string()
}
}
pub struct LanDiscovery {
daemon: ServiceDaemon,
own_npub: String,
instance_fullname: String,
events_rx: Mutex<tokio::sync::mpsc::UnboundedReceiver<LanEvent>>,
event_pump: tokio::task::JoinHandle<()>,
}
impl LanDiscovery {
pub async fn start(
identity: &Identity,
scope: Option<String>,
advertised_port: u16,
config: LanDiscoveryConfig,
) -> Result<Arc<Self>, LanDiscoveryError> {
if !config.enabled {
return Err(LanDiscoveryError::Disabled);
}
if advertised_port == 0 {
return Err(LanDiscoveryError::NoAdvertisedPort);
}
let daemon = ServiceDaemon::new().map_err(|e| LanDiscoveryError::Daemon(e.to_string()))?;
let npub = identity.npub();
let label_npub = &npub[..16.min(npub.len())];
let instance_name = format!("fips-{label_npub}");
let host_name = format!("{instance_name}.local.");
let mut props: HashMap<String, String> = HashMap::new();
props.insert(TXT_KEY_NPUB.to_string(), npub.clone());
if let Some(s) = scope.as_deref()
&& !s.is_empty()
{
props.insert(TXT_KEY_SCOPE.to_string(), s.to_string());
}
props.insert(
TXT_KEY_VERSION.to_string(),
super::nostr::PROTOCOL_VERSION.to_string(),
);
let service_info = ServiceInfo::new(
&config.service_type,
&instance_name,
&host_name,
"127.0.0.1",
advertised_port,
Some(props),
)
.map_err(|e| LanDiscoveryError::Register(e.to_string()))?
.enable_addr_auto();
let instance_fullname = service_info.get_fullname().to_string();
daemon
.register(service_info)
.map_err(|e| LanDiscoveryError::Register(e.to_string()))?;
let browse_rx = daemon
.browse(&config.service_type)
.map_err(|e| LanDiscoveryError::Browse(e.to_string()))?;
let (events_tx, events_rx) = tokio::sync::mpsc::unbounded_channel();
let own_npub = npub.clone();
let scope_filter = scope.clone().filter(|s| !s.is_empty());
let event_pump = tokio::spawn(async move {
loop {
let event = match browse_rx.recv_async().await {
Ok(e) => e,
Err(_) => break,
};
match event {
ServiceEvent::ServiceResolved(info) => {
let mut peer_npub: Option<String> = None;
let mut peer_scope: Option<String> = None;
for prop in info.get_properties().iter() {
match prop.key() {
TXT_KEY_NPUB => {
peer_npub = Some(prop.val_str().to_string());
}
TXT_KEY_SCOPE => {
peer_scope = Some(prop.val_str().to_string());
}
_ => {}
}
}
let Some(peer_npub) = peer_npub else {
debug!(
instance = info.get_fullname(),
"lan: skip advert without npub TXT"
);
continue;
};
if peer_npub == own_npub {
continue;
}
if scope_filter.is_some() && scope_filter != peer_scope {
debug!(
npub = %short(&peer_npub),
their_scope = ?peer_scope,
our_scope = ?scope_filter,
"lan: skip cross-scope advert"
);
continue;
}
let port = info.get_port();
if port == 0 {
continue;
}
let observed_at = Instant::now();
for scoped in info.get_addresses() {
let ip = scoped.to_ip_addr();
let addr = SocketAddr::new(ip, port);
if events_tx
.send(LanEvent::Discovered(LanDiscoveredPeer {
npub: peer_npub.clone(),
scope: peer_scope.clone(),
addr,
observed_at,
}))
.is_err()
{
return;
}
}
}
ServiceEvent::ServiceRemoved(_, fullname) => {
debug!(fullname = %fullname, "lan: service removed");
}
other => {
debug!(?other, "lan: mDNS event");
}
}
}
});
info!(
instance = %instance_fullname,
port = advertised_port,
scope = ?scope,
"lan: mDNS discovery started"
);
Ok(Arc::new(Self {
daemon,
own_npub: npub,
instance_fullname,
events_rx: Mutex::new(events_rx),
event_pump,
}))
}
pub fn own_npub(&self) -> &str {
&self.own_npub
}
pub async fn drain_events(&self) -> Vec<LanEvent> {
let mut rx = self.events_rx.lock().await;
let mut events = Vec::new();
while let Ok(event) = rx.try_recv() {
events.push(event);
}
events
}
pub async fn shutdown(self: &Arc<Self>) {
if let Err(e) = self.daemon.unregister(&self.instance_fullname) {
warn!(error = %e, "lan: unregister failed");
}
if let Err(e) = self.daemon.shutdown() {
warn!(error = %e, "lan: daemon shutdown failed");
}
self.event_pump.abort();
}
}
fn short(npub: &str) -> &str {
let end = 16.min(npub.len());
&npub[..end]
}
#[cfg(test)]
mod tests;