#![deny(unsafe_code)]
#[cfg(feature = "mdns")]
use iroh::address_lookup::{DiscoveryEvent, MdnsAddressLookup};
#[cfg(feature = "mdns")]
use std::sync::Arc;
#[derive(Debug)]
pub enum DiscoveryError {
Setup(String),
InvalidServiceName(String),
}
impl std::fmt::Display for DiscoveryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DiscoveryError::Setup(msg) => write!(f, "mDNS setup failed: {msg}"),
DiscoveryError::InvalidServiceName(msg) => {
write!(f, "invalid mDNS service name: {msg}")
}
}
}
}
impl std::error::Error for DiscoveryError {}
#[derive(Debug, Clone)]
pub struct PeerDiscoveryEvent {
pub is_active: bool,
pub node_id: String,
pub addrs: Vec<String>,
}
#[cfg(feature = "mdns")]
pub struct BrowseSession {
rx: tokio::sync::mpsc::Receiver<DiscoveryEvent>,
_mdns: Arc<MdnsAddressLookup>,
}
#[cfg(feature = "mdns")]
impl BrowseSession {
pub async fn next_event(&mut self) -> Option<PeerDiscoveryEvent> {
use iroh::TransportAddr;
let ev = self.rx.recv().await?;
Some(match ev {
DiscoveryEvent::Discovered { endpoint_info, .. } => {
let node_id = endpoint_info.endpoint_id.to_string();
let mut addrs = Vec::new();
for a in endpoint_info.data.addrs() {
match a {
TransportAddr::Ip(sock) => addrs.push(sock.to_string()),
TransportAddr::Relay(url) => addrs.push(url.to_string()),
other => addrs.push(format!("{:?}", other)),
}
}
PeerDiscoveryEvent {
is_active: true,
node_id,
addrs,
}
}
DiscoveryEvent::Expired { endpoint_id } => PeerDiscoveryEvent {
is_active: false,
node_id: endpoint_id.to_string(),
addrs: Vec::new(),
},
})
}
}
#[cfg(feature = "mdns")]
pub async fn start_browse(
ep: &iroh::Endpoint,
service_name: &str,
) -> Result<BrowseSession, DiscoveryError> {
let mdns = Arc::new(
MdnsAddressLookup::builder()
.advertise(false)
.service_name(service_name)
.build(ep.id())
.map_err(|e| DiscoveryError::Setup(e.to_string()))?,
);
ep.address_lookup().add(Arc::clone(&mdns));
use futures::StreamExt;
let mut stream = mdns.subscribe().await;
let (tx, rx) = tokio::sync::mpsc::channel(64);
tokio::spawn(async move {
while let Some(ev) = stream.next().await {
if tx.send(ev).await.is_err() {
break;
}
}
});
Ok(BrowseSession { rx, _mdns: mdns })
}
#[cfg(feature = "mdns")]
pub struct AdvertiseSession {
_mdns: Arc<MdnsAddressLookup>,
}
#[cfg(feature = "mdns")]
pub fn start_advertise(
ep: &iroh::Endpoint,
service_name: &str,
) -> Result<AdvertiseSession, DiscoveryError> {
let mdns = Arc::new(
MdnsAddressLookup::builder()
.advertise(true)
.service_name(service_name)
.build(ep.id())
.map_err(|e| DiscoveryError::Setup(e.to_string()))?,
);
ep.address_lookup().add(Arc::clone(&mdns));
Ok(AdvertiseSession { _mdns: mdns })
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn peer_discovery_event_active_construction() {
let ev = PeerDiscoveryEvent {
is_active: true,
node_id: "node123".to_string(),
addrs: vec!["127.0.0.1:4000".to_string(), "relay://r.example.com".to_string()],
};
assert!(ev.is_active);
assert_eq!(ev.node_id, "node123");
assert_eq!(ev.addrs.len(), 2);
assert!(ev.addrs.iter().any(|a| a.contains("127.0.0.1")));
}
#[test]
fn peer_discovery_event_expired_construction() {
let ev = PeerDiscoveryEvent {
is_active: false,
node_id: "expired_node".to_string(),
addrs: vec![],
};
assert!(!ev.is_active);
assert_eq!(ev.node_id, "expired_node");
assert!(ev.addrs.is_empty(), "expired events carry no addresses");
}
#[test]
fn peer_discovery_event_clone_preserves_all_fields() {
let original = PeerDiscoveryEvent {
is_active: true,
node_id: "abc".to_string(),
addrs: vec!["10.0.0.1:1234".to_string()],
};
let cloned = original.clone();
assert_eq!(cloned.is_active, original.is_active);
assert_eq!(cloned.node_id, original.node_id);
assert_eq!(cloned.addrs, original.addrs);
}
#[tokio::test]
async fn channel_close_on_sender_drop() {
let (tx, mut rx) = tokio::sync::mpsc::channel::<()>(1);
drop(tx);
assert!(
rx.recv().await.is_none(),
"recv() must return None when all senders are dropped"
);
}
}