1use crate::UtilsError;
7use chrono::{DateTime, Local, TimeZone, Utc};
8use scirs2_core::ndarray::{Array1, Array2, Axis};
9use std::collections::{BTreeMap, HashMap, VecDeque};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
14pub struct Timestamp {
15 pub timestamp: i64, }
17
18impl Timestamp {
19 pub fn from_millis(millis: i64) -> Self {
21 Self { timestamp: millis }
22 }
23
24 pub fn from_secs(secs: i64) -> Self {
26 Self {
27 timestamp: secs * 1000,
28 }
29 }
30
31 pub fn now() -> Self {
33 let duration = SystemTime::now()
34 .duration_since(UNIX_EPOCH)
35 .unwrap_or(Duration::from_secs(0));
36 Self::from_millis(duration.as_millis() as i64)
37 }
38
39 pub fn as_millis(&self) -> i64 {
41 self.timestamp
42 }
43
44 pub fn as_secs(&self) -> i64 {
46 self.timestamp / 1000
47 }
48
49 pub fn to_datetime_utc(&self) -> DateTime<Utc> {
51 let naive = DateTime::from_timestamp_millis(self.timestamp)
52 .unwrap_or_default()
53 .naive_utc();
54 DateTime::from_naive_utc_and_offset(naive, Utc)
55 }
56
57 pub fn add_duration(&self, duration: Duration) -> Self {
59 Self::from_millis(self.timestamp + duration.as_millis() as i64)
60 }
61
62 pub fn sub_duration(&self, duration: Duration) -> Self {
64 Self::from_millis(self.timestamp - duration.as_millis() as i64)
65 }
66}
67
68#[derive(Debug, Clone)]
70pub struct TimeSeriesPoint<T> {
71 pub timestamp: Timestamp,
72 pub value: T,
73}
74
75impl<T> TimeSeriesPoint<T> {
76 pub fn new(timestamp: Timestamp, value: T) -> Self {
77 Self { timestamp, value }
78 }
79}
80
81#[derive(Debug, Clone)]
83pub struct TimeSeries<T> {
84 data: BTreeMap<Timestamp, T>,
85 metadata: HashMap<String, String>,
86}
87
88impl<T: Clone> Default for TimeSeries<T> {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94impl<T: Clone> TimeSeries<T> {
95 pub fn new() -> Self {
97 Self {
98 data: BTreeMap::new(),
99 metadata: HashMap::new(),
100 }
101 }
102
103 pub fn from_vecs(timestamps: Vec<Timestamp>, values: Vec<T>) -> Result<Self, UtilsError> {
105 if timestamps.len() != values.len() {
106 return Err(UtilsError::ShapeMismatch {
107 expected: vec![timestamps.len()],
108 actual: vec![values.len()],
109 });
110 }
111
112 let mut ts = Self::new();
113 for (timestamp, value) in timestamps.into_iter().zip(values.into_iter()) {
114 ts.insert(timestamp, value);
115 }
116 Ok(ts)
117 }
118
119 pub fn insert(&mut self, timestamp: Timestamp, value: T) {
121 self.data.insert(timestamp, value);
122 }
123
124 pub fn get(&self, timestamp: &Timestamp) -> Option<&T> {
126 self.data.get(timestamp)
127 }
128
129 pub fn len(&self) -> usize {
131 self.data.len()
132 }
133
134 pub fn is_empty(&self) -> bool {
136 self.data.is_empty()
137 }
138
139 pub fn first_timestamp(&self) -> Option<Timestamp> {
141 self.data.keys().next().copied()
142 }
143
144 pub fn last_timestamp(&self) -> Option<Timestamp> {
146 self.data.keys().next_back().copied()
147 }
148
149 pub fn range(&self, start: Timestamp, end: Timestamp) -> Vec<TimeSeriesPoint<T>> {
151 self.data
152 .range(start..=end)
153 .map(|(×tamp, value)| TimeSeriesPoint::new(timestamp, value.clone()))
154 .collect()
155 }
156
157 pub fn timestamps(&self) -> Vec<Timestamp> {
159 self.data.keys().copied().collect()
160 }
161
162 pub fn values(&self) -> Vec<T> {
164 self.data.values().cloned().collect()
165 }
166
167 pub fn set_metadata(&mut self, key: String, value: String) {
169 self.metadata.insert(key, value);
170 }
171
172 pub fn get_metadata(&self, key: &str) -> Option<&String> {
174 self.metadata.get(key)
175 }
176
177 pub fn resample(
179 &self,
180 interval: Duration,
181 aggregation: AggregationMethod,
182 ) -> Result<TimeSeries<f64>, UtilsError>
183 where
184 T: Into<f64> + Copy,
185 {
186 if self.is_empty() {
187 return Ok(TimeSeries::new());
188 }
189
190 let start = self.first_timestamp().unwrap();
191 let end = self.last_timestamp().unwrap();
192 let mut resampled = TimeSeries::new();
193
194 let mut current = start;
195 while current.timestamp <= end.timestamp {
196 let window_end = current.add_duration(interval);
197 let window_data: Vec<f64> = self
198 .range(current, window_end)
199 .into_iter()
200 .map(|point| point.value.into())
201 .collect();
202
203 if !window_data.is_empty() {
204 let aggregated = match aggregation {
205 AggregationMethod::Mean => {
206 window_data.iter().sum::<f64>() / window_data.len() as f64
207 }
208 AggregationMethod::Sum => window_data.iter().sum(),
209 AggregationMethod::Min => {
210 window_data.iter().fold(f64::INFINITY, |a, &b| a.min(b))
211 }
212 AggregationMethod::Max => {
213 window_data.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b))
214 }
215 AggregationMethod::First => window_data[0],
216 AggregationMethod::Last => window_data[window_data.len() - 1],
217 };
218 resampled.insert(current, aggregated);
219 }
220
221 current = current.add_duration(interval);
222 }
223
224 Ok(resampled)
225 }
226}
227
228#[derive(Debug, Clone, Copy)]
230pub enum AggregationMethod {
231 Mean,
232 Sum,
233 Min,
234 Max,
235 First,
236 Last,
237}
238
239#[derive(Debug, Clone)]
241pub struct SlidingWindow<T> {
242 window_size: Duration,
243 data: VecDeque<TimeSeriesPoint<T>>,
244}
245
246impl<T: Clone> SlidingWindow<T> {
247 pub fn new(window_size: Duration) -> Self {
249 Self {
250 window_size,
251 data: VecDeque::new(),
252 }
253 }
254
255 pub fn add(&mut self, point: TimeSeriesPoint<T>) {
257 self.data.push_back(point.clone());
258
259 let cutoff = point.timestamp.sub_duration(self.window_size);
261 while let Some(front) = self.data.front() {
262 if front.timestamp < cutoff {
263 self.data.pop_front();
264 } else {
265 break;
266 }
267 }
268 }
269
270 pub fn current_window(&self) -> Vec<TimeSeriesPoint<T>> {
272 self.data.iter().cloned().collect()
273 }
274
275 pub fn len(&self) -> usize {
277 self.data.len()
278 }
279
280 pub fn is_empty(&self) -> bool {
282 self.data.is_empty()
283 }
284
285 pub fn compute_stats(&self) -> WindowStats
287 where
288 T: Into<f64> + Copy,
289 {
290 if self.is_empty() {
291 return WindowStats::default();
292 }
293
294 let values: Vec<f64> = self.data.iter().map(|p| p.value.into()).collect();
295 let n = values.len() as f64;
296 let sum = values.iter().sum::<f64>();
297 let mean = sum / n;
298 let variance = values.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / n;
299 let std_dev = variance.sqrt();
300 let min = values.iter().fold(f64::INFINITY, |a, &b| a.min(b));
301 let max = values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
302
303 WindowStats {
304 count: values.len(),
305 mean,
306 std_dev,
307 min,
308 max,
309 sum,
310 }
311 }
312}
313
314#[derive(Debug, Clone, Default)]
316pub struct WindowStats {
317 pub count: usize,
318 pub mean: f64,
319 pub std_dev: f64,
320 pub min: f64,
321 pub max: f64,
322 pub sum: f64,
323}
324
325pub struct TemporalIndex {
327 index: BTreeMap<Timestamp, Vec<usize>>,
328}
329
330impl Default for TemporalIndex {
331 fn default() -> Self {
332 Self::new()
333 }
334}
335
336impl TemporalIndex {
337 pub fn new() -> Self {
339 Self {
340 index: BTreeMap::new(),
341 }
342 }
343
344 pub fn add_entry(&mut self, timestamp: Timestamp, id: usize) {
346 self.index.entry(timestamp).or_default().push(id);
347 }
348
349 pub fn find_range(&self, start: Timestamp, end: Timestamp) -> Vec<usize> {
351 self.index
352 .range(start..=end)
353 .flat_map(|(_, ids)| ids.iter().copied())
354 .collect()
355 }
356
357 pub fn find_before(&self, timestamp: Timestamp) -> Vec<usize> {
359 self.index
360 .range(..timestamp)
361 .flat_map(|(_, ids)| ids.iter().copied())
362 .collect()
363 }
364
365 pub fn find_after(&self, timestamp: Timestamp) -> Vec<usize> {
367 self.index
368 .range((
369 std::ops::Bound::Excluded(timestamp),
370 std::ops::Bound::Unbounded,
371 ))
372 .flat_map(|(_, ids)| ids.iter().copied())
373 .collect()
374 }
375}
376
377pub struct TimeZoneUtils;
379
380impl TimeZoneUtils {
381 pub fn convert_timezone(
383 timestamp: Timestamp,
384 from_tz: &str,
385 to_tz: &str,
386 ) -> Result<Timestamp, UtilsError> {
387 let datetime_utc = timestamp.to_datetime_utc();
389
390 match (from_tz, to_tz) {
393 ("UTC", "Local") => {
394 let local = Local.from_utc_datetime(&datetime_utc.naive_utc());
395 Ok(Timestamp::from_millis(local.timestamp_millis()))
396 }
397 ("Local", "UTC") => {
398 Ok(timestamp) }
401 _ => Ok(timestamp), }
403 }
404
405 pub fn start_of_day(timestamp: Timestamp) -> Timestamp {
407 let datetime = timestamp.to_datetime_utc();
408 let start_of_day = datetime.date_naive().and_hms_opt(0, 0, 0).unwrap();
409 let start_of_day_utc: DateTime<Utc> =
410 DateTime::from_naive_utc_and_offset(start_of_day, Utc);
411 Timestamp::from_millis(start_of_day_utc.timestamp_millis())
412 }
413
414 pub fn end_of_day(timestamp: Timestamp) -> Timestamp {
416 let datetime = timestamp.to_datetime_utc();
417 let end_of_day = datetime.date_naive().and_hms_opt(23, 59, 59).unwrap();
418 let end_of_day_utc: DateTime<Utc> = DateTime::from_naive_utc_and_offset(end_of_day, Utc);
419 Timestamp::from_millis(end_of_day_utc.timestamp_millis())
420 }
421}
422
423pub struct TemporalAggregator;
425
426impl TemporalAggregator {
427 pub fn aggregate_by_period<T>(
429 time_series: &TimeSeries<T>,
430 period: Duration,
431 aggregation: AggregationMethod,
432 ) -> Result<TimeSeries<f64>, UtilsError>
433 where
434 T: Into<f64> + Copy,
435 {
436 time_series.resample(period, aggregation)
437 }
438
439 pub fn rolling_statistics<T>(
441 data: &[TimeSeriesPoint<T>],
442 window_size: Duration,
443 ) -> Vec<WindowStats>
444 where
445 T: Into<f64> + Copy + Clone,
446 {
447 let mut results = Vec::new();
448 let mut window = SlidingWindow::new(window_size);
449
450 for point in data {
451 window.add(point.clone());
452 results.push(window.compute_stats());
453 }
454
455 results
456 }
457
458 pub fn detect_trend<T>(data: &[TimeSeriesPoint<T>], window_size: usize) -> Vec<TrendDirection>
460 where
461 T: Into<f64> + Copy,
462 {
463 if data.len() < window_size * 2 {
464 return vec![TrendDirection::Stable; data.len()];
465 }
466
467 let mut trends = Vec::new();
468 let values: Vec<f64> = data.iter().map(|p| p.value.into()).collect();
469
470 for i in window_size..(values.len() - window_size) {
471 let before: f64 = values[(i - window_size)..i].iter().sum::<f64>() / window_size as f64;
472 let after: f64 =
473 values[(i + 1)..(i + 1 + window_size)].iter().sum::<f64>() / window_size as f64;
474
475 let trend = if after > before * 1.05 {
476 TrendDirection::Increasing
477 } else if after < before * 0.95 {
478 TrendDirection::Decreasing
479 } else {
480 TrendDirection::Stable
481 };
482
483 trends.push(trend);
484 }
485
486 let mut result = vec![TrendDirection::Stable; window_size];
488 result.extend(trends);
489 result.extend(vec![TrendDirection::Stable; window_size]);
490 result
491 }
492}
493
494#[derive(Debug, Clone, Copy, PartialEq)]
496pub enum TrendDirection {
497 Increasing,
498 Decreasing,
499 Stable,
500}
501
502pub struct LagFeatureGenerator;
504
505impl LagFeatureGenerator {
506 pub fn generate_lag_features(
508 data: &Array1<f64>,
509 lags: &[usize],
510 ) -> Result<Array2<f64>, UtilsError> {
511 if data.is_empty() || lags.is_empty() {
512 return Err(UtilsError::EmptyInput);
513 }
514
515 let max_lag = *lags.iter().max().unwrap();
516 if data.len() <= max_lag {
517 return Err(UtilsError::InsufficientData {
518 min: max_lag + 1,
519 actual: data.len(),
520 });
521 }
522
523 let n_samples = data.len() - max_lag;
524 let n_features = lags.len() + 1; let mut features = Array2::zeros((n_samples, n_features));
526
527 for (i, mut row) in features.axis_iter_mut(Axis(0)).enumerate() {
528 let idx = i + max_lag;
529
530 row[0] = data[idx];
532
533 for (j, &lag) in lags.iter().enumerate() {
535 row[j + 1] = data[idx - lag];
536 }
537 }
538
539 Ok(features)
540 }
541
542 pub fn generate_diff_features(
544 data: &Array1<f64>,
545 orders: &[usize],
546 ) -> Result<Array2<f64>, UtilsError> {
547 if data.is_empty() || orders.is_empty() {
548 return Err(UtilsError::EmptyInput);
549 }
550
551 let max_order = *orders.iter().max().unwrap();
552 if data.len() <= max_order {
553 return Err(UtilsError::InsufficientData {
554 min: max_order + 1,
555 actual: data.len(),
556 });
557 }
558
559 let n_samples = data.len() - max_order;
560 let n_features = orders.len();
561 let mut features = Array2::zeros((n_samples, n_features));
562
563 for (j, &order) in orders.iter().enumerate() {
564 let mut diff_data = data.to_owned();
565
566 for _ in 0..order {
568 let mut new_diff = Array1::zeros(diff_data.len() - 1);
569 for i in 0..new_diff.len() {
570 new_diff[i] = diff_data[i + 1] - diff_data[i];
571 }
572 diff_data = new_diff;
573 }
574
575 for i in 0..n_samples {
577 features[(i, j)] = diff_data[i];
578 }
579 }
580
581 Ok(features)
582 }
583}
584
585#[allow(non_snake_case)]
586#[cfg(test)]
587mod tests {
588 use super::*;
589 use std::time::Duration;
590
591 #[test]
592 fn test_timestamp_creation() {
593 let ts1 = Timestamp::from_secs(1000);
594 let ts2 = Timestamp::from_millis(1_000_000);
595
596 assert_eq!(ts1.as_secs(), 1000);
597 assert_eq!(ts2.as_millis(), 1_000_000);
598 assert_eq!(ts1, ts2);
599 }
600
601 #[test]
602 fn test_time_series_basic_operations() {
603 let mut ts = TimeSeries::new();
604 let ts1 = Timestamp::from_secs(100);
605 let ts2 = Timestamp::from_secs(200);
606
607 ts.insert(ts1, 10.0);
608 ts.insert(ts2, 20.0);
609
610 assert_eq!(ts.len(), 2);
611 assert_eq!(ts.get(&ts1), Some(&10.0));
612 assert_eq!(ts.first_timestamp(), Some(ts1));
613 assert_eq!(ts.last_timestamp(), Some(ts2));
614 }
615
616 #[test]
617 fn test_sliding_window() {
618 let mut window = SlidingWindow::new(Duration::from_secs(10));
619 let base_time = Timestamp::from_secs(100);
620
621 window.add(TimeSeriesPoint::new(base_time, 1.0));
622 window.add(TimeSeriesPoint::new(
623 base_time.add_duration(Duration::from_secs(5)),
624 2.0,
625 ));
626 window.add(TimeSeriesPoint::new(
627 base_time.add_duration(Duration::from_secs(15)),
628 3.0,
629 ));
630
631 assert_eq!(window.len(), 2); let stats = window.compute_stats();
633 assert_eq!(stats.count, 2);
634 assert_eq!(stats.mean, 2.5);
635 }
636
637 #[test]
638 fn test_temporal_index() {
639 let mut index = TemporalIndex::new();
640 let ts1 = Timestamp::from_secs(100);
641 let ts2 = Timestamp::from_secs(200);
642 let ts3 = Timestamp::from_secs(300);
643
644 index.add_entry(ts1, 1);
645 index.add_entry(ts2, 2);
646 index.add_entry(ts3, 3);
647
648 let range_results = index.find_range(ts1, ts2);
649 assert_eq!(range_results, vec![1, 2]);
650
651 let before_results = index.find_before(ts2);
652 assert_eq!(before_results, vec![1]);
653 }
654
655 #[test]
656 fn test_lag_feature_generation() {
657 let data = Array1::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
658 let lags = vec![1, 2];
659
660 let features = LagFeatureGenerator::generate_lag_features(&data, &lags).unwrap();
661
662 assert_eq!(features.shape(), &[3, 3]); assert_eq!(features[(0, 0)], 3.0); assert_eq!(features[(0, 1)], 2.0); assert_eq!(features[(0, 2)], 1.0); }
667
668 #[test]
669 fn test_diff_features() {
670 let data = Array1::from(vec![1.0, 3.0, 6.0, 10.0, 15.0]);
671 let orders = vec![1, 2];
672
673 let features = LagFeatureGenerator::generate_diff_features(&data, &orders).unwrap();
674
675 assert_eq!(features.shape(), &[3, 2]); assert_eq!(features[(0, 0)], 2.0); assert_eq!(features[(0, 1)], 1.0); }
679
680 #[test]
681 fn test_time_series_resampling() {
682 let timestamps = vec![
683 Timestamp::from_secs(0),
684 Timestamp::from_secs(1),
685 Timestamp::from_secs(2),
686 Timestamp::from_secs(3),
687 ];
688 let values = vec![1.0, 2.0, 3.0, 4.0];
689
690 let ts = TimeSeries::from_vecs(timestamps, values).unwrap();
691 let resampled = ts
692 .resample(Duration::from_secs(2), AggregationMethod::Mean)
693 .unwrap();
694
695 assert!(resampled.len() >= 1);
696 }
697}