Skip to main content

aranet_core/
passive.rs

1//! Passive monitoring via BLE advertisements.
2//!
3//! This module provides functionality to monitor Aranet devices without
4//! establishing a connection, using BLE advertisement data instead.
5//!
6//! # Benefits
7//!
8//! - **Lower power consumption**: No connection overhead
9//! - **More devices**: Can monitor more than the BLE connection limit
10//! - **Simpler**: No connection management needed
11//!
12//! # Requirements
13//!
14//! Smart Home integration must be enabled on each device:
15//! - Go to device Settings > Smart Home > Enable
16//!
17//! # Example
18//!
19//! ```ignore
20//! use aranet_core::passive::{PassiveMonitor, PassiveMonitorOptions};
21//! use tokio_util::sync::CancellationToken;
22//!
23//! let monitor = PassiveMonitor::new(PassiveMonitorOptions::default());
24//! let cancel = CancellationToken::new();
25//!
26//! // Start monitoring in background
27//! let handle = monitor.start(cancel.clone());
28//!
29//! // Receive readings
30//! let mut rx = monitor.subscribe();
31//! while let Ok(reading) = rx.recv().await {
32//!     println!("Device: {} CO2: {:?}", reading.device_name, reading.data.co2);
33//! }
34//! ```
35
36use std::collections::HashMap;
37use std::sync::Arc;
38use std::time::Duration;
39
40use btleplug::api::{Central, Peripheral as _, ScanFilter};
41use tokio::sync::{RwLock, broadcast};
42use tokio::time::sleep;
43use tokio_util::sync::CancellationToken;
44use tracing::{debug, info, warn};
45
46use crate::advertisement::{AdvertisementData, parse_advertisement_with_name};
47use crate::error::Result;
48use crate::scan::get_adapter;
49use crate::uuid::MANUFACTURER_ID;
50
51/// A reading from passive advertisement monitoring.
52#[derive(Debug, Clone)]
53pub struct PassiveReading {
54    /// Device identifier (MAC address or UUID).
55    pub device_id: String,
56    /// Device name if available.
57    pub device_name: Option<String>,
58    /// RSSI signal strength.
59    pub rssi: Option<i16>,
60    /// Parsed advertisement data.
61    pub data: AdvertisementData,
62    /// When this reading was received.
63    pub received_at: std::time::Instant,
64}
65
66/// Options for passive monitoring.
67#[derive(Debug, Clone)]
68pub struct PassiveMonitorOptions {
69    /// How long to scan between processing cycles.
70    pub scan_duration: Duration,
71    /// Delay between scan cycles.
72    pub scan_interval: Duration,
73    /// Channel capacity for readings.
74    pub channel_capacity: usize,
75    /// Only emit readings when values change (deduplicate).
76    pub deduplicate: bool,
77    /// Maximum age of cached readings before re-emitting (if deduplicate is true).
78    pub max_reading_age: Duration,
79    /// Filter to only these device IDs (empty = all Aranet devices).
80    pub device_filter: Vec<String>,
81}
82
83impl Default for PassiveMonitorOptions {
84    fn default() -> Self {
85        Self {
86            scan_duration: Duration::from_secs(5),
87            scan_interval: Duration::from_secs(1),
88            channel_capacity: 100,
89            deduplicate: true,
90            max_reading_age: Duration::from_secs(60),
91            device_filter: Vec::new(),
92        }
93    }
94}
95
96impl PassiveMonitorOptions {
97    /// Create new options with default settings.
98    pub fn new() -> Self {
99        Self::default()
100    }
101
102    /// Set the scan duration.
103    pub fn scan_duration(mut self, duration: Duration) -> Self {
104        self.scan_duration = duration;
105        self
106    }
107
108    /// Set the interval between scan cycles.
109    pub fn scan_interval(mut self, interval: Duration) -> Self {
110        self.scan_interval = interval;
111        self
112    }
113
114    /// Enable or disable deduplication.
115    pub fn deduplicate(mut self, enable: bool) -> Self {
116        self.deduplicate = enable;
117        self
118    }
119
120    /// Filter to specific device IDs.
121    pub fn filter_devices(mut self, device_ids: Vec<String>) -> Self {
122        self.device_filter = device_ids;
123        self
124    }
125}
126
127/// Cached reading for deduplication.
128struct CachedReading {
129    data: AdvertisementData,
130    received_at: std::time::Instant,
131}
132
133/// Passive monitor for Aranet devices using BLE advertisements.
134///
135/// This allows monitoring multiple devices without establishing connections,
136/// which is useful for scenarios where:
137/// - You need to monitor more devices than the BLE connection limit
138/// - Low power consumption is important
139/// - Real-time data isn't critical (advertisement interval is typically 4+ seconds)
140pub struct PassiveMonitor {
141    options: PassiveMonitorOptions,
142    /// Broadcast sender for readings.
143    sender: broadcast::Sender<PassiveReading>,
144    /// Cache of last readings for deduplication.
145    cache: Arc<RwLock<HashMap<String, CachedReading>>>,
146}
147
148impl PassiveMonitor {
149    /// Create a new passive monitor with the given options.
150    pub fn new(options: PassiveMonitorOptions) -> Self {
151        let (sender, _) = broadcast::channel(options.channel_capacity);
152        Self {
153            options,
154            sender,
155            cache: Arc::new(RwLock::new(HashMap::new())),
156        }
157    }
158
159    /// Subscribe to passive readings.
160    ///
161    /// Returns a receiver that will receive readings as they are detected.
162    pub fn subscribe(&self) -> broadcast::Receiver<PassiveReading> {
163        self.sender.subscribe()
164    }
165
166    /// Get the number of active subscribers.
167    pub fn subscriber_count(&self) -> usize {
168        self.sender.receiver_count()
169    }
170
171    /// Start the passive monitor.
172    ///
173    /// This spawns a background task that continuously scans for BLE
174    /// advertisements and parses Aranet device data.
175    ///
176    /// The task runs until the cancellation token is triggered.
177    pub fn start(self: &Arc<Self>, cancel_token: CancellationToken) -> tokio::task::JoinHandle<()> {
178        let monitor = Arc::clone(self);
179
180        tokio::spawn(async move {
181            info!("Starting passive monitor");
182
183            loop {
184                tokio::select! {
185                    _ = cancel_token.cancelled() => {
186                        info!("Passive monitor cancelled");
187                        break;
188                    }
189                    result = monitor.scan_cycle() => {
190                        if let Err(e) = result {
191                            warn!("Passive monitor scan error: {}", e);
192                        }
193                        // Wait before next scan cycle
194                        sleep(monitor.options.scan_interval).await;
195                    }
196                }
197            }
198        })
199    }
200
201    /// Perform a single scan cycle.
202    async fn scan_cycle(&self) -> Result<()> {
203        let adapter = get_adapter().await?;
204
205        // Start scanning
206        adapter.start_scan(ScanFilter::default()).await?;
207        sleep(self.options.scan_duration).await;
208        adapter.stop_scan().await?;
209
210        // Process discovered peripherals
211        let peripherals = adapter.peripherals().await?;
212
213        for peripheral in peripherals {
214            if let Ok(Some(props)) = peripheral.properties().await {
215                // Check if this is an Aranet device by manufacturer data
216                if let Some(data) = props.manufacturer_data.get(&MANUFACTURER_ID) {
217                    let device_id = crate::util::create_identifier(
218                        &props.address.to_string(),
219                        &peripheral.id(),
220                    );
221
222                    // Check device filter
223                    if !self.options.device_filter.is_empty()
224                        && !self.options.device_filter.contains(&device_id)
225                    {
226                        continue;
227                    }
228
229                    // Try to parse the advertisement
230                    match parse_advertisement_with_name(data, props.local_name.as_deref()) {
231                        Ok(adv_data) => {
232                            // Check for deduplication
233                            let should_emit = if self.options.deduplicate {
234                                self.should_emit(&device_id, &adv_data).await
235                            } else {
236                                true
237                            };
238
239                            if should_emit {
240                                let reading = PassiveReading {
241                                    device_id: device_id.clone(),
242                                    device_name: props.local_name.clone(),
243                                    rssi: props.rssi,
244                                    data: adv_data.clone(),
245                                    received_at: std::time::Instant::now(),
246                                };
247
248                                // Update cache
249                                self.cache.write().await.insert(
250                                    device_id,
251                                    CachedReading {
252                                        data: adv_data,
253                                        received_at: std::time::Instant::now(),
254                                    },
255                                );
256
257                                // Send to subscribers (ignore if no receivers)
258                                let _ = self.sender.send(reading);
259                            }
260                        }
261                        Err(e) => {
262                            debug!("Failed to parse advertisement from {}: {}", device_id, e);
263                        }
264                    }
265                }
266            }
267        }
268
269        Ok(())
270    }
271
272    /// Check if a reading should be emitted (for deduplication).
273    async fn should_emit(&self, device_id: &str, data: &AdvertisementData) -> bool {
274        let cache = self.cache.read().await;
275
276        if let Some(cached) = cache.get(device_id) {
277            // Check if reading is too old
278            if cached.received_at.elapsed() > self.options.max_reading_age {
279                return true;
280            }
281
282            // Check if values have changed
283            if cached.data.co2 != data.co2
284                || cached.data.temperature != data.temperature
285                || cached.data.humidity != data.humidity
286                || cached.data.pressure != data.pressure
287                || cached.data.radon != data.radon
288                || cached.data.radiation_dose_rate != data.radiation_dose_rate
289                || cached.data.battery != data.battery
290            {
291                return true;
292            }
293
294            // Check if counter changed (new measurement)
295            if cached.data.counter != data.counter {
296                return true;
297            }
298
299            false
300        } else {
301            // Not in cache, emit
302            true
303        }
304    }
305
306    /// Get the last known reading for a device.
307    pub async fn get_last_reading(&self, device_id: &str) -> Option<AdvertisementData> {
308        let cache = self.cache.read().await;
309        cache.get(device_id).map(|c| c.data.clone())
310    }
311
312    /// Get all known device IDs.
313    pub async fn known_devices(&self) -> Vec<String> {
314        let cache = self.cache.read().await;
315        cache.keys().cloned().collect()
316    }
317
318    /// Clear the reading cache.
319    pub async fn clear_cache(&self) {
320        self.cache.write().await.clear();
321    }
322}
323
324impl Default for PassiveMonitor {
325    fn default() -> Self {
326        Self::new(PassiveMonitorOptions::default())
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333
334    #[test]
335    fn test_passive_monitor_options_default() {
336        let opts = PassiveMonitorOptions::default();
337        assert_eq!(opts.scan_duration, Duration::from_secs(5));
338        assert!(opts.deduplicate);
339        assert!(opts.device_filter.is_empty());
340    }
341
342    #[test]
343    fn test_passive_monitor_options_builder() {
344        let opts = PassiveMonitorOptions::new()
345            .scan_duration(Duration::from_secs(10))
346            .deduplicate(false)
347            .filter_devices(vec!["device1".to_string()]);
348
349        assert_eq!(opts.scan_duration, Duration::from_secs(10));
350        assert!(!opts.deduplicate);
351        assert_eq!(opts.device_filter, vec!["device1"]);
352    }
353
354    #[test]
355    fn test_passive_monitor_subscribe() {
356        let monitor = Arc::new(PassiveMonitor::default());
357        let _rx1 = monitor.subscribe();
358        let _rx2 = monitor.subscribe();
359        assert_eq!(monitor.subscriber_count(), 2);
360    }
361}