use polars::prelude::*;
use crate::indicators::moving_averages::{calculate_ema, calculate_sma};
use crate::indicators::oscillators::calculate_rsi;
fn create_higher_timeframe(
df: &DataFrame,
aggregation_factor: usize,
) -> PolarsResult<DataFrame> {
if aggregation_factor <= 1 {
return Err(PolarsError::ComputeError(
"Aggregation factor must be greater than 1".into(),
));
}
for col in ["open", "high", "low", "close", "volume"].iter() {
if !df.schema().contains(*col) {
return Err(PolarsError::ComputeError(
format!("Required column '{}' not found", col).into(),
));
}
}
let open = df.column("open")?.f64()?;
let high = df.column("high")?.f64()?;
let low = df.column("low")?.f64()?;
let close = df.column("close")?.f64()?;
let volume = df.column("volume")?.f64()?;
let num_periods = df.height() / aggregation_factor;
let mut agg_open = Vec::with_capacity(num_periods);
let mut agg_high = Vec::with_capacity(num_periods);
let mut agg_low = Vec::with_capacity(num_periods);
let mut agg_close = Vec::with_capacity(num_periods);
let mut agg_volume = Vec::with_capacity(num_periods);
for i in 0..num_periods {
let start_idx = i * aggregation_factor;
let end_idx = ((i + 1) * aggregation_factor).min(df.height());
let o = open.get(start_idx).unwrap_or(f64::NAN);
let mut h = f64::MIN;
for j in start_idx..end_idx {
let h_val = high.get(j).unwrap_or(f64::NAN);
if !h_val.is_nan() {
h = h.max(h_val);
}
}
let mut l = f64::MAX;
for j in start_idx..end_idx {
let l_val = low.get(j).unwrap_or(f64::NAN);
if !l_val.is_nan() {
l = l.min(l_val);
}
}
let c = close.get(end_idx - 1).unwrap_or(f64::NAN);
let mut v = 0.0;
for j in start_idx..end_idx {
let v_val = volume.get(j).unwrap_or(f64::NAN);
if !v_val.is_nan() {
v += v_val;
}
}
agg_open.push(o);
agg_high.push(h);
agg_low.push(l);
agg_close.push(c);
agg_volume.push(v);
}
DataFrame::new(vec![
Series::new("open".into(), agg_open).into(),
Series::new("high".into(), agg_high).into(),
Series::new("low".into(), agg_low).into(),
Series::new("close".into(), agg_close).into(),
Series::new("volume".into(), agg_volume).into(),
])
}
pub fn calculate_multi_timeframe_alignment(
df: &DataFrame,
agg_factor1: usize,
agg_factor2: usize,
ma_period: Option<usize>,
) -> PolarsResult<Series> {
let period = ma_period.unwrap_or(20);
let higher_tf1 = create_higher_timeframe(df, agg_factor1)?;
let higher_tf2 = create_higher_timeframe(df, agg_factor2)?;
let current_ma = calculate_ema(df, "close", period)?;
let higher_ma1 = calculate_ema(&higher_tf1, "close", period)?;
let higher_ma2 = calculate_ema(&higher_tf2, "close", period)?;
let close = df.column("close")?.f64()?;
let higher_close1 = higher_tf1.column("close")?.f64()?;
let higher_close2 = higher_tf2.column("close")?.f64()?;
let current_ma_vals = current_ma.f64()?;
let current_trend = detect_trend(close, current_ma_vals, df.height())?;
let expanded_trend1 = expand_higher_timeframe_data(¤t_trend, &higher_close1, &higher_tf1, agg_factor1, df.height())?;
let expanded_trend2 = expand_higher_timeframe_data(¤t_trend, &higher_close2, &higher_tf2, agg_factor2, df.height())?;
let mut alignment = Vec::with_capacity(df.height());
for i in 0..period.min(df.height()) {
alignment.push(0);
}
for i in period..df.height() {
let current = current_trend[i];
let higher1 = expanded_trend1[i];
let higher2 = expanded_trend2[i];
let agreement_count = if current == higher1 { 1 } else { 0 } +
if current == higher2 { 1 } else { 0 };
if current > 0 {
if agreement_count == 2 {
alignment.push(2); } else if agreement_count == 1 {
alignment.push(1); } else {
alignment.push(-1); }
} else if current < 0 {
if agreement_count == 2 {
alignment.push(-2); } else if agreement_count == 1 {
alignment.push(-1); } else {
alignment.push(1); }
} else {
alignment.push(0); }
}
Ok(Series::new("multi_timeframe_alignment".into(), alignment))
}
fn detect_trend(
price: &ChunkedArray<Float64Type>,
ma: &ChunkedArray<Float64Type>,
length: usize,
) -> PolarsResult<Vec<i32>> {
let mut trend = Vec::with_capacity(length);
for i in 0..length {
let p = price.get(i).unwrap_or(f64::NAN);
let m = ma.get(i).unwrap_or(f64::NAN);
if p.is_nan() || m.is_nan() {
trend.push(0);
} else if p > m * 1.01 {
trend.push(1); } else if p < m * 0.99 {
trend.push(-1); } else {
trend.push(0); }
}
Ok(trend)
}
fn expand_higher_timeframe_data(
base_trend: &[i32],
higher_data: &ChunkedArray<Float64Type>,
higher_df: &DataFrame,
agg_factor: usize,
original_length: usize,
) -> PolarsResult<Vec<i32>> {
let mut expanded = Vec::with_capacity(original_length);
let higher_ma = calculate_ema(higher_df, "close", 20)?;
let higher_ma_vals = higher_ma.f64()?;
let higher_trend = detect_trend(higher_data, higher_ma_vals, higher_df.height())?;
for i in 0..original_length {
let higher_idx = i / agg_factor;
if higher_idx < higher_trend.len() {
expanded.push(higher_trend[higher_idx]);
} else {
expanded.push(base_trend[i.min(base_trend.len() - 1)]);
}
}
Ok(expanded)
}
pub fn calculate_multi_timeframe_rsi_divergence(
df: &DataFrame,
rsi_period: Option<usize>,
agg_factor: Option<usize>,
) -> PolarsResult<Series> {
let period = rsi_period.unwrap_or(14);
let agg = agg_factor.unwrap_or(4);
let rsi = calculate_rsi(df, period, "close")?;
let rsi_vals = rsi.f64()?;
let higher_tf = create_higher_timeframe(df, agg)?;
let higher_rsi = calculate_rsi(&higher_tf, period, "close")?;
let higher_rsi_vals = higher_rsi.f64()?;
let close = df.column("close")?.f64()?;
let higher_close = higher_tf.column("close")?.f64()?;
let mut divergence_signals = Vec::with_capacity(df.height());
let lookback = 5; for i in 0..period.max(lookback).min(df.height()) {
divergence_signals.push(0);
}
for i in period.max(lookback)..df.height() {
let mut price_peak = true;
let mut price_trough = true;
for j in 1..=lookback {
if i < j || close.get(i).unwrap_or(f64::NAN) <= close.get(i - j).unwrap_or(f64::NAN) {
price_peak = false;
}
if i < j || close.get(i).unwrap_or(f64::NAN) >= close.get(i - j).unwrap_or(f64::NAN) {
price_trough = false;
}
}
let higher_idx = i / agg;
if higher_idx >= higher_tf.height() {
divergence_signals.push(0);
continue;
}
if price_peak {
if higher_idx > 0 &&
higher_close.get(higher_idx).unwrap_or(f64::NAN) > higher_close.get(higher_idx - 1).unwrap_or(f64::NAN) &&
higher_rsi_vals.get(higher_idx).unwrap_or(f64::NAN) < higher_rsi_vals.get(higher_idx - 1).unwrap_or(f64::NAN) {
divergence_signals.push(-1); continue;
}
} else if price_trough {
if higher_idx > 0 &&
higher_close.get(higher_idx).unwrap_or(f64::NAN) < higher_close.get(higher_idx - 1).unwrap_or(f64::NAN) &&
higher_rsi_vals.get(higher_idx).unwrap_or(f64::NAN) > higher_rsi_vals.get(higher_idx - 1).unwrap_or(f64::NAN) {
divergence_signals.push(1); continue;
}
}
divergence_signals.push(0);
}
Ok(Series::new("multi_tf_rsi_divergence".into(), divergence_signals))
}
pub fn add_multi_timeframe_analysis(
df: &mut DataFrame,
daily_to_weekly: Option<usize>,
daily_to_monthly: Option<usize>,
) -> PolarsResult<()> {
let weekly_factor = daily_to_weekly.unwrap_or(5);
let monthly_factor = daily_to_monthly.unwrap_or(20);
let alignment = calculate_multi_timeframe_alignment(df, weekly_factor, monthly_factor, None)?;
let divergence = calculate_multi_timeframe_rsi_divergence(df, None, Some(weekly_factor))?;
df.with_column(alignment)?;
df.with_column(divergence)?;
Ok(())
}