pub mod device;
pub mod error;
#[cfg(feature = "mdns")]
pub mod mdns;
#[cfg(feature = "broadcast")]
pub mod broadcast;
#[cfg(feature = "rendezvous")]
pub mod rendezvous;
pub use device::{Device, DeviceInfo};
pub use error::{DiscoveryError, Result};
#[cfg(feature = "rendezvous")]
pub use rendezvous::{DeviceRegistration, RendezvousClient, RendezvousConfig, RendezvousServer};
#[cfg(feature = "rendezvous")]
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
#[derive(Debug, Clone)]
pub enum DiscoveryEvent {
Found(Box<Device>),
Lost(String), Error(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DiscoverySource {
Mdns,
Broadcast,
Rendezvous,
Manual,
}
#[derive(Debug, Clone)]
pub struct DiscoveryConfig {
pub mdns: bool,
pub broadcast: bool,
pub broadcast_port: u16,
pub timeout: Duration,
pub rendezvous_url: Option<String>,
pub rendezvous_refresh_interval: Duration,
pub rendezvous_tag: Option<String>,
}
impl Default for DiscoveryConfig {
fn default() -> Self {
Self {
mdns: true,
broadcast: true,
broadcast_port: clasp_core::DEFAULT_DISCOVERY_PORT,
timeout: Duration::from_secs(5),
rendezvous_url: None,
rendezvous_refresh_interval: Duration::from_secs(120), rendezvous_tag: None,
}
}
}
#[cfg(feature = "rendezvous")]
struct RendezvousKeepalive {
client: rendezvous::RendezvousClient,
registration: rendezvous::DeviceRegistration,
device_id: parking_lot::RwLock<Option<String>>,
refresh_interval: Duration,
}
#[cfg(feature = "rendezvous")]
impl RendezvousKeepalive {
fn new(
url: &str,
registration: rendezvous::DeviceRegistration,
refresh_interval: Duration,
) -> Self {
Self {
client: rendezvous::RendezvousClient::new(url),
registration,
device_id: parking_lot::RwLock::new(None),
refresh_interval,
}
}
async fn register(&self) -> Result<()> {
let response = self
.client
.register(self.registration.clone())
.await
.map_err(|e| DiscoveryError::Other(format!("Rendezvous registration failed: {}", e)))?;
*self.device_id.write() = Some(response.id);
tracing::info!("Registered with rendezvous server (TTL: {}s)", response.ttl);
Ok(())
}
async fn refresh(&self) -> Result<bool> {
let device_id: Option<String> = self.device_id.read().clone();
if let Some(ref id) = device_id {
let success =
self.client.refresh(id).await.map_err(|e| {
DiscoveryError::Other(format!("Rendezvous refresh failed: {}", e))
})?;
if !success {
tracing::warn!("Rendezvous registration expired, re-registering");
*self.device_id.write() = None;
self.register().await?;
}
Ok(true)
} else {
self.register().await?;
Ok(true)
}
}
#[allow(dead_code)]
async fn unregister(&self) -> Result<()> {
let device_id: Option<String> = self.device_id.write().take();
if let Some(ref id) = device_id {
let _ = self.client.unregister(id).await;
tracing::info!("Unregistered from rendezvous server");
}
Ok(())
}
fn start_keepalive(self: Arc<Self>) {
let keepalive = Arc::clone(&self);
tokio::spawn(async move {
if let Err(e) = keepalive.register().await {
tracing::error!("Initial rendezvous registration failed: {}", e);
}
let mut interval = tokio::time::interval(keepalive.refresh_interval);
loop {
interval.tick().await;
if let Err(e) = keepalive.refresh().await {
tracing::warn!("Rendezvous refresh failed: {}", e);
}
}
});
}
}
pub struct Discovery {
config: DiscoveryConfig,
devices: std::collections::HashMap<String, Device>,
#[cfg(feature = "rendezvous")]
rendezvous_keepalive: Option<Arc<RendezvousKeepalive>>,
}
impl Discovery {
pub fn new() -> Self {
Self {
config: DiscoveryConfig::default(),
devices: std::collections::HashMap::new(),
#[cfg(feature = "rendezvous")]
rendezvous_keepalive: None,
}
}
pub fn with_config(config: DiscoveryConfig) -> Self {
Self {
config,
devices: std::collections::HashMap::new(),
#[cfg(feature = "rendezvous")]
rendezvous_keepalive: None,
}
}
#[cfg(feature = "rendezvous")]
pub fn register_with_rendezvous(&mut self, registration: rendezvous::DeviceRegistration) {
if let Some(ref url) = self.config.rendezvous_url {
let keepalive = Arc::new(RendezvousKeepalive::new(
url,
registration,
self.config.rendezvous_refresh_interval,
));
keepalive.clone().start_keepalive();
self.rendezvous_keepalive = Some(keepalive);
} else {
tracing::warn!("Cannot register with rendezvous: no URL configured");
}
}
#[cfg(feature = "rendezvous")]
pub async fn discover_wan(&self) -> Result<Vec<Device>> {
let url = self
.config
.rendezvous_url
.as_ref()
.ok_or_else(|| DiscoveryError::Other("No rendezvous URL configured".to_string()))?;
let client = rendezvous::RendezvousClient::new(url);
let tag = self.config.rendezvous_tag.as_deref();
let registered_devices = client
.discover(tag)
.await
.map_err(|e| DiscoveryError::Other(format!("Rendezvous discovery failed: {}", e)))?;
let devices: Vec<Device> = registered_devices
.into_iter()
.map(|rd| {
let mut meta = rd.metadata.clone();
if !rd.tags.is_empty() {
meta.insert("tags".to_string(), rd.tags.join(","));
}
let info = DeviceInfo {
version: clasp_core::PROTOCOL_VERSION,
features: rd.features,
bridge: false,
bridge_protocol: None,
meta,
entity_id: None,
};
let now = std::time::Instant::now();
Device {
id: rd.id,
name: rd.name,
info,
endpoints: rd.endpoints,
discovered_at: now,
last_seen: now,
}
})
.collect();
Ok(devices)
}
pub async fn discover_all(&mut self) -> Result<Vec<Device>> {
let (tx, mut rx) = mpsc::channel(100);
let mut all_devices = Vec::new();
let mut seen_ids = std::collections::HashSet::new();
#[cfg(feature = "mdns")]
if self.config.mdns {
let tx_clone = tx.clone();
tokio::spawn(async move {
if let Err(e) = mdns::discover(tx_clone).await {
tracing::warn!("mDNS discovery error: {}", e);
}
});
}
#[cfg(feature = "broadcast")]
if self.config.broadcast {
let tx_clone = tx.clone();
let port = self.config.broadcast_port;
tokio::spawn(async move {
if let Err(e) = broadcast::discover(port, tx_clone).await {
tracing::warn!("Broadcast discovery error: {}", e);
}
});
}
let timeout = self.config.timeout;
let deadline = tokio::time::Instant::now() + timeout;
drop(tx);
loop {
tokio::select! {
event = rx.recv() => {
match event {
Some(DiscoveryEvent::Found(device)) => {
let device = *device;
if seen_ids.insert(device.id.clone()) {
self.devices.insert(device.id.clone(), device.clone());
all_devices.push(device);
}
}
Some(DiscoveryEvent::Error(e)) => {
tracing::warn!("Discovery error: {}", e);
}
Some(DiscoveryEvent::Lost(_)) | None => break,
}
}
_ = tokio::time::sleep_until(deadline) => {
tracing::debug!("LAN discovery timeout");
break;
}
}
}
#[cfg(feature = "rendezvous")]
if self.config.rendezvous_url.is_some() {
match self.discover_wan().await {
Ok(wan_devices) => {
for device in wan_devices {
if seen_ids.insert(device.id.clone()) {
self.devices.insert(device.id.clone(), device.clone());
all_devices.push(device);
}
}
}
Err(e) => {
tracing::warn!("WAN discovery failed: {}", e);
}
}
}
Ok(all_devices)
}
pub async fn start(&mut self) -> Result<mpsc::Receiver<DiscoveryEvent>> {
let (tx, rx) = mpsc::channel(100);
#[cfg(feature = "mdns")]
if self.config.mdns {
let tx_clone = tx.clone();
tokio::spawn(async move {
if let Err(e) = mdns::discover(tx_clone).await {
tracing::warn!("mDNS discovery error: {}", e);
}
});
}
#[cfg(feature = "broadcast")]
if self.config.broadcast {
let tx_clone = tx.clone();
let port = self.config.broadcast_port;
tokio::spawn(async move {
if let Err(e) = broadcast::discover(port, tx_clone).await {
tracing::warn!("Broadcast discovery error: {}", e);
}
});
}
#[cfg(feature = "rendezvous")]
if self.config.rendezvous_url.is_some() {
let tx_clone = tx.clone();
let config = self.config.clone();
tokio::spawn(async move {
let url = config.rendezvous_url.as_ref().unwrap();
let client = rendezvous::RendezvousClient::new(url);
let tag = config.rendezvous_tag.as_deref();
match client.discover(tag).await {
Ok(devices) => {
for rd in devices {
let mut meta = rd.metadata.clone();
if !rd.tags.is_empty() {
meta.insert("tags".to_string(), rd.tags.join(","));
}
let info = DeviceInfo {
version: clasp_core::PROTOCOL_VERSION,
features: rd.features,
bridge: false,
bridge_protocol: None,
meta,
entity_id: None,
};
let now = std::time::Instant::now();
let device = Device {
id: rd.id,
name: rd.name,
info,
endpoints: rd.endpoints,
discovered_at: now,
last_seen: now,
};
let _ = tx_clone.send(DiscoveryEvent::Found(Box::new(device))).await;
}
}
Err(e) => {
tracing::warn!("Rendezvous discovery error: {}", e);
let _ = tx_clone
.send(DiscoveryEvent::Error(format!(
"Rendezvous discovery failed: {}",
e
)))
.await;
}
}
});
}
Ok(rx)
}
pub fn devices(&self) -> impl Iterator<Item = &Device> {
self.devices.values()
}
pub fn get(&self, id: &str) -> Option<&Device> {
self.devices.get(id)
}
pub fn add(&mut self, device: Device) {
self.devices.insert(device.id.clone(), device);
}
pub fn remove(&mut self, id: &str) -> Option<Device> {
self.devices.remove(id)
}
}
impl Default for Discovery {
fn default() -> Self {
Self::new()
}
}