use crate::core::error::{Error, Result};
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeSeriesData {
values: Vec<f64>,
}
impl TimeSeriesData {
pub fn from_vec(values: Vec<f64>) -> Self {
Self { values }
}
pub fn len(&self) -> usize {
self.values.len()
}
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
pub fn get_f64(&self, index: usize) -> Option<f64> {
self.values.get(index).copied()
}
pub fn slice(&self, start: usize, end: usize) -> Result<Self> {
if start >= self.values.len() || end > self.values.len() || start >= end {
return Err(Error::InvalidInput("Invalid slice bounds".to_string()));
}
Ok(Self::from_vec(self.values[start..end].to_vec()))
}
pub fn iter(&self) -> impl Iterator<Item = f64> + '_ {
self.values.iter().copied()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum Frequency {
Second,
Minute,
Hour,
Daily,
Weekly,
Monthly,
Quarterly,
Yearly,
Custom(Duration),
}
impl Frequency {
pub fn to_duration(&self) -> Duration {
match self {
Frequency::Second => Duration::seconds(1),
Frequency::Minute => Duration::minutes(1),
Frequency::Hour => Duration::hours(1),
Frequency::Daily => Duration::days(1),
Frequency::Weekly => Duration::weeks(1),
Frequency::Monthly => Duration::days(30), Frequency::Quarterly => Duration::days(90), Frequency::Yearly => Duration::days(365), Frequency::Custom(duration) => *duration,
}
}
pub fn name(&self) -> &'static str {
match self {
Frequency::Second => "S",
Frequency::Minute => "T",
Frequency::Hour => "H",
Frequency::Daily => "D",
Frequency::Weekly => "W",
Frequency::Monthly => "M",
Frequency::Quarterly => "Q",
Frequency::Yearly => "Y",
Frequency::Custom(_) => "C",
}
}
}
impl fmt::Display for Frequency {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.name())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DateTimeIndex {
pub values: Vec<DateTime<Utc>>,
pub frequency: Option<Frequency>,
pub name: Option<String>,
}
impl DateTimeIndex {
pub fn new(values: Vec<DateTime<Utc>>) -> Self {
let frequency = Self::infer_frequency(&values);
Self {
values,
frequency,
name: None,
}
}
pub fn with_frequency(values: Vec<DateTime<Utc>>, frequency: Frequency) -> Self {
Self {
values,
frequency: Some(frequency),
name: None,
}
}
pub fn date_range(
start: DateTime<Utc>,
end: DateTime<Utc>,
frequency: Frequency,
) -> Result<Self> {
let mut dates = Vec::new();
let mut current = start;
let duration = frequency.to_duration();
while current <= end {
dates.push(current);
current = current + duration;
}
if dates.is_empty() {
return Err(Error::InvalidInput("Invalid date range".to_string()));
}
Ok(Self {
values: dates,
frequency: Some(frequency),
name: None,
})
}
fn infer_frequency(values: &[DateTime<Utc>]) -> Option<Frequency> {
if values.len() < 2 {
return None;
}
let diff = values[1] - values[0];
for i in 2..values.len() {
if (values[i] - values[i - 1]) != diff {
return None; }
}
match diff.num_seconds() {
1 => Some(Frequency::Second),
60 => Some(Frequency::Minute),
3600 => Some(Frequency::Hour),
86400 => Some(Frequency::Daily),
604800 => Some(Frequency::Weekly),
_ => Some(Frequency::Custom(diff)),
}
}
pub fn len(&self) -> usize {
self.values.len()
}
pub fn is_empty(&self) -> bool {
self.values.is_empty()
}
pub fn get(&self, index: usize) -> Option<&DateTime<Utc>> {
self.values.get(index)
}
pub fn is_regular(&self) -> bool {
self.frequency.is_some()
}
pub fn start(&self) -> Option<&DateTime<Utc>> {
self.values.first()
}
pub fn end(&self) -> Option<&DateTime<Utc>> {
self.values.last()
}
pub fn slice(&self, start: usize, end: usize) -> Result<Self> {
if start >= self.values.len() || end > self.values.len() || start >= end {
return Err(Error::InvalidInput("Invalid slice bounds".to_string()));
}
Ok(Self {
values: self.values[start..end].to_vec(),
frequency: self.frequency.clone(),
name: self.name.clone(),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimePoint<T> {
pub timestamp: DateTime<Utc>,
pub value: T,
pub metadata: HashMap<String, String>,
}
impl<T> TimePoint<T> {
pub fn new(timestamp: DateTime<Utc>, value: T) -> Self {
Self {
timestamp,
value,
metadata: HashMap::new(),
}
}
pub fn with_metadata(
timestamp: DateTime<Utc>,
value: T,
metadata: HashMap<String, String>,
) -> Self {
Self {
timestamp,
value,
metadata,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeSeries {
pub index: DateTimeIndex,
pub values: TimeSeriesData,
pub name: Option<String>,
pub metadata: HashMap<String, String>,
}
impl TimeSeries {
pub fn new(index: DateTimeIndex, values: TimeSeriesData) -> Result<Self> {
if index.len() != values.len() {
return Err(Error::DimensionMismatch(
"Index and values must have the same length".to_string(),
));
}
Ok(Self {
index,
values,
name: None,
metadata: HashMap::new(),
})
}
pub fn from_vecs(timestamps: Vec<DateTime<Utc>>, values: Vec<f64>) -> Result<Self> {
let index = DateTimeIndex::new(timestamps);
let data = TimeSeriesData::from_vec(values);
Self::new(index, data)
}
pub fn from_points(points: Vec<TimePoint<f64>>) -> Result<Self> {
let timestamps: Vec<DateTime<Utc>> = points.iter().map(|p| p.timestamp).collect();
let values: Vec<f64> = points.into_iter().map(|p| p.value).collect();
Self::from_vecs(timestamps, values)
}
pub fn len(&self) -> usize {
self.index.len()
}
pub fn is_empty(&self) -> bool {
self.index.is_empty()
}
pub fn get(&self, index: usize) -> Option<(&DateTime<Utc>, f64)> {
match (self.index.get(index), self.values.get_f64(index)) {
(Some(ts), Some(val)) => Some((ts, val)),
_ => None,
}
}
pub fn at(&self, timestamp: &DateTime<Utc>) -> Option<f64> {
self.index
.values
.iter()
.position(|ts| ts == timestamp)
.and_then(|idx| self.values.get_f64(idx))
}
pub fn between(&self, start: &DateTime<Utc>, end: &DateTime<Utc>) -> Result<TimeSeries> {
let start_idx = self
.index
.values
.iter()
.position(|ts| ts >= start)
.unwrap_or(0);
let end_idx = self
.index
.values
.iter()
.rposition(|ts| ts <= end)
.map(|idx| idx + 1)
.unwrap_or(self.len());
if start_idx >= end_idx {
return Err(Error::InvalidInput("Invalid time range".to_string()));
}
let index = self.index.slice(start_idx, end_idx)?;
let values = self.values.slice(start_idx, end_idx)?;
Ok(TimeSeries {
index,
values,
name: self.name.clone(),
metadata: self.metadata.clone(),
})
}
pub fn slice(&self, start: usize, end: usize) -> Result<TimeSeries> {
let index = self.index.slice(start, end)?;
let values = self.values.slice(start, end)?;
Ok(TimeSeries {
index,
values,
name: self.name.clone(),
metadata: self.metadata.clone(),
})
}
pub fn resample(&self, frequency: Frequency, method: ResampleMethod) -> Result<TimeSeries> {
if self.is_empty() {
return Err(Error::InvalidInput(
"Cannot resample empty time series".to_string(),
));
}
let start = *self.index.start().ok_or_else(|| {
Error::InvalidInput("Time series index has no start date".to_string())
})?;
let end = *self
.index
.end()
.ok_or_else(|| Error::InvalidInput("Time series index has no end date".to_string()))?;
let new_index = DateTimeIndex::date_range(start, end, frequency)?;
let mut new_values = Vec::new();
for new_timestamp in &new_index.values {
let value = match method {
ResampleMethod::Mean => self.interpolate_mean(new_timestamp),
ResampleMethod::Linear => self.interpolate_linear(new_timestamp),
ResampleMethod::Nearest => self.interpolate_nearest(new_timestamp),
ResampleMethod::Forward => self.forward_fill(new_timestamp),
ResampleMethod::Backward => self.backward_fill(new_timestamp),
};
new_values.push(value.unwrap_or(f64::NAN));
}
let new_series = TimeSeriesData::from_vec(new_values);
TimeSeries::new(new_index, new_series)
}
fn interpolate_linear(&self, timestamp: &DateTime<Utc>) -> Option<f64> {
let mut before_idx = None;
let mut after_idx = None;
for (i, ts) in self.index.values.iter().enumerate() {
if ts <= timestamp {
before_idx = Some(i);
} else {
after_idx = Some(i);
break;
}
}
match (before_idx, after_idx) {
(Some(before), Some(after)) => {
let ts_before = &self.index.values[before];
let ts_after = &self.index.values[after];
let val_before = self.values.get_f64(before)?;
let val_after = self.values.get_f64(after)?;
let total_duration = ts_after.signed_duration_since(*ts_before);
let elapsed_duration = timestamp.signed_duration_since(*ts_before);
if total_duration.num_seconds() == 0 {
return Some(val_before);
}
let ratio =
elapsed_duration.num_seconds() as f64 / total_duration.num_seconds() as f64;
Some(val_before + (val_after - val_before) * ratio)
}
(Some(idx), None) => self.values.get_f64(idx), (None, Some(idx)) => self.values.get_f64(idx), _ => None,
}
}
fn interpolate_mean(&self, timestamp: &DateTime<Utc>) -> Option<f64> {
let window = Duration::minutes(30); let start = *timestamp - window;
let end = *timestamp + window;
let values: Vec<f64> = self
.index
.values
.iter()
.zip(self.values.iter())
.filter(|(ts, _)| **ts >= start && **ts <= end)
.map(|(_, val)| val)
.collect();
if values.is_empty() {
None
} else {
Some(values.iter().sum::<f64>() / values.len() as f64)
}
}
fn interpolate_nearest(&self, timestamp: &DateTime<Utc>) -> Option<f64> {
let mut closest_idx = 0;
let mut min_diff = Duration::MAX;
for (i, ts) in self.index.values.iter().enumerate() {
let diff = (*timestamp - *ts).abs();
if diff < min_diff {
min_diff = diff;
closest_idx = i;
}
}
self.values.get_f64(closest_idx)
}
fn forward_fill(&self, timestamp: &DateTime<Utc>) -> Option<f64> {
for (i, ts) in self.index.values.iter().enumerate().rev() {
if ts <= timestamp {
return self.values.get_f64(i);
}
}
None
}
fn backward_fill(&self, timestamp: &DateTime<Utc>) -> Option<f64> {
for (i, ts) in self.index.values.iter().enumerate() {
if ts >= timestamp {
return self.values.get_f64(i);
}
}
None
}
pub fn rolling_mean(&self, window: usize) -> Result<TimeSeries> {
if window == 0 || window > self.len() {
return Err(Error::InvalidInput("Invalid window size".to_string()));
}
let mut rolling_values = Vec::new();
for i in 0..self.len() {
if i + 1 < window {
rolling_values.push(f64::NAN);
} else {
let start_idx = i + 1 - window;
let window_sum: f64 = (start_idx..=i)
.filter_map(|idx| self.values.get_f64(idx))
.sum();
rolling_values.push(window_sum / window as f64);
}
}
let new_series = TimeSeriesData::from_vec(rolling_values);
TimeSeries::new(self.index.clone(), new_series)
}
pub fn rolling_std(&self, window: usize) -> Result<TimeSeries> {
if window == 0 || window > self.len() {
return Err(Error::InvalidInput("Invalid window size".to_string()));
}
let mut rolling_values = Vec::new();
for i in 0..self.len() {
if i + 1 < window {
rolling_values.push(f64::NAN);
} else {
let start_idx = i + 1 - window;
let window_values: Vec<f64> = (start_idx..=i)
.filter_map(|idx| self.values.get_f64(idx))
.collect();
if window_values.len() == window {
let mean = window_values.iter().sum::<f64>() / window as f64;
let variance = window_values
.iter()
.map(|&x| (x - mean).powi(2))
.sum::<f64>()
/ window as f64;
rolling_values.push(variance.sqrt());
} else {
rolling_values.push(f64::NAN);
}
}
}
let new_series = TimeSeriesData::from_vec(rolling_values);
TimeSeries::new(self.index.clone(), new_series)
}
pub fn diff(&self, periods: usize) -> Result<TimeSeries> {
if periods >= self.len() {
return Err(Error::InvalidInput(
"Periods must be less than series length".to_string(),
));
}
let mut diff_values = vec![f64::NAN; periods];
for i in periods..self.len() {
if let (Some(current), Some(prev)) =
(self.values.get_f64(i), self.values.get_f64(i - periods))
{
diff_values.push(current - prev);
} else {
diff_values.push(f64::NAN);
}
}
let new_series = TimeSeriesData::from_vec(diff_values);
TimeSeries::new(self.index.clone(), new_series)
}
pub fn pct_change(&self, periods: usize) -> Result<TimeSeries> {
if periods >= self.len() {
return Err(Error::InvalidInput(
"Periods must be less than series length".to_string(),
));
}
let mut pct_values = vec![f64::NAN; periods];
for i in periods..self.len() {
if let (Some(current), Some(prev)) =
(self.values.get_f64(i), self.values.get_f64(i - periods))
{
if prev != 0.0 {
pct_values.push((current - prev) / prev);
} else {
pct_values.push(f64::NAN);
}
} else {
pct_values.push(f64::NAN);
}
}
let new_series = TimeSeriesData::from_vec(pct_values);
TimeSeries::new(self.index.clone(), new_series)
}
pub fn shift(&self, periods: i32) -> Result<TimeSeries> {
let mut shifted_values = Vec::with_capacity(self.len());
if periods > 0 {
for _ in 0..periods as usize {
shifted_values.push(f64::NAN);
}
for i in 0..(self.len() - periods as usize) {
shifted_values.push(self.values.get_f64(i).unwrap_or(f64::NAN));
}
} else if periods < 0 {
let abs_periods = (-periods) as usize;
for i in abs_periods..self.len() {
shifted_values.push(self.values.get_f64(i).unwrap_or(f64::NAN));
}
for _ in 0..abs_periods {
shifted_values.push(f64::NAN);
}
} else {
for i in 0..self.len() {
shifted_values.push(self.values.get_f64(i).unwrap_or(f64::NAN));
}
}
let new_series = TimeSeriesData::from_vec(shifted_values);
TimeSeries::new(self.index.clone(), new_series)
}
pub fn fillna_forward(&self) -> Result<TimeSeries> {
let mut filled_values = Vec::new();
let mut last_valid = None;
for i in 0..self.len() {
if let Some(val) = self.values.get_f64(i) {
if val.is_finite() {
last_valid = Some(val);
filled_values.push(val);
} else if let Some(last) = last_valid {
filled_values.push(last);
} else {
filled_values.push(val);
}
} else if let Some(last) = last_valid {
filled_values.push(last);
} else {
filled_values.push(f64::NAN);
}
}
let new_series = TimeSeriesData::from_vec(filled_values);
TimeSeries::new(self.index.clone(), new_series)
}
pub fn fillna_backward(&self) -> Result<TimeSeries> {
let mut filled_values = vec![f64::NAN; self.len()];
let mut next_valid = None;
for i in (0..self.len()).rev() {
if let Some(val) = self.values.get_f64(i) {
if val.is_finite() {
next_valid = Some(val);
filled_values[i] = val;
} else if let Some(next) = next_valid {
filled_values[i] = next;
} else {
filled_values[i] = val;
}
} else if let Some(next) = next_valid {
filled_values[i] = next;
}
}
let new_series = TimeSeriesData::from_vec(filled_values);
TimeSeries::new(self.index.clone(), new_series)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ResampleMethod {
Mean,
Linear,
Nearest,
Forward,
Backward,
}
pub struct TimeSeriesBuilder {
timestamps: Vec<DateTime<Utc>>,
values: Vec<f64>,
name: Option<String>,
frequency: Option<Frequency>,
metadata: HashMap<String, String>,
}
impl TimeSeriesBuilder {
pub fn new() -> Self {
Self {
timestamps: Vec::new(),
values: Vec::new(),
name: None,
frequency: None,
metadata: HashMap::new(),
}
}
pub fn add_point(mut self, timestamp: DateTime<Utc>, value: f64) -> Self {
self.timestamps.push(timestamp);
self.values.push(value);
self
}
pub fn name(mut self, name: String) -> Self {
self.name = Some(name);
self
}
pub fn frequency(mut self, frequency: Frequency) -> Self {
self.frequency = Some(frequency);
self
}
pub fn metadata(mut self, key: String, value: String) -> Self {
self.metadata.insert(key, value);
self
}
pub fn build(self) -> Result<TimeSeries> {
if self.timestamps.len() != self.values.len() {
return Err(Error::DimensionMismatch(
"Timestamps and values must have the same length".to_string(),
));
}
let index = if let Some(freq) = self.frequency {
DateTimeIndex::with_frequency(self.timestamps, freq)
} else {
DateTimeIndex::new(self.timestamps)
};
let series = TimeSeriesData::from_vec(self.values);
let mut ts = TimeSeries::new(index, series)?;
ts.name = self.name;
ts.metadata = self.metadata;
Ok(ts)
}
}
impl Default for TimeSeriesBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
fn create_test_series() -> TimeSeries {
let timestamps = vec![
Utc.timestamp_opt(1640995200, 0)
.single()
.expect("operation should succeed"), Utc.timestamp_opt(1641081600, 0)
.single()
.expect("operation should succeed"), Utc.timestamp_opt(1641168000, 0)
.single()
.expect("operation should succeed"), Utc.timestamp_opt(1641254400, 0)
.single()
.expect("operation should succeed"), Utc.timestamp_opt(1641340800, 0)
.single()
.expect("operation should succeed"), ];
let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
TimeSeries::from_vecs(timestamps, values).expect("operation should succeed")
}
#[test]
fn test_time_series_creation() {
let ts = create_test_series();
assert_eq!(ts.len(), 5);
assert!(!ts.is_empty());
assert_eq!(ts.get(0).expect("operation should succeed").1, 1.0);
}
#[test]
fn test_frequency_inference() {
let timestamps = vec![
Utc.timestamp_opt(1640995200, 0)
.single()
.expect("operation should succeed"),
Utc.timestamp_opt(1641081600, 0)
.single()
.expect("operation should succeed"),
Utc.timestamp_opt(1641168000, 0)
.single()
.expect("operation should succeed"),
];
let index = DateTimeIndex::new(timestamps);
assert_eq!(index.frequency, Some(Frequency::Daily));
}
#[test]
fn test_rolling_mean() {
let ts = create_test_series();
let rolling = ts.rolling_mean(2).expect("operation should succeed");
assert!(rolling
.values
.get_f64(0)
.expect("operation should succeed")
.is_nan());
assert_eq!(
rolling.values.get_f64(1).expect("operation should succeed"),
1.5
); assert_eq!(
rolling.values.get_f64(2).expect("operation should succeed"),
2.5
); }
#[test]
fn test_diff() {
let ts = create_test_series();
let diff = ts.diff(1).expect("operation should succeed");
assert!(diff
.values
.get_f64(0)
.expect("operation should succeed")
.is_nan());
assert_eq!(
diff.values.get_f64(1).expect("operation should succeed"),
1.0
); assert_eq!(
diff.values.get_f64(2).expect("operation should succeed"),
1.0
); }
#[test]
fn test_time_series_builder() {
let ts = TimeSeriesBuilder::new()
.add_point(
Utc.timestamp_opt(1640995200, 0)
.single()
.expect("operation should succeed"),
1.0,
)
.add_point(
Utc.timestamp_opt(1641081600, 0)
.single()
.expect("operation should succeed"),
2.0,
)
.name("test_series".to_string())
.frequency(Frequency::Daily)
.metadata("source".to_string(), "test".to_string())
.build()
.expect("operation should succeed");
assert_eq!(ts.len(), 2);
assert_eq!(ts.name, Some("test_series".to_string()));
assert_eq!(ts.index.frequency, Some(Frequency::Daily));
}
#[test]
fn test_slice() {
let ts = create_test_series();
let sliced = ts.slice(1, 4).expect("operation should succeed");
assert_eq!(sliced.len(), 3);
assert_eq!(
sliced.values.get_f64(0).expect("operation should succeed"),
2.0
);
assert_eq!(
sliced.values.get_f64(2).expect("operation should succeed"),
4.0
);
}
#[test]
fn test_shift() {
let ts = create_test_series();
let shifted = ts.shift(1).expect("operation should succeed");
assert!(shifted
.values
.get_f64(0)
.expect("operation should succeed")
.is_nan());
assert_eq!(
shifted.values.get_f64(1).expect("operation should succeed"),
1.0
);
assert_eq!(
shifted.values.get_f64(2).expect("operation should succeed"),
2.0
);
}
}