1use std::collections::BTreeMap;
43use std::sync::Arc;
44use std::time::Duration;
45
46use bytes::Buf;
47use time::OffsetDateTime;
48use tokio::time::sleep;
49use tracing::{debug, info, warn};
50
51use crate::commands::{HISTORY_V1_REQUEST, HISTORY_V2_REQUEST};
52use crate::device::Device;
53use crate::error::{Error, Result};
54use crate::uuid::{COMMAND, HISTORY_V2, READ_INTERVAL, SECONDS_SINCE_UPDATE, TOTAL_READINGS};
55use aranet_types::HistoryRecord;
56
57#[derive(Debug, Clone)]
59pub struct HistoryProgress {
60 pub current_param: HistoryParam,
62 pub param_index: usize,
64 pub total_params: usize,
66 pub values_downloaded: usize,
68 pub total_values: usize,
70 pub overall_progress: f32,
72}
73
74impl HistoryProgress {
75 pub fn new(
77 param: HistoryParam,
78 param_idx: usize,
79 total_params: usize,
80 total_values: usize,
81 ) -> Self {
82 Self {
83 current_param: param,
84 param_index: param_idx,
85 total_params,
86 values_downloaded: 0,
87 total_values,
88 overall_progress: 0.0,
89 }
90 }
91
92 fn update(&mut self, values_downloaded: usize) {
93 self.values_downloaded = values_downloaded;
94 let param_progress = if self.total_values > 0 {
95 values_downloaded as f32 / self.total_values as f32
96 } else {
97 1.0
98 };
99 if self.total_params == 0 {
101 self.overall_progress = 1.0;
102 return;
103 }
104 let base_progress = (self.param_index - 1) as f32 / self.total_params as f32;
105 let param_contribution = param_progress / self.total_params as f32;
106 self.overall_progress = base_progress + param_contribution;
107 }
108}
109
110pub type ProgressCallback = Arc<dyn Fn(HistoryProgress) + Send + Sync>;
112
113pub type CheckpointCallback = Arc<dyn Fn(HistoryCheckpoint) + Send + Sync>;
115
116#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
121pub struct HistoryCheckpoint {
122 pub device_id: String,
124 pub current_param: HistoryParamCheckpoint,
126 pub resume_index: u16,
128 pub total_readings: u16,
130 pub completed_params: Vec<HistoryParamCheckpoint>,
132 pub created_at: time::OffsetDateTime,
134 pub downloaded_data: Option<PartialHistoryData>,
136}
137
138#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
140pub enum HistoryParamCheckpoint {
141 Temperature,
142 Humidity,
143 Pressure,
144 Co2,
145 Humidity2,
146 Radon,
147}
148
149impl From<HistoryParam> for HistoryParamCheckpoint {
150 fn from(param: HistoryParam) -> Self {
151 match param {
152 HistoryParam::Temperature => HistoryParamCheckpoint::Temperature,
153 HistoryParam::Humidity => HistoryParamCheckpoint::Humidity,
154 HistoryParam::Pressure => HistoryParamCheckpoint::Pressure,
155 HistoryParam::Co2 => HistoryParamCheckpoint::Co2,
156 HistoryParam::Humidity2 => HistoryParamCheckpoint::Humidity2,
157 HistoryParam::Radon => HistoryParamCheckpoint::Radon,
158 }
159 }
160}
161
162impl From<HistoryParamCheckpoint> for HistoryParam {
163 fn from(param: HistoryParamCheckpoint) -> Self {
164 match param {
165 HistoryParamCheckpoint::Temperature => HistoryParam::Temperature,
166 HistoryParamCheckpoint::Humidity => HistoryParam::Humidity,
167 HistoryParamCheckpoint::Pressure => HistoryParam::Pressure,
168 HistoryParamCheckpoint::Co2 => HistoryParam::Co2,
169 HistoryParamCheckpoint::Humidity2 => HistoryParam::Humidity2,
170 HistoryParamCheckpoint::Radon => HistoryParam::Radon,
171 }
172 }
173}
174
175#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
177pub struct PartialHistoryData {
178 pub co2_values: Vec<u16>,
179 pub temp_values: Vec<u16>,
180 pub pressure_values: Vec<u16>,
181 pub humidity_values: Vec<u16>,
182 pub radon_values: Vec<u32>,
183}
184
185impl HistoryCheckpoint {
186 pub fn new(device_id: &str, total_readings: u16, first_param: HistoryParam) -> Self {
188 Self {
189 device_id: device_id.to_string(),
190 current_param: first_param.into(),
191 resume_index: 1,
192 total_readings,
193 completed_params: Vec::new(),
194 created_at: time::OffsetDateTime::now_utc(),
195 downloaded_data: Some(PartialHistoryData::default()),
196 }
197 }
198
199 pub fn is_valid(&self, current_total_readings: u16) -> bool {
201 self.total_readings == current_total_readings
204 }
205
206 pub fn complete_param(&mut self, param: HistoryParam, values: Vec<u16>) {
208 self.completed_params.push(param.into());
209 if let Some(ref mut data) = self.downloaded_data {
210 match param {
211 HistoryParam::Co2 => data.co2_values = values,
212 HistoryParam::Temperature => data.temp_values = values,
213 HistoryParam::Pressure => data.pressure_values = values,
214 HistoryParam::Humidity | HistoryParam::Humidity2 => data.humidity_values = values,
215 HistoryParam::Radon => {} }
217 }
218 }
219
220 pub fn complete_radon_param(&mut self, values: Vec<u32>) {
222 self.completed_params.push(HistoryParamCheckpoint::Radon);
223 if let Some(ref mut data) = self.downloaded_data {
224 data.radon_values = values;
225 }
226 }
227}
228
229#[derive(Debug, Clone, Copy, PartialEq, Eq)]
231#[repr(u8)]
232pub enum HistoryParam {
233 Temperature = 1,
234 Humidity = 2,
235 Pressure = 3,
236 Co2 = 4,
237 Humidity2 = 5,
239 Radon = 10,
241}
242
243#[derive(Clone)]
276pub struct HistoryOptions {
277 pub start_index: Option<u16>,
279 pub end_index: Option<u16>,
281 pub read_delay: Duration,
283 pub progress_callback: Option<ProgressCallback>,
285 pub use_adaptive_delay: bool,
287 pub checkpoint_callback: Option<CheckpointCallback>,
290 pub checkpoint_interval: usize,
292}
293
294impl std::fmt::Debug for HistoryOptions {
295 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
296 f.debug_struct("HistoryOptions")
297 .field("start_index", &self.start_index)
298 .field("end_index", &self.end_index)
299 .field("read_delay", &self.read_delay)
300 .field("progress_callback", &self.progress_callback.is_some())
301 .field("use_adaptive_delay", &self.use_adaptive_delay)
302 .field("checkpoint_callback", &self.checkpoint_callback.is_some())
303 .field("checkpoint_interval", &self.checkpoint_interval)
304 .finish()
305 }
306}
307
308impl Default for HistoryOptions {
309 fn default() -> Self {
310 Self {
311 start_index: None,
312 end_index: None,
313 read_delay: Duration::from_millis(50),
314 progress_callback: None,
315 use_adaptive_delay: false,
316 checkpoint_callback: None,
317 checkpoint_interval: 100, }
319 }
320}
321
322impl HistoryOptions {
323 #[must_use]
325 pub fn new() -> Self {
326 Self::default()
327 }
328
329 #[must_use]
331 pub fn start_index(mut self, index: u16) -> Self {
332 self.start_index = Some(index);
333 self
334 }
335
336 #[must_use]
338 pub fn end_index(mut self, index: u16) -> Self {
339 self.end_index = Some(index);
340 self
341 }
342
343 #[must_use]
345 pub fn read_delay(mut self, delay: Duration) -> Self {
346 self.read_delay = delay;
347 self
348 }
349
350 #[must_use]
352 pub fn with_progress<F>(mut self, callback: F) -> Self
353 where
354 F: Fn(HistoryProgress) + Send + Sync + 'static,
355 {
356 self.progress_callback = Some(Arc::new(callback));
357 self
358 }
359
360 pub fn report_progress(&self, progress: &HistoryProgress) {
362 if let Some(cb) = &self.progress_callback {
363 cb(progress.clone());
364 }
365 }
366
367 #[must_use]
376 pub fn adaptive_delay(mut self, enable: bool) -> Self {
377 self.use_adaptive_delay = enable;
378 self
379 }
380
381 #[must_use]
386 pub fn with_checkpoint<F>(mut self, callback: F) -> Self
387 where
388 F: Fn(HistoryCheckpoint) + Send + Sync + 'static,
389 {
390 self.checkpoint_callback = Some(Arc::new(callback));
391 self
392 }
393
394 #[must_use]
398 pub fn checkpoint_interval(mut self, interval: usize) -> Self {
399 self.checkpoint_interval = interval;
400 self
401 }
402
403 #[must_use]
407 pub fn resume_from(mut self, checkpoint: &HistoryCheckpoint) -> Self {
408 self.start_index = Some(checkpoint.resume_index);
409 self
410 }
411
412 pub fn report_checkpoint(&self, checkpoint: &HistoryCheckpoint) {
414 if let Some(cb) = &self.checkpoint_callback {
415 cb(checkpoint.clone());
416 }
417 }
418
419 pub fn effective_read_delay(
421 &self,
422 signal_quality: Option<crate::device::SignalQuality>,
423 ) -> Duration {
424 if self.use_adaptive_delay
425 && let Some(quality) = signal_quality
426 {
427 return quality.recommended_read_delay();
428 }
429 self.read_delay
430 }
431}
432
433#[derive(Debug, Clone)]
435pub struct HistoryInfo {
436 pub total_readings: u16,
438 pub interval_seconds: u16,
440 pub seconds_since_update: u16,
442}
443
444impl Device {
445 pub async fn get_history_info(&self) -> Result<HistoryInfo> {
447 let total_data = self.read_characteristic(TOTAL_READINGS).await?;
449 let total_readings = if total_data.len() >= 2 {
450 u16::from_le_bytes([total_data[0], total_data[1]])
451 } else {
452 return Err(Error::InvalidData(
453 "Invalid total readings data".to_string(),
454 ));
455 };
456
457 let interval_data = self.read_characteristic(READ_INTERVAL).await?;
459 let interval_seconds = if interval_data.len() >= 2 {
460 u16::from_le_bytes([interval_data[0], interval_data[1]])
461 } else {
462 return Err(Error::InvalidData("Invalid interval data".to_string()));
463 };
464
465 let age_data = self.read_characteristic(SECONDS_SINCE_UPDATE).await?;
467 let seconds_since_update = if age_data.len() >= 2 {
468 u16::from_le_bytes([age_data[0], age_data[1]])
469 } else {
470 0
471 };
472
473 Ok(HistoryInfo {
474 total_readings,
475 interval_seconds,
476 seconds_since_update,
477 })
478 }
479
480 pub async fn download_history(&self) -> Result<Vec<HistoryRecord>> {
482 self.download_history_with_options(HistoryOptions::default())
483 .await
484 }
485
486 pub async fn download_history_with_options(
508 &self,
509 options: HistoryOptions,
510 ) -> Result<Vec<HistoryRecord>> {
511 use aranet_types::DeviceType;
512
513 let info = self.get_history_info().await?;
514 info!(
515 "Device has {} readings, interval {}s, last update {}s ago",
516 info.total_readings, info.interval_seconds, info.seconds_since_update
517 );
518
519 if info.total_readings == 0 {
520 return Ok(Vec::new());
521 }
522
523 let start_idx = options.start_index.unwrap_or(1);
524 let end_idx = options.end_index.unwrap_or(info.total_readings);
525
526 let signal_quality = if options.use_adaptive_delay {
528 match self.signal_quality().await {
529 Some(quality) => {
530 info!(
531 "Signal quality: {:?} - using {} ms read delay",
532 quality,
533 quality.recommended_read_delay().as_millis()
534 );
535 Some(quality)
536 }
537 None => {
538 debug!("Could not read signal quality, using default delay");
539 None
540 }
541 }
542 } else {
543 None
544 };
545
546 let effective_delay = options.effective_read_delay(signal_quality);
548
549 match self.device_type() {
551 Some(DeviceType::AranetRadiation) => {
552 Err(Error::InvalidData(
556 "History download for Aranet Radiation devices is not yet implemented. \
557 Current readings are available via read_current()."
558 .to_string(),
559 ))
560 }
561 Some(DeviceType::AranetRadon) => {
562 self.download_radon_history_internal(
564 &info,
565 start_idx,
566 end_idx,
567 &options,
568 effective_delay,
569 )
570 .await
571 }
572 _ => {
573 self.download_aranet4_history_internal(
575 &info,
576 start_idx,
577 end_idx,
578 &options,
579 effective_delay,
580 )
581 .await
582 }
583 }
584 }
585
586 async fn download_aranet4_history_internal(
588 &self,
589 info: &HistoryInfo,
590 start_idx: u16,
591 end_idx: u16,
592 options: &HistoryOptions,
593 effective_delay: Duration,
594 ) -> Result<Vec<HistoryRecord>> {
595 let total_values = (end_idx - start_idx + 1) as usize;
596
597 let device_id = self.address().to_string();
599 let mut checkpoint = if options.checkpoint_callback.is_some() {
600 Some(HistoryCheckpoint::new(
601 &device_id,
602 info.total_readings,
603 HistoryParam::Co2,
604 ))
605 } else {
606 None
607 };
608
609 let mut progress = HistoryProgress::new(HistoryParam::Co2, 1, 4, total_values);
611 options.report_progress(&progress);
612
613 let co2_values = self
614 .download_param_history_with_progress(
615 HistoryParam::Co2,
616 start_idx,
617 end_idx,
618 effective_delay,
619 |downloaded| {
620 progress.update(downloaded);
621 options.report_progress(&progress);
622 },
623 )
624 .await?;
625
626 if let Some(ref mut cp) = checkpoint {
628 cp.complete_param(HistoryParam::Co2, co2_values.clone());
629 cp.current_param = HistoryParamCheckpoint::Temperature;
630 cp.resume_index = start_idx;
631 options.report_checkpoint(cp);
632 }
633
634 progress = HistoryProgress::new(HistoryParam::Temperature, 2, 4, total_values);
635 options.report_progress(&progress);
636
637 let temp_values = self
638 .download_param_history_with_progress(
639 HistoryParam::Temperature,
640 start_idx,
641 end_idx,
642 effective_delay,
643 |downloaded| {
644 progress.update(downloaded);
645 options.report_progress(&progress);
646 },
647 )
648 .await?;
649
650 if let Some(ref mut cp) = checkpoint {
652 cp.complete_param(HistoryParam::Temperature, temp_values.clone());
653 cp.current_param = HistoryParamCheckpoint::Pressure;
654 cp.resume_index = start_idx;
655 options.report_checkpoint(cp);
656 }
657
658 progress = HistoryProgress::new(HistoryParam::Pressure, 3, 4, total_values);
659 options.report_progress(&progress);
660
661 let pressure_values = self
662 .download_param_history_with_progress(
663 HistoryParam::Pressure,
664 start_idx,
665 end_idx,
666 effective_delay,
667 |downloaded| {
668 progress.update(downloaded);
669 options.report_progress(&progress);
670 },
671 )
672 .await?;
673
674 if let Some(ref mut cp) = checkpoint {
676 cp.complete_param(HistoryParam::Pressure, pressure_values.clone());
677 cp.current_param = HistoryParamCheckpoint::Humidity;
678 cp.resume_index = start_idx;
679 options.report_checkpoint(cp);
680 }
681
682 progress = HistoryProgress::new(HistoryParam::Humidity, 4, 4, total_values);
683 options.report_progress(&progress);
684
685 let humidity_values = self
686 .download_param_history_with_progress(
687 HistoryParam::Humidity,
688 start_idx,
689 end_idx,
690 effective_delay,
691 |downloaded| {
692 progress.update(downloaded);
693 options.report_progress(&progress);
694 },
695 )
696 .await?;
697
698 if let Some(ref mut cp) = checkpoint {
700 cp.complete_param(HistoryParam::Humidity, humidity_values.clone());
701 options.report_checkpoint(cp);
702 }
703
704 let now = OffsetDateTime::now_utc();
706 let latest_reading_time = now - time::Duration::seconds(info.seconds_since_update as i64);
707
708 let mut records = Vec::new();
710 let count = co2_values.len();
711
712 for i in 0..count {
713 let readings_ago = (count - 1 - i) as i64;
715 let timestamp = latest_reading_time
716 - time::Duration::seconds(readings_ago * info.interval_seconds as i64);
717
718 let record = HistoryRecord {
719 timestamp,
720 co2: co2_values.get(i).copied().unwrap_or(0),
721 temperature: raw_to_temperature(temp_values.get(i).copied().unwrap_or(0)),
722 pressure: raw_to_pressure(pressure_values.get(i).copied().unwrap_or(0)),
723 humidity: humidity_values.get(i).copied().unwrap_or(0) as u8,
724 radon: None,
725 radiation_rate: None,
726 radiation_total: None,
727 };
728 records.push(record);
729 }
730
731 info!("Downloaded {} history records", records.len());
732 Ok(records)
733 }
734
735 async fn download_radon_history_internal(
737 &self,
738 info: &HistoryInfo,
739 start_idx: u16,
740 end_idx: u16,
741 options: &HistoryOptions,
742 effective_delay: Duration,
743 ) -> Result<Vec<HistoryRecord>> {
744 let total_values = (end_idx - start_idx + 1) as usize;
745
746 let device_id = self.address().to_string();
748 let mut checkpoint = if options.checkpoint_callback.is_some() {
749 Some(HistoryCheckpoint::new(
750 &device_id,
751 info.total_readings,
752 HistoryParam::Radon,
753 ))
754 } else {
755 None
756 };
757
758 let mut progress = HistoryProgress::new(HistoryParam::Radon, 1, 4, total_values);
760 options.report_progress(&progress);
761
762 let radon_values = self
763 .download_param_history_u32_with_progress(
764 HistoryParam::Radon,
765 start_idx,
766 end_idx,
767 effective_delay,
768 |downloaded| {
769 progress.update(downloaded);
770 options.report_progress(&progress);
771 },
772 )
773 .await?;
774
775 if let Some(ref mut cp) = checkpoint {
777 cp.complete_radon_param(radon_values.clone());
778 cp.current_param = HistoryParamCheckpoint::Temperature;
779 cp.resume_index = start_idx;
780 options.report_checkpoint(cp);
781 }
782
783 progress = HistoryProgress::new(HistoryParam::Temperature, 2, 4, total_values);
784 options.report_progress(&progress);
785
786 let temp_values = self
787 .download_param_history_with_progress(
788 HistoryParam::Temperature,
789 start_idx,
790 end_idx,
791 effective_delay,
792 |downloaded| {
793 progress.update(downloaded);
794 options.report_progress(&progress);
795 },
796 )
797 .await?;
798
799 if let Some(ref mut cp) = checkpoint {
801 cp.complete_param(HistoryParam::Temperature, temp_values.clone());
802 cp.current_param = HistoryParamCheckpoint::Pressure;
803 options.report_checkpoint(cp);
804 }
805
806 progress = HistoryProgress::new(HistoryParam::Pressure, 3, 4, total_values);
807 options.report_progress(&progress);
808
809 let pressure_values = self
810 .download_param_history_with_progress(
811 HistoryParam::Pressure,
812 start_idx,
813 end_idx,
814 effective_delay,
815 |downloaded| {
816 progress.update(downloaded);
817 options.report_progress(&progress);
818 },
819 )
820 .await?;
821
822 if let Some(ref mut cp) = checkpoint {
824 cp.complete_param(HistoryParam::Pressure, pressure_values.clone());
825 cp.current_param = HistoryParamCheckpoint::Humidity2;
826 options.report_checkpoint(cp);
827 }
828
829 progress = HistoryProgress::new(HistoryParam::Humidity2, 4, 4, total_values);
831 options.report_progress(&progress);
832
833 let humidity_values = self
834 .download_param_history_with_progress(
835 HistoryParam::Humidity2,
836 start_idx,
837 end_idx,
838 effective_delay,
839 |downloaded| {
840 progress.update(downloaded);
841 options.report_progress(&progress);
842 },
843 )
844 .await?;
845
846 if let Some(ref mut cp) = checkpoint {
848 cp.complete_param(HistoryParam::Humidity2, humidity_values.clone());
849 options.report_checkpoint(cp);
850 }
851
852 let now = OffsetDateTime::now_utc();
854 let latest_reading_time = now - time::Duration::seconds(info.seconds_since_update as i64);
855
856 let mut records = Vec::new();
858 let count = radon_values.len();
859
860 for i in 0..count {
861 let readings_ago = (count - 1 - i) as i64;
863 let timestamp = latest_reading_time
864 - time::Duration::seconds(readings_ago * info.interval_seconds as i64);
865
866 let humidity_raw = humidity_values.get(i).copied().unwrap_or(0);
868 let humidity = (humidity_raw / 10).min(100) as u8;
869
870 let record = HistoryRecord {
871 timestamp,
872 co2: 0, temperature: raw_to_temperature(temp_values.get(i).copied().unwrap_or(0)),
874 pressure: raw_to_pressure(pressure_values.get(i).copied().unwrap_or(0)),
875 humidity,
876 radon: Some(radon_values.get(i).copied().unwrap_or(0)),
877 radiation_rate: None,
878 radiation_total: None,
879 };
880 records.push(record);
881 }
882
883 info!("Downloaded {} radon history records", records.len());
884 Ok(records)
885 }
886
887 #[allow(clippy::too_many_arguments)]
894 async fn download_param_history_generic_with_progress<T, F>(
895 &self,
896 param: HistoryParam,
897 start_idx: u16,
898 end_idx: u16,
899 read_delay: Duration,
900 value_parser: impl Fn(&[u8], usize) -> Option<T>,
901 value_size: usize,
902 mut on_progress: F,
903 ) -> Result<Vec<T>>
904 where
905 T: Default + Clone,
906 F: FnMut(usize),
907 {
908 debug!(
909 "Downloading {:?} history from {} to {} (value_size={})",
910 param, start_idx, end_idx, value_size
911 );
912
913 let mut values: BTreeMap<u16, T> = BTreeMap::new();
914 let mut current_idx = start_idx;
915
916 while current_idx <= end_idx {
917 let cmd = [
919 HISTORY_V2_REQUEST,
920 param as u8,
921 (current_idx & 0xFF) as u8,
922 ((current_idx >> 8) & 0xFF) as u8,
923 ];
924
925 self.write_characteristic(COMMAND, &cmd).await?;
926 sleep(read_delay).await;
927
928 let response = self.read_characteristic(HISTORY_V2).await?;
930
931 if response.len() < 10 {
940 warn!(
941 "Invalid history response: too short ({} bytes)",
942 response.len()
943 );
944 break;
945 }
946
947 let resp_param = response[0];
948 if resp_param != param as u8 {
949 warn!("Unexpected parameter in response: {}", resp_param);
950 sleep(read_delay).await;
952 continue;
953 }
954
955 let resp_start = u16::from_le_bytes([response[7], response[8]]);
957 let resp_count = response[9] as usize;
958
959 debug!(
960 "History response: param={}, start={}, count={}",
961 resp_param, resp_start, resp_count
962 );
963
964 if resp_count == 0 {
966 debug!("Reached end of history (count=0)");
967 break;
968 }
969
970 let data = &response[10..];
972 let num_values = (data.len() / value_size).min(resp_count);
973
974 for i in 0..num_values {
975 let idx = resp_start + i as u16;
976 if idx > end_idx {
977 break;
978 }
979 if let Some(value) = value_parser(data, i) {
980 values.insert(idx, value);
981 }
982 }
983
984 current_idx = resp_start + num_values as u16;
985 debug!(
986 "Downloaded {} values, next index: {}",
987 num_values, current_idx
988 );
989
990 on_progress(values.len());
992
993 if (resp_start as usize + resp_count) >= end_idx as usize {
995 debug!("Reached end of requested range");
996 break;
997 }
998 }
999
1000 Ok(values.into_values().collect())
1002 }
1003
1004 async fn download_param_history_with_progress<F>(
1006 &self,
1007 param: HistoryParam,
1008 start_idx: u16,
1009 end_idx: u16,
1010 read_delay: Duration,
1011 on_progress: F,
1012 ) -> Result<Vec<u16>>
1013 where
1014 F: FnMut(usize),
1015 {
1016 let value_size = if param == HistoryParam::Humidity {
1017 1
1018 } else {
1019 2
1020 };
1021
1022 self.download_param_history_generic_with_progress(
1023 param,
1024 start_idx,
1025 end_idx,
1026 read_delay,
1027 |data, i| {
1028 if param == HistoryParam::Humidity {
1029 data.get(i).map(|&b| b as u16)
1030 } else {
1031 let offset = i * 2;
1032 if offset + 1 < data.len() {
1033 Some(u16::from_le_bytes([data[offset], data[offset + 1]]))
1034 } else {
1035 None
1036 }
1037 }
1038 },
1039 value_size,
1040 on_progress,
1041 )
1042 .await
1043 }
1044
1045 async fn download_param_history_u32_with_progress<F>(
1047 &self,
1048 param: HistoryParam,
1049 start_idx: u16,
1050 end_idx: u16,
1051 read_delay: Duration,
1052 on_progress: F,
1053 ) -> Result<Vec<u32>>
1054 where
1055 F: FnMut(usize),
1056 {
1057 self.download_param_history_generic_with_progress(
1058 param,
1059 start_idx,
1060 end_idx,
1061 read_delay,
1062 |data, i| {
1063 let offset = i * 4;
1064 if offset + 3 < data.len() {
1065 Some(u32::from_le_bytes([
1066 data[offset],
1067 data[offset + 1],
1068 data[offset + 2],
1069 data[offset + 3],
1070 ]))
1071 } else {
1072 None
1073 }
1074 },
1075 4,
1076 on_progress,
1077 )
1078 .await
1079 }
1080
1081 pub async fn download_history_v1(&self) -> Result<Vec<HistoryRecord>> {
1086 use crate::uuid::HISTORY_V1;
1087 use tokio::sync::mpsc;
1088
1089 let info = self.get_history_info().await?;
1090 info!(
1091 "V1 download: {} readings, interval {}s",
1092 info.total_readings, info.interval_seconds
1093 );
1094
1095 if info.total_readings == 0 {
1096 return Ok(Vec::new());
1097 }
1098
1099 let (tx, mut rx) = mpsc::channel::<Vec<u8>>(256);
1101
1102 self.subscribe_to_notifications(HISTORY_V1, move |data| {
1104 if let Err(e) = tx.try_send(data.to_vec()) {
1105 warn!(
1106 "V1 history notification channel full or closed, data may be lost: {}",
1107 e
1108 );
1109 }
1110 })
1111 .await?;
1112
1113 let mut co2_values = Vec::new();
1115 let mut temp_values = Vec::new();
1116 let mut pressure_values = Vec::new();
1117 let mut humidity_values = Vec::new();
1118
1119 for param in [
1120 HistoryParam::Co2,
1121 HistoryParam::Temperature,
1122 HistoryParam::Pressure,
1123 HistoryParam::Humidity,
1124 ] {
1125 let cmd = [
1127 HISTORY_V1_REQUEST,
1128 param as u8,
1129 0x01,
1130 0x00,
1131 (info.total_readings & 0xFF) as u8,
1132 ((info.total_readings >> 8) & 0xFF) as u8,
1133 ];
1134
1135 self.write_characteristic(COMMAND, &cmd).await?;
1136
1137 let mut values = Vec::new();
1139 let expected = info.total_readings as usize;
1140
1141 let mut consecutive_timeouts = 0;
1142 const MAX_CONSECUTIVE_TIMEOUTS: u32 = 3;
1143
1144 while values.len() < expected {
1145 match tokio::time::timeout(Duration::from_secs(5), rx.recv()).await {
1146 Ok(Some(data)) => {
1147 consecutive_timeouts = 0; if data.len() >= 3 {
1150 let resp_param = data[0];
1151 if resp_param == param as u8 {
1152 let mut buf = &data[3..];
1153 while buf.len() >= 2 && values.len() < expected {
1154 values.push(buf.get_u16_le());
1155 }
1156 }
1157 }
1158 }
1159 Ok(None) => {
1160 warn!(
1161 "V1 history channel closed for {:?}: got {}/{} values",
1162 param,
1163 values.len(),
1164 expected
1165 );
1166 break;
1167 }
1168 Err(_) => {
1169 consecutive_timeouts += 1;
1170 warn!(
1171 "Timeout waiting for V1 history notification ({}/{}), {:?}: {}/{} values",
1172 consecutive_timeouts,
1173 MAX_CONSECUTIVE_TIMEOUTS,
1174 param,
1175 values.len(),
1176 expected
1177 );
1178 if consecutive_timeouts >= MAX_CONSECUTIVE_TIMEOUTS {
1179 warn!(
1180 "Too many consecutive timeouts for {:?}, proceeding with partial data",
1181 param
1182 );
1183 break;
1184 }
1185 }
1186 }
1187 }
1188
1189 if values.len() < expected {
1191 warn!(
1192 "V1 history download incomplete for {:?}: got {}/{} values ({:.1}%)",
1193 param,
1194 values.len(),
1195 expected,
1196 (values.len() as f64 / expected as f64) * 100.0
1197 );
1198 }
1199
1200 match param {
1201 HistoryParam::Co2 => co2_values = values,
1202 HistoryParam::Temperature => temp_values = values,
1203 HistoryParam::Pressure => pressure_values = values,
1204 HistoryParam::Humidity => humidity_values = values,
1205 HistoryParam::Humidity2 | HistoryParam::Radon => {}
1207 }
1208 }
1209
1210 self.unsubscribe_from_notifications(HISTORY_V1).await?;
1212
1213 let now = OffsetDateTime::now_utc();
1215 let latest_reading_time = now - time::Duration::seconds(info.seconds_since_update as i64);
1216
1217 let mut records = Vec::new();
1218 let count = co2_values.len();
1219
1220 for i in 0..count {
1221 let readings_ago = (count - 1 - i) as i64;
1222 let timestamp = latest_reading_time
1223 - time::Duration::seconds(readings_ago * info.interval_seconds as i64);
1224
1225 let record = HistoryRecord {
1226 timestamp,
1227 co2: co2_values.get(i).copied().unwrap_or(0),
1228 temperature: raw_to_temperature(temp_values.get(i).copied().unwrap_or(0)),
1229 pressure: raw_to_pressure(pressure_values.get(i).copied().unwrap_or(0)),
1230 humidity: humidity_values.get(i).copied().unwrap_or(0) as u8,
1231 radon: None,
1232 radiation_rate: None,
1233 radiation_total: None,
1234 };
1235 records.push(record);
1236 }
1237
1238 info!("V1 download complete: {} records", records.len());
1239 Ok(records)
1240 }
1241}
1242
1243pub fn raw_to_temperature(raw: u16) -> f32 {
1245 raw as f32 / 20.0
1246}
1247
1248pub fn raw_to_pressure(raw: u16) -> f32 {
1250 raw as f32 / 10.0
1251}
1252
1253#[cfg(test)]
1257mod tests {
1258 use super::*;
1259
1260 #[test]
1263 fn test_raw_to_temperature_typical_values() {
1264 assert!((raw_to_temperature(450) - 22.5).abs() < 0.001);
1266
1267 assert!((raw_to_temperature(400) - 20.0).abs() < 0.001);
1269
1270 assert!((raw_to_temperature(500) - 25.0).abs() < 0.001);
1272 }
1273
1274 #[test]
1275 fn test_raw_to_temperature_edge_cases() {
1276 assert!((raw_to_temperature(0) - 0.0).abs() < 0.001);
1278
1279 assert!((raw_to_temperature(1000) - 50.0).abs() < 0.001);
1284
1285 assert!((raw_to_temperature(u16::MAX) - 3276.75).abs() < 0.01);
1287 }
1288
1289 #[test]
1290 fn test_raw_to_temperature_precision() {
1291 assert!((raw_to_temperature(451) - 22.55).abs() < 0.001);
1294
1295 assert!((raw_to_temperature(441) - 22.05).abs() < 0.001);
1297 }
1298
1299 #[test]
1302 fn test_raw_to_pressure_typical_values() {
1303 assert!((raw_to_pressure(10132) - 1013.2).abs() < 0.01);
1305
1306 assert!((raw_to_pressure(10000) - 1000.0).abs() < 0.01);
1308
1309 assert!((raw_to_pressure(10500) - 1050.0).abs() < 0.01);
1311 }
1312
1313 #[test]
1314 fn test_raw_to_pressure_edge_cases() {
1315 assert!((raw_to_pressure(0) - 0.0).abs() < 0.01);
1317
1318 assert!((raw_to_pressure(9500) - 950.0).abs() < 0.01);
1320
1321 assert!((raw_to_pressure(11000) - 1100.0).abs() < 0.01);
1323
1324 assert!((raw_to_pressure(u16::MAX) - 6553.5).abs() < 0.1);
1326 }
1327
1328 #[test]
1331 fn test_history_param_values() {
1332 assert_eq!(HistoryParam::Temperature as u8, 1);
1333 assert_eq!(HistoryParam::Humidity as u8, 2);
1334 assert_eq!(HistoryParam::Pressure as u8, 3);
1335 assert_eq!(HistoryParam::Co2 as u8, 4);
1336 }
1337
1338 #[test]
1339 fn test_history_param_debug() {
1340 assert_eq!(format!("{:?}", HistoryParam::Temperature), "Temperature");
1341 assert_eq!(format!("{:?}", HistoryParam::Co2), "Co2");
1342 }
1343
1344 #[test]
1347 fn test_history_options_default() {
1348 let options = HistoryOptions::default();
1349
1350 assert!(options.start_index.is_none());
1351 assert!(options.end_index.is_none());
1352 assert_eq!(options.read_delay, Duration::from_millis(50));
1353 }
1354
1355 #[test]
1356 fn test_history_options_custom() {
1357 let options = HistoryOptions::new()
1358 .start_index(10)
1359 .end_index(100)
1360 .read_delay(Duration::from_millis(100));
1361
1362 assert_eq!(options.start_index, Some(10));
1363 assert_eq!(options.end_index, Some(100));
1364 assert_eq!(options.read_delay, Duration::from_millis(100));
1365 }
1366
1367 #[test]
1368 fn test_history_options_with_progress() {
1369 use std::sync::Arc;
1370 use std::sync::atomic::{AtomicUsize, Ordering};
1371
1372 let call_count = Arc::new(AtomicUsize::new(0));
1373 let call_count_clone = Arc::clone(&call_count);
1374
1375 let options = HistoryOptions::new().with_progress(move |_progress| {
1376 call_count_clone.fetch_add(1, Ordering::SeqCst);
1377 });
1378
1379 assert!(options.progress_callback.is_some());
1380
1381 let progress = HistoryProgress::new(HistoryParam::Co2, 1, 4, 100);
1383 options.report_progress(&progress);
1384 assert_eq!(call_count.load(Ordering::SeqCst), 1);
1385 }
1386
1387 #[test]
1390 fn test_history_info_creation() {
1391 let info = HistoryInfo {
1392 total_readings: 1000,
1393 interval_seconds: 300,
1394 seconds_since_update: 120,
1395 };
1396
1397 assert_eq!(info.total_readings, 1000);
1398 assert_eq!(info.interval_seconds, 300);
1399 assert_eq!(info.seconds_since_update, 120);
1400 }
1401
1402 #[test]
1403 fn test_history_info_debug() {
1404 let info = HistoryInfo {
1405 total_readings: 500,
1406 interval_seconds: 60,
1407 seconds_since_update: 30,
1408 };
1409
1410 let debug_str = format!("{:?}", info);
1411 assert!(debug_str.contains("total_readings"));
1412 assert!(debug_str.contains("500"));
1413 }
1414}