Skip to main content

xls_rs/
timeseries.rs

1//! Time series operations
2//!
3//! Provides time series analysis capabilities including resampling,
4//! rolling windows, trend analysis, and seasonal decomposition.
5
6use crate::common::string;
7use anyhow::Result;
8use chrono::{Datelike, Duration, NaiveDate, NaiveDateTime, Timelike};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12/// Time series resampling intervals
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub enum ResampleInterval {
15    Daily,
16    Weekly,
17    Monthly,
18    Quarterly,
19    Yearly,
20    Hourly,
21    Minute,
22    Custom(Duration),
23}
24
25/// Aggregation functions for resampling
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub enum TimeSeriesAgg {
28    Sum,
29    Mean,
30    Median,
31    Min,
32    Max,
33    First,
34    Last,
35    Count,
36}
37
38/// Rolling window configuration
39#[derive(Debug, Clone)]
40pub struct RollingWindow {
41    pub window_size: Duration,
42    pub min_periods: usize,
43    pub center: bool,
44}
45
46/// Time series data point
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct TimeSeriesPoint {
49    pub timestamp: NaiveDateTime,
50    pub value: f64,
51}
52
53/// Time series statistics
54#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct TimeSeriesStats {
56    pub start_date: NaiveDateTime,
57    pub end_date: NaiveDateTime,
58    pub total_points: usize,
59    pub missing_points: usize,
60    pub trend_direction: TrendDirection,
61    pub seasonality_detected: bool,
62    pub autocorrelation: f64,
63}
64
65/// Trend direction
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub enum TrendDirection {
68    Increasing,
69    Decreasing,
70    Stationary,
71    Unknown,
72}
73
74/// Time series processor
75pub struct TimeSeriesProcessor {
76    date_format: String,
77}
78
79impl TimeSeriesProcessor {
80    /// Create a new time series processor
81    pub fn new(date_format: &str) -> Self {
82        Self {
83            date_format: date_format.to_string(),
84        }
85    }
86
87    /// Parse date string to NaiveDateTime
88    pub fn parse_date(&self, date_str: &str) -> Result<NaiveDateTime> {
89        if let Ok(date) = NaiveDate::parse_from_str(date_str, &self.date_format) {
90            Ok(date.and_hms_opt(0, 0, 0).unwrap())
91        } else if let Ok(datetime) = NaiveDateTime::parse_from_str(date_str, &self.date_format) {
92            Ok(datetime)
93        } else {
94            // Try common formats
95            let common_formats = vec![
96                "%Y-%m-%d",
97                "%Y-%m-%d %H:%M:%S",
98                "%d/%m/%Y",
99                "%d/%m/%Y %H:%M:%S",
100                "%m/%d/%Y",
101                "%m/%d/%Y %H:%M:%S",
102            ];
103
104            for format in common_formats {
105                if let Ok(date) = NaiveDate::parse_from_str(date_str, format) {
106                    return Ok(date.and_hms_opt(0, 0, 0).unwrap());
107                }
108                if let Ok(datetime) = NaiveDateTime::parse_from_str(date_str, format) {
109                    return Ok(datetime);
110                }
111            }
112
113            anyhow::bail!("Unable to parse date: {}", date_str);
114        }
115    }
116
117    /// Convert CSV data to time series
118    pub fn csv_to_timeseries(
119        &self,
120        data: &[Vec<String>],
121        date_col: usize,
122        value_col: usize,
123    ) -> Result<Vec<TimeSeriesPoint>> {
124        if data.is_empty() {
125            return Ok(Vec::new());
126        }
127
128        let mut points = Vec::new();
129
130        for (i, row) in data.iter().enumerate().skip(1) {
131            // Skip header
132            if let (Some(date_str), Some(value_str)) = (row.get(date_col), row.get(value_col)) {
133                let timestamp = self.parse_date(date_str)?;
134                let value = string::to_number(value_str).ok_or_else(|| {
135                    anyhow::anyhow!("Invalid number at row {}: {}", i + 1, value_str)
136                })?;
137
138                points.push(TimeSeriesPoint { timestamp, value });
139            }
140        }
141
142        // Sort by timestamp
143        points.sort_by_key(|p| p.timestamp);
144
145        Ok(points)
146    }
147
148    /// Resample time series data
149    pub fn resample(
150        &self,
151        data: &[TimeSeriesPoint],
152        interval: &ResampleInterval,
153        agg: &TimeSeriesAgg,
154    ) -> Result<Vec<TimeSeriesPoint>> {
155        if data.is_empty() {
156            return Ok(Vec::new());
157        }
158
159        let grouped = self.group_by_interval(data, interval)?;
160        let mut resampled = Vec::new();
161
162        for (timestamp, values) in grouped {
163            let aggregated_value = self.aggregate_values(&values, agg)?;
164            resampled.push(TimeSeriesPoint {
165                timestamp,
166                value: aggregated_value,
167            });
168        }
169
170        resampled.sort_by_key(|p| p.timestamp);
171        Ok(resampled)
172    }
173
174    /// Group time series by interval
175    fn group_by_interval(
176        &self,
177        data: &[TimeSeriesPoint],
178        interval: &ResampleInterval,
179    ) -> Result<HashMap<NaiveDateTime, Vec<f64>>> {
180        let mut groups: HashMap<NaiveDateTime, Vec<f64>> = HashMap::new();
181
182        for point in data {
183            let key = self.get_interval_key(point.timestamp, interval);
184            groups.entry(key).or_insert_with(Vec::new).push(point.value);
185        }
186
187        Ok(groups)
188    }
189
190    /// Get interval key for timestamp
191    fn get_interval_key(
192        &self,
193        timestamp: NaiveDateTime,
194        interval: &ResampleInterval,
195    ) -> NaiveDateTime {
196        match interval {
197            ResampleInterval::Daily => timestamp.date().and_hms_opt(0, 0, 0).unwrap(),
198            ResampleInterval::Weekly => {
199                let week_start = timestamp.date()
200                    - Duration::days(timestamp.weekday().num_days_from_sunday() as i64);
201                week_start.and_hms_opt(0, 0, 0).unwrap()
202            }
203            ResampleInterval::Monthly => {
204                NaiveDate::from_ymd_opt(timestamp.year(), timestamp.month(), 1)
205                    .unwrap()
206                    .and_hms_opt(0, 0, 0)
207                    .unwrap()
208            }
209            ResampleInterval::Quarterly => {
210                let quarter = ((timestamp.month() - 1) / 3) + 1;
211                let month = (quarter - 1) * 3 + 1;
212                NaiveDate::from_ymd_opt(timestamp.year(), month, 1)
213                    .unwrap()
214                    .and_hms_opt(0, 0, 0)
215                    .unwrap()
216            }
217            ResampleInterval::Yearly => NaiveDate::from_ymd_opt(timestamp.year(), 1, 1)
218                .unwrap()
219                .and_hms_opt(0, 0, 0)
220                .unwrap(),
221            ResampleInterval::Hourly => timestamp
222                .date()
223                .and_hms_opt(timestamp.hour(), 0, 0)
224                .unwrap(),
225            ResampleInterval::Minute => timestamp
226                .date()
227                .and_hms_opt(timestamp.hour(), timestamp.minute(), 0)
228                .unwrap(),
229            ResampleInterval::Custom(duration) => {
230                let epoch = NaiveDateTime::new(
231                    NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
232                    chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap(),
233                );
234                let duration_since_epoch = timestamp.signed_duration_since(epoch);
235                let rounded_duration = (duration_since_epoch.num_seconds() as i64
236                    / duration.num_seconds())
237                    * duration.num_seconds();
238                epoch + Duration::seconds(rounded_duration)
239            }
240        }
241    }
242
243    /// Aggregate values using specified function
244    fn aggregate_values(&self, values: &[f64], agg: &TimeSeriesAgg) -> Result<f64> {
245        if values.is_empty() {
246            return Err(anyhow::anyhow!("Cannot aggregate empty values"));
247        }
248
249        match agg {
250            TimeSeriesAgg::Sum => Ok(values.iter().sum()),
251            TimeSeriesAgg::Mean => Ok(values.iter().sum::<f64>() / values.len() as f64),
252            TimeSeriesAgg::Median => {
253                let mut sorted = values.to_vec();
254                sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
255                let mid = sorted.len() / 2;
256                if sorted.len() % 2 == 0 {
257                    Ok((sorted[mid - 1] + sorted[mid]) / 2.0)
258                } else {
259                    Ok(sorted[mid])
260                }
261            }
262            TimeSeriesAgg::Min => Ok(values.iter().fold(f64::INFINITY, |a, &b| a.min(b))),
263            TimeSeriesAgg::Max => Ok(values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b))),
264            TimeSeriesAgg::First => Ok(values[0]),
265            TimeSeriesAgg::Last => Ok(values[values.len() - 1]),
266            TimeSeriesAgg::Count => Ok(values.len() as f64),
267        }
268    }
269
270    /// Calculate rolling window statistics
271    pub fn rolling_mean(
272        &self,
273        data: &[TimeSeriesPoint],
274        window: &RollingWindow,
275    ) -> Result<Vec<TimeSeriesPoint>> {
276        if data.len() < window.min_periods {
277            return Ok(Vec::new());
278        }
279
280        let mut result = Vec::new();
281        let _window_size_secs = window.window_size.num_seconds() as usize;
282
283        for i in 0..data.len() {
284            let current_time = data[i].timestamp;
285            let window_start = current_time - Duration::seconds(window.window_size.num_seconds());
286
287            // Find all points within the window
288            let window_values: Vec<f64> = data
289                .iter()
290                .filter(|p| p.timestamp >= window_start && p.timestamp <= current_time)
291                .map(|p| p.value)
292                .collect();
293
294            if window_values.len() >= window.min_periods {
295                let mean = window_values.iter().sum::<f64>() / window_values.len() as f64;
296                result.push(TimeSeriesPoint {
297                    timestamp: current_time,
298                    value: mean,
299                });
300            }
301        }
302
303        Ok(result)
304    }
305
306    /// Detect trend in time series
307    pub fn detect_trend(&self, data: &[TimeSeriesPoint]) -> TrendDirection {
308        if data.len() < 2 {
309            return TrendDirection::Unknown;
310        }
311
312        // Simple linear regression to detect trend
313        let n = data.len() as f64;
314        let x_sum: f64 = (0..data.len()).map(|i| i as f64).sum();
315        let y_sum: f64 = data.iter().map(|p| p.value).sum();
316        let xy_sum: f64 = data
317            .iter()
318            .enumerate()
319            .map(|(i, p)| i as f64 * p.value)
320            .sum();
321        let x2_sum: f64 = (0..data.len()).map(|i| (i as f64).powi(2)).sum();
322
323        let slope = (n * xy_sum - x_sum * y_sum) / (n * x2_sum - x_sum.powi(2));
324
325        if slope.abs() < 0.001 {
326            TrendDirection::Stationary
327        } else if slope > 0.0 {
328            TrendDirection::Increasing
329        } else {
330            TrendDirection::Decreasing
331        }
332    }
333
334    /// Calculate basic statistics
335    pub fn calculate_stats(&self, data: &[TimeSeriesPoint]) -> Result<TimeSeriesStats> {
336        if data.is_empty() {
337            return Err(anyhow::anyhow!("Empty time series"));
338        }
339
340        let start_date = data[0].timestamp;
341        let end_date = data[data.len() - 1].timestamp;
342        let total_points = data.len();
343
344        // Check for missing points (simplified - assumes daily data)
345        let expected_points = (end_date - start_date).num_days() + 1;
346        let missing_points = (expected_points as usize).saturating_sub(total_points);
347
348        let trend_direction = self.detect_trend(data);
349        let seasonality_detected = self.detect_seasonality(data);
350        let autocorrelation = self.calculate_autocorrelation(data, 1);
351
352        Ok(TimeSeriesStats {
353            start_date,
354            end_date,
355            total_points,
356            missing_points,
357            trend_direction,
358            seasonality_detected,
359            autocorrelation,
360        })
361    }
362
363    /// Simple seasonality detection
364    fn detect_seasonality(&self, data: &[TimeSeriesPoint]) -> bool {
365        if data.len() < 12 {
366            return false;
367        }
368
369        // Simple approach: check if there's a pattern in monthly averages
370        let mut monthly_data: HashMap<u32, Vec<f64>> = HashMap::new();
371
372        for point in data {
373            let month = point.timestamp.month();
374            monthly_data
375                .entry(month)
376                .or_insert_with(Vec::new)
377                .push(point.value);
378        }
379
380        // Calculate variance of monthly means
381        let monthly_means: Vec<f64> = monthly_data
382            .values()
383            .map(|values| values.iter().sum::<f64>() / values.len() as f64)
384            .collect();
385
386        if monthly_means.len() < 2 {
387            return false;
388        }
389
390        let mean = monthly_means.iter().sum::<f64>() / monthly_means.len() as f64;
391        let variance = monthly_means
392            .iter()
393            .map(|m| (m - mean).powi(2))
394            .sum::<f64>()
395            / monthly_means.len() as f64;
396
397        // If variance is significant relative to mean, assume seasonality
398        variance > mean * 0.1
399    }
400
401    /// Calculate autocorrelation at given lag
402    fn calculate_autocorrelation(&self, data: &[TimeSeriesPoint], lag: usize) -> f64 {
403        if data.len() <= lag {
404            return 0.0;
405        }
406
407        let values: Vec<f64> = data.iter().map(|p| p.value).collect();
408        let n = values.len() - lag;
409
410        let mean = values.iter().sum::<f64>() / values.len() as f64;
411
412        let mut numerator = 0.0;
413        let mut denominator = 0.0;
414
415        for i in 0..n {
416            numerator += (values[i] - mean) * (values[i + lag] - mean);
417        }
418
419        for i in 0..values.len() {
420            denominator += (values[i] - mean).powi(2);
421        }
422
423        if denominator == 0.0 {
424            0.0
425        } else {
426            numerator / denominator
427        }
428    }
429
430    /// Convert time series back to CSV format
431    pub fn timeseries_to_csv(&self, data: &[TimeSeriesPoint]) -> Vec<Vec<String>> {
432        let mut result = vec![vec!["timestamp".to_string(), "value".to_string()]];
433
434        for point in data {
435            result.push(vec![
436                point.timestamp.format("%Y-%m-%d %H:%M:%S").to_string(),
437                point.value.to_string(),
438            ]);
439        }
440
441        result
442    }
443}
444
445impl Default for TimeSeriesProcessor {
446    fn default() -> Self {
447        Self::new("%Y-%m-%d")
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use super::*;
454    use chrono::NaiveDate;
455
456    #[test]
457    fn test_parse_date() {
458        let processor = TimeSeriesProcessor::new("%Y-%m-%d");
459
460        assert!(processor.parse_date("2023-01-01").is_ok());
461        assert!(processor.parse_date("2023-01-01 12:00:00").is_ok());
462    }
463
464    #[test]
465    fn test_detect_trend() {
466        let processor = TimeSeriesProcessor::new("%Y-%m-%d");
467
468        // Increasing trend
469        let increasing_data: Vec<TimeSeriesPoint> = (0..10)
470            .map(|i| TimeSeriesPoint {
471                timestamp: NaiveDate::from_ymd_opt(2023, 1, 1)
472                    .unwrap()
473                    .and_hms_opt(0, 0, 0)
474                    .unwrap()
475                    + Duration::days(i),
476                value: i as f64,
477            })
478            .collect();
479
480        assert!(matches!(
481            processor.detect_trend(&increasing_data),
482            TrendDirection::Increasing
483        ));
484
485        // Decreasing trend
486        let decreasing_data: Vec<TimeSeriesPoint> = (0..10)
487            .map(|i| TimeSeriesPoint {
488                timestamp: NaiveDate::from_ymd_opt(2023, 1, 1)
489                    .unwrap()
490                    .and_hms_opt(0, 0, 0)
491                    .unwrap()
492                    + Duration::days(i),
493                value: (10 - i) as f64,
494            })
495            .collect();
496
497        assert!(matches!(
498            processor.detect_trend(&decreasing_data),
499            TrendDirection::Decreasing
500        ));
501    }
502
503    #[test]
504    fn test_resample() {
505        let processor = TimeSeriesProcessor::new("%Y-%m-%d");
506
507        let data: Vec<TimeSeriesPoint> = (0..30)
508            .map(|i| TimeSeriesPoint {
509                timestamp: NaiveDate::from_ymd_opt(2023, 1, 1)
510                    .unwrap()
511                    .and_hms_opt(0, 0, 0)
512                    .unwrap()
513                    + Duration::days(i),
514                value: (i % 7) as f64,
515            })
516            .collect();
517
518        let resampled = processor
519            .resample(&data, &ResampleInterval::Weekly, &TimeSeriesAgg::Mean)
520            .unwrap();
521
522        assert!(!resampled.is_empty());
523        assert!(resampled.len() < data.len());
524    }
525}