use polars::prelude::*;
use polars::frame::DataFrame;
use std::collections::{HashMap, BTreeMap};
pub fn calculate_strike_skew(
df: &DataFrame,
iv_column: &str,
strike_column: &str,
price_column: &str,
is_call_column: &str,
) -> PolarsResult<Series> {
let iv = df.column(iv_column)?.f64()?;
let strike = df.column(strike_column)?.f64()?;
let price = df.column(price_column)?.f64()?;
let is_call = df.column(is_call_column)?.bool()?;
let len = df.height();
let mut skew = vec![f64::NAN; len];
let mut put_ivs: HashMap<i32, Vec<f64>> = HashMap::new();
let mut call_ivs: HashMap<i32, Vec<f64>> = HashMap::new();
for i in 0..len {
let iv_val = iv.get(i).unwrap_or(f64::NAN);
let strike_val = strike.get(i).unwrap_or(f64::NAN);
let price_val = price.get(i).unwrap_or(f64::NAN);
let call = is_call.get(i).unwrap_or(false);
if iv_val.is_nan() || strike_val.is_nan() || price_val.is_nan() || price_val <= 0.0 {
continue;
}
let otm_pct = ((strike_val - price_val) / price_val * 100.0).round() as i32;
if call {
call_ivs.entry(otm_pct).or_insert_with(Vec::new).push(iv_val);
} else {
put_ivs.entry(otm_pct).or_insert_with(Vec::new).push(iv_val);
}
}
let mut put_avg_ivs: HashMap<i32, f64> = HashMap::new();
let mut call_avg_ivs: HashMap<i32, f64> = HashMap::new();
for (pct, ivs) in &put_ivs {
if !ivs.is_empty() {
let avg = ivs.iter().sum::<f64>() / ivs.len() as f64;
put_avg_ivs.insert(*pct, avg);
}
}
for (pct, ivs) in &call_ivs {
if !ivs.is_empty() {
let avg = ivs.iter().sum::<f64>() / ivs.len() as f64;
call_avg_ivs.insert(*pct, avg);
}
}
for i in 0..len {
let strike_val = strike.get(i).unwrap_or(f64::NAN);
let price_val = price.get(i).unwrap_or(f64::NAN);
if strike_val.is_nan() || price_val.is_nan() || price_val <= 0.0 {
continue;
}
let otm_pct = ((strike_val - price_val) / price_val * 100.0).round() as i32;
let opposite_pct = -otm_pct;
if otm_pct < 0 && put_avg_ivs.contains_key(&otm_pct) && call_avg_ivs.contains_key(&opposite_pct) {
skew[i] = put_avg_ivs[&otm_pct] - call_avg_ivs[&opposite_pct];
} else if otm_pct > 0 && call_avg_ivs.contains_key(&otm_pct) && put_avg_ivs.contains_key(&opposite_pct) {
skew[i] = put_avg_ivs[&opposite_pct] - call_avg_ivs[&otm_pct];
} else {
let put_25d = put_avg_ivs.get(&-10).or_else(|| put_avg_ivs.get(&-15));
let call_25d = call_avg_ivs.get(&10).or_else(|| call_avg_ivs.get(&15));
if let (Some(&put_iv), Some(&call_iv)) = (put_25d, call_25d) {
skew[i] = put_iv - call_iv;
}
}
}
Ok(Series::new("strike_skew".into(), skew))
}
pub fn calculate_wing_skew(
df: &DataFrame,
iv_column: &str,
strike_column: &str,
price_column: &str,
is_call_column: &str,
) -> PolarsResult<Series> {
let iv = df.column(iv_column)?.f64()?;
let strike = df.column(strike_column)?.f64()?;
let price = df.column(price_column)?.f64()?;
let is_call = df.column(is_call_column)?.bool()?;
let len = df.height();
let mut wing_skew = vec![f64::NAN; len];
let mut atm_ivs: Vec<f64> = Vec::new();
let mut far_otm_put_ivs: Vec<f64> = Vec::new();
for i in 0..len {
let iv_val = iv.get(i).unwrap_or(f64::NAN);
let strike_val = strike.get(i).unwrap_or(f64::NAN);
let price_val = price.get(i).unwrap_or(f64::NAN);
let call = is_call.get(i).unwrap_or(false);
if iv_val.is_nan() || strike_val.is_nan() || price_val.is_nan() || price_val <= 0.0 {
continue;
}
let otm_pct = (strike_val - price_val) / price_val * 100.0;
if otm_pct.abs() < 2.5 {
atm_ivs.push(iv_val);
}
if !call && otm_pct <= -15.0 {
far_otm_put_ivs.push(iv_val);
}
}
if atm_ivs.is_empty() || far_otm_put_ivs.is_empty() {
return Ok(Series::new("wing_skew".into(), wing_skew));
}
let avg_atm_iv = atm_ivs.iter().sum::<f64>() / atm_ivs.len() as f64;
let avg_far_otm_put_iv = far_otm_put_ivs.iter().sum::<f64>() / far_otm_put_ivs.len() as f64;
let wing_skew_ratio = avg_far_otm_put_iv / avg_atm_iv;
for i in 0..len {
wing_skew[i] = wing_skew_ratio;
}
Ok(Series::new("wing_skew".into(), wing_skew))
}
pub fn calculate_skew_term_structure(
df: &DataFrame,
iv_column: &str,
strike_column: &str,
price_column: &str,
is_call_column: &str,
expiry_column: &str,
) -> PolarsResult<Series> {
let iv = df.column(iv_column)?.f64()?;
let strike = df.column(strike_column)?.f64()?;
let price = df.column(price_column)?.f64()?;
let is_call = df.column(is_call_column)?.bool()?;
let expiry = df.column(expiry_column)?;
let len = df.height();
let mut term_structure = vec![f64::NAN; len];
let mut expiry_groups: HashMap<String, Vec<usize>> = HashMap::new();
for i in 0..len {
let exp_result = expiry.get(i);
if let Ok(exp_value) = exp_result {
let exp_str = exp_value.to_string();
expiry_groups.entry(exp_str).or_insert_with(Vec::new).push(i);
}
}
let mut expiry_skews: HashMap<String, f64> = HashMap::new();
for (exp, indices) in &expiry_groups {
let mut otm_put_ivs: Vec<f64> = Vec::new();
let mut otm_call_ivs: Vec<f64> = Vec::new();
for &idx in indices {
let iv_val = iv.get(idx).unwrap_or(f64::NAN);
let strike_val = strike.get(idx).unwrap_or(f64::NAN);
let price_val = price.get(idx).unwrap_or(f64::NAN);
let call = is_call.get(idx).unwrap_or(false);
if iv_val.is_nan() || strike_val.is_nan() || price_val.is_nan() || price_val <= 0.0 {
continue;
}
let otm_pct = (strike_val - price_val) / price_val * 100.0;
if !call && otm_pct <= -10.0 && otm_pct > -15.0 {
otm_put_ivs.push(iv_val);
} else if call && otm_pct >= 10.0 && otm_pct < 15.0 {
otm_call_ivs.push(iv_val);
}
}
if !otm_put_ivs.is_empty() && !otm_call_ivs.is_empty() {
let avg_put_iv = otm_put_ivs.iter().sum::<f64>() / otm_put_ivs.len() as f64;
let avg_call_iv = otm_call_ivs.iter().sum::<f64>() / otm_call_ivs.len() as f64;
expiry_skews.insert(exp.clone(), avg_put_iv - avg_call_iv);
}
}
let mut expirations: Vec<(String, f64)> = expiry_skews
.into_iter()
.collect();
expirations.sort_by(|a, b| a.0.cmp(&b.0));
if expirations.len() >= 2 {
let n = expirations.len() as f64;
let sum_x = (0..expirations.len()).sum::<usize>() as f64;
let sum_y = expirations.iter().map(|(_, skew)| skew).sum::<f64>();
let sum_xy = expirations.iter().enumerate()
.map(|(i, (_, skew))| i as f64 * skew)
.sum::<f64>();
let sum_xx = (0..expirations.len()).map(|i| (i * i) as f64).sum::<f64>();
let slope = (n * sum_xy - sum_x * sum_y) / (n * sum_xx - sum_x * sum_x);
for i in 0..len {
let exp_result = expiry.get(i);
if let Ok(exp_value) = exp_result {
let exp_str = exp_value.to_string();
if let Some(exp_idx) = expirations.iter().position(|(e, _)| e == &exp_str) {
term_structure[i] = expirations[exp_idx].1;
}
}
}
}
Ok(Series::new("skew_term_structure".into(), term_structure))
}
pub fn calculate_skew_breakpoints(
df: &DataFrame,
iv_column: &str,
strike_column: &str,
price_column: &str,
is_call_column: &str,
) -> PolarsResult<DataFrame> {
if !df.schema().contains("strike") || !df.schema().contains("iv") {
return Err(PolarsError::ComputeError(
"Required columns 'strike' and 'iv' not found".into(),
));
}
let strike = df.column("strike")?.f64()?;
let iv = df.column("iv")?.f64()?;
let mut breakpoint_strikes = Vec::new();
let mut breakpoint_magnitudes = Vec::new();
let mut breakpoint_directions = Vec::new();
let mut strike_ivs: BTreeMap<f64, Vec<f64>> = BTreeMap::new();
for i in 0..df.height() {
let strike_val = strike.get(i).unwrap_or(f64::NAN);
let iv_val = iv.get(i).unwrap_or(f64::NAN);
if !strike_val.is_nan() && !iv_val.is_nan() {
strike_ivs.entry(strike_val).or_insert_with(Vec::new).push(iv_val);
}
}
let mut strike_avg_iv: Vec<(f64, f64)> = Vec::new();
for (strike_val, ivs) in strike_ivs {
if ivs.is_empty() {
continue;
}
let avg_iv = ivs.iter().sum::<f64>() / ivs.len() as f64;
strike_avg_iv.push((strike_val, avg_iv));
}
strike_avg_iv.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
if strike_avg_iv.len() >= 3 {
for i in 1..(strike_avg_iv.len() - 1) {
let prev_strike = strike_avg_iv[i-1].0;
let curr_strike = strike_avg_iv[i].0;
let next_strike = strike_avg_iv[i+1].0;
let prev_iv = strike_avg_iv[i-1].1;
let curr_iv = strike_avg_iv[i].1;
let next_iv = strike_avg_iv[i+1].1;
let prev_change = (curr_iv - prev_iv) / (curr_strike - prev_strike);
let next_change = (next_iv - curr_iv) / (next_strike - curr_strike);
let slope_change = next_change - prev_change;
let threshold = 0.01;
if slope_change.abs() > threshold {
breakpoint_strikes.push(curr_strike);
breakpoint_magnitudes.push(slope_change.abs());
breakpoint_directions.push(if slope_change > 0.0 { "steepening" } else { "flattening" });
}
}
}
let result = DataFrame::new(vec![
Series::new("strike".into(), breakpoint_strikes).into(),
Series::new("magnitude".into(), breakpoint_magnitudes).into(),
Series::new("direction".into(), breakpoint_directions).into(),
])?;
Ok(result)
}
pub fn add_skew_indicators(df: &mut DataFrame) -> PolarsResult<()> {
let required_columns = [
"iv", "strike", "price", "is_call"
];
for &col in required_columns.iter() {
if !df.schema().contains(col) {
return Err(PolarsError::ComputeError(
format!("Required column '{}' not found", col).into(),
));
}
}
let skew = calculate_strike_skew(df, "iv", "strike", "price", "is_call")?;
df.with_column(skew)?;
let wing = calculate_wing_skew(df, "iv", "strike", "price", "is_call")?;
df.with_column(wing)?;
if df.schema().contains("expiry") {
let term = calculate_skew_term_structure(
df, "iv", "strike", "price", "is_call", "expiry"
)?;
df.with_column(term)?;
}
Ok(())
}