1use crate::common::string;
7use anyhow::Result;
8use chrono::{Datelike, Duration, NaiveDate, NaiveDateTime, Timelike};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub enum ResampleInterval {
15 Daily,
16 Weekly,
17 Monthly,
18 Quarterly,
19 Yearly,
20 Hourly,
21 Minute,
22 Custom(Duration),
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub enum TimeSeriesAgg {
28 Sum,
29 Mean,
30 Median,
31 Min,
32 Max,
33 First,
34 Last,
35 Count,
36}
37
38#[derive(Debug, Clone)]
40pub struct RollingWindow {
41 pub window_size: Duration,
42 pub min_periods: usize,
43 pub center: bool,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct TimeSeriesPoint {
49 pub timestamp: NaiveDateTime,
50 pub value: f64,
51}
52
53#[derive(Debug, Clone, Serialize, Deserialize)]
55pub struct TimeSeriesStats {
56 pub start_date: NaiveDateTime,
57 pub end_date: NaiveDateTime,
58 pub total_points: usize,
59 pub missing_points: usize,
60 pub trend_direction: TrendDirection,
61 pub seasonality_detected: bool,
62 pub autocorrelation: f64,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub enum TrendDirection {
68 Increasing,
69 Decreasing,
70 Stationary,
71 Unknown,
72}
73
74pub struct TimeSeriesProcessor {
76 date_format: String,
77}
78
79impl TimeSeriesProcessor {
80 pub fn new(date_format: &str) -> Self {
82 Self {
83 date_format: date_format.to_string(),
84 }
85 }
86
87 pub fn parse_date(&self, date_str: &str) -> Result<NaiveDateTime> {
89 if let Ok(date) = NaiveDate::parse_from_str(date_str, &self.date_format) {
90 Ok(date.and_hms_opt(0, 0, 0).unwrap())
91 } else if let Ok(datetime) = NaiveDateTime::parse_from_str(date_str, &self.date_format) {
92 Ok(datetime)
93 } else {
94 let common_formats = vec![
96 "%Y-%m-%d",
97 "%Y-%m-%d %H:%M:%S",
98 "%d/%m/%Y",
99 "%d/%m/%Y %H:%M:%S",
100 "%m/%d/%Y",
101 "%m/%d/%Y %H:%M:%S",
102 ];
103
104 for format in common_formats {
105 if let Ok(date) = NaiveDate::parse_from_str(date_str, format) {
106 return Ok(date.and_hms_opt(0, 0, 0).unwrap());
107 }
108 if let Ok(datetime) = NaiveDateTime::parse_from_str(date_str, format) {
109 return Ok(datetime);
110 }
111 }
112
113 anyhow::bail!("Unable to parse date: {}", date_str);
114 }
115 }
116
117 pub fn csv_to_timeseries(
119 &self,
120 data: &[Vec<String>],
121 date_col: usize,
122 value_col: usize,
123 ) -> Result<Vec<TimeSeriesPoint>> {
124 if data.is_empty() {
125 return Ok(Vec::new());
126 }
127
128 let mut points = Vec::new();
129
130 for (i, row) in data.iter().enumerate().skip(1) {
131 if let (Some(date_str), Some(value_str)) = (row.get(date_col), row.get(value_col)) {
133 let timestamp = self.parse_date(date_str)?;
134 let value = string::to_number(value_str).ok_or_else(|| {
135 anyhow::anyhow!("Invalid number at row {}: {}", i + 1, value_str)
136 })?;
137
138 points.push(TimeSeriesPoint { timestamp, value });
139 }
140 }
141
142 points.sort_by_key(|p| p.timestamp);
144
145 Ok(points)
146 }
147
148 pub fn resample(
150 &self,
151 data: &[TimeSeriesPoint],
152 interval: &ResampleInterval,
153 agg: &TimeSeriesAgg,
154 ) -> Result<Vec<TimeSeriesPoint>> {
155 if data.is_empty() {
156 return Ok(Vec::new());
157 }
158
159 let grouped = self.group_by_interval(data, interval)?;
160 let mut resampled = Vec::new();
161
162 for (timestamp, values) in grouped {
163 let aggregated_value = self.aggregate_values(&values, agg)?;
164 resampled.push(TimeSeriesPoint {
165 timestamp,
166 value: aggregated_value,
167 });
168 }
169
170 resampled.sort_by_key(|p| p.timestamp);
171 Ok(resampled)
172 }
173
174 fn group_by_interval(
176 &self,
177 data: &[TimeSeriesPoint],
178 interval: &ResampleInterval,
179 ) -> Result<HashMap<NaiveDateTime, Vec<f64>>> {
180 let mut groups: HashMap<NaiveDateTime, Vec<f64>> = HashMap::new();
181
182 for point in data {
183 let key = self.get_interval_key(point.timestamp, interval);
184 groups.entry(key).or_insert_with(Vec::new).push(point.value);
185 }
186
187 Ok(groups)
188 }
189
190 fn get_interval_key(
192 &self,
193 timestamp: NaiveDateTime,
194 interval: &ResampleInterval,
195 ) -> NaiveDateTime {
196 match interval {
197 ResampleInterval::Daily => timestamp.date().and_hms_opt(0, 0, 0).unwrap(),
198 ResampleInterval::Weekly => {
199 let week_start = timestamp.date()
200 - Duration::days(timestamp.weekday().num_days_from_sunday() as i64);
201 week_start.and_hms_opt(0, 0, 0).unwrap()
202 }
203 ResampleInterval::Monthly => {
204 NaiveDate::from_ymd_opt(timestamp.year(), timestamp.month(), 1)
205 .unwrap()
206 .and_hms_opt(0, 0, 0)
207 .unwrap()
208 }
209 ResampleInterval::Quarterly => {
210 let quarter = ((timestamp.month() - 1) / 3) + 1;
211 let month = (quarter - 1) * 3 + 1;
212 NaiveDate::from_ymd_opt(timestamp.year(), month, 1)
213 .unwrap()
214 .and_hms_opt(0, 0, 0)
215 .unwrap()
216 }
217 ResampleInterval::Yearly => NaiveDate::from_ymd_opt(timestamp.year(), 1, 1)
218 .unwrap()
219 .and_hms_opt(0, 0, 0)
220 .unwrap(),
221 ResampleInterval::Hourly => timestamp
222 .date()
223 .and_hms_opt(timestamp.hour(), 0, 0)
224 .unwrap(),
225 ResampleInterval::Minute => timestamp
226 .date()
227 .and_hms_opt(timestamp.hour(), timestamp.minute(), 0)
228 .unwrap(),
229 ResampleInterval::Custom(duration) => {
230 let epoch = NaiveDateTime::new(
231 NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(),
232 chrono::NaiveTime::from_hms_opt(0, 0, 0).unwrap(),
233 );
234 let duration_since_epoch = timestamp.signed_duration_since(epoch);
235 let rounded_duration = (duration_since_epoch.num_seconds() as i64
236 / duration.num_seconds())
237 * duration.num_seconds();
238 epoch + Duration::seconds(rounded_duration)
239 }
240 }
241 }
242
243 fn aggregate_values(&self, values: &[f64], agg: &TimeSeriesAgg) -> Result<f64> {
245 if values.is_empty() {
246 return Err(anyhow::anyhow!("Cannot aggregate empty values"));
247 }
248
249 match agg {
250 TimeSeriesAgg::Sum => Ok(values.iter().sum()),
251 TimeSeriesAgg::Mean => Ok(values.iter().sum::<f64>() / values.len() as f64),
252 TimeSeriesAgg::Median => {
253 let mut sorted = values.to_vec();
254 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
255 let mid = sorted.len() / 2;
256 if sorted.len() % 2 == 0 {
257 Ok((sorted[mid - 1] + sorted[mid]) / 2.0)
258 } else {
259 Ok(sorted[mid])
260 }
261 }
262 TimeSeriesAgg::Min => Ok(values.iter().fold(f64::INFINITY, |a, &b| a.min(b))),
263 TimeSeriesAgg::Max => Ok(values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b))),
264 TimeSeriesAgg::First => Ok(values[0]),
265 TimeSeriesAgg::Last => Ok(values[values.len() - 1]),
266 TimeSeriesAgg::Count => Ok(values.len() as f64),
267 }
268 }
269
270 pub fn rolling_mean(
272 &self,
273 data: &[TimeSeriesPoint],
274 window: &RollingWindow,
275 ) -> Result<Vec<TimeSeriesPoint>> {
276 if data.len() < window.min_periods {
277 return Ok(Vec::new());
278 }
279
280 let mut result = Vec::new();
281 let _window_size_secs = window.window_size.num_seconds() as usize;
282
283 for i in 0..data.len() {
284 let current_time = data[i].timestamp;
285 let window_start = current_time - Duration::seconds(window.window_size.num_seconds());
286
287 let window_values: Vec<f64> = data
289 .iter()
290 .filter(|p| p.timestamp >= window_start && p.timestamp <= current_time)
291 .map(|p| p.value)
292 .collect();
293
294 if window_values.len() >= window.min_periods {
295 let mean = window_values.iter().sum::<f64>() / window_values.len() as f64;
296 result.push(TimeSeriesPoint {
297 timestamp: current_time,
298 value: mean,
299 });
300 }
301 }
302
303 Ok(result)
304 }
305
306 pub fn detect_trend(&self, data: &[TimeSeriesPoint]) -> TrendDirection {
308 if data.len() < 2 {
309 return TrendDirection::Unknown;
310 }
311
312 let n = data.len() as f64;
314 let x_sum: f64 = (0..data.len()).map(|i| i as f64).sum();
315 let y_sum: f64 = data.iter().map(|p| p.value).sum();
316 let xy_sum: f64 = data
317 .iter()
318 .enumerate()
319 .map(|(i, p)| i as f64 * p.value)
320 .sum();
321 let x2_sum: f64 = (0..data.len()).map(|i| (i as f64).powi(2)).sum();
322
323 let slope = (n * xy_sum - x_sum * y_sum) / (n * x2_sum - x_sum.powi(2));
324
325 if slope.abs() < 0.001 {
326 TrendDirection::Stationary
327 } else if slope > 0.0 {
328 TrendDirection::Increasing
329 } else {
330 TrendDirection::Decreasing
331 }
332 }
333
334 pub fn calculate_stats(&self, data: &[TimeSeriesPoint]) -> Result<TimeSeriesStats> {
336 if data.is_empty() {
337 return Err(anyhow::anyhow!("Empty time series"));
338 }
339
340 let start_date = data[0].timestamp;
341 let end_date = data[data.len() - 1].timestamp;
342 let total_points = data.len();
343
344 let expected_points = (end_date - start_date).num_days() + 1;
346 let missing_points = (expected_points as usize).saturating_sub(total_points);
347
348 let trend_direction = self.detect_trend(data);
349 let seasonality_detected = self.detect_seasonality(data);
350 let autocorrelation = self.calculate_autocorrelation(data, 1);
351
352 Ok(TimeSeriesStats {
353 start_date,
354 end_date,
355 total_points,
356 missing_points,
357 trend_direction,
358 seasonality_detected,
359 autocorrelation,
360 })
361 }
362
363 fn detect_seasonality(&self, data: &[TimeSeriesPoint]) -> bool {
365 if data.len() < 12 {
366 return false;
367 }
368
369 let mut monthly_data: HashMap<u32, Vec<f64>> = HashMap::new();
371
372 for point in data {
373 let month = point.timestamp.month();
374 monthly_data
375 .entry(month)
376 .or_insert_with(Vec::new)
377 .push(point.value);
378 }
379
380 let monthly_means: Vec<f64> = monthly_data
382 .values()
383 .map(|values| values.iter().sum::<f64>() / values.len() as f64)
384 .collect();
385
386 if monthly_means.len() < 2 {
387 return false;
388 }
389
390 let mean = monthly_means.iter().sum::<f64>() / monthly_means.len() as f64;
391 let variance = monthly_means
392 .iter()
393 .map(|m| (m - mean).powi(2))
394 .sum::<f64>()
395 / monthly_means.len() as f64;
396
397 variance > mean * 0.1
399 }
400
401 fn calculate_autocorrelation(&self, data: &[TimeSeriesPoint], lag: usize) -> f64 {
403 if data.len() <= lag {
404 return 0.0;
405 }
406
407 let values: Vec<f64> = data.iter().map(|p| p.value).collect();
408 let n = values.len() - lag;
409
410 let mean = values.iter().sum::<f64>() / values.len() as f64;
411
412 let mut numerator = 0.0;
413 let mut denominator = 0.0;
414
415 for i in 0..n {
416 numerator += (values[i] - mean) * (values[i + lag] - mean);
417 }
418
419 for i in 0..values.len() {
420 denominator += (values[i] - mean).powi(2);
421 }
422
423 if denominator == 0.0 {
424 0.0
425 } else {
426 numerator / denominator
427 }
428 }
429
430 pub fn timeseries_to_csv(&self, data: &[TimeSeriesPoint]) -> Vec<Vec<String>> {
432 let mut result = vec![vec!["timestamp".to_string(), "value".to_string()]];
433
434 for point in data {
435 result.push(vec![
436 point.timestamp.format("%Y-%m-%d %H:%M:%S").to_string(),
437 point.value.to_string(),
438 ]);
439 }
440
441 result
442 }
443}
444
445impl Default for TimeSeriesProcessor {
446 fn default() -> Self {
447 Self::new("%Y-%m-%d")
448 }
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454 use chrono::NaiveDate;
455
456 #[test]
457 fn test_parse_date() {
458 let processor = TimeSeriesProcessor::new("%Y-%m-%d");
459
460 assert!(processor.parse_date("2023-01-01").is_ok());
461 assert!(processor.parse_date("2023-01-01 12:00:00").is_ok());
462 }
463
464 #[test]
465 fn test_detect_trend() {
466 let processor = TimeSeriesProcessor::new("%Y-%m-%d");
467
468 let increasing_data: Vec<TimeSeriesPoint> = (0..10)
470 .map(|i| TimeSeriesPoint {
471 timestamp: NaiveDate::from_ymd_opt(2023, 1, 1)
472 .unwrap()
473 .and_hms_opt(0, 0, 0)
474 .unwrap()
475 + Duration::days(i),
476 value: i as f64,
477 })
478 .collect();
479
480 assert!(matches!(
481 processor.detect_trend(&increasing_data),
482 TrendDirection::Increasing
483 ));
484
485 let decreasing_data: Vec<TimeSeriesPoint> = (0..10)
487 .map(|i| TimeSeriesPoint {
488 timestamp: NaiveDate::from_ymd_opt(2023, 1, 1)
489 .unwrap()
490 .and_hms_opt(0, 0, 0)
491 .unwrap()
492 + Duration::days(i),
493 value: (10 - i) as f64,
494 })
495 .collect();
496
497 assert!(matches!(
498 processor.detect_trend(&decreasing_data),
499 TrendDirection::Decreasing
500 ));
501 }
502
503 #[test]
504 fn test_resample() {
505 let processor = TimeSeriesProcessor::new("%Y-%m-%d");
506
507 let data: Vec<TimeSeriesPoint> = (0..30)
508 .map(|i| TimeSeriesPoint {
509 timestamp: NaiveDate::from_ymd_opt(2023, 1, 1)
510 .unwrap()
511 .and_hms_opt(0, 0, 0)
512 .unwrap()
513 + Duration::days(i),
514 value: (i % 7) as f64,
515 })
516 .collect();
517
518 let resampled = processor
519 .resample(&data, &ResampleInterval::Weekly, &TimeSeriesAgg::Mean)
520 .unwrap();
521
522 assert!(!resampled.is_empty());
523 assert!(resampled.len() < data.len());
524 }
525}