use polars::prelude::*;
pub fn calculate_mfi(df: &DataFrame, window: usize) -> PolarsResult<Series> {
if !df.schema().contains("high")
|| !df.schema().contains("low")
|| !df.schema().contains("close")
|| !df.schema().contains("volume")
{
return Err(PolarsError::ShapeMismatch(
"Missing required columns for MFI calculation. Required: high, low, close, volume"
.to_string()
.into(),
));
}
let high = df.column("high")?.f64()?;
let low = df.column("low")?.f64()?;
let close = df.column("close")?.f64()?;
let volume = df.column("volume")?.f64()?;
let mut typical_prices = Vec::with_capacity(df.height());
let mut money_flows = Vec::with_capacity(df.height());
for i in 0..df.height() {
let high_val = high.get(i).unwrap_or(f64::NAN);
let low_val = low.get(i).unwrap_or(f64::NAN);
let close_val = close.get(i).unwrap_or(f64::NAN);
let vol = volume.get(i).unwrap_or(f64::NAN);
if !high_val.is_nan() && !low_val.is_nan() && !close_val.is_nan() && !vol.is_nan() {
let typical_price = (high_val + low_val + close_val) / 3.0;
typical_prices.push(typical_price);
let money_flow = typical_price * vol;
money_flows.push(money_flow);
} else {
typical_prices.push(f64::NAN);
money_flows.push(f64::NAN);
}
}
let mut positive_money_flows = Vec::with_capacity(df.height());
let mut negative_money_flows = Vec::with_capacity(df.height());
positive_money_flows.push(0.0);
negative_money_flows.push(0.0);
for i in 1..df.height() {
let current_tp = typical_prices[i];
let prev_tp = typical_prices[i - 1];
let current_mf = money_flows[i];
if current_tp.is_nan() || prev_tp.is_nan() || current_mf.is_nan() {
positive_money_flows.push(0.0);
negative_money_flows.push(0.0);
} else if current_tp > prev_tp {
positive_money_flows.push(current_mf);
negative_money_flows.push(0.0);
} else if current_tp < prev_tp {
positive_money_flows.push(0.0);
negative_money_flows.push(current_mf);
} else {
positive_money_flows.push(0.0);
negative_money_flows.push(0.0);
}
}
let mut mfi_values = Vec::with_capacity(df.height());
for _ in 0..window {
mfi_values.push(f64::NAN);
}
for i in window..df.height() {
let mut positive_flow_sum = 0.0;
let mut negative_flow_sum = 0.0;
for j in (i - window + 1)..=i {
positive_flow_sum += positive_money_flows[j];
negative_flow_sum += negative_money_flows[j];
}
if negative_flow_sum.abs() < 1e-10 {
if positive_flow_sum.abs() < 1e-10 {
mfi_values.push(50.0); } else {
mfi_values.push(100.0); }
} else {
let money_ratio = positive_flow_sum / negative_flow_sum;
let mfi = 100.0 - (100.0 / (1.0 + money_ratio));
mfi_values.push(mfi);
}
}
let name = format!("mfi_{}", window);
Ok(Series::new(name.into(), mfi_values))
}