use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use btleplug::api::{Central, Peripheral as _, ScanFilter};
use tokio::sync::{RwLock, broadcast};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use crate::advertisement::{AdvertisementData, parse_advertisement_with_name};
use crate::error::Result;
use crate::scan::get_adapter;
use crate::uuid::MANUFACTURER_ID;
fn opt_f32_eq(a: Option<f32>, b: Option<f32>) -> bool {
match (a, b) {
(Some(x), Some(y)) => x.to_bits() == y.to_bits(),
(None, None) => true,
_ => false,
}
}
#[derive(Debug, Clone)]
pub struct PassiveReading {
pub device_id: String,
pub device_name: Option<String>,
pub rssi: Option<i16>,
pub data: AdvertisementData,
pub received_at: std::time::Instant,
}
#[derive(Debug, Clone)]
pub struct PassiveMonitorOptions {
pub scan_duration: Duration,
pub scan_interval: Duration,
pub channel_capacity: usize,
pub deduplicate: bool,
pub max_reading_age: Duration,
pub device_filter: Vec<String>,
}
impl Default for PassiveMonitorOptions {
fn default() -> Self {
Self {
scan_duration: Duration::from_secs(5),
scan_interval: Duration::from_secs(1),
channel_capacity: 100,
deduplicate: true,
max_reading_age: Duration::from_secs(60),
device_filter: Vec::new(),
}
}
}
impl PassiveMonitorOptions {
pub fn new() -> Self {
Self::default()
}
pub fn scan_duration(mut self, duration: Duration) -> Self {
self.scan_duration = duration;
self
}
pub fn scan_interval(mut self, interval: Duration) -> Self {
self.scan_interval = interval;
self
}
pub fn deduplicate(mut self, enable: bool) -> Self {
self.deduplicate = enable;
self
}
pub fn filter_devices(mut self, device_ids: Vec<String>) -> Self {
self.device_filter = device_ids;
self
}
}
struct CachedReading {
data: AdvertisementData,
received_at: std::time::Instant,
}
pub struct PassiveMonitor {
options: PassiveMonitorOptions,
sender: broadcast::Sender<PassiveReading>,
cache: Arc<RwLock<HashMap<String, CachedReading>>>,
}
impl PassiveMonitor {
pub fn new(options: PassiveMonitorOptions) -> Self {
let (sender, _) = broadcast::channel(options.channel_capacity);
Self {
options,
sender,
cache: Arc::new(RwLock::new(HashMap::new())),
}
}
pub fn subscribe(&self) -> broadcast::Receiver<PassiveReading> {
self.sender.subscribe()
}
pub fn subscriber_count(&self) -> usize {
self.sender.receiver_count()
}
pub fn start(self: &Arc<Self>, cancel_token: CancellationToken) -> tokio::task::JoinHandle<()> {
let monitor = Arc::clone(self);
tokio::spawn(async move {
info!("Starting passive monitor");
let mut adapter = loop {
match get_adapter().await {
Ok(a) => break a,
Err(e) => {
warn!("Passive monitor failed to get adapter: {e} — retrying in 10s");
tokio::select! {
_ = cancel_token.cancelled() => {
info!("Passive monitor cancelled while waiting for adapter");
return;
}
_ = sleep(Duration::from_secs(10)) => {}
}
}
}
};
let mut consecutive_errors: u32 = 0;
loop {
tokio::select! {
_ = cancel_token.cancelled() => {
info!("Passive monitor cancelled");
break;
}
result = monitor.scan_cycle_with_adapter(&adapter) => {
match result {
Ok(()) => {
consecutive_errors = 0;
}
Err(e) => {
consecutive_errors += 1;
warn!(
"Passive monitor scan error ({consecutive_errors} consecutive): {e}"
);
if consecutive_errors >= 5 {
warn!("Passive monitor: re-acquiring adapter after {} consecutive errors", consecutive_errors);
match get_adapter().await {
Ok(a) => {
adapter = a;
info!("Passive monitor: adapter re-acquired");
consecutive_errors = 0;
}
Err(e2) => {
warn!("Passive monitor: failed to re-acquire adapter: {}. Backing off.", e2);
let backoff = std::cmp::min(
monitor.options.scan_interval.saturating_mul(consecutive_errors),
std::time::Duration::from_secs(300),
);
sleep(backoff).await;
}
}
}
}
}
sleep(monitor.options.scan_interval).await;
}
}
}
})
}
async fn scan_cycle_with_adapter(&self, adapter: &btleplug::platform::Adapter) -> Result<()> {
adapter.start_scan(ScanFilter::default()).await?;
sleep(self.options.scan_duration).await;
adapter.stop_scan().await?;
let peripherals = adapter.peripherals().await?;
for peripheral in peripherals {
if let Ok(Some(props)) = peripheral.properties().await {
if let Some(data) = props.manufacturer_data.get(&MANUFACTURER_ID) {
let device_id = crate::util::create_identifier(
&props.address.to_string(),
&peripheral.id(),
);
if !self.options.device_filter.is_empty()
&& !self.options.device_filter.contains(&device_id)
{
continue;
}
match parse_advertisement_with_name(data, props.local_name.as_deref()) {
Ok(adv_data) => {
let should_emit = if self.options.deduplicate {
self.should_emit(&device_id, &adv_data).await
} else {
true
};
if should_emit {
let reading = PassiveReading {
device_id: device_id.clone(),
device_name: props.local_name.clone(),
rssi: props.rssi,
data: adv_data.clone(),
received_at: std::time::Instant::now(),
};
self.cache.write().await.insert(
device_id,
CachedReading {
data: adv_data,
received_at: std::time::Instant::now(),
},
);
let _ = self.sender.send(reading);
}
}
Err(e) => {
debug!("Failed to parse advertisement from {}: {}", device_id, e);
}
}
}
}
}
Ok(())
}
async fn should_emit(&self, device_id: &str, data: &AdvertisementData) -> bool {
let cache = self.cache.read().await;
if let Some(cached) = cache.get(device_id) {
if cached.received_at.elapsed() > self.options.max_reading_age {
return true;
}
if cached.data.co2 != data.co2
|| !opt_f32_eq(cached.data.temperature, data.temperature)
|| cached.data.humidity != data.humidity
|| !opt_f32_eq(cached.data.pressure, data.pressure)
|| cached.data.radon != data.radon
|| !opt_f32_eq(cached.data.radiation_dose_rate, data.radiation_dose_rate)
|| cached.data.battery != data.battery
{
return true;
}
if cached.data.counter != data.counter {
return true;
}
false
} else {
true
}
}
pub async fn get_last_reading(&self, device_id: &str) -> Option<AdvertisementData> {
let cache = self.cache.read().await;
cache.get(device_id).map(|c| c.data.clone())
}
pub async fn known_devices(&self) -> Vec<String> {
let cache = self.cache.read().await;
cache.keys().cloned().collect()
}
pub async fn clear_cache(&self) {
self.cache.write().await.clear();
}
}
impl Default for PassiveMonitor {
fn default() -> Self {
Self::new(PassiveMonitorOptions::default())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_passive_monitor_options_default() {
let opts = PassiveMonitorOptions::default();
assert_eq!(opts.scan_duration, Duration::from_secs(5));
assert!(opts.deduplicate);
assert!(opts.device_filter.is_empty());
}
#[test]
fn test_passive_monitor_options_builder() {
let opts = PassiveMonitorOptions::new()
.scan_duration(Duration::from_secs(10))
.deduplicate(false)
.filter_devices(vec!["device1".to_string()]);
assert_eq!(opts.scan_duration, Duration::from_secs(10));
assert!(!opts.deduplicate);
assert_eq!(opts.device_filter, vec!["device1"]);
}
#[test]
fn test_passive_monitor_subscribe() {
let monitor = Arc::new(PassiveMonitor::default());
let _rx1 = monitor.subscribe();
let _rx2 = monitor.subscribe();
assert_eq!(monitor.subscriber_count(), 2);
}
fn make_adv_data() -> AdvertisementData {
AdvertisementData {
device_type: aranet_types::DeviceType::Aranet4,
co2: Some(800),
temperature: Some(22.5),
pressure: Some(1013.2),
humidity: Some(45),
battery: 85,
status: aranet_types::Status::Green,
interval: 300,
age: 120,
radon: None,
radiation_dose_rate: None,
counter: Some(5),
flags: 0x22,
}
}
#[tokio::test]
async fn test_should_emit_first_reading() {
let monitor = PassiveMonitor::default();
let data = make_adv_data();
assert!(monitor.should_emit("device-1", &data).await);
}
#[tokio::test]
async fn test_should_emit_duplicate_suppressed() {
let monitor = PassiveMonitor::default();
let data = make_adv_data();
monitor.cache.write().await.insert(
"device-1".to_string(),
CachedReading {
data: data.clone(),
received_at: std::time::Instant::now(),
},
);
assert!(!monitor.should_emit("device-1", &data).await);
}
#[tokio::test]
async fn test_should_emit_on_value_change() {
let monitor = PassiveMonitor::default();
let data = make_adv_data();
monitor.cache.write().await.insert(
"device-1".to_string(),
CachedReading {
data: data.clone(),
received_at: std::time::Instant::now(),
},
);
let mut changed = data.clone();
changed.co2 = Some(900);
assert!(monitor.should_emit("device-1", &changed).await);
let mut changed = data.clone();
changed.battery = 50;
assert!(monitor.should_emit("device-1", &changed).await);
let mut changed = data;
changed.temperature = Some(23.0);
assert!(monitor.should_emit("device-1", &changed).await);
}
#[tokio::test]
async fn test_should_emit_on_counter_change() {
let monitor = PassiveMonitor::default();
let data = make_adv_data();
monitor.cache.write().await.insert(
"device-1".to_string(),
CachedReading {
data: data.clone(),
received_at: std::time::Instant::now(),
},
);
let mut changed = data;
changed.counter = Some(6);
assert!(monitor.should_emit("device-1", &changed).await);
}
#[tokio::test]
async fn test_should_emit_on_stale_cache() {
let opts = PassiveMonitorOptions {
max_reading_age: Duration::from_millis(10),
..Default::default()
};
let monitor = PassiveMonitor::new(opts);
let data = make_adv_data();
monitor.cache.write().await.insert(
"device-1".to_string(),
CachedReading {
data: data.clone(),
received_at: std::time::Instant::now() - Duration::from_millis(50),
},
);
assert!(monitor.should_emit("device-1", &data).await);
}
#[tokio::test]
async fn test_should_emit_different_device() {
let monitor = PassiveMonitor::default();
let data = make_adv_data();
monitor.cache.write().await.insert(
"device-1".to_string(),
CachedReading {
data: data.clone(),
received_at: std::time::Instant::now(),
},
);
assert!(monitor.should_emit("device-2", &data).await);
}
}