1use std::collections::BTreeMap;
43use std::sync::Arc;
44use std::time::Duration;
45
46use bytes::Buf;
47use time::OffsetDateTime;
48use tokio::time::sleep;
49use tracing::{debug, info, warn};
50
51use crate::commands::{HISTORY_V1_REQUEST, HISTORY_V2_REQUEST};
52use crate::device::Device;
53use crate::error::{Error, Result};
54use crate::uuid::{COMMAND, HISTORY_V2, READ_INTERVAL, SECONDS_SINCE_UPDATE, TOTAL_READINGS};
55use aranet_types::HistoryRecord;
56
57#[derive(Debug, Clone)]
59pub struct HistoryProgress {
60 pub current_param: HistoryParam,
62 pub param_index: usize,
64 pub total_params: usize,
66 pub values_downloaded: usize,
68 pub total_values: usize,
70 pub overall_progress: f32,
72}
73
74impl HistoryProgress {
75 pub fn new(
77 param: HistoryParam,
78 param_idx: usize,
79 total_params: usize,
80 total_values: usize,
81 ) -> Self {
82 Self {
83 current_param: param,
84 param_index: param_idx,
85 total_params,
86 values_downloaded: 0,
87 total_values,
88 overall_progress: 0.0,
89 }
90 }
91
92 fn update(&mut self, values_downloaded: usize) {
93 self.values_downloaded = values_downloaded;
94 let param_progress = if self.total_values > 0 {
95 values_downloaded as f32 / self.total_values as f32
96 } else {
97 1.0
98 };
99 if self.total_params == 0 {
101 self.overall_progress = 1.0;
102 return;
103 }
104 let base_progress = (self.param_index - 1) as f32 / self.total_params as f32;
105 let param_contribution = param_progress / self.total_params as f32;
106 self.overall_progress = base_progress + param_contribution;
107 }
108}
109
110pub type ProgressCallback = Arc<dyn Fn(HistoryProgress) + Send + Sync>;
112
113pub type CheckpointCallback = Arc<dyn Fn(HistoryCheckpoint) + Send + Sync>;
115
116#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
121pub struct HistoryCheckpoint {
122 pub device_id: String,
124 pub current_param: HistoryParamCheckpoint,
126 pub resume_index: u16,
128 pub total_readings: u16,
130 pub completed_params: Vec<HistoryParamCheckpoint>,
132 pub created_at: time::OffsetDateTime,
134 pub downloaded_data: Option<PartialHistoryData>,
136}
137
138#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
140pub enum HistoryParamCheckpoint {
141 Temperature,
142 Humidity,
143 Pressure,
144 Co2,
145 Humidity2,
146 Radon,
147}
148
149impl From<HistoryParam> for HistoryParamCheckpoint {
150 fn from(param: HistoryParam) -> Self {
151 match param {
152 HistoryParam::Temperature => HistoryParamCheckpoint::Temperature,
153 HistoryParam::Humidity => HistoryParamCheckpoint::Humidity,
154 HistoryParam::Pressure => HistoryParamCheckpoint::Pressure,
155 HistoryParam::Co2 => HistoryParamCheckpoint::Co2,
156 HistoryParam::Humidity2 => HistoryParamCheckpoint::Humidity2,
157 HistoryParam::Radon => HistoryParamCheckpoint::Radon,
158 }
159 }
160}
161
162impl From<HistoryParamCheckpoint> for HistoryParam {
163 fn from(param: HistoryParamCheckpoint) -> Self {
164 match param {
165 HistoryParamCheckpoint::Temperature => HistoryParam::Temperature,
166 HistoryParamCheckpoint::Humidity => HistoryParam::Humidity,
167 HistoryParamCheckpoint::Pressure => HistoryParam::Pressure,
168 HistoryParamCheckpoint::Co2 => HistoryParam::Co2,
169 HistoryParamCheckpoint::Humidity2 => HistoryParam::Humidity2,
170 HistoryParamCheckpoint::Radon => HistoryParam::Radon,
171 }
172 }
173}
174
175#[derive(Debug, Clone, Copy)]
176struct U16HistoryStep {
177 param: HistoryParam,
178 step: usize,
179 total_steps: usize,
180 next_param: Option<HistoryParamCheckpoint>,
181}
182
183#[derive(Debug, Clone, Default, serde::Serialize, serde::Deserialize)]
185pub struct PartialHistoryData {
186 pub co2_values: Vec<u16>,
187 pub temp_values: Vec<u16>,
188 pub pressure_values: Vec<u16>,
189 pub humidity_values: Vec<u16>,
190 pub radon_values: Vec<u32>,
191}
192
193impl HistoryCheckpoint {
194 pub fn new(device_id: &str, total_readings: u16, first_param: HistoryParam) -> Self {
196 Self {
197 device_id: device_id.to_string(),
198 current_param: first_param.into(),
199 resume_index: 1,
200 total_readings,
201 completed_params: Vec::new(),
202 created_at: time::OffsetDateTime::now_utc(),
203 downloaded_data: Some(PartialHistoryData::default()),
204 }
205 }
206
207 pub fn is_valid(&self, current_total_readings: u16) -> bool {
209 self.total_readings == current_total_readings
212 }
213
214 pub fn complete_param(&mut self, param: HistoryParam, values: Vec<u16>) {
216 self.completed_params.push(param.into());
217 if let Some(ref mut data) = self.downloaded_data {
218 match param {
219 HistoryParam::Co2 => data.co2_values = values,
220 HistoryParam::Temperature => data.temp_values = values,
221 HistoryParam::Pressure => data.pressure_values = values,
222 HistoryParam::Humidity | HistoryParam::Humidity2 => data.humidity_values = values,
223 HistoryParam::Radon => {} }
225 }
226 }
227
228 pub fn complete_radon_param(&mut self, values: Vec<u32>) {
230 self.completed_params.push(HistoryParamCheckpoint::Radon);
231 if let Some(ref mut data) = self.downloaded_data {
232 data.radon_values = values;
233 }
234 }
235}
236
237#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239#[repr(u8)]
240pub enum HistoryParam {
241 Temperature = 1,
242 Humidity = 2,
243 Pressure = 3,
244 Co2 = 4,
245 Humidity2 = 5,
247 Radon = 10,
249}
250
251#[derive(Clone)]
284pub struct HistoryOptions {
285 pub start_index: Option<u16>,
287 pub end_index: Option<u16>,
289 pub read_delay: Duration,
291 pub progress_callback: Option<ProgressCallback>,
293 pub use_adaptive_delay: bool,
295 pub checkpoint_callback: Option<CheckpointCallback>,
298 pub checkpoint_interval: usize,
300}
301
302impl std::fmt::Debug for HistoryOptions {
303 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
304 f.debug_struct("HistoryOptions")
305 .field("start_index", &self.start_index)
306 .field("end_index", &self.end_index)
307 .field("read_delay", &self.read_delay)
308 .field("progress_callback", &self.progress_callback.is_some())
309 .field("use_adaptive_delay", &self.use_adaptive_delay)
310 .field("checkpoint_callback", &self.checkpoint_callback.is_some())
311 .field("checkpoint_interval", &self.checkpoint_interval)
312 .finish()
313 }
314}
315
316impl Default for HistoryOptions {
317 fn default() -> Self {
318 Self {
319 start_index: None,
320 end_index: None,
321 read_delay: Duration::from_millis(50),
322 progress_callback: None,
323 use_adaptive_delay: false,
324 checkpoint_callback: None,
325 checkpoint_interval: 100, }
327 }
328}
329
330impl HistoryOptions {
331 #[must_use]
333 pub fn new() -> Self {
334 Self::default()
335 }
336
337 #[must_use]
339 pub fn start_index(mut self, index: u16) -> Self {
340 self.start_index = Some(index);
341 self
342 }
343
344 #[must_use]
346 pub fn end_index(mut self, index: u16) -> Self {
347 self.end_index = Some(index);
348 self
349 }
350
351 #[must_use]
353 pub fn read_delay(mut self, delay: Duration) -> Self {
354 self.read_delay = delay;
355 self
356 }
357
358 #[must_use]
360 pub fn with_progress<F>(mut self, callback: F) -> Self
361 where
362 F: Fn(HistoryProgress) + Send + Sync + 'static,
363 {
364 self.progress_callback = Some(Arc::new(callback));
365 self
366 }
367
368 pub fn report_progress(&self, progress: &HistoryProgress) {
370 if let Some(cb) = &self.progress_callback {
371 cb(progress.clone());
372 }
373 }
374
375 #[must_use]
384 pub fn adaptive_delay(mut self, enable: bool) -> Self {
385 self.use_adaptive_delay = enable;
386 self
387 }
388
389 #[must_use]
394 pub fn with_checkpoint<F>(mut self, callback: F) -> Self
395 where
396 F: Fn(HistoryCheckpoint) + Send + Sync + 'static,
397 {
398 self.checkpoint_callback = Some(Arc::new(callback));
399 self
400 }
401
402 #[must_use]
406 pub fn checkpoint_interval(mut self, interval: usize) -> Self {
407 self.checkpoint_interval = interval;
408 self
409 }
410
411 #[must_use]
415 pub fn resume_from(mut self, checkpoint: &HistoryCheckpoint) -> Self {
416 self.start_index = Some(checkpoint.resume_index);
417 self
418 }
419
420 pub fn report_checkpoint(&self, checkpoint: &HistoryCheckpoint) {
422 if let Some(cb) = &self.checkpoint_callback {
423 cb(checkpoint.clone());
424 }
425 }
426
427 pub fn effective_read_delay(
429 &self,
430 signal_quality: Option<crate::device::SignalQuality>,
431 ) -> Duration {
432 if self.use_adaptive_delay
433 && let Some(quality) = signal_quality
434 {
435 return quality.recommended_read_delay();
436 }
437 self.read_delay
438 }
439}
440
441#[derive(Debug, Clone)]
443pub struct HistoryInfo {
444 pub total_readings: u16,
446 pub interval_seconds: u16,
448 pub seconds_since_update: u16,
450}
451
452impl Device {
453 pub async fn get_history_info(&self) -> Result<HistoryInfo> {
455 let total_data = self.read_characteristic(TOTAL_READINGS).await?;
457 let total_readings = if total_data.len() >= 2 {
458 u16::from_le_bytes([total_data[0], total_data[1]])
459 } else {
460 return Err(Error::InvalidData(
461 "Invalid total readings data".to_string(),
462 ));
463 };
464
465 let interval_data = self.read_characteristic(READ_INTERVAL).await?;
467 let interval_seconds = if interval_data.len() >= 2 {
468 u16::from_le_bytes([interval_data[0], interval_data[1]])
469 } else {
470 return Err(Error::InvalidData("Invalid interval data".to_string()));
471 };
472
473 let age_data = self.read_characteristic(SECONDS_SINCE_UPDATE).await?;
475 let seconds_since_update = if age_data.len() >= 2 {
476 u16::from_le_bytes([age_data[0], age_data[1]])
477 } else {
478 0
479 };
480
481 Ok(HistoryInfo {
482 total_readings,
483 interval_seconds,
484 seconds_since_update,
485 })
486 }
487
488 pub async fn download_history(&self) -> Result<Vec<HistoryRecord>> {
490 self.download_history_with_options(HistoryOptions::default())
491 .await
492 }
493
494 pub async fn download_history_with_options(
516 &self,
517 options: HistoryOptions,
518 ) -> Result<Vec<HistoryRecord>> {
519 use aranet_types::DeviceType;
520
521 let info = self.get_history_info().await?;
522 info!(
523 "Device has {} readings, interval {}s, last update {}s ago",
524 info.total_readings, info.interval_seconds, info.seconds_since_update
525 );
526
527 if info.total_readings == 0 {
528 return Ok(Vec::new());
529 }
530
531 let start_idx = options.start_index.unwrap_or(1);
532 let end_idx = options.end_index.unwrap_or(info.total_readings);
533
534 if start_idx > end_idx {
535 return Err(Error::InvalidConfig(format!(
536 "start_index ({start_idx}) must be <= end_index ({end_idx})"
537 )));
538 }
539 if start_idx == 0 {
540 return Err(Error::InvalidConfig(
541 "start_index must be >= 1 (indices are 1-based)".into(),
542 ));
543 }
544
545 let signal_quality = if options.use_adaptive_delay {
547 match self.signal_quality().await {
548 Some(quality) => {
549 info!(
550 "Signal quality: {:?} - using {} ms read delay",
551 quality,
552 quality.recommended_read_delay().as_millis()
553 );
554 Some(quality)
555 }
556 None => {
557 debug!("Could not read signal quality, using default delay");
558 None
559 }
560 }
561 } else {
562 None
563 };
564
565 let effective_delay = options.effective_read_delay(signal_quality);
567
568 match self.device_type() {
570 Some(DeviceType::AranetRadiation) => {
571 Err(Error::Unsupported(
575 "History download is not available for Aranet Radiation devices. \
576 The radiation history protocol is not documented. \
577 Use read_current() for current radiation readings."
578 .to_string(),
579 ))
580 }
581 Some(DeviceType::AranetRadon) => {
582 self.download_radon_history_internal(
584 &info,
585 start_idx,
586 end_idx,
587 &options,
588 effective_delay,
589 )
590 .await
591 }
592 Some(DeviceType::Aranet2) => {
593 self.download_aranet2_history_internal(
595 &info,
596 start_idx,
597 end_idx,
598 &options,
599 effective_delay,
600 )
601 .await
602 }
603 _ => {
604 self.download_aranet4_history_internal(
606 &info,
607 start_idx,
608 end_idx,
609 &options,
610 effective_delay,
611 )
612 .await
613 }
614 }
615 }
616
617 async fn download_u16_param_with_checkpoint(
622 &self,
623 step_info: U16HistoryStep,
624 start_idx: u16,
625 end_idx: u16,
626 effective_delay: Duration,
627 options: &HistoryOptions,
628 checkpoint: &mut Option<HistoryCheckpoint>,
629 ) -> Result<Vec<u16>> {
630 let total_values = (end_idx - start_idx + 1) as usize;
631 let mut progress = HistoryProgress::new(
632 step_info.param,
633 step_info.step,
634 step_info.total_steps,
635 total_values,
636 );
637 options.report_progress(&progress);
638
639 let values = self
640 .download_param_history_with_progress(
641 step_info.param,
642 start_idx,
643 end_idx,
644 effective_delay,
645 |downloaded| {
646 progress.update(downloaded);
647 options.report_progress(&progress);
648 },
649 )
650 .await?;
651
652 if let Some(cp) = checkpoint {
653 cp.complete_param(step_info.param, values.clone());
654 if let Some(next) = step_info.next_param {
655 cp.current_param = next;
656 cp.resume_index = start_idx;
657 }
658 options.report_checkpoint(cp);
659 }
660
661 Ok(values)
662 }
663
664 async fn download_aranet4_history_internal(
666 &self,
667 info: &HistoryInfo,
668 start_idx: u16,
669 end_idx: u16,
670 options: &HistoryOptions,
671 effective_delay: Duration,
672 ) -> Result<Vec<HistoryRecord>> {
673 if start_idx > end_idx {
674 return Ok(Vec::new());
675 }
676
677 let device_id = self.address().to_string();
678 let mut checkpoint = if options.checkpoint_callback.is_some() {
679 Some(HistoryCheckpoint::new(
680 &device_id,
681 info.total_readings,
682 HistoryParam::Co2,
683 ))
684 } else {
685 None
686 };
687
688 let co2_values = self
689 .download_u16_param_with_checkpoint(
690 U16HistoryStep {
691 param: HistoryParam::Co2,
692 step: 1,
693 total_steps: 4,
694 next_param: Some(HistoryParamCheckpoint::Temperature),
695 },
696 start_idx,
697 end_idx,
698 effective_delay,
699 options,
700 &mut checkpoint,
701 )
702 .await?;
703
704 let temp_values = self
705 .download_u16_param_with_checkpoint(
706 U16HistoryStep {
707 param: HistoryParam::Temperature,
708 step: 2,
709 total_steps: 4,
710 next_param: Some(HistoryParamCheckpoint::Pressure),
711 },
712 start_idx,
713 end_idx,
714 effective_delay,
715 options,
716 &mut checkpoint,
717 )
718 .await?;
719
720 let pressure_values = self
721 .download_u16_param_with_checkpoint(
722 U16HistoryStep {
723 param: HistoryParam::Pressure,
724 step: 3,
725 total_steps: 4,
726 next_param: Some(HistoryParamCheckpoint::Humidity),
727 },
728 start_idx,
729 end_idx,
730 effective_delay,
731 options,
732 &mut checkpoint,
733 )
734 .await?;
735
736 let humidity_values = self
737 .download_u16_param_with_checkpoint(
738 U16HistoryStep {
739 param: HistoryParam::Humidity,
740 step: 4,
741 total_steps: 4,
742 next_param: None,
743 },
744 start_idx,
745 end_idx,
746 effective_delay,
747 options,
748 &mut checkpoint,
749 )
750 .await?;
751
752 let records = build_history_records(
753 info,
754 &co2_values,
755 &temp_values,
756 &pressure_values,
757 &humidity_values,
758 &[],
759 );
760
761 info!("Downloaded {} history records", records.len());
762 Ok(records)
763 }
764
765 async fn download_aranet2_history_internal(
767 &self,
768 info: &HistoryInfo,
769 start_idx: u16,
770 end_idx: u16,
771 options: &HistoryOptions,
772 effective_delay: Duration,
773 ) -> Result<Vec<HistoryRecord>> {
774 if start_idx > end_idx {
775 return Ok(Vec::new());
776 }
777
778 let device_id = self.address().to_string();
779 let mut checkpoint = if options.checkpoint_callback.is_some() {
780 Some(HistoryCheckpoint::new(
781 &device_id,
782 info.total_readings,
783 HistoryParam::Temperature,
784 ))
785 } else {
786 None
787 };
788
789 let temp_values = self
790 .download_u16_param_with_checkpoint(
791 U16HistoryStep {
792 param: HistoryParam::Temperature,
793 step: 1,
794 total_steps: 2,
795 next_param: Some(HistoryParamCheckpoint::Humidity2),
796 },
797 start_idx,
798 end_idx,
799 effective_delay,
800 options,
801 &mut checkpoint,
802 )
803 .await?;
804
805 let humidity_values = self
806 .download_u16_param_with_checkpoint(
807 U16HistoryStep {
808 param: HistoryParam::Humidity2,
809 step: 2,
810 total_steps: 2,
811 next_param: None,
812 },
813 start_idx,
814 end_idx,
815 effective_delay,
816 options,
817 &mut checkpoint,
818 )
819 .await?;
820
821 let records = build_history_records(info, &[], &temp_values, &[], &humidity_values, &[]);
823
824 info!("Downloaded {} Aranet2 history records", records.len());
825 Ok(records)
826 }
827
828 async fn download_radon_history_internal(
830 &self,
831 info: &HistoryInfo,
832 start_idx: u16,
833 end_idx: u16,
834 options: &HistoryOptions,
835 effective_delay: Duration,
836 ) -> Result<Vec<HistoryRecord>> {
837 if start_idx > end_idx {
838 return Ok(Vec::new());
839 }
840 let total_values = (end_idx - start_idx + 1) as usize;
841
842 let device_id = self.address().to_string();
843 let mut checkpoint = if options.checkpoint_callback.is_some() {
844 Some(HistoryCheckpoint::new(
845 &device_id,
846 info.total_readings,
847 HistoryParam::Radon,
848 ))
849 } else {
850 None
851 };
852
853 let mut progress = HistoryProgress::new(HistoryParam::Radon, 1, 4, total_values);
855 options.report_progress(&progress);
856
857 let radon_values = self
858 .download_param_history_u32_with_progress(
859 HistoryParam::Radon,
860 start_idx,
861 end_idx,
862 effective_delay,
863 |downloaded| {
864 progress.update(downloaded);
865 options.report_progress(&progress);
866 },
867 )
868 .await?;
869
870 if let Some(ref mut cp) = checkpoint {
871 cp.complete_radon_param(radon_values.clone());
872 cp.current_param = HistoryParamCheckpoint::Temperature;
873 cp.resume_index = start_idx;
874 options.report_checkpoint(cp);
875 }
876
877 let temp_values = self
878 .download_u16_param_with_checkpoint(
879 U16HistoryStep {
880 param: HistoryParam::Temperature,
881 step: 2,
882 total_steps: 4,
883 next_param: Some(HistoryParamCheckpoint::Pressure),
884 },
885 start_idx,
886 end_idx,
887 effective_delay,
888 options,
889 &mut checkpoint,
890 )
891 .await?;
892
893 let pressure_values = self
894 .download_u16_param_with_checkpoint(
895 U16HistoryStep {
896 param: HistoryParam::Pressure,
897 step: 3,
898 total_steps: 4,
899 next_param: Some(HistoryParamCheckpoint::Humidity2),
900 },
901 start_idx,
902 end_idx,
903 effective_delay,
904 options,
905 &mut checkpoint,
906 )
907 .await?;
908
909 let humidity_values = self
910 .download_u16_param_with_checkpoint(
911 U16HistoryStep {
912 param: HistoryParam::Humidity2,
913 step: 4,
914 total_steps: 4,
915 next_param: None,
916 },
917 start_idx,
918 end_idx,
919 effective_delay,
920 options,
921 &mut checkpoint,
922 )
923 .await?;
924
925 let records = build_history_records(
926 info,
927 &[],
928 &temp_values,
929 &pressure_values,
930 &humidity_values,
931 &radon_values,
932 );
933
934 info!("Downloaded {} radon history records", records.len());
935 Ok(records)
936 }
937
938 #[allow(clippy::too_many_arguments)]
945 async fn download_param_history_generic_with_progress<T, F>(
946 &self,
947 param: HistoryParam,
948 start_idx: u16,
949 end_idx: u16,
950 read_delay: Duration,
951 value_parser: impl Fn(&[u8], usize) -> Option<T>,
952 value_size: usize,
953 mut on_progress: F,
954 ) -> Result<Vec<T>>
955 where
956 T: Default + Clone,
957 F: FnMut(usize),
958 {
959 debug!(
960 "Downloading {:?} history from {} to {} (value_size={})",
961 param, start_idx, end_idx, value_size
962 );
963
964 let mut values: BTreeMap<u16, T> = BTreeMap::new();
965 let mut current_idx = start_idx;
966 let mut consecutive_wrong_param = 0u32;
967 const MAX_WRONG_PARAM_RETRIES: u32 = 5;
968
969 while current_idx <= end_idx {
970 let cmd = [
972 HISTORY_V2_REQUEST,
973 param as u8,
974 (current_idx & 0xFF) as u8,
975 ((current_idx >> 8) & 0xFF) as u8,
976 ];
977
978 self.write_characteristic(COMMAND, &cmd).await?;
979 sleep(read_delay).await;
980
981 let response = self.read_characteristic(HISTORY_V2).await?;
983
984 if response.len() < 10 {
993 warn!(
994 "Invalid history response: too short ({} bytes)",
995 response.len()
996 );
997 break;
998 }
999
1000 let resp_param = response[0];
1001 if resp_param != param as u8 {
1002 consecutive_wrong_param += 1;
1003 warn!(
1004 "Unexpected parameter in response: {} (retry {}/{})",
1005 resp_param, consecutive_wrong_param, MAX_WRONG_PARAM_RETRIES
1006 );
1007 if consecutive_wrong_param >= MAX_WRONG_PARAM_RETRIES {
1008 warn!("Too many wrong parameter responses, aborting download");
1009 break;
1010 }
1011 sleep(read_delay).await;
1013 continue;
1014 }
1015 consecutive_wrong_param = 0;
1016
1017 let resp_start = u16::from_le_bytes([response[7], response[8]]);
1019 let resp_count = response[9] as usize;
1020
1021 debug!(
1022 "History response: param={}, start={}, count={}",
1023 resp_param, resp_start, resp_count
1024 );
1025
1026 if resp_count == 0 {
1028 debug!("Reached end of history (count=0)");
1029 break;
1030 }
1031
1032 let data = &response[10..];
1034 let num_values = (data.len() / value_size).min(resp_count);
1035
1036 for i in 0..num_values {
1037 let idx = resp_start + i as u16;
1038 if idx > end_idx {
1039 break;
1040 }
1041 if let Some(value) = value_parser(data, i) {
1042 values.insert(idx, value);
1043 }
1044 }
1045
1046 current_idx = resp_start + num_values as u16;
1047 debug!(
1048 "Downloaded {} values, next index: {}",
1049 num_values, current_idx
1050 );
1051
1052 on_progress(values.len());
1054
1055 if (resp_start as usize + resp_count) >= end_idx as usize {
1057 debug!("Reached end of requested range");
1058 break;
1059 }
1060 }
1061
1062 Ok(values.into_values().collect())
1064 }
1065
1066 async fn download_param_history_with_progress<F>(
1068 &self,
1069 param: HistoryParam,
1070 start_idx: u16,
1071 end_idx: u16,
1072 read_delay: Duration,
1073 on_progress: F,
1074 ) -> Result<Vec<u16>>
1075 where
1076 F: FnMut(usize),
1077 {
1078 let value_size = if param == HistoryParam::Humidity {
1079 1
1080 } else {
1081 2
1082 };
1083
1084 self.download_param_history_generic_with_progress(
1085 param,
1086 start_idx,
1087 end_idx,
1088 read_delay,
1089 |data, i| {
1090 if param == HistoryParam::Humidity {
1091 data.get(i).map(|&b| b as u16)
1092 } else {
1093 let offset = i * 2;
1094 if offset + 1 < data.len() {
1095 Some(u16::from_le_bytes([data[offset], data[offset + 1]]))
1096 } else {
1097 None
1098 }
1099 }
1100 },
1101 value_size,
1102 on_progress,
1103 )
1104 .await
1105 }
1106
1107 async fn download_param_history_u32_with_progress<F>(
1109 &self,
1110 param: HistoryParam,
1111 start_idx: u16,
1112 end_idx: u16,
1113 read_delay: Duration,
1114 on_progress: F,
1115 ) -> Result<Vec<u32>>
1116 where
1117 F: FnMut(usize),
1118 {
1119 self.download_param_history_generic_with_progress(
1120 param,
1121 start_idx,
1122 end_idx,
1123 read_delay,
1124 |data, i| {
1125 let offset = i * 4;
1126 if offset + 3 < data.len() {
1127 Some(u32::from_le_bytes([
1128 data[offset],
1129 data[offset + 1],
1130 data[offset + 2],
1131 data[offset + 3],
1132 ]))
1133 } else {
1134 None
1135 }
1136 },
1137 4,
1138 on_progress,
1139 )
1140 .await
1141 }
1142
1143 pub async fn download_history_v1(&self) -> Result<Vec<HistoryRecord>> {
1148 use crate::uuid::HISTORY_V1;
1149 use tokio::sync::mpsc;
1150
1151 let info = self.get_history_info().await?;
1152 info!(
1153 "V1 download: {} readings, interval {}s",
1154 info.total_readings, info.interval_seconds
1155 );
1156
1157 if info.total_readings == 0 {
1158 return Ok(Vec::new());
1159 }
1160
1161 let (tx, mut rx) = mpsc::channel::<Vec<u8>>(256);
1163
1164 self.subscribe_to_notifications(HISTORY_V1, move |data| {
1166 if let Err(e) = tx.try_send(data.to_vec()) {
1167 warn!(
1168 "V1 history notification channel full or closed, data may be lost: {}",
1169 e
1170 );
1171 }
1172 })
1173 .await?;
1174
1175 let mut co2_values = Vec::new();
1177 let mut temp_values = Vec::new();
1178 let mut pressure_values = Vec::new();
1179 let mut humidity_values = Vec::new();
1180
1181 for param in [
1182 HistoryParam::Co2,
1183 HistoryParam::Temperature,
1184 HistoryParam::Pressure,
1185 HistoryParam::Humidity,
1186 ] {
1187 let cmd = [
1189 HISTORY_V1_REQUEST,
1190 param as u8,
1191 0x01,
1192 0x00,
1193 (info.total_readings & 0xFF) as u8,
1194 ((info.total_readings >> 8) & 0xFF) as u8,
1195 ];
1196
1197 self.write_characteristic(COMMAND, &cmd).await?;
1198
1199 let mut values = Vec::new();
1201 let expected = info.total_readings as usize;
1202
1203 let mut consecutive_timeouts = 0;
1204 const MAX_CONSECUTIVE_TIMEOUTS: u32 = 3;
1205
1206 while values.len() < expected {
1207 match tokio::time::timeout(Duration::from_secs(5), rx.recv()).await {
1208 Ok(Some(data)) => {
1209 consecutive_timeouts = 0; if data.len() >= 3 {
1212 let resp_param = data[0];
1213 if resp_param == param as u8 {
1214 let mut buf = &data[3..];
1215 while buf.len() >= 2 && values.len() < expected {
1216 values.push(buf.get_u16_le());
1217 }
1218 }
1219 }
1220 }
1221 Ok(None) => {
1222 warn!(
1223 "V1 history channel closed for {:?}: got {}/{} values",
1224 param,
1225 values.len(),
1226 expected
1227 );
1228 break;
1229 }
1230 Err(_) => {
1231 consecutive_timeouts += 1;
1232 warn!(
1233 "Timeout waiting for V1 history notification ({}/{}), {:?}: {}/{} values",
1234 consecutive_timeouts,
1235 MAX_CONSECUTIVE_TIMEOUTS,
1236 param,
1237 values.len(),
1238 expected
1239 );
1240 if consecutive_timeouts >= MAX_CONSECUTIVE_TIMEOUTS {
1241 warn!(
1242 "Too many consecutive timeouts for {:?}, proceeding with partial data",
1243 param
1244 );
1245 break;
1246 }
1247 }
1248 }
1249 }
1250
1251 if values.len() < expected {
1253 warn!(
1254 "V1 history download incomplete for {:?}: got {}/{} values ({:.1}%)",
1255 param,
1256 values.len(),
1257 expected,
1258 (values.len() as f64 / expected as f64) * 100.0
1259 );
1260 }
1261
1262 match param {
1263 HistoryParam::Co2 => co2_values = values,
1264 HistoryParam::Temperature => temp_values = values,
1265 HistoryParam::Pressure => pressure_values = values,
1266 HistoryParam::Humidity => humidity_values = values,
1267 HistoryParam::Humidity2 | HistoryParam::Radon => {}
1269 }
1270 }
1271
1272 self.unsubscribe_from_notifications(HISTORY_V1).await?;
1274
1275 let now = OffsetDateTime::now_utc();
1277 let latest_reading_time = now - time::Duration::seconds(info.seconds_since_update as i64);
1278
1279 let mut records = Vec::new();
1280 let count = co2_values.len();
1281
1282 if temp_values.len() != count
1284 || pressure_values.len() != count
1285 || humidity_values.len() != count
1286 {
1287 warn!(
1288 "V1 history arrays have mismatched lengths: co2={}, temp={}, pressure={}, humidity={} — \
1289 records with missing values will use defaults",
1290 count,
1291 temp_values.len(),
1292 pressure_values.len(),
1293 humidity_values.len()
1294 );
1295 }
1296
1297 for i in 0..count {
1298 let readings_ago = (count - 1 - i) as i64;
1299 let timestamp = latest_reading_time
1300 - time::Duration::seconds(readings_ago * info.interval_seconds as i64);
1301
1302 let record = HistoryRecord {
1303 timestamp,
1304 co2: co2_values.get(i).copied().unwrap_or(0),
1305 temperature: raw_to_temperature(temp_values.get(i).copied().unwrap_or(0)),
1306 pressure: raw_to_pressure(pressure_values.get(i).copied().unwrap_or(0)),
1307 humidity: humidity_values.get(i).copied().unwrap_or(0) as u8,
1308 radon: None,
1309 radiation_rate: None,
1310 radiation_total: None,
1311 };
1312 records.push(record);
1313 }
1314
1315 info!("V1 download complete: {} records", records.len());
1316 Ok(records)
1317 }
1318}
1319
1320fn build_history_records(
1327 info: &HistoryInfo,
1328 co2_values: &[u16],
1329 temp_values: &[u16],
1330 pressure_values: &[u16],
1331 humidity_values: &[u16],
1332 radon_values: &[u32],
1333) -> Vec<HistoryRecord> {
1334 let is_radon = !radon_values.is_empty();
1335 let is_aranet2 = co2_values.is_empty() && radon_values.is_empty();
1336 let count = if is_radon {
1337 radon_values.len()
1338 } else if is_aranet2 {
1339 temp_values.len()
1340 } else {
1341 co2_values.len()
1342 };
1343
1344 let expected = count;
1346 if temp_values.len() != expected
1347 || pressure_values.len() != expected
1348 || humidity_values.len() != expected
1349 {
1350 warn!(
1351 "History arrays have mismatched lengths: primary={expected}, temp={}, pressure={}, humidity={} — \
1352 records with missing values will use defaults",
1353 temp_values.len(),
1354 pressure_values.len(),
1355 humidity_values.len()
1356 );
1357 }
1358
1359 let now = OffsetDateTime::now_utc();
1360 let latest_reading_time = now - time::Duration::seconds(info.seconds_since_update as i64);
1361
1362 (0..count)
1363 .map(|i| {
1364 let readings_ago = (count - 1 - i) as i64;
1365 let timestamp = latest_reading_time
1366 - time::Duration::seconds(readings_ago * info.interval_seconds as i64);
1367
1368 let humidity = if is_radon || is_aranet2 {
1369 let raw = humidity_values.get(i).copied().unwrap_or(0);
1371 (raw / 10).min(100) as u8
1372 } else {
1373 humidity_values.get(i).copied().unwrap_or(0) as u8
1374 };
1375
1376 HistoryRecord {
1377 timestamp,
1378 co2: if is_radon {
1379 0
1380 } else {
1381 co2_values.get(i).copied().unwrap_or(0)
1382 },
1383 temperature: raw_to_temperature(temp_values.get(i).copied().unwrap_or(0)),
1384 pressure: raw_to_pressure(pressure_values.get(i).copied().unwrap_or(0)),
1385 humidity,
1386 radon: if is_radon {
1387 Some(radon_values.get(i).copied().unwrap_or(0))
1388 } else {
1389 None
1390 },
1391 radiation_rate: None,
1392 radiation_total: None,
1393 }
1394 })
1395 .collect()
1396}
1397
1398pub fn raw_to_temperature(raw: u16) -> f32 {
1400 raw as f32 / 20.0
1401}
1402
1403pub fn raw_to_pressure(raw: u16) -> f32 {
1405 raw as f32 / 10.0
1406}
1407
1408#[cfg(test)]
1412mod tests {
1413 use super::*;
1414
1415 #[test]
1418 fn test_raw_to_temperature_typical_values() {
1419 assert!((raw_to_temperature(450) - 22.5).abs() < 0.001);
1421
1422 assert!((raw_to_temperature(400) - 20.0).abs() < 0.001);
1424
1425 assert!((raw_to_temperature(500) - 25.0).abs() < 0.001);
1427 }
1428
1429 #[test]
1430 fn test_raw_to_temperature_edge_cases() {
1431 assert!((raw_to_temperature(0) - 0.0).abs() < 0.001);
1433
1434 assert!((raw_to_temperature(1000) - 50.0).abs() < 0.001);
1439
1440 assert!((raw_to_temperature(u16::MAX) - 3276.75).abs() < 0.01);
1442 }
1443
1444 #[test]
1445 fn test_raw_to_temperature_precision() {
1446 assert!((raw_to_temperature(451) - 22.55).abs() < 0.001);
1449
1450 assert!((raw_to_temperature(441) - 22.05).abs() < 0.001);
1452 }
1453
1454 #[test]
1457 fn test_raw_to_pressure_typical_values() {
1458 assert!((raw_to_pressure(10132) - 1013.2).abs() < 0.01);
1460
1461 assert!((raw_to_pressure(10000) - 1000.0).abs() < 0.01);
1463
1464 assert!((raw_to_pressure(10500) - 1050.0).abs() < 0.01);
1466 }
1467
1468 #[test]
1469 fn test_raw_to_pressure_edge_cases() {
1470 assert!((raw_to_pressure(0) - 0.0).abs() < 0.01);
1472
1473 assert!((raw_to_pressure(9500) - 950.0).abs() < 0.01);
1475
1476 assert!((raw_to_pressure(11000) - 1100.0).abs() < 0.01);
1478
1479 assert!((raw_to_pressure(u16::MAX) - 6553.5).abs() < 0.1);
1481 }
1482
1483 #[test]
1486 fn test_history_param_values() {
1487 assert_eq!(HistoryParam::Temperature as u8, 1);
1488 assert_eq!(HistoryParam::Humidity as u8, 2);
1489 assert_eq!(HistoryParam::Pressure as u8, 3);
1490 assert_eq!(HistoryParam::Co2 as u8, 4);
1491 }
1492
1493 #[test]
1494 fn test_history_param_debug() {
1495 assert_eq!(format!("{:?}", HistoryParam::Temperature), "Temperature");
1496 assert_eq!(format!("{:?}", HistoryParam::Co2), "Co2");
1497 }
1498
1499 #[test]
1502 fn test_history_options_default() {
1503 let options = HistoryOptions::default();
1504
1505 assert!(options.start_index.is_none());
1506 assert!(options.end_index.is_none());
1507 assert_eq!(options.read_delay, Duration::from_millis(50));
1508 }
1509
1510 #[test]
1511 fn test_history_options_custom() {
1512 let options = HistoryOptions::new()
1513 .start_index(10)
1514 .end_index(100)
1515 .read_delay(Duration::from_millis(100));
1516
1517 assert_eq!(options.start_index, Some(10));
1518 assert_eq!(options.end_index, Some(100));
1519 assert_eq!(options.read_delay, Duration::from_millis(100));
1520 }
1521
1522 #[test]
1523 fn test_history_options_with_progress() {
1524 use std::sync::Arc;
1525 use std::sync::atomic::{AtomicUsize, Ordering};
1526
1527 let call_count = Arc::new(AtomicUsize::new(0));
1528 let call_count_clone = Arc::clone(&call_count);
1529
1530 let options = HistoryOptions::new().with_progress(move |_progress| {
1531 call_count_clone.fetch_add(1, Ordering::SeqCst);
1532 });
1533
1534 assert!(options.progress_callback.is_some());
1535
1536 let progress = HistoryProgress::new(HistoryParam::Co2, 1, 4, 100);
1538 options.report_progress(&progress);
1539 assert_eq!(call_count.load(Ordering::SeqCst), 1);
1540 }
1541
1542 #[test]
1545 fn test_history_info_creation() {
1546 let info = HistoryInfo {
1547 total_readings: 1000,
1548 interval_seconds: 300,
1549 seconds_since_update: 120,
1550 };
1551
1552 assert_eq!(info.total_readings, 1000);
1553 assert_eq!(info.interval_seconds, 300);
1554 assert_eq!(info.seconds_since_update, 120);
1555 }
1556
1557 #[test]
1558 fn test_history_info_debug() {
1559 let info = HistoryInfo {
1560 total_readings: 500,
1561 interval_seconds: 60,
1562 seconds_since_update: 30,
1563 };
1564
1565 let debug_str = format!("{:?}", info);
1566 assert!(debug_str.contains("total_readings"));
1567 assert!(debug_str.contains("500"));
1568 }
1569}