Skip to main content

aranet_core/
passive.rs

1//! Passive monitoring via BLE advertisements.
2//!
3//! This module provides functionality to monitor Aranet devices without
4//! establishing a connection, using BLE advertisement data instead.
5//!
6//! # Benefits
7//!
8//! - **Lower power consumption**: No connection overhead
9//! - **More devices**: Can monitor more than the BLE connection limit
10//! - **Simpler**: No connection management needed
11//!
12//! # Requirements
13//!
14//! Smart Home integration must be enabled on each device:
15//! - Go to device Settings > Smart Home > Enable
16//!
17//! # Example
18//!
19//! ```ignore
20//! use aranet_core::passive::{PassiveMonitor, PassiveMonitorOptions};
21//! use tokio_util::sync::CancellationToken;
22//!
23//! let monitor = PassiveMonitor::new(PassiveMonitorOptions::default());
24//! let cancel = CancellationToken::new();
25//!
26//! // Start monitoring in background
27//! let handle = monitor.start(cancel.clone());
28//!
29//! // Receive readings
30//! let mut rx = monitor.subscribe();
31//! while let Ok(reading) = rx.recv().await {
32//!     println!("Device: {} CO2: {:?}", reading.device_name, reading.data.co2);
33//! }
34//! ```
35
36use std::collections::HashMap;
37use std::sync::Arc;
38use std::time::Duration;
39
40use btleplug::api::{Central, Peripheral as _, ScanFilter};
41use tokio::sync::{RwLock, broadcast};
42use tokio::time::sleep;
43use tokio_util::sync::CancellationToken;
44use tracing::{debug, info, warn};
45
46use crate::advertisement::{AdvertisementData, parse_advertisement_with_name};
47use crate::error::Result;
48use crate::scan::get_adapter;
49use crate::uuid::MANUFACTURER_ID;
50
51/// Bitwise-exact comparison of two `Option<f32>` values (handles NaN correctly).
52fn opt_f32_eq(a: Option<f32>, b: Option<f32>) -> bool {
53    match (a, b) {
54        (Some(x), Some(y)) => x.to_bits() == y.to_bits(),
55        (None, None) => true,
56        _ => false,
57    }
58}
59
60/// A reading from passive advertisement monitoring.
61#[derive(Debug, Clone)]
62pub struct PassiveReading {
63    /// Device identifier (MAC address or UUID).
64    pub device_id: String,
65    /// Device name if available.
66    pub device_name: Option<String>,
67    /// RSSI signal strength.
68    pub rssi: Option<i16>,
69    /// Parsed advertisement data.
70    pub data: AdvertisementData,
71    /// When this reading was received.
72    pub received_at: std::time::Instant,
73}
74
75/// Options for passive monitoring.
76#[derive(Debug, Clone)]
77pub struct PassiveMonitorOptions {
78    /// How long to scan between processing cycles.
79    pub scan_duration: Duration,
80    /// Delay between scan cycles.
81    pub scan_interval: Duration,
82    /// Channel capacity for readings.
83    pub channel_capacity: usize,
84    /// Only emit readings when values change (deduplicate).
85    pub deduplicate: bool,
86    /// Maximum age of cached readings before re-emitting (if deduplicate is true).
87    pub max_reading_age: Duration,
88    /// Filter to only these device IDs (empty = all Aranet devices).
89    pub device_filter: Vec<String>,
90}
91
92impl Default for PassiveMonitorOptions {
93    fn default() -> Self {
94        Self {
95            scan_duration: Duration::from_secs(5),
96            scan_interval: Duration::from_secs(1),
97            channel_capacity: 100,
98            deduplicate: true,
99            max_reading_age: Duration::from_secs(60),
100            device_filter: Vec::new(),
101        }
102    }
103}
104
105impl PassiveMonitorOptions {
106    /// Create new options with default settings.
107    pub fn new() -> Self {
108        Self::default()
109    }
110
111    /// Set the scan duration.
112    pub fn scan_duration(mut self, duration: Duration) -> Self {
113        self.scan_duration = duration;
114        self
115    }
116
117    /// Set the interval between scan cycles.
118    pub fn scan_interval(mut self, interval: Duration) -> Self {
119        self.scan_interval = interval;
120        self
121    }
122
123    /// Enable or disable deduplication.
124    pub fn deduplicate(mut self, enable: bool) -> Self {
125        self.deduplicate = enable;
126        self
127    }
128
129    /// Filter to specific device IDs.
130    pub fn filter_devices(mut self, device_ids: Vec<String>) -> Self {
131        self.device_filter = device_ids;
132        self
133    }
134}
135
136/// Cached reading for deduplication.
137struct CachedReading {
138    data: AdvertisementData,
139    received_at: std::time::Instant,
140}
141
142/// Passive monitor for Aranet devices using BLE advertisements.
143///
144/// This allows monitoring multiple devices without establishing connections,
145/// which is useful for scenarios where:
146/// - You need to monitor more devices than the BLE connection limit
147/// - Low power consumption is important
148/// - Real-time data isn't critical (advertisement interval is typically 4+ seconds)
149pub struct PassiveMonitor {
150    options: PassiveMonitorOptions,
151    /// Broadcast sender for readings.
152    sender: broadcast::Sender<PassiveReading>,
153    /// Cache of last readings for deduplication.
154    cache: Arc<RwLock<HashMap<String, CachedReading>>>,
155}
156
157impl PassiveMonitor {
158    /// Create a new passive monitor with the given options.
159    pub fn new(options: PassiveMonitorOptions) -> Self {
160        let (sender, _) = broadcast::channel(options.channel_capacity);
161        Self {
162            options,
163            sender,
164            cache: Arc::new(RwLock::new(HashMap::new())),
165        }
166    }
167
168    /// Subscribe to passive readings.
169    ///
170    /// Returns a receiver that will receive readings as they are detected.
171    pub fn subscribe(&self) -> broadcast::Receiver<PassiveReading> {
172        self.sender.subscribe()
173    }
174
175    /// Get the number of active subscribers.
176    pub fn subscriber_count(&self) -> usize {
177        self.sender.receiver_count()
178    }
179
180    /// Start the passive monitor.
181    ///
182    /// This spawns a background task that continuously scans for BLE
183    /// advertisements and parses Aranet device data.
184    ///
185    /// The task runs until the cancellation token is triggered.
186    pub fn start(self: &Arc<Self>, cancel_token: CancellationToken) -> tokio::task::JoinHandle<()> {
187        let monitor = Arc::clone(self);
188
189        tokio::spawn(async move {
190            info!("Starting passive monitor");
191
192            // Acquire the adapter once and reuse across scan cycles.
193            // On persistent errors we re-acquire it in case the adapter
194            // was reset or the D-Bus connection was lost.
195            let mut adapter = loop {
196                match get_adapter().await {
197                    Ok(a) => break a,
198                    Err(e) => {
199                        warn!("Passive monitor failed to get adapter: {e} — retrying in 10s");
200                        tokio::select! {
201                            _ = cancel_token.cancelled() => {
202                                info!("Passive monitor cancelled while waiting for adapter");
203                                return;
204                            }
205                            _ = sleep(Duration::from_secs(10)) => {}
206                        }
207                    }
208                }
209            };
210            let mut consecutive_errors: u32 = 0;
211
212            loop {
213                tokio::select! {
214                    _ = cancel_token.cancelled() => {
215                        info!("Passive monitor cancelled");
216                        break;
217                    }
218                    result = monitor.scan_cycle_with_adapter(&adapter) => {
219                        match result {
220                            Ok(()) => {
221                                consecutive_errors = 0;
222                            }
223                            Err(e) => {
224                                consecutive_errors += 1;
225                                warn!(
226                                    "Passive monitor scan error ({consecutive_errors} consecutive): {e}"
227                                );
228                                // After several consecutive failures, try to
229                                // re-acquire the adapter — it may have been
230                                // reset or the D-Bus connection may have died.
231                                if consecutive_errors >= 5 {
232                                    warn!("Passive monitor: re-acquiring adapter after {} consecutive errors", consecutive_errors);
233                                    match get_adapter().await {
234                                        Ok(a) => {
235                                            adapter = a;
236                                            info!("Passive monitor: adapter re-acquired");
237                                            consecutive_errors = 0;
238                                        }
239                                        Err(e2) => {
240                                            // Adapter re-acquisition failed — back off
241                                            // longer to avoid thrashing when the adapter
242                                            // is permanently unavailable.
243                                            warn!("Passive monitor: failed to re-acquire adapter: {}. Backing off.", e2);
244                                            let backoff = std::cmp::min(
245                                                monitor.options.scan_interval.saturating_mul(consecutive_errors),
246                                                std::time::Duration::from_secs(300),
247                                            );
248                                            sleep(backoff).await;
249                                        }
250                                    }
251                                }
252                            }
253                        }
254                        // Wait before next scan cycle
255                        sleep(monitor.options.scan_interval).await;
256                    }
257                }
258            }
259        })
260    }
261
262    /// Perform a single scan cycle using a pre-existing adapter.
263    async fn scan_cycle_with_adapter(&self, adapter: &btleplug::platform::Adapter) -> Result<()> {
264        // Start scanning
265        adapter.start_scan(ScanFilter::default()).await?;
266        sleep(self.options.scan_duration).await;
267        adapter.stop_scan().await?;
268
269        // Process discovered peripherals
270        let peripherals = adapter.peripherals().await?;
271
272        for peripheral in peripherals {
273            if let Ok(Some(props)) = peripheral.properties().await {
274                // Check if this is an Aranet device by manufacturer data
275                if let Some(data) = props.manufacturer_data.get(&MANUFACTURER_ID) {
276                    let device_id = crate::util::create_identifier(
277                        &props.address.to_string(),
278                        &peripheral.id(),
279                    );
280
281                    // Check device filter
282                    if !self.options.device_filter.is_empty()
283                        && !self.options.device_filter.contains(&device_id)
284                    {
285                        continue;
286                    }
287
288                    // Try to parse the advertisement
289                    match parse_advertisement_with_name(data, props.local_name.as_deref()) {
290                        Ok(adv_data) => {
291                            // Check for deduplication
292                            let should_emit = if self.options.deduplicate {
293                                self.should_emit(&device_id, &adv_data).await
294                            } else {
295                                true
296                            };
297
298                            if should_emit {
299                                let reading = PassiveReading {
300                                    device_id: device_id.clone(),
301                                    device_name: props.local_name.clone(),
302                                    rssi: props.rssi,
303                                    data: adv_data.clone(),
304                                    received_at: std::time::Instant::now(),
305                                };
306
307                                // Update cache
308                                self.cache.write().await.insert(
309                                    device_id,
310                                    CachedReading {
311                                        data: adv_data,
312                                        received_at: std::time::Instant::now(),
313                                    },
314                                );
315
316                                // Send to subscribers (ignore if no receivers)
317                                let _ = self.sender.send(reading);
318                            }
319                        }
320                        Err(e) => {
321                            debug!("Failed to parse advertisement from {}: {}", device_id, e);
322                        }
323                    }
324                }
325            }
326        }
327
328        Ok(())
329    }
330
331    /// Check if a reading should be emitted (for deduplication).
332    async fn should_emit(&self, device_id: &str, data: &AdvertisementData) -> bool {
333        let cache = self.cache.read().await;
334
335        if let Some(cached) = cache.get(device_id) {
336            // Check if reading is too old
337            if cached.received_at.elapsed() > self.options.max_reading_age {
338                return true;
339            }
340
341            // Check if values have changed (use total_cmp for floats to handle NaN correctly)
342            if cached.data.co2 != data.co2
343                || !opt_f32_eq(cached.data.temperature, data.temperature)
344                || cached.data.humidity != data.humidity
345                || !opt_f32_eq(cached.data.pressure, data.pressure)
346                || cached.data.radon != data.radon
347                || !opt_f32_eq(cached.data.radiation_dose_rate, data.radiation_dose_rate)
348                || cached.data.battery != data.battery
349            {
350                return true;
351            }
352
353            // Check if counter changed (new measurement)
354            if cached.data.counter != data.counter {
355                return true;
356            }
357
358            false
359        } else {
360            // Not in cache, emit
361            true
362        }
363    }
364
365    /// Get the last known reading for a device.
366    pub async fn get_last_reading(&self, device_id: &str) -> Option<AdvertisementData> {
367        let cache = self.cache.read().await;
368        cache.get(device_id).map(|c| c.data.clone())
369    }
370
371    /// Get all known device IDs.
372    pub async fn known_devices(&self) -> Vec<String> {
373        let cache = self.cache.read().await;
374        cache.keys().cloned().collect()
375    }
376
377    /// Clear the reading cache.
378    pub async fn clear_cache(&self) {
379        self.cache.write().await.clear();
380    }
381}
382
383impl Default for PassiveMonitor {
384    fn default() -> Self {
385        Self::new(PassiveMonitorOptions::default())
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392
393    #[test]
394    fn test_passive_monitor_options_default() {
395        let opts = PassiveMonitorOptions::default();
396        assert_eq!(opts.scan_duration, Duration::from_secs(5));
397        assert!(opts.deduplicate);
398        assert!(opts.device_filter.is_empty());
399    }
400
401    #[test]
402    fn test_passive_monitor_options_builder() {
403        let opts = PassiveMonitorOptions::new()
404            .scan_duration(Duration::from_secs(10))
405            .deduplicate(false)
406            .filter_devices(vec!["device1".to_string()]);
407
408        assert_eq!(opts.scan_duration, Duration::from_secs(10));
409        assert!(!opts.deduplicate);
410        assert_eq!(opts.device_filter, vec!["device1"]);
411    }
412
413    #[test]
414    fn test_passive_monitor_subscribe() {
415        let monitor = Arc::new(PassiveMonitor::default());
416        let _rx1 = monitor.subscribe();
417        let _rx2 = monitor.subscribe();
418        assert_eq!(monitor.subscriber_count(), 2);
419    }
420
421    /// Helper to create test advertisement data with sensible defaults.
422    fn make_adv_data() -> AdvertisementData {
423        AdvertisementData {
424            device_type: aranet_types::DeviceType::Aranet4,
425            co2: Some(800),
426            temperature: Some(22.5),
427            pressure: Some(1013.2),
428            humidity: Some(45),
429            battery: 85,
430            status: aranet_types::Status::Green,
431            interval: 300,
432            age: 120,
433            radon: None,
434            radiation_dose_rate: None,
435            counter: Some(5),
436            flags: 0x22,
437        }
438    }
439
440    #[tokio::test]
441    async fn test_should_emit_first_reading() {
442        let monitor = PassiveMonitor::default();
443        let data = make_adv_data();
444
445        // First reading for a device should always be emitted.
446        assert!(monitor.should_emit("device-1", &data).await);
447    }
448
449    #[tokio::test]
450    async fn test_should_emit_duplicate_suppressed() {
451        let monitor = PassiveMonitor::default();
452        let data = make_adv_data();
453
454        // Populate the cache.
455        monitor.cache.write().await.insert(
456            "device-1".to_string(),
457            CachedReading {
458                data: data.clone(),
459                received_at: std::time::Instant::now(),
460            },
461        );
462
463        // Identical reading should be suppressed.
464        assert!(!monitor.should_emit("device-1", &data).await);
465    }
466
467    #[tokio::test]
468    async fn test_should_emit_on_value_change() {
469        let monitor = PassiveMonitor::default();
470        let data = make_adv_data();
471
472        monitor.cache.write().await.insert(
473            "device-1".to_string(),
474            CachedReading {
475                data: data.clone(),
476                received_at: std::time::Instant::now(),
477            },
478        );
479
480        // Changed CO2 should trigger emission.
481        let mut changed = data.clone();
482        changed.co2 = Some(900);
483        assert!(monitor.should_emit("device-1", &changed).await);
484
485        // Changed battery should trigger emission.
486        let mut changed = data.clone();
487        changed.battery = 50;
488        assert!(monitor.should_emit("device-1", &changed).await);
489
490        // Changed temperature should trigger emission.
491        let mut changed = data;
492        changed.temperature = Some(23.0);
493        assert!(monitor.should_emit("device-1", &changed).await);
494    }
495
496    #[tokio::test]
497    async fn test_should_emit_on_counter_change() {
498        let monitor = PassiveMonitor::default();
499        let data = make_adv_data();
500
501        monitor.cache.write().await.insert(
502            "device-1".to_string(),
503            CachedReading {
504                data: data.clone(),
505                received_at: std::time::Instant::now(),
506            },
507        );
508
509        // Counter increment means a new measurement was taken.
510        let mut changed = data;
511        changed.counter = Some(6);
512        assert!(monitor.should_emit("device-1", &changed).await);
513    }
514
515    #[tokio::test]
516    async fn test_should_emit_on_stale_cache() {
517        let opts = PassiveMonitorOptions {
518            max_reading_age: Duration::from_millis(10),
519            ..Default::default()
520        };
521        let monitor = PassiveMonitor::new(opts);
522        let data = make_adv_data();
523
524        // Insert a reading that is already "old".
525        monitor.cache.write().await.insert(
526            "device-1".to_string(),
527            CachedReading {
528                data: data.clone(),
529                received_at: std::time::Instant::now() - Duration::from_millis(50),
530            },
531        );
532
533        // Identical data should still be emitted because the cache entry expired.
534        assert!(monitor.should_emit("device-1", &data).await);
535    }
536
537    #[tokio::test]
538    async fn test_should_emit_different_device() {
539        let monitor = PassiveMonitor::default();
540        let data = make_adv_data();
541
542        // Cache a reading for device-1.
543        monitor.cache.write().await.insert(
544            "device-1".to_string(),
545            CachedReading {
546                data: data.clone(),
547                received_at: std::time::Instant::now(),
548            },
549        );
550
551        // device-2 has no cache entry, so it should emit even with identical data.
552        assert!(monitor.should_emit("device-2", &data).await);
553    }
554}