1pub mod device;
10pub mod error;
11
12#[cfg(feature = "mdns")]
13pub mod mdns;
14
15#[cfg(feature = "broadcast")]
16pub mod broadcast;
17
18#[cfg(feature = "rendezvous")]
19pub mod rendezvous;
20
21pub use device::{Device, DeviceInfo};
22pub use error::{DiscoveryError, Result};
23
24#[cfg(feature = "rendezvous")]
25pub use rendezvous::{DeviceRegistration, RendezvousClient, RendezvousConfig, RendezvousServer};
26
27#[cfg(feature = "rendezvous")]
28use std::sync::Arc;
29use std::time::Duration;
30use tokio::sync::mpsc;
31
32#[derive(Debug, Clone)]
34pub enum DiscoveryEvent {
35 Found(Box<Device>),
37 Lost(String), Error(String),
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum DiscoverySource {
46 Mdns,
48 Broadcast,
50 Rendezvous,
52 Manual,
54}
55
56#[derive(Debug, Clone)]
58pub struct DiscoveryConfig {
59 pub mdns: bool,
61 pub broadcast: bool,
63 pub broadcast_port: u16,
65 pub timeout: Duration,
67 pub rendezvous_url: Option<String>,
69 pub rendezvous_refresh_interval: Duration,
71 pub rendezvous_tag: Option<String>,
73}
74
75impl Default for DiscoveryConfig {
76 fn default() -> Self {
77 Self {
78 mdns: true,
79 broadcast: true,
80 broadcast_port: clasp_core::DEFAULT_DISCOVERY_PORT,
81 timeout: Duration::from_secs(5),
82 rendezvous_url: None,
83 rendezvous_refresh_interval: Duration::from_secs(120), rendezvous_tag: None,
85 }
86 }
87}
88
89#[cfg(feature = "rendezvous")]
91struct RendezvousKeepalive {
92 client: rendezvous::RendezvousClient,
93 registration: rendezvous::DeviceRegistration,
94 device_id: parking_lot::RwLock<Option<String>>,
95 refresh_interval: Duration,
96}
97
98#[cfg(feature = "rendezvous")]
99impl RendezvousKeepalive {
100 fn new(
101 url: &str,
102 registration: rendezvous::DeviceRegistration,
103 refresh_interval: Duration,
104 ) -> Self {
105 Self {
106 client: rendezvous::RendezvousClient::new(url),
107 registration,
108 device_id: parking_lot::RwLock::new(None),
109 refresh_interval,
110 }
111 }
112
113 async fn register(&self) -> Result<()> {
114 let response = self
115 .client
116 .register(self.registration.clone())
117 .await
118 .map_err(|e| DiscoveryError::Other(format!("Rendezvous registration failed: {}", e)))?;
119
120 *self.device_id.write() = Some(response.id);
121 tracing::info!("Registered with rendezvous server (TTL: {}s)", response.ttl);
122 Ok(())
123 }
124
125 async fn refresh(&self) -> Result<bool> {
126 let device_id: Option<String> = self.device_id.read().clone();
127 if let Some(ref id) = device_id {
128 let success =
129 self.client.refresh(id).await.map_err(|e| {
130 DiscoveryError::Other(format!("Rendezvous refresh failed: {}", e))
131 })?;
132
133 if !success {
134 tracing::warn!("Rendezvous registration expired, re-registering");
136 *self.device_id.write() = None;
137 self.register().await?;
138 }
139 Ok(true)
140 } else {
141 self.register().await?;
143 Ok(true)
144 }
145 }
146
147 #[allow(dead_code)]
148 async fn unregister(&self) -> Result<()> {
149 let device_id: Option<String> = self.device_id.write().take();
150 if let Some(ref id) = device_id {
151 let _ = self.client.unregister(id).await;
152 tracing::info!("Unregistered from rendezvous server");
153 }
154 Ok(())
155 }
156
157 fn start_keepalive(self: Arc<Self>) {
159 let keepalive = Arc::clone(&self);
160 tokio::spawn(async move {
161 if let Err(e) = keepalive.register().await {
163 tracing::error!("Initial rendezvous registration failed: {}", e);
164 }
165
166 let mut interval = tokio::time::interval(keepalive.refresh_interval);
168 loop {
169 interval.tick().await;
170 if let Err(e) = keepalive.refresh().await {
171 tracing::warn!("Rendezvous refresh failed: {}", e);
172 }
173 }
174 });
175 }
176}
177
178pub struct Discovery {
180 config: DiscoveryConfig,
181 devices: std::collections::HashMap<String, Device>,
182 #[cfg(feature = "rendezvous")]
183 rendezvous_keepalive: Option<Arc<RendezvousKeepalive>>,
184}
185
186impl Discovery {
187 pub fn new() -> Self {
188 Self {
189 config: DiscoveryConfig::default(),
190 devices: std::collections::HashMap::new(),
191 #[cfg(feature = "rendezvous")]
192 rendezvous_keepalive: None,
193 }
194 }
195
196 pub fn with_config(config: DiscoveryConfig) -> Self {
197 Self {
198 config,
199 devices: std::collections::HashMap::new(),
200 #[cfg(feature = "rendezvous")]
201 rendezvous_keepalive: None,
202 }
203 }
204
205 #[cfg(feature = "rendezvous")]
207 pub fn register_with_rendezvous(&mut self, registration: rendezvous::DeviceRegistration) {
208 if let Some(ref url) = self.config.rendezvous_url {
209 let keepalive = Arc::new(RendezvousKeepalive::new(
210 url,
211 registration,
212 self.config.rendezvous_refresh_interval,
213 ));
214 keepalive.clone().start_keepalive();
215 self.rendezvous_keepalive = Some(keepalive);
216 } else {
217 tracing::warn!("Cannot register with rendezvous: no URL configured");
218 }
219 }
220
221 #[cfg(feature = "rendezvous")]
223 pub async fn discover_wan(&self) -> Result<Vec<Device>> {
224 let url = self
225 .config
226 .rendezvous_url
227 .as_ref()
228 .ok_or_else(|| DiscoveryError::Other("No rendezvous URL configured".to_string()))?;
229
230 let client = rendezvous::RendezvousClient::new(url);
231 let tag = self.config.rendezvous_tag.as_deref();
232 let registered_devices = client
233 .discover(tag)
234 .await
235 .map_err(|e| DiscoveryError::Other(format!("Rendezvous discovery failed: {}", e)))?;
236
237 let devices: Vec<Device> = registered_devices
239 .into_iter()
240 .map(|rd| {
241 let mut meta = rd.metadata.clone();
242 if !rd.tags.is_empty() {
244 meta.insert("tags".to_string(), rd.tags.join(","));
245 }
246
247 let info = DeviceInfo {
248 version: clasp_core::PROTOCOL_VERSION,
249 features: rd.features,
250 bridge: false,
251 bridge_protocol: None,
252 meta,
253 entity_id: None,
254 };
255
256 let now = std::time::Instant::now();
257 Device {
258 id: rd.id,
259 name: rd.name,
260 info,
261 endpoints: rd.endpoints,
262 discovered_at: now,
263 last_seen: now,
264 }
265 })
266 .collect();
267
268 Ok(devices)
269 }
270
271 pub async fn discover_all(&mut self) -> Result<Vec<Device>> {
275 let (tx, mut rx) = mpsc::channel(100);
276 let mut all_devices = Vec::new();
277 let mut seen_ids = std::collections::HashSet::new();
278
279 #[cfg(feature = "mdns")]
281 if self.config.mdns {
282 let tx_clone = tx.clone();
283 tokio::spawn(async move {
284 if let Err(e) = mdns::discover(tx_clone).await {
285 tracing::warn!("mDNS discovery error: {}", e);
286 }
287 });
288 }
289
290 #[cfg(feature = "broadcast")]
291 if self.config.broadcast {
292 let tx_clone = tx.clone();
293 let port = self.config.broadcast_port;
294 tokio::spawn(async move {
295 if let Err(e) = broadcast::discover(port, tx_clone).await {
296 tracing::warn!("Broadcast discovery error: {}", e);
297 }
298 });
299 }
300
301 let timeout = self.config.timeout;
303 let deadline = tokio::time::Instant::now() + timeout;
304 drop(tx); loop {
307 tokio::select! {
308 event = rx.recv() => {
309 match event {
310 Some(DiscoveryEvent::Found(device)) => {
311 let device = *device;
312 if seen_ids.insert(device.id.clone()) {
313 self.devices.insert(device.id.clone(), device.clone());
314 all_devices.push(device);
315 }
316 }
317 Some(DiscoveryEvent::Error(e)) => {
318 tracing::warn!("Discovery error: {}", e);
319 }
320 Some(DiscoveryEvent::Lost(_)) | None => break,
321 }
322 }
323 _ = tokio::time::sleep_until(deadline) => {
324 tracing::debug!("LAN discovery timeout");
325 break;
326 }
327 }
328 }
329
330 #[cfg(feature = "rendezvous")]
332 if self.config.rendezvous_url.is_some() {
333 match self.discover_wan().await {
334 Ok(wan_devices) => {
335 for device in wan_devices {
336 if seen_ids.insert(device.id.clone()) {
337 self.devices.insert(device.id.clone(), device.clone());
338 all_devices.push(device);
339 }
340 }
341 }
342 Err(e) => {
343 tracing::warn!("WAN discovery failed: {}", e);
344 }
345 }
346 }
347
348 Ok(all_devices)
349 }
350
351 pub async fn start(&mut self) -> Result<mpsc::Receiver<DiscoveryEvent>> {
353 let (tx, rx) = mpsc::channel(100);
354
355 #[cfg(feature = "mdns")]
356 if self.config.mdns {
357 let tx_clone = tx.clone();
358 tokio::spawn(async move {
359 if let Err(e) = mdns::discover(tx_clone).await {
360 tracing::warn!("mDNS discovery error: {}", e);
361 }
362 });
363 }
364
365 #[cfg(feature = "broadcast")]
366 if self.config.broadcast {
367 let tx_clone = tx.clone();
368 let port = self.config.broadcast_port;
369 tokio::spawn(async move {
370 if let Err(e) = broadcast::discover(port, tx_clone).await {
371 tracing::warn!("Broadcast discovery error: {}", e);
372 }
373 });
374 }
375
376 #[cfg(feature = "rendezvous")]
378 if self.config.rendezvous_url.is_some() {
379 let tx_clone = tx.clone();
380 let config = self.config.clone();
381 tokio::spawn(async move {
382 let url = config.rendezvous_url.as_ref().unwrap();
383 let client = rendezvous::RendezvousClient::new(url);
384 let tag = config.rendezvous_tag.as_deref();
385
386 match client.discover(tag).await {
387 Ok(devices) => {
388 for rd in devices {
389 let mut meta = rd.metadata.clone();
390 if !rd.tags.is_empty() {
391 meta.insert("tags".to_string(), rd.tags.join(","));
392 }
393
394 let info = DeviceInfo {
395 version: clasp_core::PROTOCOL_VERSION,
396 features: rd.features,
397 bridge: false,
398 bridge_protocol: None,
399 meta,
400 entity_id: None,
401 };
402
403 let now = std::time::Instant::now();
404 let device = Device {
405 id: rd.id,
406 name: rd.name,
407 info,
408 endpoints: rd.endpoints,
409 discovered_at: now,
410 last_seen: now,
411 };
412 let _ = tx_clone.send(DiscoveryEvent::Found(Box::new(device))).await;
413 }
414 }
415 Err(e) => {
416 tracing::warn!("Rendezvous discovery error: {}", e);
417 let _ = tx_clone
418 .send(DiscoveryEvent::Error(format!(
419 "Rendezvous discovery failed: {}",
420 e
421 )))
422 .await;
423 }
424 }
425 });
426 }
427
428 Ok(rx)
429 }
430
431 pub fn devices(&self) -> impl Iterator<Item = &Device> {
433 self.devices.values()
434 }
435
436 pub fn get(&self, id: &str) -> Option<&Device> {
438 self.devices.get(id)
439 }
440
441 pub fn add(&mut self, device: Device) {
443 self.devices.insert(device.id.clone(), device);
444 }
445
446 pub fn remove(&mut self, id: &str) -> Option<Device> {
448 self.devices.remove(id)
449 }
450}
451
452impl Default for Discovery {
453 fn default() -> Self {
454 Self::new()
455 }
456}