use crate::core::error::{Error, Result};
use crate::series::base::Series;
use std::fmt::Debug;
#[derive(Debug, Clone)]
pub struct Rolling<T>
where
T: Debug + Clone,
{
series: Series<T>,
window_size: usize,
min_periods: Option<usize>,
center: bool,
closed: WindowClosed,
}
#[derive(Debug, Clone)]
pub struct Expanding<T>
where
T: Debug + Clone,
{
series: Series<T>,
min_periods: usize,
}
#[derive(Debug, Clone)]
pub struct EWM<T>
where
T: Debug + Clone,
{
series: Series<T>,
alpha: Option<f64>,
span: Option<usize>,
halflife: Option<f64>,
adjust: bool,
ignore_na: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WindowClosed {
Both,
Left,
Right,
Neither,
}
impl Default for WindowClosed {
fn default() -> Self {
WindowClosed::Right
}
}
pub trait WindowOps<T>
where
T: Debug + Clone,
{
fn mean(&self) -> Result<Series<f64>>;
fn sum(&self) -> Result<Series<f64>>;
fn std(&self, ddof: usize) -> Result<Series<f64>>;
fn var(&self, ddof: usize) -> Result<Series<f64>>;
fn min(&self) -> Result<Series<f64>>;
fn max(&self) -> Result<Series<f64>>;
fn count(&self) -> Result<Series<usize>>;
fn median(&self) -> Result<Series<f64>>;
fn quantile(&self, q: f64) -> Result<Series<f64>>;
fn apply<F, R>(&self, func: F) -> Result<Series<R>>
where
F: Fn(&[f64]) -> R + Copy,
R: Debug + Clone;
}
impl<T> Rolling<T>
where
T: Debug + Clone,
{
pub fn new(series: Series<T>, window_size: usize) -> Result<Self> {
if window_size == 0 {
return Err(Error::InvalidValue(
"Window size must be greater than 0".to_string(),
));
}
Ok(Self {
series,
window_size,
min_periods: None,
center: false,
closed: WindowClosed::default(),
})
}
pub fn min_periods(mut self, min_periods: usize) -> Self {
self.min_periods = Some(min_periods);
self
}
pub fn center(mut self, center: bool) -> Self {
self.center = center;
self
}
pub fn closed(mut self, closed: WindowClosed) -> Self {
self.closed = closed;
self
}
fn effective_min_periods(&self) -> usize {
self.min_periods.unwrap_or(self.window_size)
}
fn values_as_f64(&self) -> Result<Vec<Option<f64>>>
where
T: Into<f64> + Copy,
{
let mut result = Vec::with_capacity(self.series.len());
for value in self.series.values() {
result.push(Some((*value).into()));
}
Ok(result)
}
fn apply_window_op<F, R>(&self, mut func: F) -> Result<Series<Option<R>>>
where
T: Into<f64> + Copy,
F: FnMut(&[f64]) -> R,
R: Debug + Clone,
{
let values = self.values_as_f64()?;
let mut result = Vec::with_capacity(values.len());
let min_periods = self.effective_min_periods();
for i in 0..values.len() {
let (start, end) = if self.center {
let half_window = self.window_size / 2;
let start = if i >= half_window { i - half_window } else { 0 };
let end = std::cmp::min(start + self.window_size, values.len());
(start, end)
} else {
let start = if i + 1 >= self.window_size {
i + 1 - self.window_size
} else {
0
};
let end = i + 1;
(start, end)
};
let window_values: Vec<f64> = values[start..end].iter().filter_map(|&v| v).collect();
if window_values.len() >= min_periods {
let agg_result = func(&window_values);
result.push(Some(agg_result));
} else {
result.push(None);
}
}
Series::new(result, self.series.name().cloned())
}
}
impl<T> WindowOps<T> for Rolling<T>
where
T: Debug + Clone + Into<f64> + Copy,
{
fn mean(&self) -> Result<Series<f64>> {
let result =
self.apply_window_op(|values| values.iter().sum::<f64>() / values.len() as f64)?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn sum(&self) -> Result<Series<f64>> {
let result = self.apply_window_op(|values| values.iter().sum::<f64>())?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn std(&self, ddof: usize) -> Result<Series<f64>> {
let result = self.apply_window_op(|values| {
if values.len() <= ddof {
f64::NAN
} else {
let mean = values.iter().sum::<f64>() / values.len() as f64;
let variance = values.iter().map(|&x| (x - mean).powi(2)).sum::<f64>()
/ (values.len() - ddof) as f64;
variance.sqrt()
}
})?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn var(&self, ddof: usize) -> Result<Series<f64>> {
let result = self.apply_window_op(|values| {
if values.len() <= ddof {
f64::NAN
} else {
let mean = values.iter().sum::<f64>() / values.len() as f64;
values.iter().map(|&x| (x - mean).powi(2)).sum::<f64>()
/ (values.len() - ddof) as f64
}
})?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn min(&self) -> Result<Series<f64>> {
let result =
self.apply_window_op(|values| values.iter().fold(f64::INFINITY, |a, &b| a.min(b)))?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn max(&self) -> Result<Series<f64>> {
let result =
self.apply_window_op(|values| values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b)))?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn count(&self) -> Result<Series<usize>> {
let result = self.apply_window_op(|values| values.len())?;
let values: Vec<usize> = result.values().iter().map(|&v| v.unwrap_or(0)).collect();
Series::new(values, result.name().cloned())
}
fn median(&self) -> Result<Series<f64>> {
let result = self.apply_window_op(|values| {
let mut sorted = values.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let mid = sorted.len() / 2;
if sorted.len() % 2 == 0 {
(sorted[mid - 1] + sorted[mid]) / 2.0
} else {
sorted[mid]
}
})?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn quantile(&self, q: f64) -> Result<Series<f64>> {
if q < 0.0 || q > 1.0 {
return Err(Error::InvalidValue(
"Quantile must be between 0 and 1".to_string(),
));
}
let result = self.apply_window_op(|values| {
let mut sorted = values.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let idx = (q * (sorted.len() - 1) as f64).round() as usize;
sorted[idx.min(sorted.len() - 1)]
})?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn apply<F, R>(&self, func: F) -> Result<Series<R>>
where
F: Fn(&[f64]) -> R + Copy,
R: Debug + Clone,
{
let result = self.apply_window_op(func)?;
let values: Vec<R> = result
.values()
.iter()
.filter_map(|v| v.as_ref().cloned())
.collect();
Series::new(values, result.name().cloned())
}
}
impl<T> Expanding<T>
where
T: Debug + Clone,
{
pub fn new(series: Series<T>, min_periods: usize) -> Result<Self> {
Ok(Self {
series,
min_periods,
})
}
fn values_as_f64(&self) -> Result<Vec<Option<f64>>>
where
T: Into<f64> + Copy,
{
let mut result = Vec::with_capacity(self.series.len());
for value in self.series.values() {
result.push(Some((*value).into()));
}
Ok(result)
}
fn apply_expanding_op<F, R>(&self, mut func: F) -> Result<Series<Option<R>>>
where
T: Into<f64> + Copy,
F: FnMut(&[f64]) -> R,
R: Debug + Clone,
{
let values = self.values_as_f64()?;
let mut result = Vec::with_capacity(values.len());
for i in 0..values.len() {
let window_values: Vec<f64> = values[0..=i].iter().filter_map(|&v| v).collect();
if window_values.len() >= self.min_periods {
let agg_result = func(&window_values);
result.push(Some(agg_result));
} else {
result.push(None);
}
}
Series::new(result, self.series.name().cloned())
}
}
impl<T> WindowOps<T> for Expanding<T>
where
T: Debug + Clone + Into<f64> + Copy,
{
fn mean(&self) -> Result<Series<f64>> {
let result =
self.apply_expanding_op(|values| values.iter().sum::<f64>() / values.len() as f64)?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn sum(&self) -> Result<Series<f64>> {
let result = self.apply_expanding_op(|values| values.iter().sum::<f64>())?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn std(&self, ddof: usize) -> Result<Series<f64>> {
let result = self.apply_expanding_op(|values| {
if values.len() <= ddof {
f64::NAN
} else {
let mean = values.iter().sum::<f64>() / values.len() as f64;
let variance = values.iter().map(|&x| (x - mean).powi(2)).sum::<f64>()
/ (values.len() - ddof) as f64;
variance.sqrt()
}
})?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn var(&self, ddof: usize) -> Result<Series<f64>> {
let result = self.apply_expanding_op(|values| {
if values.len() <= ddof {
f64::NAN
} else {
let mean = values.iter().sum::<f64>() / values.len() as f64;
values.iter().map(|&x| (x - mean).powi(2)).sum::<f64>()
/ (values.len() - ddof) as f64
}
})?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn min(&self) -> Result<Series<f64>> {
let result =
self.apply_expanding_op(|values| values.iter().fold(f64::INFINITY, |a, &b| a.min(b)))?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn max(&self) -> Result<Series<f64>> {
let result = self
.apply_expanding_op(|values| values.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b)))?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn count(&self) -> Result<Series<usize>> {
let result = self.apply_expanding_op(|values| values.len())?;
let values: Vec<usize> = result.values().iter().map(|&v| v.unwrap_or(0)).collect();
Series::new(values, result.name().cloned())
}
fn median(&self) -> Result<Series<f64>> {
let result = self.apply_expanding_op(|values| {
let mut sorted = values.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let mid = sorted.len() / 2;
if sorted.len() % 2 == 0 {
(sorted[mid - 1] + sorted[mid]) / 2.0
} else {
sorted[mid]
}
})?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn quantile(&self, q: f64) -> Result<Series<f64>> {
if q < 0.0 || q > 1.0 {
return Err(Error::InvalidValue(
"Quantile must be between 0 and 1".to_string(),
));
}
let result = self.apply_expanding_op(|values| {
let mut sorted = values.to_vec();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let idx = (q * (sorted.len() - 1) as f64).round() as usize;
sorted[idx.min(sorted.len() - 1)]
})?;
let values: Vec<f64> = result
.values()
.iter()
.map(|&v| v.unwrap_or(f64::NAN))
.collect();
Series::new(values, result.name().cloned())
}
fn apply<F, R>(&self, func: F) -> Result<Series<R>>
where
F: Fn(&[f64]) -> R + Copy,
R: Debug + Clone,
{
let result = self.apply_expanding_op(func)?;
let values: Vec<R> = result
.values()
.iter()
.filter_map(|v| v.as_ref().cloned())
.collect();
Series::new(values, result.name().cloned())
}
}
impl<T> EWM<T>
where
T: Debug + Clone,
{
pub fn new(series: Series<T>) -> Self {
Self {
series,
alpha: None,
span: None,
halflife: None,
adjust: true,
ignore_na: false,
}
}
pub fn alpha(mut self, alpha: f64) -> Result<Self> {
if alpha <= 0.0 || alpha > 1.0 {
return Err(Error::InvalidValue(
"Alpha must be between 0 and 1".to_string(),
));
}
self.alpha = Some(alpha);
self.span = None;
self.halflife = None;
Ok(self)
}
pub fn span(mut self, span: usize) -> Self {
self.span = Some(span);
self.alpha = None;
self.halflife = None;
self
}
pub fn halflife(mut self, halflife: f64) -> Self {
self.halflife = Some(halflife);
self.alpha = None;
self.span = None;
self
}
pub fn adjust(mut self, adjust: bool) -> Self {
self.adjust = adjust;
self
}
pub fn ignore_na(mut self, ignore_na: bool) -> Self {
self.ignore_na = ignore_na;
self
}
fn get_alpha(&self) -> Result<f64> {
if let Some(alpha) = self.alpha {
Ok(alpha)
} else if let Some(span) = self.span {
Ok(2.0 / (span as f64 + 1.0))
} else if let Some(halflife) = self.halflife {
Ok(1.0 - (-std::f64::consts::LN_2 / halflife).exp())
} else {
Err(Error::InvalidValue(
"Must specify either alpha, span, or halflife".to_string(),
))
}
}
fn values_as_f64(&self) -> Result<Vec<Option<f64>>>
where
T: Into<f64> + Copy,
{
let mut result = Vec::with_capacity(self.series.len());
for value in self.series.values() {
result.push(Some((*value).into()));
}
Ok(result)
}
}
impl<T> EWM<T>
where
T: Debug + Clone + Into<f64> + Copy,
{
pub fn mean(&self) -> Result<Series<f64>> {
let alpha = self.get_alpha()?;
let values = self.values_as_f64()?;
let mut result = Vec::with_capacity(values.len());
if values.is_empty() {
return Series::new(result, self.series.name().cloned());
}
let mut ewm_val = None;
for (i, &val) in values.iter().enumerate() {
if let Some(v) = val {
if ewm_val.is_none() {
ewm_val = Some(v);
result.extend(std::iter::repeat(f64::NAN).take(i));
result.push(v);
} else {
let prev = ewm_val.expect("ewm_val should be Some in else branch");
ewm_val = Some(alpha * v + (1.0 - alpha) * prev);
result.push(ewm_val.expect("ewm_val was just set to Some"));
}
} else if ewm_val.is_some() {
result.push(ewm_val.expect("ewm_val should be Some"));
} else {
result.push(f64::NAN);
}
}
Series::new(result, self.series.name().cloned())
}
pub fn std(&self, ddof: usize) -> Result<Series<f64>> {
let alpha = self.get_alpha()?;
let values = self.values_as_f64()?;
let mut result = Vec::with_capacity(values.len());
if values.is_empty() {
return Series::new(result, self.series.name().cloned());
}
let mut ewm_mean = None;
let mut ewm_var = None;
for (i, &val) in values.iter().enumerate() {
if let Some(v) = val {
if ewm_mean.is_none() {
ewm_mean = Some(v);
ewm_var = Some(0.0);
result.extend(std::iter::repeat(f64::NAN).take(i + 1));
} else {
let prev_mean = ewm_mean.expect("ewm_mean should be Some in else branch");
let prev_var = ewm_var.expect("ewm_var should be Some in else branch");
ewm_mean = Some(alpha * v + (1.0 - alpha) * prev_mean);
let diff = v - prev_mean;
ewm_var = Some((1.0 - alpha) * (prev_var + alpha * diff * diff));
result.push(ewm_var.expect("ewm_var was just set to Some").sqrt());
}
} else if ewm_var.is_some() {
result.push(ewm_var.expect("ewm_var should be Some").sqrt());
} else {
result.push(f64::NAN);
}
}
Series::new(result, self.series.name().cloned())
}
pub fn var(&self, ddof: usize) -> Result<Series<f64>> {
let std_series = self.std(ddof)?;
let var_values: Vec<f64> = std_series
.values()
.iter()
.map(|&v| if v.is_nan() { f64::NAN } else { v * v })
.collect();
Series::new(var_values, std_series.name().cloned())
}
}
pub trait WindowExt<T>
where
T: Debug + Clone,
{
fn rolling(&self, window_size: usize) -> Result<Rolling<T>>;
fn expanding(&self, min_periods: usize) -> Result<Expanding<T>>;
fn ewm(&self) -> EWM<T>;
}
impl<T> WindowExt<T> for Series<T>
where
T: Debug + Clone,
{
fn rolling(&self, window_size: usize) -> Result<Rolling<T>> {
Rolling::new(self.clone(), window_size)
}
fn expanding(&self, min_periods: usize) -> Result<Expanding<T>> {
Expanding::new(self.clone(), min_periods)
}
fn ewm(&self) -> EWM<T> {
EWM::new(self.clone())
}
}