use polars::prelude::*;
pub fn calculate_cmf(df: &DataFrame, window: usize) -> PolarsResult<Series> {
if !df.schema().contains("high")
|| !df.schema().contains("low")
|| !df.schema().contains("close")
|| !df.schema().contains("volume")
{
return Err(PolarsError::ShapeMismatch(
"Missing required columns for CMF calculation. Required: high, low, close, volume"
.to_string()
.into(),
));
}
if window == 0 {
return Err(PolarsError::ComputeError(
"Window size must be greater than 0".into(),
));
}
let high = df.column("high")?.f64()?;
let low = df.column("low")?.f64()?;
let close = df.column("close")?.f64()?;
let volume = df.column("volume")?.f64()?;
let mut money_flow_multipliers = Vec::with_capacity(df.height());
let mut money_flow_volumes = Vec::with_capacity(df.height());
for i in 0..df.height() {
let high_val = high.get(i).unwrap_or(f64::NAN);
let low_val = low.get(i).unwrap_or(f64::NAN);
let close_val = close.get(i).unwrap_or(f64::NAN);
let vol = volume.get(i).unwrap_or(f64::NAN);
if !high_val.is_nan() && !low_val.is_nan() && !close_val.is_nan() && high_val != low_val {
let money_flow_multiplier =
((close_val - low_val) - (high_val - close_val)) / (high_val - low_val);
money_flow_multipliers.push(money_flow_multiplier);
let money_flow_volume = money_flow_multiplier * vol;
money_flow_volumes.push(money_flow_volume);
} else {
money_flow_multipliers.push(f64::NAN);
money_flow_volumes.push(f64::NAN);
}
}
let mut cmf_values = Vec::with_capacity(df.height());
for i in 0..df.height() {
if i < window - 1 {
cmf_values.push(f64::NAN);
continue;
}
let mut sum_money_flow_volume = 0.0;
let mut sum_volume = 0.0;
let window_start = i - (window - 1);
for (idx, money_flow_vol) in money_flow_volumes
.iter()
.enumerate()
.skip(window_start)
.take(window)
{
let vol = volume.get(idx).unwrap_or(f64::NAN);
if !money_flow_vol.is_nan() && !vol.is_nan() {
sum_money_flow_volume += money_flow_vol;
sum_volume += vol;
}
}
if sum_volume > 0.0 {
cmf_values.push(sum_money_flow_volume / sum_volume);
} else {
cmf_values.push(f64::NAN);
}
}
Ok(Series::new(format!("cmf_{}", window).into(), cmf_values))
}