Skip to main content

clasp_discovery/
lib.rs

1//! Clasp Discovery
2//!
3//! Provides device discovery mechanisms:
4//! - mDNS/Bonjour for LAN auto-discovery
5//! - UDP broadcast fallback
6//! - Rendezvous server for WAN discovery
7//! - Manual registration
8
9pub mod device;
10pub mod error;
11
12#[cfg(feature = "mdns")]
13pub mod mdns;
14
15#[cfg(feature = "broadcast")]
16pub mod broadcast;
17
18#[cfg(feature = "rendezvous")]
19pub mod rendezvous;
20
21pub use device::{Device, DeviceInfo};
22pub use error::{DiscoveryError, Result};
23
24#[cfg(feature = "rendezvous")]
25pub use rendezvous::{DeviceRegistration, RendezvousClient, RendezvousConfig, RendezvousServer};
26
27#[cfg(feature = "rendezvous")]
28use std::sync::Arc;
29use std::time::Duration;
30use tokio::sync::mpsc;
31
32/// Discovery event
33#[derive(Debug, Clone)]
34pub enum DiscoveryEvent {
35    /// Device discovered
36    Found(Box<Device>),
37    /// Device removed/lost
38    Lost(String), // Device ID
39    /// Error during discovery
40    Error(String),
41}
42
43/// Discovery source (where the device was discovered from)
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum DiscoverySource {
46    /// mDNS/Bonjour (LAN)
47    Mdns,
48    /// UDP broadcast (LAN)
49    Broadcast,
50    /// Rendezvous server (WAN)
51    Rendezvous,
52    /// Manually added
53    Manual,
54}
55
56/// Discovery configuration
57#[derive(Debug, Clone)]
58pub struct DiscoveryConfig {
59    /// Enable mDNS discovery
60    pub mdns: bool,
61    /// Enable UDP broadcast discovery
62    pub broadcast: bool,
63    /// Broadcast port
64    pub broadcast_port: u16,
65    /// Discovery timeout
66    pub timeout: Duration,
67    /// Rendezvous server URL for WAN discovery (e.g., "https://relay.clasp.to")
68    pub rendezvous_url: Option<String>,
69    /// Rendezvous refresh interval (how often to re-register, should be < TTL)
70    pub rendezvous_refresh_interval: Duration,
71    /// Filter tag for rendezvous discovery
72    pub rendezvous_tag: Option<String>,
73}
74
75impl Default for DiscoveryConfig {
76    fn default() -> Self {
77        Self {
78            mdns: true,
79            broadcast: true,
80            broadcast_port: clasp_core::DEFAULT_DISCOVERY_PORT,
81            timeout: Duration::from_secs(5),
82            rendezvous_url: None,
83            rendezvous_refresh_interval: Duration::from_secs(120), // 2 minutes (< 5 min default TTL)
84            rendezvous_tag: None,
85        }
86    }
87}
88
89/// Rendezvous keepalive state
90#[cfg(feature = "rendezvous")]
91struct RendezvousKeepalive {
92    client: rendezvous::RendezvousClient,
93    registration: rendezvous::DeviceRegistration,
94    device_id: parking_lot::RwLock<Option<String>>,
95    refresh_interval: Duration,
96}
97
98#[cfg(feature = "rendezvous")]
99impl RendezvousKeepalive {
100    fn new(
101        url: &str,
102        registration: rendezvous::DeviceRegistration,
103        refresh_interval: Duration,
104    ) -> Self {
105        Self {
106            client: rendezvous::RendezvousClient::new(url),
107            registration,
108            device_id: parking_lot::RwLock::new(None),
109            refresh_interval,
110        }
111    }
112
113    async fn register(&self) -> Result<()> {
114        let response = self
115            .client
116            .register(self.registration.clone())
117            .await
118            .map_err(|e| DiscoveryError::Other(format!("Rendezvous registration failed: {}", e)))?;
119
120        *self.device_id.write() = Some(response.id);
121        tracing::info!("Registered with rendezvous server (TTL: {}s)", response.ttl);
122        Ok(())
123    }
124
125    async fn refresh(&self) -> Result<bool> {
126        let device_id: Option<String> = self.device_id.read().clone();
127        if let Some(ref id) = device_id {
128            let success =
129                self.client.refresh(id).await.map_err(|e| {
130                    DiscoveryError::Other(format!("Rendezvous refresh failed: {}", e))
131                })?;
132
133            if !success {
134                // Device was removed, re-register
135                tracing::warn!("Rendezvous registration expired, re-registering");
136                *self.device_id.write() = None;
137                self.register().await?;
138            }
139            Ok(true)
140        } else {
141            // Not registered yet, register now
142            self.register().await?;
143            Ok(true)
144        }
145    }
146
147    #[allow(dead_code)]
148    async fn unregister(&self) -> Result<()> {
149        let device_id: Option<String> = self.device_id.write().take();
150        if let Some(ref id) = device_id {
151            let _ = self.client.unregister(id).await;
152            tracing::info!("Unregistered from rendezvous server");
153        }
154        Ok(())
155    }
156
157    /// Start the keepalive loop
158    fn start_keepalive(self: Arc<Self>) {
159        let keepalive = Arc::clone(&self);
160        tokio::spawn(async move {
161            // Initial registration
162            if let Err(e) = keepalive.register().await {
163                tracing::error!("Initial rendezvous registration failed: {}", e);
164            }
165
166            // Refresh loop
167            let mut interval = tokio::time::interval(keepalive.refresh_interval);
168            loop {
169                interval.tick().await;
170                if let Err(e) = keepalive.refresh().await {
171                    tracing::warn!("Rendezvous refresh failed: {}", e);
172                }
173            }
174        });
175    }
176}
177
178/// Discover Clasp devices
179pub struct Discovery {
180    config: DiscoveryConfig,
181    devices: std::collections::HashMap<String, Device>,
182    #[cfg(feature = "rendezvous")]
183    rendezvous_keepalive: Option<Arc<RendezvousKeepalive>>,
184}
185
186impl Discovery {
187    pub fn new() -> Self {
188        Self {
189            config: DiscoveryConfig::default(),
190            devices: std::collections::HashMap::new(),
191            #[cfg(feature = "rendezvous")]
192            rendezvous_keepalive: None,
193        }
194    }
195
196    pub fn with_config(config: DiscoveryConfig) -> Self {
197        Self {
198            config,
199            devices: std::collections::HashMap::new(),
200            #[cfg(feature = "rendezvous")]
201            rendezvous_keepalive: None,
202        }
203    }
204
205    /// Register this device with the rendezvous server and start keepalive
206    #[cfg(feature = "rendezvous")]
207    pub fn register_with_rendezvous(&mut self, registration: rendezvous::DeviceRegistration) {
208        if let Some(ref url) = self.config.rendezvous_url {
209            let keepalive = Arc::new(RendezvousKeepalive::new(
210                url,
211                registration,
212                self.config.rendezvous_refresh_interval,
213            ));
214            keepalive.clone().start_keepalive();
215            self.rendezvous_keepalive = Some(keepalive);
216        } else {
217            tracing::warn!("Cannot register with rendezvous: no URL configured");
218        }
219    }
220
221    /// Discover devices from the rendezvous server (WAN discovery)
222    #[cfg(feature = "rendezvous")]
223    pub async fn discover_wan(&self) -> Result<Vec<Device>> {
224        let url = self
225            .config
226            .rendezvous_url
227            .as_ref()
228            .ok_or_else(|| DiscoveryError::Other("No rendezvous URL configured".to_string()))?;
229
230        let client = rendezvous::RendezvousClient::new(url);
231        let tag = self.config.rendezvous_tag.as_deref();
232        let registered_devices = client
233            .discover(tag)
234            .await
235            .map_err(|e| DiscoveryError::Other(format!("Rendezvous discovery failed: {}", e)))?;
236
237        // Convert RegisteredDevice to Device
238        let devices: Vec<Device> = registered_devices
239            .into_iter()
240            .map(|rd| {
241                let mut meta = rd.metadata.clone();
242                // Add tags to metadata
243                if !rd.tags.is_empty() {
244                    meta.insert("tags".to_string(), rd.tags.join(","));
245                }
246
247                let info = DeviceInfo {
248                    version: clasp_core::PROTOCOL_VERSION,
249                    features: rd.features,
250                    bridge: false,
251                    bridge_protocol: None,
252                    meta,
253                    entity_id: None,
254                };
255
256                let now = std::time::Instant::now();
257                Device {
258                    id: rd.id,
259                    name: rd.name,
260                    info,
261                    endpoints: rd.endpoints,
262                    discovered_at: now,
263                    last_seen: now,
264                }
265            })
266            .collect();
267
268        Ok(devices)
269    }
270
271    /// Discover all devices using all available methods (cascade discovery)
272    /// Tries: mDNS → broadcast → rendezvous
273    /// Returns devices from all successful discovery methods
274    pub async fn discover_all(&mut self) -> Result<Vec<Device>> {
275        let (tx, mut rx) = mpsc::channel(100);
276        let mut all_devices = Vec::new();
277        let mut seen_ids = std::collections::HashSet::new();
278
279        // Start LAN discovery
280        #[cfg(feature = "mdns")]
281        if self.config.mdns {
282            let tx_clone = tx.clone();
283            tokio::spawn(async move {
284                if let Err(e) = mdns::discover(tx_clone).await {
285                    tracing::warn!("mDNS discovery error: {}", e);
286                }
287            });
288        }
289
290        #[cfg(feature = "broadcast")]
291        if self.config.broadcast {
292            let tx_clone = tx.clone();
293            let port = self.config.broadcast_port;
294            tokio::spawn(async move {
295                if let Err(e) = broadcast::discover(port, tx_clone).await {
296                    tracing::warn!("Broadcast discovery error: {}", e);
297                }
298            });
299        }
300
301        // Collect LAN results with timeout
302        let timeout = self.config.timeout;
303        let deadline = tokio::time::Instant::now() + timeout;
304        drop(tx); // Close sender so rx completes when all spawned tasks finish
305
306        loop {
307            tokio::select! {
308                event = rx.recv() => {
309                    match event {
310                        Some(DiscoveryEvent::Found(device)) => {
311                            let device = *device;
312                            if seen_ids.insert(device.id.clone()) {
313                                self.devices.insert(device.id.clone(), device.clone());
314                                all_devices.push(device);
315                            }
316                        }
317                        Some(DiscoveryEvent::Error(e)) => {
318                            tracing::warn!("Discovery error: {}", e);
319                        }
320                        Some(DiscoveryEvent::Lost(_)) | None => break,
321                    }
322                }
323                _ = tokio::time::sleep_until(deadline) => {
324                    tracing::debug!("LAN discovery timeout");
325                    break;
326                }
327            }
328        }
329
330        // Try WAN discovery if configured
331        #[cfg(feature = "rendezvous")]
332        if self.config.rendezvous_url.is_some() {
333            match self.discover_wan().await {
334                Ok(wan_devices) => {
335                    for device in wan_devices {
336                        if seen_ids.insert(device.id.clone()) {
337                            self.devices.insert(device.id.clone(), device.clone());
338                            all_devices.push(device);
339                        }
340                    }
341                }
342                Err(e) => {
343                    tracing::warn!("WAN discovery failed: {}", e);
344                }
345            }
346        }
347
348        Ok(all_devices)
349    }
350
351    /// Start discovery and return a receiver for events
352    pub async fn start(&mut self) -> Result<mpsc::Receiver<DiscoveryEvent>> {
353        let (tx, rx) = mpsc::channel(100);
354
355        #[cfg(feature = "mdns")]
356        if self.config.mdns {
357            let tx_clone = tx.clone();
358            tokio::spawn(async move {
359                if let Err(e) = mdns::discover(tx_clone).await {
360                    tracing::warn!("mDNS discovery error: {}", e);
361                }
362            });
363        }
364
365        #[cfg(feature = "broadcast")]
366        if self.config.broadcast {
367            let tx_clone = tx.clone();
368            let port = self.config.broadcast_port;
369            tokio::spawn(async move {
370                if let Err(e) = broadcast::discover(port, tx_clone).await {
371                    tracing::warn!("Broadcast discovery error: {}", e);
372                }
373            });
374        }
375
376        // Start WAN discovery if configured
377        #[cfg(feature = "rendezvous")]
378        if self.config.rendezvous_url.is_some() {
379            let tx_clone = tx.clone();
380            let config = self.config.clone();
381            tokio::spawn(async move {
382                let url = config.rendezvous_url.as_ref().unwrap();
383                let client = rendezvous::RendezvousClient::new(url);
384                let tag = config.rendezvous_tag.as_deref();
385
386                match client.discover(tag).await {
387                    Ok(devices) => {
388                        for rd in devices {
389                            let mut meta = rd.metadata.clone();
390                            if !rd.tags.is_empty() {
391                                meta.insert("tags".to_string(), rd.tags.join(","));
392                            }
393
394                            let info = DeviceInfo {
395                                version: clasp_core::PROTOCOL_VERSION,
396                                features: rd.features,
397                                bridge: false,
398                                bridge_protocol: None,
399                                meta,
400                                entity_id: None,
401                            };
402
403                            let now = std::time::Instant::now();
404                            let device = Device {
405                                id: rd.id,
406                                name: rd.name,
407                                info,
408                                endpoints: rd.endpoints,
409                                discovered_at: now,
410                                last_seen: now,
411                            };
412                            let _ = tx_clone.send(DiscoveryEvent::Found(Box::new(device))).await;
413                        }
414                    }
415                    Err(e) => {
416                        tracing::warn!("Rendezvous discovery error: {}", e);
417                        let _ = tx_clone
418                            .send(DiscoveryEvent::Error(format!(
419                                "Rendezvous discovery failed: {}",
420                                e
421                            )))
422                            .await;
423                    }
424                }
425            });
426        }
427
428        Ok(rx)
429    }
430
431    /// Get currently known devices
432    pub fn devices(&self) -> impl Iterator<Item = &Device> {
433        self.devices.values()
434    }
435
436    /// Get a device by ID
437    pub fn get(&self, id: &str) -> Option<&Device> {
438        self.devices.get(id)
439    }
440
441    /// Manually add a device
442    pub fn add(&mut self, device: Device) {
443        self.devices.insert(device.id.clone(), device);
444    }
445
446    /// Remove a device
447    pub fn remove(&mut self, id: &str) -> Option<Device> {
448        self.devices.remove(id)
449    }
450}
451
452impl Default for Discovery {
453    fn default() -> Self {
454        Self::new()
455    }
456}