sklears_utils/
time_series.rs

1//! Time Series Utilities
2//!
3//! This module provides comprehensive time series data structures and utilities
4//! for temporal data processing in machine learning applications.
5
6use crate::UtilsError;
7use chrono::{DateTime, Local, TimeZone, Utc};
8use scirs2_core::ndarray::{Array1, Array2, Axis};
9use std::collections::{BTreeMap, HashMap, VecDeque};
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12/// Timestamp representation for time series data
13#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
14pub struct Timestamp {
15    pub timestamp: i64, // Unix timestamp in milliseconds
16}
17
18impl Timestamp {
19    /// Create a new timestamp from milliseconds since Unix epoch
20    pub fn from_millis(millis: i64) -> Self {
21        Self { timestamp: millis }
22    }
23
24    /// Create a new timestamp from seconds since Unix epoch
25    pub fn from_secs(secs: i64) -> Self {
26        Self {
27            timestamp: secs * 1000,
28        }
29    }
30
31    /// Create a timestamp from the current time
32    pub fn now() -> Self {
33        let duration = SystemTime::now()
34            .duration_since(UNIX_EPOCH)
35            .unwrap_or(Duration::from_secs(0));
36        Self::from_millis(duration.as_millis() as i64)
37    }
38
39    /// Convert to milliseconds
40    pub fn as_millis(&self) -> i64 {
41        self.timestamp
42    }
43
44    /// Convert to seconds
45    pub fn as_secs(&self) -> i64 {
46        self.timestamp / 1000
47    }
48
49    /// Convert to `DateTime<Utc>`
50    pub fn to_datetime_utc(&self) -> DateTime<Utc> {
51        let naive = DateTime::from_timestamp_millis(self.timestamp)
52            .unwrap_or_default()
53            .naive_utc();
54        DateTime::from_naive_utc_and_offset(naive, Utc)
55    }
56
57    /// Add duration to timestamp
58    pub fn add_duration(&self, duration: Duration) -> Self {
59        Self::from_millis(self.timestamp + duration.as_millis() as i64)
60    }
61
62    /// Subtract duration from timestamp
63    pub fn sub_duration(&self, duration: Duration) -> Self {
64        Self::from_millis(self.timestamp - duration.as_millis() as i64)
65    }
66}
67
68/// Time series data point with timestamp and value
69#[derive(Debug, Clone)]
70pub struct TimeSeriesPoint<T> {
71    pub timestamp: Timestamp,
72    pub value: T,
73}
74
75impl<T> TimeSeriesPoint<T> {
76    pub fn new(timestamp: Timestamp, value: T) -> Self {
77        Self { timestamp, value }
78    }
79}
80
81/// Time series data structure with efficient temporal indexing
82#[derive(Debug, Clone)]
83pub struct TimeSeries<T> {
84    data: BTreeMap<Timestamp, T>,
85    metadata: HashMap<String, String>,
86}
87
88impl<T: Clone> Default for TimeSeries<T> {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94impl<T: Clone> TimeSeries<T> {
95    /// Create a new empty time series
96    pub fn new() -> Self {
97        Self {
98            data: BTreeMap::new(),
99            metadata: HashMap::new(),
100        }
101    }
102
103    /// Create time series from vectors of timestamps and values
104    pub fn from_vecs(timestamps: Vec<Timestamp>, values: Vec<T>) -> Result<Self, UtilsError> {
105        if timestamps.len() != values.len() {
106            return Err(UtilsError::ShapeMismatch {
107                expected: vec![timestamps.len()],
108                actual: vec![values.len()],
109            });
110        }
111
112        let mut ts = Self::new();
113        for (timestamp, value) in timestamps.into_iter().zip(values.into_iter()) {
114            ts.insert(timestamp, value);
115        }
116        Ok(ts)
117    }
118
119    /// Insert a new data point
120    pub fn insert(&mut self, timestamp: Timestamp, value: T) {
121        self.data.insert(timestamp, value);
122    }
123
124    /// Get value at specific timestamp
125    pub fn get(&self, timestamp: &Timestamp) -> Option<&T> {
126        self.data.get(timestamp)
127    }
128
129    /// Get the number of data points
130    pub fn len(&self) -> usize {
131        self.data.len()
132    }
133
134    /// Check if time series is empty
135    pub fn is_empty(&self) -> bool {
136        self.data.is_empty()
137    }
138
139    /// Get the first timestamp
140    pub fn first_timestamp(&self) -> Option<Timestamp> {
141        self.data.keys().next().copied()
142    }
143
144    /// Get the last timestamp
145    pub fn last_timestamp(&self) -> Option<Timestamp> {
146        self.data.keys().next_back().copied()
147    }
148
149    /// Get data points in a time range
150    pub fn range(&self, start: Timestamp, end: Timestamp) -> Vec<TimeSeriesPoint<T>> {
151        self.data
152            .range(start..=end)
153            .map(|(&timestamp, value)| TimeSeriesPoint::new(timestamp, value.clone()))
154            .collect()
155    }
156
157    /// Get all timestamps
158    pub fn timestamps(&self) -> Vec<Timestamp> {
159        self.data.keys().copied().collect()
160    }
161
162    /// Get all values
163    pub fn values(&self) -> Vec<T> {
164        self.data.values().cloned().collect()
165    }
166
167    /// Set metadata
168    pub fn set_metadata(&mut self, key: String, value: String) {
169        self.metadata.insert(key, value);
170    }
171
172    /// Get metadata
173    pub fn get_metadata(&self, key: &str) -> Option<&String> {
174        self.metadata.get(key)
175    }
176
177    /// Resample time series to regular intervals
178    pub fn resample(
179        &self,
180        interval: Duration,
181        aggregation: AggregationMethod,
182    ) -> Result<TimeSeries<f64>, UtilsError>
183    where
184        T: Into<f64> + Copy,
185    {
186        if self.is_empty() {
187            return Ok(TimeSeries::new());
188        }
189
190        let start = self.first_timestamp().unwrap();
191        let end = self.last_timestamp().unwrap();
192        let mut resampled = TimeSeries::new();
193
194        let mut current = start;
195        while current.timestamp <= end.timestamp {
196            let window_end = current.add_duration(interval);
197            let window_data: Vec<f64> = self
198                .range(current, window_end)
199                .into_iter()
200                .map(|point| point.value.into())
201                .collect();
202
203            if !window_data.is_empty() {
204                let aggregated = match aggregation {
205                    AggregationMethod::Mean => {
206                        window_data.iter().sum::<f64>() / window_data.len() as f64
207                    }
208                    AggregationMethod::Sum => window_data.iter().sum(),
209                    AggregationMethod::Min => {
210                        window_data.iter().fold(f64::INFINITY, |a, &b| a.min(b))
211                    }
212                    AggregationMethod::Max => {
213                        window_data.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b))
214                    }
215                    AggregationMethod::First => window_data[0],
216                    AggregationMethod::Last => window_data[window_data.len() - 1],
217                };
218                resampled.insert(current, aggregated);
219            }
220
221            current = current.add_duration(interval);
222        }
223
224        Ok(resampled)
225    }
226}
227
228/// Aggregation methods for resampling
229#[derive(Debug, Clone, Copy)]
230pub enum AggregationMethod {
231    Mean,
232    Sum,
233    Min,
234    Max,
235    First,
236    Last,
237}
238
239/// Sliding window for time series analysis
240#[derive(Debug, Clone)]
241pub struct SlidingWindow<T> {
242    window_size: Duration,
243    data: VecDeque<TimeSeriesPoint<T>>,
244}
245
246impl<T: Clone> SlidingWindow<T> {
247    /// Create a new sliding window with specified size
248    pub fn new(window_size: Duration) -> Self {
249        Self {
250            window_size,
251            data: VecDeque::new(),
252        }
253    }
254
255    /// Add a new data point and maintain window size
256    pub fn add(&mut self, point: TimeSeriesPoint<T>) {
257        self.data.push_back(point.clone());
258
259        // Remove old data points outside the window
260        let cutoff = point.timestamp.sub_duration(self.window_size);
261        while let Some(front) = self.data.front() {
262            if front.timestamp < cutoff {
263                self.data.pop_front();
264            } else {
265                break;
266            }
267        }
268    }
269
270    /// Get current window data
271    pub fn current_window(&self) -> Vec<TimeSeriesPoint<T>> {
272        self.data.iter().cloned().collect()
273    }
274
275    /// Get window size
276    pub fn len(&self) -> usize {
277        self.data.len()
278    }
279
280    /// Check if window is empty
281    pub fn is_empty(&self) -> bool {
282        self.data.is_empty()
283    }
284
285    /// Compute statistics for numeric data in the window
286    pub fn compute_stats(&self) -> WindowStats
287    where
288        T: Into<f64> + Copy,
289    {
290        if self.is_empty() {
291            return WindowStats::default();
292        }
293
294        let values: Vec<f64> = self.data.iter().map(|p| p.value.into()).collect();
295        let n = values.len() as f64;
296        let sum = values.iter().sum::<f64>();
297        let mean = sum / n;
298        let variance = values.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / n;
299        let std_dev = variance.sqrt();
300        let min = values.iter().fold(f64::INFINITY, |a, &b| a.min(b));
301        let max = values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
302
303        WindowStats {
304            count: values.len(),
305            mean,
306            std_dev,
307            min,
308            max,
309            sum,
310        }
311    }
312}
313
314/// Statistics for sliding window
315#[derive(Debug, Clone, Default)]
316pub struct WindowStats {
317    pub count: usize,
318    pub mean: f64,
319    pub std_dev: f64,
320    pub min: f64,
321    pub max: f64,
322    pub sum: f64,
323}
324
325/// Temporal indexing utilities
326pub struct TemporalIndex {
327    index: BTreeMap<Timestamp, Vec<usize>>,
328}
329
330impl Default for TemporalIndex {
331    fn default() -> Self {
332        Self::new()
333    }
334}
335
336impl TemporalIndex {
337    /// Create a new temporal index
338    pub fn new() -> Self {
339        Self {
340            index: BTreeMap::new(),
341        }
342    }
343
344    /// Add an entry to the index
345    pub fn add_entry(&mut self, timestamp: Timestamp, id: usize) {
346        self.index.entry(timestamp).or_default().push(id);
347    }
348
349    /// Find entries in time range
350    pub fn find_range(&self, start: Timestamp, end: Timestamp) -> Vec<usize> {
351        self.index
352            .range(start..=end)
353            .flat_map(|(_, ids)| ids.iter().copied())
354            .collect()
355    }
356
357    /// Find entries before timestamp
358    pub fn find_before(&self, timestamp: Timestamp) -> Vec<usize> {
359        self.index
360            .range(..timestamp)
361            .flat_map(|(_, ids)| ids.iter().copied())
362            .collect()
363    }
364
365    /// Find entries after timestamp
366    pub fn find_after(&self, timestamp: Timestamp) -> Vec<usize> {
367        self.index
368            .range((
369                std::ops::Bound::Excluded(timestamp),
370                std::ops::Bound::Unbounded,
371            ))
372            .flat_map(|(_, ids)| ids.iter().copied())
373            .collect()
374    }
375}
376
377/// Time zone utilities
378pub struct TimeZoneUtils;
379
380impl TimeZoneUtils {
381    /// Convert timestamp from one timezone to another
382    pub fn convert_timezone(
383        timestamp: Timestamp,
384        from_tz: &str,
385        to_tz: &str,
386    ) -> Result<Timestamp, UtilsError> {
387        // Simplified timezone conversion - in practice would use proper timezone libraries
388        let datetime_utc = timestamp.to_datetime_utc();
389
390        // This is a simplified implementation
391        // In practice, you'd want to use proper timezone handling
392        match (from_tz, to_tz) {
393            ("UTC", "Local") => {
394                let local = Local.from_utc_datetime(&datetime_utc.naive_utc());
395                Ok(Timestamp::from_millis(local.timestamp_millis()))
396            }
397            ("Local", "UTC") => {
398                // Assume input is local time, convert to UTC
399                Ok(timestamp) // Simplified
400            }
401            _ => Ok(timestamp), // No conversion for unsupported timezones
402        }
403    }
404
405    /// Get timestamp for start of day
406    pub fn start_of_day(timestamp: Timestamp) -> Timestamp {
407        let datetime = timestamp.to_datetime_utc();
408        let start_of_day = datetime.date_naive().and_hms_opt(0, 0, 0).unwrap();
409        let start_of_day_utc: DateTime<Utc> =
410            DateTime::from_naive_utc_and_offset(start_of_day, Utc);
411        Timestamp::from_millis(start_of_day_utc.timestamp_millis())
412    }
413
414    /// Get timestamp for end of day
415    pub fn end_of_day(timestamp: Timestamp) -> Timestamp {
416        let datetime = timestamp.to_datetime_utc();
417        let end_of_day = datetime.date_naive().and_hms_opt(23, 59, 59).unwrap();
418        let end_of_day_utc: DateTime<Utc> = DateTime::from_naive_utc_and_offset(end_of_day, Utc);
419        Timestamp::from_millis(end_of_day_utc.timestamp_millis())
420    }
421}
422
423/// Temporal aggregation utilities
424pub struct TemporalAggregator;
425
426impl TemporalAggregator {
427    /// Aggregate time series data by time periods
428    pub fn aggregate_by_period<T>(
429        time_series: &TimeSeries<T>,
430        period: Duration,
431        aggregation: AggregationMethod,
432    ) -> Result<TimeSeries<f64>, UtilsError>
433    where
434        T: Into<f64> + Copy,
435    {
436        time_series.resample(period, aggregation)
437    }
438
439    /// Compute rolling statistics
440    pub fn rolling_statistics<T>(
441        data: &[TimeSeriesPoint<T>],
442        window_size: Duration,
443    ) -> Vec<WindowStats>
444    where
445        T: Into<f64> + Copy + Clone,
446    {
447        let mut results = Vec::new();
448        let mut window = SlidingWindow::new(window_size);
449
450        for point in data {
451            window.add(point.clone());
452            results.push(window.compute_stats());
453        }
454
455        results
456    }
457
458    /// Detect trends in time series data
459    pub fn detect_trend<T>(data: &[TimeSeriesPoint<T>], window_size: usize) -> Vec<TrendDirection>
460    where
461        T: Into<f64> + Copy,
462    {
463        if data.len() < window_size * 2 {
464            return vec![TrendDirection::Stable; data.len()];
465        }
466
467        let mut trends = Vec::new();
468        let values: Vec<f64> = data.iter().map(|p| p.value.into()).collect();
469
470        for i in window_size..(values.len() - window_size) {
471            let before: f64 = values[(i - window_size)..i].iter().sum::<f64>() / window_size as f64;
472            let after: f64 =
473                values[(i + 1)..(i + 1 + window_size)].iter().sum::<f64>() / window_size as f64;
474
475            let trend = if after > before * 1.05 {
476                TrendDirection::Increasing
477            } else if after < before * 0.95 {
478                TrendDirection::Decreasing
479            } else {
480                TrendDirection::Stable
481            };
482
483            trends.push(trend);
484        }
485
486        // Pad with stable for edge cases
487        let mut result = vec![TrendDirection::Stable; window_size];
488        result.extend(trends);
489        result.extend(vec![TrendDirection::Stable; window_size]);
490        result
491    }
492}
493
494/// Trend direction enumeration
495#[derive(Debug, Clone, Copy, PartialEq)]
496pub enum TrendDirection {
497    Increasing,
498    Decreasing,
499    Stable,
500}
501
502/// Lag features generator for time series
503pub struct LagFeatureGenerator;
504
505impl LagFeatureGenerator {
506    /// Generate lag features for time series data
507    pub fn generate_lag_features(
508        data: &Array1<f64>,
509        lags: &[usize],
510    ) -> Result<Array2<f64>, UtilsError> {
511        if data.is_empty() || lags.is_empty() {
512            return Err(UtilsError::EmptyInput);
513        }
514
515        let max_lag = *lags.iter().max().unwrap();
516        if data.len() <= max_lag {
517            return Err(UtilsError::InsufficientData {
518                min: max_lag + 1,
519                actual: data.len(),
520            });
521        }
522
523        let n_samples = data.len() - max_lag;
524        let n_features = lags.len() + 1; // +1 for original feature
525        let mut features = Array2::zeros((n_samples, n_features));
526
527        for (i, mut row) in features.axis_iter_mut(Axis(0)).enumerate() {
528            let idx = i + max_lag;
529
530            // Original value
531            row[0] = data[idx];
532
533            // Lag features
534            for (j, &lag) in lags.iter().enumerate() {
535                row[j + 1] = data[idx - lag];
536            }
537        }
538
539        Ok(features)
540    }
541
542    /// Generate differencing features
543    pub fn generate_diff_features(
544        data: &Array1<f64>,
545        orders: &[usize],
546    ) -> Result<Array2<f64>, UtilsError> {
547        if data.is_empty() || orders.is_empty() {
548            return Err(UtilsError::EmptyInput);
549        }
550
551        let max_order = *orders.iter().max().unwrap();
552        if data.len() <= max_order {
553            return Err(UtilsError::InsufficientData {
554                min: max_order + 1,
555                actual: data.len(),
556            });
557        }
558
559        let n_samples = data.len() - max_order;
560        let n_features = orders.len();
561        let mut features = Array2::zeros((n_samples, n_features));
562
563        for (j, &order) in orders.iter().enumerate() {
564            let mut diff_data = data.to_owned();
565
566            // Apply differencing 'order' times
567            for _ in 0..order {
568                let mut new_diff = Array1::zeros(diff_data.len() - 1);
569                for i in 0..new_diff.len() {
570                    new_diff[i] = diff_data[i + 1] - diff_data[i];
571                }
572                diff_data = new_diff;
573            }
574
575            // Copy to features matrix
576            for i in 0..n_samples {
577                features[(i, j)] = diff_data[i];
578            }
579        }
580
581        Ok(features)
582    }
583}
584
585#[allow(non_snake_case)]
586#[cfg(test)]
587mod tests {
588    use super::*;
589    use std::time::Duration;
590
591    #[test]
592    fn test_timestamp_creation() {
593        let ts1 = Timestamp::from_secs(1000);
594        let ts2 = Timestamp::from_millis(1_000_000);
595
596        assert_eq!(ts1.as_secs(), 1000);
597        assert_eq!(ts2.as_millis(), 1_000_000);
598        assert_eq!(ts1, ts2);
599    }
600
601    #[test]
602    fn test_time_series_basic_operations() {
603        let mut ts = TimeSeries::new();
604        let ts1 = Timestamp::from_secs(100);
605        let ts2 = Timestamp::from_secs(200);
606
607        ts.insert(ts1, 10.0);
608        ts.insert(ts2, 20.0);
609
610        assert_eq!(ts.len(), 2);
611        assert_eq!(ts.get(&ts1), Some(&10.0));
612        assert_eq!(ts.first_timestamp(), Some(ts1));
613        assert_eq!(ts.last_timestamp(), Some(ts2));
614    }
615
616    #[test]
617    fn test_sliding_window() {
618        let mut window = SlidingWindow::new(Duration::from_secs(10));
619        let base_time = Timestamp::from_secs(100);
620
621        window.add(TimeSeriesPoint::new(base_time, 1.0));
622        window.add(TimeSeriesPoint::new(
623            base_time.add_duration(Duration::from_secs(5)),
624            2.0,
625        ));
626        window.add(TimeSeriesPoint::new(
627            base_time.add_duration(Duration::from_secs(15)),
628            3.0,
629        ));
630
631        assert_eq!(window.len(), 2); // First point should be evicted
632        let stats = window.compute_stats();
633        assert_eq!(stats.count, 2);
634        assert_eq!(stats.mean, 2.5);
635    }
636
637    #[test]
638    fn test_temporal_index() {
639        let mut index = TemporalIndex::new();
640        let ts1 = Timestamp::from_secs(100);
641        let ts2 = Timestamp::from_secs(200);
642        let ts3 = Timestamp::from_secs(300);
643
644        index.add_entry(ts1, 1);
645        index.add_entry(ts2, 2);
646        index.add_entry(ts3, 3);
647
648        let range_results = index.find_range(ts1, ts2);
649        assert_eq!(range_results, vec![1, 2]);
650
651        let before_results = index.find_before(ts2);
652        assert_eq!(before_results, vec![1]);
653    }
654
655    #[test]
656    fn test_lag_feature_generation() {
657        let data = Array1::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);
658        let lags = vec![1, 2];
659
660        let features = LagFeatureGenerator::generate_lag_features(&data, &lags).unwrap();
661
662        assert_eq!(features.shape(), &[3, 3]); // 3 samples, 3 features (original + 2 lags)
663        assert_eq!(features[(0, 0)], 3.0); // Original value at index 2
664        assert_eq!(features[(0, 1)], 2.0); // Lag 1
665        assert_eq!(features[(0, 2)], 1.0); // Lag 2
666    }
667
668    #[test]
669    fn test_diff_features() {
670        let data = Array1::from(vec![1.0, 3.0, 6.0, 10.0, 15.0]);
671        let orders = vec![1, 2];
672
673        let features = LagFeatureGenerator::generate_diff_features(&data, &orders).unwrap();
674
675        assert_eq!(features.shape(), &[3, 2]); // 3 samples, 2 features (1st and 2nd diff)
676        assert_eq!(features[(0, 0)], 2.0); // First difference: 3-1
677        assert_eq!(features[(0, 1)], 1.0); // Second difference: (3-1) - (3-1) = 0, but this is computed differently
678    }
679
680    #[test]
681    fn test_time_series_resampling() {
682        let timestamps = vec![
683            Timestamp::from_secs(0),
684            Timestamp::from_secs(1),
685            Timestamp::from_secs(2),
686            Timestamp::from_secs(3),
687        ];
688        let values = vec![1.0, 2.0, 3.0, 4.0];
689
690        let ts = TimeSeries::from_vecs(timestamps, values).unwrap();
691        let resampled = ts
692            .resample(Duration::from_secs(2), AggregationMethod::Mean)
693            .unwrap();
694
695        assert!(resampled.len() >= 1);
696    }
697}