use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use bytes::Buf;
use time::OffsetDateTime;
use tokio::time::sleep;
use tracing::{debug, info, warn};
use crate::commands::{HISTORY_V1_REQUEST, HISTORY_V2_REQUEST};
use crate::device::Device;
use crate::error::{Error, Result};
use crate::uuid::{COMMAND, HISTORY_V2, READ_INTERVAL, SECONDS_SINCE_UPDATE, TOTAL_READINGS};
use aranet_types::HistoryRecord;
#[derive(Debug, Clone)]
pub struct HistoryProgress {
pub current_param: HistoryParam,
pub param_index: usize,
pub total_params: usize,
pub values_downloaded: usize,
pub total_values: usize,
pub overall_progress: f32,
}
impl HistoryProgress {
pub fn new(
param: HistoryParam,
param_idx: usize,
total_params: usize,
total_values: usize,
) -> Self {
Self {
current_param: param,
param_index: param_idx,
total_params,
values_downloaded: 0,
total_values,
overall_progress: 0.0,
}
}
fn update(&mut self, values_downloaded: usize) {
self.values_downloaded = values_downloaded;
let param_progress = if self.total_values > 0 {
values_downloaded as f32 / self.total_values as f32
} else {
1.0
};
if self.total_params == 0 {
self.overall_progress = 1.0;
return;
}
let base_progress = (self.param_index - 1) as f32 / self.total_params as f32;
let param_contribution = param_progress / self.total_params as f32;
self.overall_progress = base_progress + param_contribution;
}
}
pub type ProgressCallback = Arc<dyn Fn(HistoryProgress) + Send + Sync>;
pub type CheckpointCallback = Arc<dyn Fn(HistoryCheckpoint) + Send + Sync>;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct HistoryCheckpoint {
pub device_id: String,
pub current_param: HistoryParamCheckpoint,
pub resume_index: u16,
pub total_readings: u16,
pub completed_params: Vec<HistoryParamCheckpoint>,
pub created_at: time::OffsetDateTime,
pub downloaded_data: Option<PartialHistoryData>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub enum HistoryParamCheckpoint {
Temperature,
Humidity,
Pressure,
Co2,
Humidity2,
Radon,
}
impl From<HistoryParam> for HistoryParamCheckpoint {
fn from(param: HistoryParam) -> Self {
match param {
HistoryParam::Temperature => HistoryParamCheckpoint::Temperature,
HistoryParam::Humidity => HistoryParamCheckpoint::Humidity,
HistoryParam::Pressure => HistoryParamCheckpoint::Pressure,
HistoryParam::Co2 => HistoryParamCheckpoint::Co2,
HistoryParam::Humidity2 => HistoryParamCheckpoint::Humidity2,
HistoryParam::Radon => HistoryParamCheckpoint::Radon,
}
}
}
impl From<HistoryParamCheckpoint> for HistoryParam {
fn from(param: HistoryParamCheckpoint) -> Self {
match param {
HistoryParamCheckpoint::Temperature => HistoryParam::Temperature,
HistoryParamCheckpoint::Humidity => HistoryParam::Humidity,
HistoryParamCheckpoint::Pressure => HistoryParam::Pressure,
HistoryParamCheckpoint::Co2 => HistoryParam::Co2,
HistoryParamCheckpoint::Humidity2 => HistoryParam::Humidity2,
HistoryParamCheckpoint::Radon => HistoryParam::Radon,
}
}
}
#[derive(Debug, Clone, Copy)]
struct U16HistoryStep {
param: HistoryParam,
step: usize,
total_steps: usize,
next_param: Option<HistoryParamCheckpoint>,
}
#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct PartialHistoryData {
pub co2_values: Vec<u16>,
pub temp_values: Vec<u16>,
pub pressure_values: Vec<u16>,
pub humidity_values: Vec<u16>,
pub radon_values: Vec<u32>,
}
impl HistoryCheckpoint {
pub fn new(device_id: &str, total_readings: u16, first_param: HistoryParam) -> Self {
Self {
device_id: device_id.to_string(),
current_param: first_param.into(),
resume_index: 1,
total_readings,
completed_params: Vec::new(),
created_at: time::OffsetDateTime::now_utc(),
downloaded_data: Some(PartialHistoryData::default()),
}
}
pub fn is_valid(&self, current_total_readings: u16) -> bool {
self.total_readings == current_total_readings
}
pub fn complete_param(&mut self, param: HistoryParam, values: Vec<u16>) {
self.completed_params.push(param.into());
if let Some(ref mut data) = self.downloaded_data {
match param {
HistoryParam::Co2 => data.co2_values = values,
HistoryParam::Temperature => data.temp_values = values,
HistoryParam::Pressure => data.pressure_values = values,
HistoryParam::Humidity | HistoryParam::Humidity2 => data.humidity_values = values,
HistoryParam::Radon => {} }
}
}
pub fn complete_radon_param(&mut self, values: Vec<u32>) {
self.completed_params.push(HistoryParamCheckpoint::Radon);
if let Some(ref mut data) = self.downloaded_data {
data.radon_values = values;
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum HistoryParam {
Temperature = 1,
Humidity = 2,
Pressure = 3,
Co2 = 4,
Humidity2 = 5,
Radon = 10,
}
#[derive(Clone)]
pub struct HistoryOptions {
pub start_index: Option<u16>,
pub end_index: Option<u16>,
pub read_delay: Duration,
pub progress_callback: Option<ProgressCallback>,
pub use_adaptive_delay: bool,
pub checkpoint_callback: Option<CheckpointCallback>,
pub checkpoint_interval: usize,
}
impl std::fmt::Debug for HistoryOptions {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HistoryOptions")
.field("start_index", &self.start_index)
.field("end_index", &self.end_index)
.field("read_delay", &self.read_delay)
.field("progress_callback", &self.progress_callback.is_some())
.field("use_adaptive_delay", &self.use_adaptive_delay)
.field("checkpoint_callback", &self.checkpoint_callback.is_some())
.field("checkpoint_interval", &self.checkpoint_interval)
.finish()
}
}
impl Default for HistoryOptions {
fn default() -> Self {
Self {
start_index: None,
end_index: None,
read_delay: Duration::from_millis(50),
progress_callback: None,
use_adaptive_delay: false,
checkpoint_callback: None,
checkpoint_interval: 100, }
}
}
impl HistoryOptions {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn start_index(mut self, index: u16) -> Self {
self.start_index = Some(index);
self
}
#[must_use]
pub fn end_index(mut self, index: u16) -> Self {
self.end_index = Some(index);
self
}
#[must_use]
pub fn read_delay(mut self, delay: Duration) -> Self {
self.read_delay = delay;
self
}
#[must_use]
pub fn with_progress<F>(mut self, callback: F) -> Self
where
F: Fn(HistoryProgress) + Send + Sync + 'static,
{
self.progress_callback = Some(Arc::new(callback));
self
}
pub fn report_progress(&self, progress: &HistoryProgress) {
if let Some(cb) = &self.progress_callback {
cb(progress.clone());
}
}
#[must_use]
pub fn adaptive_delay(mut self, enable: bool) -> Self {
self.use_adaptive_delay = enable;
self
}
#[must_use]
pub fn with_checkpoint<F>(mut self, callback: F) -> Self
where
F: Fn(HistoryCheckpoint) + Send + Sync + 'static,
{
self.checkpoint_callback = Some(Arc::new(callback));
self
}
#[must_use]
pub fn checkpoint_interval(mut self, interval: usize) -> Self {
self.checkpoint_interval = interval;
self
}
#[must_use]
pub fn resume_from(mut self, checkpoint: &HistoryCheckpoint) -> Self {
self.start_index = Some(checkpoint.resume_index);
self
}
pub fn report_checkpoint(&self, checkpoint: &HistoryCheckpoint) {
if let Some(cb) = &self.checkpoint_callback {
cb(checkpoint.clone());
}
}
pub fn effective_read_delay(
&self,
signal_quality: Option<crate::device::SignalQuality>,
) -> Duration {
if self.use_adaptive_delay
&& let Some(quality) = signal_quality
{
return quality.recommended_read_delay();
}
self.read_delay
}
}
#[derive(Debug, Clone)]
pub struct HistoryInfo {
pub total_readings: u16,
pub interval_seconds: u16,
pub seconds_since_update: u16,
}
impl Device {
pub async fn get_history_info(&self) -> Result<HistoryInfo> {
let total_data = self.read_characteristic(TOTAL_READINGS).await?;
let total_readings = if total_data.len() >= 2 {
u16::from_le_bytes([total_data[0], total_data[1]])
} else {
return Err(Error::InvalidData(
"Invalid total readings data".to_string(),
));
};
let interval_data = self.read_characteristic(READ_INTERVAL).await?;
let interval_seconds = if interval_data.len() >= 2 {
u16::from_le_bytes([interval_data[0], interval_data[1]])
} else {
return Err(Error::InvalidData("Invalid interval data".to_string()));
};
let age_data = self.read_characteristic(SECONDS_SINCE_UPDATE).await?;
let seconds_since_update = if age_data.len() >= 2 {
u16::from_le_bytes([age_data[0], age_data[1]])
} else {
0
};
Ok(HistoryInfo {
total_readings,
interval_seconds,
seconds_since_update,
})
}
pub async fn download_history(&self) -> Result<Vec<HistoryRecord>> {
self.download_history_with_options(HistoryOptions::default())
.await
}
pub async fn download_history_with_options(
&self,
options: HistoryOptions,
) -> Result<Vec<HistoryRecord>> {
use aranet_types::DeviceType;
let info = self.get_history_info().await?;
info!(
"Device has {} readings, interval {}s, last update {}s ago",
info.total_readings, info.interval_seconds, info.seconds_since_update
);
if info.total_readings == 0 {
return Ok(Vec::new());
}
let start_idx = options.start_index.unwrap_or(1);
let end_idx = options.end_index.unwrap_or(info.total_readings);
if start_idx > end_idx {
return Err(Error::InvalidConfig(format!(
"start_index ({start_idx}) must be <= end_index ({end_idx})"
)));
}
if start_idx == 0 {
return Err(Error::InvalidConfig(
"start_index must be >= 1 (indices are 1-based)".into(),
));
}
let signal_quality = if options.use_adaptive_delay {
match self.signal_quality().await {
Some(quality) => {
info!(
"Signal quality: {:?} - using {} ms read delay",
quality,
quality.recommended_read_delay().as_millis()
);
Some(quality)
}
None => {
debug!("Could not read signal quality, using default delay");
None
}
}
} else {
None
};
let effective_delay = options.effective_read_delay(signal_quality);
match self.device_type() {
Some(DeviceType::AranetRadiation) => {
Err(Error::Unsupported(
"History download is not available for Aranet Radiation devices. \
The radiation history protocol is not documented. \
Use read_current() for current radiation readings."
.to_string(),
))
}
Some(DeviceType::AranetRadon) => {
self.download_radon_history_internal(
&info,
start_idx,
end_idx,
&options,
effective_delay,
)
.await
}
Some(DeviceType::Aranet2) => {
self.download_aranet2_history_internal(
&info,
start_idx,
end_idx,
&options,
effective_delay,
)
.await
}
_ => {
self.download_aranet4_history_internal(
&info,
start_idx,
end_idx,
&options,
effective_delay,
)
.await
}
}
}
async fn download_u16_param_with_checkpoint(
&self,
step_info: U16HistoryStep,
start_idx: u16,
end_idx: u16,
effective_delay: Duration,
options: &HistoryOptions,
checkpoint: &mut Option<HistoryCheckpoint>,
) -> Result<Vec<u16>> {
let total_values = (end_idx - start_idx + 1) as usize;
let mut progress = HistoryProgress::new(
step_info.param,
step_info.step,
step_info.total_steps,
total_values,
);
options.report_progress(&progress);
let values = self
.download_param_history_with_progress(
step_info.param,
start_idx,
end_idx,
effective_delay,
|downloaded| {
progress.update(downloaded);
options.report_progress(&progress);
},
)
.await?;
if let Some(cp) = checkpoint {
cp.complete_param(step_info.param, values.clone());
if let Some(next) = step_info.next_param {
cp.current_param = next;
cp.resume_index = start_idx;
}
options.report_checkpoint(cp);
}
Ok(values)
}
async fn download_aranet4_history_internal(
&self,
info: &HistoryInfo,
start_idx: u16,
end_idx: u16,
options: &HistoryOptions,
effective_delay: Duration,
) -> Result<Vec<HistoryRecord>> {
if start_idx > end_idx {
return Ok(Vec::new());
}
let device_id = self.address().to_string();
let mut checkpoint = if options.checkpoint_callback.is_some() {
Some(HistoryCheckpoint::new(
&device_id,
info.total_readings,
HistoryParam::Co2,
))
} else {
None
};
let co2_values = self
.download_u16_param_with_checkpoint(
U16HistoryStep {
param: HistoryParam::Co2,
step: 1,
total_steps: 4,
next_param: Some(HistoryParamCheckpoint::Temperature),
},
start_idx,
end_idx,
effective_delay,
options,
&mut checkpoint,
)
.await?;
let temp_values = self
.download_u16_param_with_checkpoint(
U16HistoryStep {
param: HistoryParam::Temperature,
step: 2,
total_steps: 4,
next_param: Some(HistoryParamCheckpoint::Pressure),
},
start_idx,
end_idx,
effective_delay,
options,
&mut checkpoint,
)
.await?;
let pressure_values = self
.download_u16_param_with_checkpoint(
U16HistoryStep {
param: HistoryParam::Pressure,
step: 3,
total_steps: 4,
next_param: Some(HistoryParamCheckpoint::Humidity),
},
start_idx,
end_idx,
effective_delay,
options,
&mut checkpoint,
)
.await?;
let humidity_values = self
.download_u16_param_with_checkpoint(
U16HistoryStep {
param: HistoryParam::Humidity,
step: 4,
total_steps: 4,
next_param: None,
},
start_idx,
end_idx,
effective_delay,
options,
&mut checkpoint,
)
.await?;
let records = build_history_records(
info,
&co2_values,
&temp_values,
&pressure_values,
&humidity_values,
&[],
);
info!("Downloaded {} history records", records.len());
Ok(records)
}
async fn download_aranet2_history_internal(
&self,
info: &HistoryInfo,
start_idx: u16,
end_idx: u16,
options: &HistoryOptions,
effective_delay: Duration,
) -> Result<Vec<HistoryRecord>> {
if start_idx > end_idx {
return Ok(Vec::new());
}
let device_id = self.address().to_string();
let mut checkpoint = if options.checkpoint_callback.is_some() {
Some(HistoryCheckpoint::new(
&device_id,
info.total_readings,
HistoryParam::Temperature,
))
} else {
None
};
let temp_values = self
.download_u16_param_with_checkpoint(
U16HistoryStep {
param: HistoryParam::Temperature,
step: 1,
total_steps: 2,
next_param: Some(HistoryParamCheckpoint::Humidity2),
},
start_idx,
end_idx,
effective_delay,
options,
&mut checkpoint,
)
.await?;
let humidity_values = self
.download_u16_param_with_checkpoint(
U16HistoryStep {
param: HistoryParam::Humidity2,
step: 2,
total_steps: 2,
next_param: None,
},
start_idx,
end_idx,
effective_delay,
options,
&mut checkpoint,
)
.await?;
let records = build_history_records(info, &[], &temp_values, &[], &humidity_values, &[]);
info!("Downloaded {} Aranet2 history records", records.len());
Ok(records)
}
async fn download_radon_history_internal(
&self,
info: &HistoryInfo,
start_idx: u16,
end_idx: u16,
options: &HistoryOptions,
effective_delay: Duration,
) -> Result<Vec<HistoryRecord>> {
if start_idx > end_idx {
return Ok(Vec::new());
}
let total_values = (end_idx - start_idx + 1) as usize;
let device_id = self.address().to_string();
let mut checkpoint = if options.checkpoint_callback.is_some() {
Some(HistoryCheckpoint::new(
&device_id,
info.total_readings,
HistoryParam::Radon,
))
} else {
None
};
let mut progress = HistoryProgress::new(HistoryParam::Radon, 1, 4, total_values);
options.report_progress(&progress);
let radon_values = self
.download_param_history_u32_with_progress(
HistoryParam::Radon,
start_idx,
end_idx,
effective_delay,
|downloaded| {
progress.update(downloaded);
options.report_progress(&progress);
},
)
.await?;
if let Some(ref mut cp) = checkpoint {
cp.complete_radon_param(radon_values.clone());
cp.current_param = HistoryParamCheckpoint::Temperature;
cp.resume_index = start_idx;
options.report_checkpoint(cp);
}
let temp_values = self
.download_u16_param_with_checkpoint(
U16HistoryStep {
param: HistoryParam::Temperature,
step: 2,
total_steps: 4,
next_param: Some(HistoryParamCheckpoint::Pressure),
},
start_idx,
end_idx,
effective_delay,
options,
&mut checkpoint,
)
.await?;
let pressure_values = self
.download_u16_param_with_checkpoint(
U16HistoryStep {
param: HistoryParam::Pressure,
step: 3,
total_steps: 4,
next_param: Some(HistoryParamCheckpoint::Humidity2),
},
start_idx,
end_idx,
effective_delay,
options,
&mut checkpoint,
)
.await?;
let humidity_values = self
.download_u16_param_with_checkpoint(
U16HistoryStep {
param: HistoryParam::Humidity2,
step: 4,
total_steps: 4,
next_param: None,
},
start_idx,
end_idx,
effective_delay,
options,
&mut checkpoint,
)
.await?;
let records = build_history_records(
info,
&[],
&temp_values,
&pressure_values,
&humidity_values,
&radon_values,
);
info!("Downloaded {} radon history records", records.len());
Ok(records)
}
#[allow(clippy::too_many_arguments)]
async fn download_param_history_generic_with_progress<T, F>(
&self,
param: HistoryParam,
start_idx: u16,
end_idx: u16,
read_delay: Duration,
value_parser: impl Fn(&[u8], usize) -> Option<T>,
value_size: usize,
mut on_progress: F,
) -> Result<Vec<T>>
where
T: Default + Clone,
F: FnMut(usize),
{
debug!(
"Downloading {:?} history from {} to {} (value_size={})",
param, start_idx, end_idx, value_size
);
let mut values: BTreeMap<u16, T> = BTreeMap::new();
let mut current_idx = start_idx;
let mut consecutive_wrong_param = 0u32;
const MAX_WRONG_PARAM_RETRIES: u32 = 5;
while current_idx <= end_idx {
let cmd = [
HISTORY_V2_REQUEST,
param as u8,
(current_idx & 0xFF) as u8,
((current_idx >> 8) & 0xFF) as u8,
];
self.write_characteristic(COMMAND, &cmd).await?;
sleep(read_delay).await;
let response = self.read_characteristic(HISTORY_V2).await?;
if response.len() < 10 {
warn!(
"Invalid history response: too short ({} bytes)",
response.len()
);
break;
}
let resp_param = response[0];
if resp_param != param as u8 {
consecutive_wrong_param += 1;
warn!(
"Unexpected parameter in response: {} (retry {}/{})",
resp_param, consecutive_wrong_param, MAX_WRONG_PARAM_RETRIES
);
if consecutive_wrong_param >= MAX_WRONG_PARAM_RETRIES {
warn!("Too many wrong parameter responses, aborting download");
break;
}
sleep(read_delay).await;
continue;
}
consecutive_wrong_param = 0;
let resp_start = u16::from_le_bytes([response[7], response[8]]);
let resp_count = response[9] as usize;
debug!(
"History response: param={}, start={}, count={}",
resp_param, resp_start, resp_count
);
if resp_count == 0 {
debug!("Reached end of history (count=0)");
break;
}
let data = &response[10..];
let num_values = (data.len() / value_size).min(resp_count);
for i in 0..num_values {
let idx = resp_start + i as u16;
if idx > end_idx {
break;
}
if let Some(value) = value_parser(data, i) {
values.insert(idx, value);
}
}
current_idx = resp_start + num_values as u16;
debug!(
"Downloaded {} values, next index: {}",
num_values, current_idx
);
on_progress(values.len());
if (resp_start as usize + resp_count) >= end_idx as usize {
debug!("Reached end of requested range");
break;
}
}
Ok(values.into_values().collect())
}
async fn download_param_history_with_progress<F>(
&self,
param: HistoryParam,
start_idx: u16,
end_idx: u16,
read_delay: Duration,
on_progress: F,
) -> Result<Vec<u16>>
where
F: FnMut(usize),
{
let value_size = if param == HistoryParam::Humidity {
1
} else {
2
};
self.download_param_history_generic_with_progress(
param,
start_idx,
end_idx,
read_delay,
|data, i| {
if param == HistoryParam::Humidity {
data.get(i).map(|&b| b as u16)
} else {
let offset = i * 2;
if offset + 1 < data.len() {
Some(u16::from_le_bytes([data[offset], data[offset + 1]]))
} else {
None
}
}
},
value_size,
on_progress,
)
.await
}
async fn download_param_history_u32_with_progress<F>(
&self,
param: HistoryParam,
start_idx: u16,
end_idx: u16,
read_delay: Duration,
on_progress: F,
) -> Result<Vec<u32>>
where
F: FnMut(usize),
{
self.download_param_history_generic_with_progress(
param,
start_idx,
end_idx,
read_delay,
|data, i| {
let offset = i * 4;
if offset + 3 < data.len() {
Some(u32::from_le_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]))
} else {
None
}
},
4,
on_progress,
)
.await
}
pub async fn download_history_v1(&self) -> Result<Vec<HistoryRecord>> {
use crate::uuid::HISTORY_V1;
use tokio::sync::mpsc;
let info = self.get_history_info().await?;
info!(
"V1 download: {} readings, interval {}s",
info.total_readings, info.interval_seconds
);
if info.total_readings == 0 {
return Ok(Vec::new());
}
let (tx, mut rx) = mpsc::channel::<Vec<u8>>(256);
self.subscribe_to_notifications(HISTORY_V1, move |data| {
if let Err(e) = tx.try_send(data.to_vec()) {
warn!(
"V1 history notification channel full or closed, data may be lost: {}",
e
);
}
})
.await?;
let mut co2_values = Vec::new();
let mut temp_values = Vec::new();
let mut pressure_values = Vec::new();
let mut humidity_values = Vec::new();
for param in [
HistoryParam::Co2,
HistoryParam::Temperature,
HistoryParam::Pressure,
HistoryParam::Humidity,
] {
let cmd = [
HISTORY_V1_REQUEST,
param as u8,
0x01,
0x00,
(info.total_readings & 0xFF) as u8,
((info.total_readings >> 8) & 0xFF) as u8,
];
self.write_characteristic(COMMAND, &cmd).await?;
let mut values = Vec::new();
let expected = info.total_readings as usize;
let mut consecutive_timeouts = 0;
const MAX_CONSECUTIVE_TIMEOUTS: u32 = 3;
while values.len() < expected {
match tokio::time::timeout(Duration::from_secs(5), rx.recv()).await {
Ok(Some(data)) => {
consecutive_timeouts = 0; if data.len() >= 3 {
let resp_param = data[0];
if resp_param == param as u8 {
let mut buf = &data[3..];
while buf.len() >= 2 && values.len() < expected {
values.push(buf.get_u16_le());
}
}
}
}
Ok(None) => {
warn!(
"V1 history channel closed for {:?}: got {}/{} values",
param,
values.len(),
expected
);
break;
}
Err(_) => {
consecutive_timeouts += 1;
warn!(
"Timeout waiting for V1 history notification ({}/{}), {:?}: {}/{} values",
consecutive_timeouts,
MAX_CONSECUTIVE_TIMEOUTS,
param,
values.len(),
expected
);
if consecutive_timeouts >= MAX_CONSECUTIVE_TIMEOUTS {
warn!(
"Too many consecutive timeouts for {:?}, proceeding with partial data",
param
);
break;
}
}
}
}
if values.len() < expected {
warn!(
"V1 history download incomplete for {:?}: got {}/{} values ({:.1}%)",
param,
values.len(),
expected,
(values.len() as f64 / expected as f64) * 100.0
);
}
match param {
HistoryParam::Co2 => co2_values = values,
HistoryParam::Temperature => temp_values = values,
HistoryParam::Pressure => pressure_values = values,
HistoryParam::Humidity => humidity_values = values,
HistoryParam::Humidity2 | HistoryParam::Radon => {}
}
}
self.unsubscribe_from_notifications(HISTORY_V1).await?;
let now = OffsetDateTime::now_utc();
let latest_reading_time = now - time::Duration::seconds(info.seconds_since_update as i64);
let mut records = Vec::new();
let count = co2_values.len();
if temp_values.len() != count
|| pressure_values.len() != count
|| humidity_values.len() != count
{
warn!(
"V1 history arrays have mismatched lengths: co2={}, temp={}, pressure={}, humidity={} — \
records with missing values will use defaults",
count,
temp_values.len(),
pressure_values.len(),
humidity_values.len()
);
}
for i in 0..count {
let readings_ago = (count - 1 - i) as i64;
let timestamp = latest_reading_time
- time::Duration::seconds(readings_ago * info.interval_seconds as i64);
let record = HistoryRecord {
timestamp,
co2: co2_values.get(i).copied().unwrap_or(0),
temperature: raw_to_temperature(temp_values.get(i).copied().unwrap_or(0)),
pressure: raw_to_pressure(pressure_values.get(i).copied().unwrap_or(0)),
humidity: humidity_values.get(i).copied().unwrap_or(0) as u8,
radon: None,
radiation_rate: None,
radiation_total: None,
};
records.push(record);
}
info!("V1 download complete: {} records", records.len());
Ok(records)
}
}
fn build_history_records(
info: &HistoryInfo,
co2_values: &[u16],
temp_values: &[u16],
pressure_values: &[u16],
humidity_values: &[u16],
radon_values: &[u32],
) -> Vec<HistoryRecord> {
let is_radon = !radon_values.is_empty();
let is_aranet2 = co2_values.is_empty() && radon_values.is_empty();
let count = if is_radon {
radon_values.len()
} else if is_aranet2 {
temp_values.len()
} else {
co2_values.len()
};
let expected = count;
if temp_values.len() != expected
|| pressure_values.len() != expected
|| humidity_values.len() != expected
{
warn!(
"History arrays have mismatched lengths: primary={expected}, temp={}, pressure={}, humidity={} — \
records with missing values will use defaults",
temp_values.len(),
pressure_values.len(),
humidity_values.len()
);
}
let now = OffsetDateTime::now_utc();
let latest_reading_time = now - time::Duration::seconds(info.seconds_since_update as i64);
(0..count)
.map(|i| {
let readings_ago = (count - 1 - i) as i64;
let timestamp = latest_reading_time
- time::Duration::seconds(readings_ago * info.interval_seconds as i64);
let humidity = if is_radon || is_aranet2 {
let raw = humidity_values.get(i).copied().unwrap_or(0);
(raw / 10).min(100) as u8
} else {
humidity_values.get(i).copied().unwrap_or(0) as u8
};
HistoryRecord {
timestamp,
co2: if is_radon {
0
} else {
co2_values.get(i).copied().unwrap_or(0)
},
temperature: raw_to_temperature(temp_values.get(i).copied().unwrap_or(0)),
pressure: raw_to_pressure(pressure_values.get(i).copied().unwrap_or(0)),
humidity,
radon: if is_radon {
Some(radon_values.get(i).copied().unwrap_or(0))
} else {
None
},
radiation_rate: None,
radiation_total: None,
}
})
.collect()
}
pub fn raw_to_temperature(raw: u16) -> f32 {
raw as f32 / 20.0
}
pub fn raw_to_pressure(raw: u16) -> f32 {
raw as f32 / 10.0
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_raw_to_temperature_typical_values() {
assert!((raw_to_temperature(450) - 22.5).abs() < 0.001);
assert!((raw_to_temperature(400) - 20.0).abs() < 0.001);
assert!((raw_to_temperature(500) - 25.0).abs() < 0.001);
}
#[test]
fn test_raw_to_temperature_edge_cases() {
assert!((raw_to_temperature(0) - 0.0).abs() < 0.001);
assert!((raw_to_temperature(1000) - 50.0).abs() < 0.001);
assert!((raw_to_temperature(u16::MAX) - 3276.75).abs() < 0.01);
}
#[test]
fn test_raw_to_temperature_precision() {
assert!((raw_to_temperature(451) - 22.55).abs() < 0.001);
assert!((raw_to_temperature(441) - 22.05).abs() < 0.001);
}
#[test]
fn test_raw_to_pressure_typical_values() {
assert!((raw_to_pressure(10132) - 1013.2).abs() < 0.01);
assert!((raw_to_pressure(10000) - 1000.0).abs() < 0.01);
assert!((raw_to_pressure(10500) - 1050.0).abs() < 0.01);
}
#[test]
fn test_raw_to_pressure_edge_cases() {
assert!((raw_to_pressure(0) - 0.0).abs() < 0.01);
assert!((raw_to_pressure(9500) - 950.0).abs() < 0.01);
assert!((raw_to_pressure(11000) - 1100.0).abs() < 0.01);
assert!((raw_to_pressure(u16::MAX) - 6553.5).abs() < 0.1);
}
#[test]
fn test_history_param_values() {
assert_eq!(HistoryParam::Temperature as u8, 1);
assert_eq!(HistoryParam::Humidity as u8, 2);
assert_eq!(HistoryParam::Pressure as u8, 3);
assert_eq!(HistoryParam::Co2 as u8, 4);
}
#[test]
fn test_history_param_debug() {
assert_eq!(format!("{:?}", HistoryParam::Temperature), "Temperature");
assert_eq!(format!("{:?}", HistoryParam::Co2), "Co2");
}
#[test]
fn test_history_options_default() {
let options = HistoryOptions::default();
assert!(options.start_index.is_none());
assert!(options.end_index.is_none());
assert_eq!(options.read_delay, Duration::from_millis(50));
}
#[test]
fn test_history_options_custom() {
let options = HistoryOptions::new()
.start_index(10)
.end_index(100)
.read_delay(Duration::from_millis(100));
assert_eq!(options.start_index, Some(10));
assert_eq!(options.end_index, Some(100));
assert_eq!(options.read_delay, Duration::from_millis(100));
}
#[test]
fn test_history_options_with_progress() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
let call_count = Arc::new(AtomicUsize::new(0));
let call_count_clone = Arc::clone(&call_count);
let options = HistoryOptions::new().with_progress(move |_progress| {
call_count_clone.fetch_add(1, Ordering::SeqCst);
});
assert!(options.progress_callback.is_some());
let progress = HistoryProgress::new(HistoryParam::Co2, 1, 4, 100);
options.report_progress(&progress);
assert_eq!(call_count.load(Ordering::SeqCst), 1);
}
#[test]
fn test_history_info_creation() {
let info = HistoryInfo {
total_readings: 1000,
interval_seconds: 300,
seconds_since_update: 120,
};
assert_eq!(info.total_readings, 1000);
assert_eq!(info.interval_seconds, 300);
assert_eq!(info.seconds_since_update, 120);
}
#[test]
fn test_history_info_debug() {
let info = HistoryInfo {
total_readings: 500,
interval_seconds: 60,
seconds_since_update: 30,
};
let debug_str = format!("{:?}", info);
assert!(debug_str.contains("total_readings"));
assert!(debug_str.contains("500"));
}
}