use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use btleplug::api::{Characteristic, Peripheral as _, WriteType};
use btleplug::platform::{Adapter, Peripheral};
use tokio::sync::RwLock;
use tokio::time::timeout;
use tracing::{debug, info, warn};
use uuid::Uuid;
use crate::error::{Error, Result};
use crate::scan::{ScanOptions, find_device};
use crate::traits::AranetDevice;
use crate::util::{create_identifier, format_peripheral_id};
use crate::uuid::{
BATTERY_LEVEL, BATTERY_SERVICE, CURRENT_READINGS_DETAIL, CURRENT_READINGS_DETAIL_ALT,
DEVICE_INFO_SERVICE, DEVICE_NAME, FIRMWARE_REVISION, GAP_SERVICE, HARDWARE_REVISION,
MANUFACTURER_NAME, MODEL_NUMBER, SAF_TEHNIKA_SERVICE_NEW, SAF_TEHNIKA_SERVICE_OLD,
SERIAL_NUMBER, SOFTWARE_REVISION,
};
use aranet_types::{CurrentReading, DeviceInfo, DeviceType};
pub struct Device {
#[allow(dead_code)]
adapter: Adapter,
peripheral: Peripheral,
name: Option<String>,
address: String,
device_type: Option<DeviceType>,
services_discovered: bool,
characteristics_cache: RwLock<HashMap<Uuid, Characteristic>>,
notification_handles: tokio::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>,
disconnected: AtomicBool,
config: ConnectionConfig,
}
impl std::fmt::Debug for Device {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Device")
.field("name", &self.name)
.field("address", &self.address)
.field("device_type", &self.device_type)
.field("services_discovered", &self.services_discovered)
.finish_non_exhaustive()
}
}
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_WRITE_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(15);
const DEFAULT_DISCOVERY_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_VALIDATION_TIMEOUT: Duration = Duration::from_secs(3);
#[derive(Debug, Clone)]
pub struct ConnectionConfig {
pub connection_timeout: Duration,
pub read_timeout: Duration,
pub write_timeout: Duration,
pub discovery_timeout: Duration,
pub validation_timeout: Duration,
}
impl Default for ConnectionConfig {
fn default() -> Self {
Self {
connection_timeout: DEFAULT_CONNECT_TIMEOUT,
read_timeout: DEFAULT_READ_TIMEOUT,
write_timeout: DEFAULT_WRITE_TIMEOUT,
discovery_timeout: DEFAULT_DISCOVERY_TIMEOUT,
validation_timeout: DEFAULT_VALIDATION_TIMEOUT,
}
}
}
impl ConnectionConfig {
pub fn new() -> Self {
Self::default()
}
pub fn for_current_platform() -> Self {
let platform = crate::platform::PlatformConfig::for_current_platform();
Self {
connection_timeout: platform.recommended_connection_timeout,
read_timeout: platform.recommended_operation_timeout,
write_timeout: platform.recommended_operation_timeout,
discovery_timeout: platform.recommended_operation_timeout,
validation_timeout: DEFAULT_VALIDATION_TIMEOUT,
}
}
pub fn challenging_environment() -> Self {
Self {
connection_timeout: Duration::from_secs(90),
read_timeout: Duration::from_secs(30),
write_timeout: Duration::from_secs(15),
discovery_timeout: Duration::from_secs(30),
validation_timeout: Duration::from_secs(5),
}
}
pub fn fast() -> Self {
Self {
connection_timeout: Duration::from_secs(8),
read_timeout: Duration::from_secs(5),
write_timeout: Duration::from_secs(5),
discovery_timeout: Duration::from_secs(5),
validation_timeout: Duration::from_secs(2),
}
}
#[must_use]
pub fn connection_timeout(mut self, timeout: Duration) -> Self {
self.connection_timeout = timeout;
self
}
#[must_use]
pub fn read_timeout(mut self, timeout: Duration) -> Self {
self.read_timeout = timeout;
self
}
#[must_use]
pub fn write_timeout(mut self, timeout: Duration) -> Self {
self.write_timeout = timeout;
self
}
#[must_use]
pub fn discovery_timeout(mut self, timeout: Duration) -> Self {
self.discovery_timeout = timeout;
self
}
#[must_use]
pub fn validation_timeout(mut self, timeout: Duration) -> Self {
self.validation_timeout = timeout;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum SignalQuality {
Poor,
Fair,
Good,
Excellent,
}
impl SignalQuality {
pub fn from_rssi(rssi: i16) -> Self {
match rssi {
r if r > -60 => SignalQuality::Excellent,
r if r > -75 => SignalQuality::Good,
r if r > -85 => SignalQuality::Fair,
_ => SignalQuality::Poor,
}
}
pub fn description(&self) -> &'static str {
match self {
SignalQuality::Excellent => "Excellent signal",
SignalQuality::Good => "Good signal",
SignalQuality::Fair => "Fair signal - connection may be unstable",
SignalQuality::Poor => "Poor signal - consider moving closer",
}
}
pub fn recommended_read_delay(&self) -> Duration {
match self {
SignalQuality::Excellent => Duration::from_millis(30),
SignalQuality::Good => Duration::from_millis(50),
SignalQuality::Fair => Duration::from_millis(100),
SignalQuality::Poor => Duration::from_millis(200),
}
}
pub fn is_usable(&self) -> bool {
matches!(
self,
SignalQuality::Excellent | SignalQuality::Good | SignalQuality::Fair
)
}
}
impl Device {
#[tracing::instrument(level = "info", skip_all, fields(identifier = %identifier))]
pub async fn connect(identifier: &str) -> Result<Self> {
Self::connect_with_config(identifier, ConnectionConfig::default()).await
}
#[tracing::instrument(level = "info", skip_all, fields(identifier = %identifier, timeout_secs = scan_timeout.as_secs()))]
pub async fn connect_with_timeout(identifier: &str, scan_timeout: Duration) -> Result<Self> {
let config = ConnectionConfig::default().connection_timeout(scan_timeout);
Self::connect_with_config(identifier, config).await
}
#[tracing::instrument(level = "info", skip_all, fields(identifier = %identifier))]
pub async fn connect_with_config(identifier: &str, config: ConnectionConfig) -> Result<Self> {
let options = ScanOptions {
duration: config.connection_timeout,
filter_aranet_only: false, use_service_filter: false,
};
let (adapter, peripheral) = match find_device(identifier).await {
Ok(result) => result,
Err(_) => crate::scan::find_device_with_options(identifier, options).await?,
};
Self::from_peripheral_with_config(adapter, peripheral, config).await
}
#[tracing::instrument(level = "info", skip_all, fields(identifier = %identifier))]
pub async fn connect_with_adapter(
adapter: Adapter,
identifier: &str,
config: ConnectionConfig,
) -> Result<Self> {
let options = ScanOptions {
duration: config.connection_timeout,
filter_aranet_only: false,
use_service_filter: false,
};
let peripheral = match crate::scan::find_device_with_adapter(
&adapter,
identifier,
ScanOptions::default(),
)
.await
{
Ok(p) => p,
Err(e) => {
debug!("Fast scan failed ({e}), retrying with extended options");
crate::scan::find_device_with_adapter(&adapter, identifier, options).await?
}
};
Self::from_peripheral_with_config(adapter, peripheral, config).await
}
#[tracing::instrument(level = "info", skip_all)]
pub async fn from_peripheral(adapter: Adapter, peripheral: Peripheral) -> Result<Self> {
Self::from_peripheral_with_config(adapter, peripheral, ConnectionConfig::default()).await
}
#[tracing::instrument(level = "info", skip_all, fields(timeout_secs = connect_timeout.as_secs()))]
pub async fn from_peripheral_with_timeout(
adapter: Adapter,
peripheral: Peripheral,
connect_timeout: Duration,
) -> Result<Self> {
let config = ConnectionConfig::default().connection_timeout(connect_timeout);
Self::from_peripheral_with_config(adapter, peripheral, config).await
}
#[tracing::instrument(level = "info", skip_all, fields(connect_timeout = ?config.connection_timeout))]
pub async fn from_peripheral_with_config(
adapter: Adapter,
peripheral: Peripheral,
config: ConnectionConfig,
) -> Result<Self> {
info!("Connecting to device...");
timeout(config.connection_timeout, peripheral.connect())
.await
.map_err(|_| Error::Timeout {
operation: "connect to device".to_string(),
duration: config.connection_timeout,
})??;
info!("Connected!");
info!("Discovering services...");
timeout(config.discovery_timeout, peripheral.discover_services())
.await
.map_err(|_| Error::Timeout {
operation: "discover services".to_string(),
duration: config.discovery_timeout,
})??;
let mut services = peripheral.services();
if services.is_empty() {
warn!("Service discovery returned 0 services — retrying with fresh connection");
let _ = peripheral.disconnect().await;
tokio::time::sleep(Duration::from_secs(2)).await;
timeout(config.connection_timeout, peripheral.connect())
.await
.map_err(|_| Error::Timeout {
operation: "reconnect to device".to_string(),
duration: config.connection_timeout,
})??;
timeout(config.discovery_timeout, peripheral.discover_services())
.await
.map_err(|_| Error::Timeout {
operation: "rediscover services".to_string(),
duration: config.discovery_timeout,
})??;
services = peripheral.services();
}
debug!("Found {} services", services.len());
let mut characteristics_cache = HashMap::new();
for service in &services {
debug!(" Service: {}", service.uuid);
for char in &service.characteristics {
debug!(" Characteristic: {}", char.uuid);
characteristics_cache.insert(char.uuid, char.clone());
}
}
debug!(
"Cached {} characteristics for fast lookup",
characteristics_cache.len()
);
let properties = peripheral.properties().await?;
let name = properties.as_ref().and_then(|p| p.local_name.clone());
let address = properties
.as_ref()
.map(|p| create_identifier(&p.address.to_string(), &peripheral.id()))
.unwrap_or_else(|| format_peripheral_id(&peripheral.id()));
let device_type = name.as_ref().and_then(|n| DeviceType::from_name(n));
Ok(Self {
adapter,
peripheral,
name,
address,
device_type,
services_discovered: true,
characteristics_cache: RwLock::new(characteristics_cache),
notification_handles: tokio::sync::Mutex::new(Vec::new()),
disconnected: AtomicBool::new(false),
config,
})
}
pub async fn is_connected(&self) -> bool {
match self.peripheral.is_connected().await {
Ok(connected) => connected,
Err(e) => {
warn!("Failed to query connection state: {e}");
false
}
}
}
pub async fn validate_connection(&self) -> bool {
timeout(self.config.validation_timeout, self.read_battery())
.await
.map(|r| r.is_ok())
.unwrap_or(false)
}
pub async fn is_connection_alive(&self) -> bool {
self.validate_connection().await
}
pub fn config(&self) -> &ConnectionConfig {
&self.config
}
pub async fn signal_quality(&self) -> Option<SignalQuality> {
self.read_rssi().await.ok().map(SignalQuality::from_rssi)
}
#[tracing::instrument(level = "info", skip(self), fields(device_name = ?self.name))]
pub async fn disconnect(&self) -> Result<()> {
info!("Disconnecting from device...");
self.disconnected.store(true, Ordering::SeqCst);
{
let mut handles = self.notification_handles.lock().await;
for handle in handles.drain(..) {
handle.abort();
}
}
self.peripheral.disconnect().await?;
Ok(())
}
pub fn name(&self) -> Option<&str> {
self.name.as_deref()
}
pub fn address(&self) -> &str {
&self.address
}
pub fn device_type(&self) -> Option<DeviceType> {
self.device_type
}
pub async fn read_rssi(&self) -> Result<i16> {
let properties = self.peripheral.properties().await?;
properties
.and_then(|p| p.rssi)
.ok_or_else(|| Error::InvalidData("RSSI not available".to_string()))
}
async fn find_characteristic(&self, uuid: Uuid) -> Result<Characteristic> {
{
let cache = self.characteristics_cache.read().await;
if let Some(char) = cache.get(&uuid) {
return Ok(char.clone());
}
if !cache.is_empty() {
return Err(Error::characteristic_not_found(
uuid.to_string(),
self.peripheral.services().len(),
));
}
}
warn!(
"Characteristics cache empty, falling back to service search for {}",
uuid
);
let services = self.peripheral.services();
let service_count = services.len();
for service in &services {
if service.uuid == SAF_TEHNIKA_SERVICE_NEW || service.uuid == SAF_TEHNIKA_SERVICE_OLD {
for char in &service.characteristics {
if char.uuid == uuid {
return Ok(char.clone());
}
}
}
}
for service in &services {
if service.uuid == GAP_SERVICE
|| service.uuid == DEVICE_INFO_SERVICE
|| service.uuid == BATTERY_SERVICE
{
for char in &service.characteristics {
if char.uuid == uuid {
return Ok(char.clone());
}
}
}
}
for service in &services {
for char in &service.characteristics {
if char.uuid == uuid {
return Ok(char.clone());
}
}
}
Err(Error::characteristic_not_found(
uuid.to_string(),
service_count,
))
}
pub async fn read_characteristic(&self, uuid: Uuid) -> Result<Vec<u8>> {
let characteristic = self.find_characteristic(uuid).await?;
let data = timeout(
self.config.read_timeout,
self.peripheral.read(&characteristic),
)
.await
.map_err(|_| Error::Timeout {
operation: format!("read characteristic {}", uuid),
duration: self.config.read_timeout,
})??;
Ok(data)
}
pub async fn read_characteristic_with_timeout(
&self,
uuid: Uuid,
read_timeout: Duration,
) -> Result<Vec<u8>> {
let characteristic = self.find_characteristic(uuid).await?;
let data = timeout(read_timeout, self.peripheral.read(&characteristic))
.await
.map_err(|_| Error::Timeout {
operation: format!("read characteristic {}", uuid),
duration: read_timeout,
})??;
Ok(data)
}
pub async fn write_characteristic(&self, uuid: Uuid, data: &[u8]) -> Result<()> {
let characteristic = self.find_characteristic(uuid).await?;
timeout(
self.config.write_timeout,
self.peripheral
.write(&characteristic, data, WriteType::WithResponse),
)
.await
.map_err(|_| Error::Timeout {
operation: format!("write characteristic {}", uuid),
duration: self.config.write_timeout,
})??;
Ok(())
}
pub async fn write_characteristic_with_timeout(
&self,
uuid: Uuid,
data: &[u8],
write_timeout: Duration,
) -> Result<()> {
let characteristic = self.find_characteristic(uuid).await?;
timeout(
write_timeout,
self.peripheral
.write(&characteristic, data, WriteType::WithResponse),
)
.await
.map_err(|_| Error::Timeout {
operation: format!("write characteristic {}", uuid),
duration: write_timeout,
})??;
Ok(())
}
#[tracing::instrument(level = "debug", skip(self), fields(device_name = ?self.name, device_type = ?self.device_type))]
pub async fn read_current(&self) -> Result<CurrentReading> {
let data = match self.device_type {
Some(DeviceType::Aranet4) => self.read_characteristic(CURRENT_READINGS_DETAIL).await?,
Some(DeviceType::Aranet2 | DeviceType::AranetRadon | DeviceType::AranetRadiation) => {
self.read_characteristic(CURRENT_READINGS_DETAIL_ALT)
.await?
}
None | Some(_) => {
match self.read_characteristic(CURRENT_READINGS_DETAIL).await {
Ok(data) => data,
Err(Error::CharacteristicNotFound { .. }) => {
debug!("Primary reading characteristic not found, trying alternative");
self.read_characteristic(CURRENT_READINGS_DETAIL_ALT)
.await?
}
Err(e) => return Err(e),
}
}
};
let device_type = match self.device_type {
Some(dt) => dt,
None => {
warn!(
"Device type unknown for {}; defaulting to Aranet4 — \
readings may be incorrect if this is a different model",
self.name().unwrap_or("unknown")
);
DeviceType::Aranet4
}
};
crate::readings::parse_reading_for_device(&data, device_type)
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn read_battery(&self) -> Result<u8> {
let data = self.read_characteristic(BATTERY_LEVEL).await?;
if data.is_empty() {
return Err(Error::InvalidData("Empty battery data".to_string()));
}
Ok(data[0])
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn read_device_info(&self) -> Result<DeviceInfo> {
fn read_string(data: Vec<u8>) -> String {
String::from_utf8(data)
.unwrap_or_default()
.trim_end_matches('\0')
.to_string()
}
let (
name_result,
model_result,
serial_result,
firmware_result,
hardware_result,
software_result,
manufacturer_result,
) = tokio::join!(
self.read_characteristic(DEVICE_NAME),
self.read_characteristic(MODEL_NUMBER),
self.read_characteristic(SERIAL_NUMBER),
self.read_characteristic(FIRMWARE_REVISION),
self.read_characteristic(HARDWARE_REVISION),
self.read_characteristic(SOFTWARE_REVISION),
self.read_characteristic(MANUFACTURER_NAME),
);
let name = name_result
.map(read_string)
.unwrap_or_else(|_| self.name.clone().unwrap_or_default());
let model = model_result.map(read_string).unwrap_or_default();
let serial = serial_result.map(read_string).unwrap_or_default();
let firmware = firmware_result.map(read_string).unwrap_or_default();
let hardware = hardware_result.map(read_string).unwrap_or_default();
let software = software_result.map(read_string).unwrap_or_default();
let manufacturer = manufacturer_result.map(read_string).unwrap_or_default();
Ok(DeviceInfo {
name,
model,
serial,
firmware,
hardware,
software,
manufacturer,
})
}
#[tracing::instrument(level = "debug", skip(self))]
pub async fn read_device_info_essential(&self) -> Result<DeviceInfo> {
fn read_string(data: Vec<u8>) -> String {
String::from_utf8(data)
.unwrap_or_default()
.trim_end_matches('\0')
.to_string()
}
let (name_result, serial_result, firmware_result) = tokio::join!(
self.read_characteristic(DEVICE_NAME),
self.read_characteristic(SERIAL_NUMBER),
self.read_characteristic(FIRMWARE_REVISION),
);
let name = name_result
.map(read_string)
.unwrap_or_else(|_| self.name.clone().unwrap_or_default());
let serial = serial_result.map(read_string).unwrap_or_default();
let firmware = firmware_result.map(read_string).unwrap_or_default();
Ok(DeviceInfo {
name,
model: String::new(),
serial,
firmware,
hardware: String::new(),
software: String::new(),
manufacturer: String::new(),
})
}
pub async fn subscribe_to_notifications<F>(&self, uuid: Uuid, callback: F) -> Result<()>
where
F: Fn(&[u8]) + Send + Sync + 'static,
{
let characteristic = self.find_characteristic(uuid).await?;
self.peripheral.subscribe(&characteristic).await?;
let mut stream = self.peripheral.notifications().await?;
let char_uuid = characteristic.uuid;
let handle = tokio::spawn(async move {
use futures::StreamExt;
while let Some(notification) = stream.next().await {
if notification.uuid == char_uuid {
callback(¬ification.value);
}
}
});
self.notification_handles.lock().await.push(handle);
Ok(())
}
pub async fn unsubscribe_from_notifications(&self, uuid: Uuid) -> Result<()> {
let characteristic = self.find_characteristic(uuid).await?;
self.peripheral.unsubscribe(&characteristic).await?;
Ok(())
}
pub async fn cached_characteristic_count(&self) -> usize {
self.characteristics_cache.read().await.len()
}
}
impl Drop for Device {
fn drop(&mut self) {
if !self.disconnected.load(Ordering::SeqCst) {
self.disconnected.store(true, Ordering::SeqCst);
warn!(
device_name = ?self.name,
device_address = %self.address,
"Device dropped without calling disconnect() - performing best-effort cleanup. \
For reliable cleanup, call device.disconnect().await before dropping."
);
if let Ok(mut handles) = self.notification_handles.try_lock() {
for handle in handles.drain(..) {
handle.abort();
}
}
let peripheral = self.peripheral.clone();
let address = self.address.clone();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
if let Err(e) = peripheral.disconnect().await {
debug!(
device_address = %address,
error = %e,
"Best-effort disconnect failed (device may already be disconnected)"
);
} else {
debug!(
device_address = %address,
"Best-effort disconnect completed"
);
}
});
}
}
}
}
impl AranetDevice for Device {
async fn is_connected(&self) -> bool {
Device::is_connected(self).await
}
async fn disconnect(&self) -> Result<()> {
Device::disconnect(self).await
}
fn name(&self) -> Option<&str> {
Device::name(self)
}
fn address(&self) -> &str {
Device::address(self)
}
fn device_type(&self) -> Option<DeviceType> {
Device::device_type(self)
}
async fn read_current(&self) -> Result<CurrentReading> {
Device::read_current(self).await
}
async fn read_device_info(&self) -> Result<DeviceInfo> {
Device::read_device_info(self).await
}
async fn read_rssi(&self) -> Result<i16> {
Device::read_rssi(self).await
}
async fn read_battery(&self) -> Result<u8> {
Device::read_battery(self).await
}
async fn get_history_info(&self) -> Result<crate::history::HistoryInfo> {
Device::get_history_info(self).await
}
async fn download_history(&self) -> Result<Vec<aranet_types::HistoryRecord>> {
Device::download_history(self).await
}
async fn download_history_with_options(
&self,
options: crate::history::HistoryOptions,
) -> Result<Vec<aranet_types::HistoryRecord>> {
Device::download_history_with_options(self, options).await
}
async fn get_interval(&self) -> Result<crate::settings::MeasurementInterval> {
Device::get_interval(self).await
}
async fn set_interval(&self, interval: crate::settings::MeasurementInterval) -> Result<()> {
Device::set_interval(self, interval).await
}
async fn get_calibration(&self) -> Result<crate::settings::CalibrationData> {
Device::get_calibration(self).await
}
}