1use std::collections::HashMap;
37use std::sync::Arc;
38use std::time::Duration;
39
40use btleplug::api::{Central, Peripheral as _, ScanFilter};
41use tokio::sync::{RwLock, broadcast};
42use tokio::time::sleep;
43use tokio_util::sync::CancellationToken;
44use tracing::{debug, info, warn};
45
46use crate::advertisement::{AdvertisementData, parse_advertisement_with_name};
47use crate::error::Result;
48use crate::scan::get_adapter;
49use crate::uuid::MANUFACTURER_ID;
50
51#[derive(Debug, Clone)]
53pub struct PassiveReading {
54 pub device_id: String,
56 pub device_name: Option<String>,
58 pub rssi: Option<i16>,
60 pub data: AdvertisementData,
62 pub received_at: std::time::Instant,
64}
65
66#[derive(Debug, Clone)]
68pub struct PassiveMonitorOptions {
69 pub scan_duration: Duration,
71 pub scan_interval: Duration,
73 pub channel_capacity: usize,
75 pub deduplicate: bool,
77 pub max_reading_age: Duration,
79 pub device_filter: Vec<String>,
81}
82
83impl Default for PassiveMonitorOptions {
84 fn default() -> Self {
85 Self {
86 scan_duration: Duration::from_secs(5),
87 scan_interval: Duration::from_secs(1),
88 channel_capacity: 100,
89 deduplicate: true,
90 max_reading_age: Duration::from_secs(60),
91 device_filter: Vec::new(),
92 }
93 }
94}
95
96impl PassiveMonitorOptions {
97 pub fn new() -> Self {
99 Self::default()
100 }
101
102 pub fn scan_duration(mut self, duration: Duration) -> Self {
104 self.scan_duration = duration;
105 self
106 }
107
108 pub fn scan_interval(mut self, interval: Duration) -> Self {
110 self.scan_interval = interval;
111 self
112 }
113
114 pub fn deduplicate(mut self, enable: bool) -> Self {
116 self.deduplicate = enable;
117 self
118 }
119
120 pub fn filter_devices(mut self, device_ids: Vec<String>) -> Self {
122 self.device_filter = device_ids;
123 self
124 }
125}
126
127struct CachedReading {
129 data: AdvertisementData,
130 received_at: std::time::Instant,
131}
132
133pub struct PassiveMonitor {
141 options: PassiveMonitorOptions,
142 sender: broadcast::Sender<PassiveReading>,
144 cache: Arc<RwLock<HashMap<String, CachedReading>>>,
146}
147
148impl PassiveMonitor {
149 pub fn new(options: PassiveMonitorOptions) -> Self {
151 let (sender, _) = broadcast::channel(options.channel_capacity);
152 Self {
153 options,
154 sender,
155 cache: Arc::new(RwLock::new(HashMap::new())),
156 }
157 }
158
159 pub fn subscribe(&self) -> broadcast::Receiver<PassiveReading> {
163 self.sender.subscribe()
164 }
165
166 pub fn subscriber_count(&self) -> usize {
168 self.sender.receiver_count()
169 }
170
171 pub fn start(self: &Arc<Self>, cancel_token: CancellationToken) -> tokio::task::JoinHandle<()> {
178 let monitor = Arc::clone(self);
179
180 tokio::spawn(async move {
181 info!("Starting passive monitor");
182
183 loop {
184 tokio::select! {
185 _ = cancel_token.cancelled() => {
186 info!("Passive monitor cancelled");
187 break;
188 }
189 result = monitor.scan_cycle() => {
190 if let Err(e) = result {
191 warn!("Passive monitor scan error: {}", e);
192 }
193 sleep(monitor.options.scan_interval).await;
195 }
196 }
197 }
198 })
199 }
200
201 async fn scan_cycle(&self) -> Result<()> {
203 let adapter = get_adapter().await?;
204
205 adapter.start_scan(ScanFilter::default()).await?;
207 sleep(self.options.scan_duration).await;
208 adapter.stop_scan().await?;
209
210 let peripherals = adapter.peripherals().await?;
212
213 for peripheral in peripherals {
214 if let Ok(Some(props)) = peripheral.properties().await {
215 if let Some(data) = props.manufacturer_data.get(&MANUFACTURER_ID) {
217 let device_id = crate::util::create_identifier(
218 &props.address.to_string(),
219 &peripheral.id(),
220 );
221
222 if !self.options.device_filter.is_empty()
224 && !self.options.device_filter.contains(&device_id)
225 {
226 continue;
227 }
228
229 match parse_advertisement_with_name(data, props.local_name.as_deref()) {
231 Ok(adv_data) => {
232 let should_emit = if self.options.deduplicate {
234 self.should_emit(&device_id, &adv_data).await
235 } else {
236 true
237 };
238
239 if should_emit {
240 let reading = PassiveReading {
241 device_id: device_id.clone(),
242 device_name: props.local_name.clone(),
243 rssi: props.rssi,
244 data: adv_data.clone(),
245 received_at: std::time::Instant::now(),
246 };
247
248 self.cache.write().await.insert(
250 device_id,
251 CachedReading {
252 data: adv_data,
253 received_at: std::time::Instant::now(),
254 },
255 );
256
257 let _ = self.sender.send(reading);
259 }
260 }
261 Err(e) => {
262 debug!("Failed to parse advertisement from {}: {}", device_id, e);
263 }
264 }
265 }
266 }
267 }
268
269 Ok(())
270 }
271
272 async fn should_emit(&self, device_id: &str, data: &AdvertisementData) -> bool {
274 let cache = self.cache.read().await;
275
276 if let Some(cached) = cache.get(device_id) {
277 if cached.received_at.elapsed() > self.options.max_reading_age {
279 return true;
280 }
281
282 if cached.data.co2 != data.co2
284 || cached.data.temperature != data.temperature
285 || cached.data.humidity != data.humidity
286 || cached.data.pressure != data.pressure
287 || cached.data.radon != data.radon
288 || cached.data.radiation_dose_rate != data.radiation_dose_rate
289 || cached.data.battery != data.battery
290 {
291 return true;
292 }
293
294 if cached.data.counter != data.counter {
296 return true;
297 }
298
299 false
300 } else {
301 true
303 }
304 }
305
306 pub async fn get_last_reading(&self, device_id: &str) -> Option<AdvertisementData> {
308 let cache = self.cache.read().await;
309 cache.get(device_id).map(|c| c.data.clone())
310 }
311
312 pub async fn known_devices(&self) -> Vec<String> {
314 let cache = self.cache.read().await;
315 cache.keys().cloned().collect()
316 }
317
318 pub async fn clear_cache(&self) {
320 self.cache.write().await.clear();
321 }
322}
323
324impl Default for PassiveMonitor {
325 fn default() -> Self {
326 Self::new(PassiveMonitorOptions::default())
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333
334 #[test]
335 fn test_passive_monitor_options_default() {
336 let opts = PassiveMonitorOptions::default();
337 assert_eq!(opts.scan_duration, Duration::from_secs(5));
338 assert!(opts.deduplicate);
339 assert!(opts.device_filter.is_empty());
340 }
341
342 #[test]
343 fn test_passive_monitor_options_builder() {
344 let opts = PassiveMonitorOptions::new()
345 .scan_duration(Duration::from_secs(10))
346 .deduplicate(false)
347 .filter_devices(vec!["device1".to_string()]);
348
349 assert_eq!(opts.scan_duration, Duration::from_secs(10));
350 assert!(!opts.deduplicate);
351 assert_eq!(opts.device_filter, vec!["device1"]);
352 }
353
354 #[test]
355 fn test_passive_monitor_subscribe() {
356 let monitor = Arc::new(PassiveMonitor::default());
357 let _rx1 = monitor.subscribe();
358 let _rx2 = monitor.subscribe();
359 assert_eq!(monitor.subscriber_count(), 2);
360 }
361}