1use std::collections::HashMap;
37use std::sync::Arc;
38use std::time::Duration;
39
40use btleplug::api::{Central, Peripheral as _, ScanFilter};
41use tokio::sync::{RwLock, broadcast};
42use tokio::time::sleep;
43use tokio_util::sync::CancellationToken;
44use tracing::{debug, info, warn};
45
46use crate::advertisement::{AdvertisementData, parse_advertisement_with_name};
47use crate::error::Result;
48use crate::scan::get_adapter;
49use crate::uuid::MANUFACTURER_ID;
50
51fn opt_f32_eq(a: Option<f32>, b: Option<f32>) -> bool {
53 match (a, b) {
54 (Some(x), Some(y)) => x.to_bits() == y.to_bits(),
55 (None, None) => true,
56 _ => false,
57 }
58}
59
60#[derive(Debug, Clone)]
62pub struct PassiveReading {
63 pub device_id: String,
65 pub device_name: Option<String>,
67 pub rssi: Option<i16>,
69 pub data: AdvertisementData,
71 pub received_at: std::time::Instant,
73}
74
75#[derive(Debug, Clone)]
77pub struct PassiveMonitorOptions {
78 pub scan_duration: Duration,
80 pub scan_interval: Duration,
82 pub channel_capacity: usize,
84 pub deduplicate: bool,
86 pub max_reading_age: Duration,
88 pub device_filter: Vec<String>,
90}
91
92impl Default for PassiveMonitorOptions {
93 fn default() -> Self {
94 Self {
95 scan_duration: Duration::from_secs(5),
96 scan_interval: Duration::from_secs(1),
97 channel_capacity: 100,
98 deduplicate: true,
99 max_reading_age: Duration::from_secs(60),
100 device_filter: Vec::new(),
101 }
102 }
103}
104
105impl PassiveMonitorOptions {
106 pub fn new() -> Self {
108 Self::default()
109 }
110
111 pub fn scan_duration(mut self, duration: Duration) -> Self {
113 self.scan_duration = duration;
114 self
115 }
116
117 pub fn scan_interval(mut self, interval: Duration) -> Self {
119 self.scan_interval = interval;
120 self
121 }
122
123 pub fn deduplicate(mut self, enable: bool) -> Self {
125 self.deduplicate = enable;
126 self
127 }
128
129 pub fn filter_devices(mut self, device_ids: Vec<String>) -> Self {
131 self.device_filter = device_ids;
132 self
133 }
134}
135
136struct CachedReading {
138 data: AdvertisementData,
139 received_at: std::time::Instant,
140}
141
142pub struct PassiveMonitor {
150 options: PassiveMonitorOptions,
151 sender: broadcast::Sender<PassiveReading>,
153 cache: Arc<RwLock<HashMap<String, CachedReading>>>,
155}
156
157impl PassiveMonitor {
158 pub fn new(options: PassiveMonitorOptions) -> Self {
160 let (sender, _) = broadcast::channel(options.channel_capacity);
161 Self {
162 options,
163 sender,
164 cache: Arc::new(RwLock::new(HashMap::new())),
165 }
166 }
167
168 pub fn subscribe(&self) -> broadcast::Receiver<PassiveReading> {
172 self.sender.subscribe()
173 }
174
175 pub fn subscriber_count(&self) -> usize {
177 self.sender.receiver_count()
178 }
179
180 pub fn start(self: &Arc<Self>, cancel_token: CancellationToken) -> tokio::task::JoinHandle<()> {
187 let monitor = Arc::clone(self);
188
189 tokio::spawn(async move {
190 info!("Starting passive monitor");
191
192 let mut adapter = loop {
196 match get_adapter().await {
197 Ok(a) => break a,
198 Err(e) => {
199 warn!("Passive monitor failed to get adapter: {e} — retrying in 10s");
200 tokio::select! {
201 _ = cancel_token.cancelled() => {
202 info!("Passive monitor cancelled while waiting for adapter");
203 return;
204 }
205 _ = sleep(Duration::from_secs(10)) => {}
206 }
207 }
208 }
209 };
210 let mut consecutive_errors: u32 = 0;
211
212 loop {
213 tokio::select! {
214 _ = cancel_token.cancelled() => {
215 info!("Passive monitor cancelled");
216 break;
217 }
218 result = monitor.scan_cycle_with_adapter(&adapter) => {
219 match result {
220 Ok(()) => {
221 consecutive_errors = 0;
222 }
223 Err(e) => {
224 consecutive_errors += 1;
225 warn!(
226 "Passive monitor scan error ({consecutive_errors} consecutive): {e}"
227 );
228 if consecutive_errors >= 5 {
232 warn!("Passive monitor: re-acquiring adapter after {} consecutive errors", consecutive_errors);
233 match get_adapter().await {
234 Ok(a) => {
235 adapter = a;
236 info!("Passive monitor: adapter re-acquired");
237 consecutive_errors = 0;
238 }
239 Err(e2) => {
240 warn!("Passive monitor: failed to re-acquire adapter: {}. Backing off.", e2);
244 let backoff = std::cmp::min(
245 monitor.options.scan_interval.saturating_mul(consecutive_errors),
246 std::time::Duration::from_secs(300),
247 );
248 sleep(backoff).await;
249 }
250 }
251 }
252 }
253 }
254 sleep(monitor.options.scan_interval).await;
256 }
257 }
258 }
259 })
260 }
261
262 async fn scan_cycle_with_adapter(&self, adapter: &btleplug::platform::Adapter) -> Result<()> {
264 adapter.start_scan(ScanFilter::default()).await?;
266 sleep(self.options.scan_duration).await;
267 adapter.stop_scan().await?;
268
269 let peripherals = adapter.peripherals().await?;
271
272 for peripheral in peripherals {
273 if let Ok(Some(props)) = peripheral.properties().await {
274 if let Some(data) = props.manufacturer_data.get(&MANUFACTURER_ID) {
276 let device_id = crate::util::create_identifier(
277 &props.address.to_string(),
278 &peripheral.id(),
279 );
280
281 if !self.options.device_filter.is_empty()
283 && !self.options.device_filter.contains(&device_id)
284 {
285 continue;
286 }
287
288 match parse_advertisement_with_name(data, props.local_name.as_deref()) {
290 Ok(adv_data) => {
291 let should_emit = if self.options.deduplicate {
293 self.should_emit(&device_id, &adv_data).await
294 } else {
295 true
296 };
297
298 if should_emit {
299 let reading = PassiveReading {
300 device_id: device_id.clone(),
301 device_name: props.local_name.clone(),
302 rssi: props.rssi,
303 data: adv_data.clone(),
304 received_at: std::time::Instant::now(),
305 };
306
307 self.cache.write().await.insert(
309 device_id,
310 CachedReading {
311 data: adv_data,
312 received_at: std::time::Instant::now(),
313 },
314 );
315
316 let _ = self.sender.send(reading);
318 }
319 }
320 Err(e) => {
321 debug!("Failed to parse advertisement from {}: {}", device_id, e);
322 }
323 }
324 }
325 }
326 }
327
328 Ok(())
329 }
330
331 async fn should_emit(&self, device_id: &str, data: &AdvertisementData) -> bool {
333 let cache = self.cache.read().await;
334
335 if let Some(cached) = cache.get(device_id) {
336 if cached.received_at.elapsed() > self.options.max_reading_age {
338 return true;
339 }
340
341 if cached.data.co2 != data.co2
343 || !opt_f32_eq(cached.data.temperature, data.temperature)
344 || cached.data.humidity != data.humidity
345 || !opt_f32_eq(cached.data.pressure, data.pressure)
346 || cached.data.radon != data.radon
347 || !opt_f32_eq(cached.data.radiation_dose_rate, data.radiation_dose_rate)
348 || cached.data.battery != data.battery
349 {
350 return true;
351 }
352
353 if cached.data.counter != data.counter {
355 return true;
356 }
357
358 false
359 } else {
360 true
362 }
363 }
364
365 pub async fn get_last_reading(&self, device_id: &str) -> Option<AdvertisementData> {
367 let cache = self.cache.read().await;
368 cache.get(device_id).map(|c| c.data.clone())
369 }
370
371 pub async fn known_devices(&self) -> Vec<String> {
373 let cache = self.cache.read().await;
374 cache.keys().cloned().collect()
375 }
376
377 pub async fn clear_cache(&self) {
379 self.cache.write().await.clear();
380 }
381}
382
383impl Default for PassiveMonitor {
384 fn default() -> Self {
385 Self::new(PassiveMonitorOptions::default())
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392
393 #[test]
394 fn test_passive_monitor_options_default() {
395 let opts = PassiveMonitorOptions::default();
396 assert_eq!(opts.scan_duration, Duration::from_secs(5));
397 assert!(opts.deduplicate);
398 assert!(opts.device_filter.is_empty());
399 }
400
401 #[test]
402 fn test_passive_monitor_options_builder() {
403 let opts = PassiveMonitorOptions::new()
404 .scan_duration(Duration::from_secs(10))
405 .deduplicate(false)
406 .filter_devices(vec!["device1".to_string()]);
407
408 assert_eq!(opts.scan_duration, Duration::from_secs(10));
409 assert!(!opts.deduplicate);
410 assert_eq!(opts.device_filter, vec!["device1"]);
411 }
412
413 #[test]
414 fn test_passive_monitor_subscribe() {
415 let monitor = Arc::new(PassiveMonitor::default());
416 let _rx1 = monitor.subscribe();
417 let _rx2 = monitor.subscribe();
418 assert_eq!(monitor.subscriber_count(), 2);
419 }
420
421 fn make_adv_data() -> AdvertisementData {
423 AdvertisementData {
424 device_type: aranet_types::DeviceType::Aranet4,
425 co2: Some(800),
426 temperature: Some(22.5),
427 pressure: Some(1013.2),
428 humidity: Some(45),
429 battery: 85,
430 status: aranet_types::Status::Green,
431 interval: 300,
432 age: 120,
433 radon: None,
434 radiation_dose_rate: None,
435 counter: Some(5),
436 flags: 0x22,
437 }
438 }
439
440 #[tokio::test]
441 async fn test_should_emit_first_reading() {
442 let monitor = PassiveMonitor::default();
443 let data = make_adv_data();
444
445 assert!(monitor.should_emit("device-1", &data).await);
447 }
448
449 #[tokio::test]
450 async fn test_should_emit_duplicate_suppressed() {
451 let monitor = PassiveMonitor::default();
452 let data = make_adv_data();
453
454 monitor.cache.write().await.insert(
456 "device-1".to_string(),
457 CachedReading {
458 data: data.clone(),
459 received_at: std::time::Instant::now(),
460 },
461 );
462
463 assert!(!monitor.should_emit("device-1", &data).await);
465 }
466
467 #[tokio::test]
468 async fn test_should_emit_on_value_change() {
469 let monitor = PassiveMonitor::default();
470 let data = make_adv_data();
471
472 monitor.cache.write().await.insert(
473 "device-1".to_string(),
474 CachedReading {
475 data: data.clone(),
476 received_at: std::time::Instant::now(),
477 },
478 );
479
480 let mut changed = data.clone();
482 changed.co2 = Some(900);
483 assert!(monitor.should_emit("device-1", &changed).await);
484
485 let mut changed = data.clone();
487 changed.battery = 50;
488 assert!(monitor.should_emit("device-1", &changed).await);
489
490 let mut changed = data;
492 changed.temperature = Some(23.0);
493 assert!(monitor.should_emit("device-1", &changed).await);
494 }
495
496 #[tokio::test]
497 async fn test_should_emit_on_counter_change() {
498 let monitor = PassiveMonitor::default();
499 let data = make_adv_data();
500
501 monitor.cache.write().await.insert(
502 "device-1".to_string(),
503 CachedReading {
504 data: data.clone(),
505 received_at: std::time::Instant::now(),
506 },
507 );
508
509 let mut changed = data;
511 changed.counter = Some(6);
512 assert!(monitor.should_emit("device-1", &changed).await);
513 }
514
515 #[tokio::test]
516 async fn test_should_emit_on_stale_cache() {
517 let opts = PassiveMonitorOptions {
518 max_reading_age: Duration::from_millis(10),
519 ..Default::default()
520 };
521 let monitor = PassiveMonitor::new(opts);
522 let data = make_adv_data();
523
524 monitor.cache.write().await.insert(
526 "device-1".to_string(),
527 CachedReading {
528 data: data.clone(),
529 received_at: std::time::Instant::now() - Duration::from_millis(50),
530 },
531 );
532
533 assert!(monitor.should_emit("device-1", &data).await);
535 }
536
537 #[tokio::test]
538 async fn test_should_emit_different_device() {
539 let monitor = PassiveMonitor::default();
540 let data = make_adv_data();
541
542 monitor.cache.write().await.insert(
544 "device-1".to_string(),
545 CachedReading {
546 data: data.clone(),
547 received_at: std::time::Instant::now(),
548 },
549 );
550
551 assert!(monitor.should_emit("device-2", &data).await);
553 }
554}