use crate::analytics::performance::TickerPerformanceStats;
use crate::data::yahoo::config::TickerSummaryStats;
use crate::prelude::{
Column as OhlcvColumn, StatementFrequency, StatementType, TickerData, TickerPerformance,
Tickers,
};
use futures::future::join_all;
use indicatif::{ProgressBar, ProgressStyle};
use polars::prelude::*;
use std::error::Error;
macro_rules! fetch_all {
($tickers:expr, $method:ident, $idx:expr $(, $param:expr)*) => {{
let mut futures = Vec::new();
let tickers = $tickers.clone();
let total_tickers = tickers.len();
let pb = ProgressBar::new(total_tickers as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{msg} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})")?
.progress_chars("#>-"),
);
for ticker in tickers.into_iter() {
let ticker = ticker.clone();
let fut = tokio::task::spawn(async move {
let result = ticker.$method($($param), *).await;
match result {
Ok(mut df) => {
let symbol_series = Series::new("symbol".into(), vec![ticker.ticker.clone(); df.height()]);
if df.width() > $idx {
let _ = df.insert_column($idx, symbol_series);
Ok(df)
} else {
eprintln!("No Data for {}", &ticker.ticker);
Err(format!("No Data for {}", &ticker.ticker))
}
}
Err(e) => {
eprintln!("Error Fetching Data for {}: {}", &ticker.ticker, e);
Err(format!("Error Fetching Data for {}: {}", &ticker.ticker, e))
}
}
});
futures.push(fut);
}
let results = join_all(futures).await;
let mut joint_df = DataFrame::default();
for result in results {
match result {
Ok(Ok(df)) => {
match joint_df.vstack(&df) {
Ok(jdf) => joint_df = jdf,
Err(e) => eprintln!("Unable to stack {:?}: {}", &df, e),
}
}
Ok(Err(_)) => continue,
Err(e) => eprintln!("Error in task: {}", e),
}
}
pb.finish_with_message(format!("Done"));
Ok(joint_df)
}};
}
pub trait TickersData {
fn get_chart(&self) -> impl std::future::Future<Output = Result<DataFrame, Box<dyn Error>>>;
fn get_news(&self) -> impl std::future::Future<Output = Result<DataFrame, Box<dyn Error>>>;
fn get_financials(
&self,
statement_type: StatementType,
frequency: StatementFrequency,
formatted: Option<bool>,
) -> impl std::future::Future<Output = Result<DataFrame, Box<dyn Error>>>;
fn get_ticker_stats(
&self,
) -> impl std::future::Future<Output = Result<DataFrame, Box<dyn Error>>>;
fn get_options(&self) -> impl std::future::Future<Output = Result<DataFrame, Box<dyn Error>>>;
fn returns(&self) -> impl std::future::Future<Output = Result<DataFrame, Box<dyn Error>>>;
fn performance_stats(
&self,
) -> impl std::future::Future<Output = Result<DataFrame, Box<dyn Error>>>;
}
impl TickersData for Tickers {
async fn get_chart(&self) -> Result<DataFrame, Box<dyn Error>> {
fetch_all!(self.tickers.clone(), get_chart, 1)
}
async fn get_news(&self) -> Result<DataFrame, Box<dyn Error>> {
fetch_all!(self.tickers.clone(), get_news, 1)
}
async fn get_financials(
&self,
statement_type: StatementType,
frequency: StatementFrequency,
formatted: Option<bool>,
) -> Result<DataFrame, Box<dyn Error>> {
fetch_all!(
self.tickers.clone(),
get_financials,
1,
statement_type,
frequency,
formatted
)
}
async fn get_ticker_stats(&self) -> Result<DataFrame, Box<dyn Error>> {
let mut futures = Vec::new();
let total_tickers = self.tickers.len();
let pb = ProgressBar::new(total_tickers as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{msg} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})")?
.progress_chars("#>-"),
);
for ticker in self.tickers.clone().into_iter() {
let fut = tokio::task::spawn(async move {
match ticker.get_ticker_stats().await {
Ok(stats) => Ok((ticker.ticker.clone(), stats)),
Err(e) => {
eprintln!("Error Fetching Ticker Stats for {}: {}", &ticker.ticker, e);
Err(format!(
"Error Fetching Ticker Stats for {}: {}",
&ticker.ticker, e
))
}
}
});
futures.push(fut);
}
let results = join_all(futures).await;
let mut all_stats: Vec<(String, TickerSummaryStats)> = Vec::new();
for result in results {
match result {
Ok(Ok(stats)) => {
all_stats.push(stats);
}
Ok(Err(_)) => continue,
Err(e) => eprintln!("Error in task: {e}"),
}
}
let mut formatted_dfs = Vec::new();
for (ticker, stats) in all_stats {
let mut fmt_df = stats.to_dataframe()?;
fmt_df.rename("Value", ticker.as_str().into())?;
formatted_dfs.push(fmt_df);
}
let combine = |dfs: Vec<DataFrame>| -> Result<DataFrame, Box<dyn Error>> {
let mut combined = dfs.first().ok_or("No data")?.clone();
for df in dfs.iter().skip(1) {
combined = combined.left_join(df, ["Metric"], ["Metric"])?;
}
Ok(combined)
};
let mut stats = combine(formatted_dfs)?;
let columns = stats
.column("Metric")?
.str()?
.into_no_null_iter()
.map(|x| x.to_string())
.collect::<Vec<String>>();
stats = stats.drop("Metric")?;
let column_names = stats
.get_column_names()
.iter()
.map(|&name| name.to_string())
.collect::<Vec<String>>();
let symbols = Series::new("Symbol".into(), column_names);
let mut stats_df = stats.transpose(None, None)?;
stats_df.set_column_names(&columns)?;
let _ = stats_df.insert_column(0, symbols)?;
let columns_to_drop: Vec<String> = stats_df
.get_column_names()
.iter()
.filter(|&&name| name != "Symbol")
.filter_map(|&col_name| {
let col = stats_df.column(col_name).ok()?;
let utf8 = col.str().ok()?;
if utf8.into_no_null_iter().all(|val| val.trim().is_empty()) {
Some(col_name.to_string())
} else {
None
}
})
.collect();
for col_name in columns_to_drop {
stats_df = stats_df.drop(&col_name)?;
}
pb.finish_with_message("Done");
Ok(stats_df)
}
async fn get_options(&self) -> Result<DataFrame, Box<dyn Error>> {
let mut futures = Vec::new();
let total_tickers = self.tickers.len();
let pb = ProgressBar::new(total_tickers as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{msg} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})")?
.progress_chars("#>-"),
);
for ticker in self.tickers.clone().into_iter() {
let fut = tokio::task::spawn(async move {
match ticker.get_options().await {
Ok(options) => {
let mut df = options.chain;
let symbol_series =
Series::new("symbol".into(), vec![ticker.ticker.clone(); df.height()]);
if df.width() > 3 {
let _ = df.insert_column(3, symbol_series);
Ok(df)
} else {
eprintln!("No Options Data for {}", &ticker.ticker);
Err(format!("No Options Data for {}", &ticker.ticker))
}
}
Err(e) => {
eprintln!("Error Fetching Options Data for {}: {}", &ticker.ticker, e);
Err(format!(
"Error Fetching Options Data for {}: {}",
&ticker.ticker, e
))
}
}
});
futures.push(fut);
}
let results = join_all(futures).await;
let mut joint_df = DataFrame::default();
for result in results {
match result {
Ok(Ok(df)) => {
joint_df = joint_df.vstack(&df)?;
}
Ok(Err(_)) => continue,
Err(e) => eprintln!("Error in task: {e}"),
}
}
pb.finish_with_message("Done");
Ok(joint_df)
}
async fn returns(&self) -> Result<DataFrame, Box<dyn Error>> {
let mut futures = Vec::new();
let total_tickers = self.tickers.len();
let pb = ProgressBar::new(total_tickers as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{msg} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})")?
.progress_chars("#>-"),
);
for ticker in self.tickers.clone().into_iter() {
let fut = tokio::task::spawn(async move {
match ticker.get_chart().await {
Ok(chart) => {
let df = DataFrame::new(vec![
chart.column("timestamp").unwrap().clone(),
chart
.column(OhlcvColumn::AdjClose.as_str())
.unwrap()
.clone()
.with_name(ticker.ticker.as_str().into()),
]);
match df {
Ok(df) => Ok((ticker.ticker.clone(), df)),
Err(_) => {
eprintln!("No Price Data for {}", &ticker.ticker);
Err(format!("No Price Data for {}", &ticker.ticker))
}
}
}
Err(e) => {
eprintln!("No Price Data for {}: {}", &ticker.ticker, e);
Err(format!("No Price Data for {}: {}", &ticker.ticker, e))
}
}
});
futures.push(fut);
}
let results = join_all(futures).await;
let mut price_frames: Vec<(String, DataFrame)> = Vec::new();
for result in results {
match result {
Ok(Ok(item)) => price_frames.push(item),
Ok(Err(_)) => continue,
Err(e) => eprintln!("Error in task: {e}"),
}
}
pb.finish_with_message("Done");
if price_frames.is_empty() {
return Err("No price data available for any ticker".into());
}
let mut joined = price_frames
.into_iter()
.map(|(_, df)| df)
.reduce(|acc, df| {
acc.join(
&df,
["timestamp"],
["timestamp"],
JoinArgs::new(JoinType::Full).with_coalesce(JoinCoalesce::CoalesceColumns),
None,
)
.map_err(|e| format!("Failed to join price frames: {e}"))
.unwrap_or(acc)
})
.ok_or("No price data to join")?;
joined = joined.sort(
["timestamp"],
SortMultipleOptions::new().with_order_descending(false),
)?;
joined = joined.fill_null(FillNullStrategy::Forward(None))?;
joined = joined.fill_null(FillNullStrategy::Backward(None))?;
let ticker_symbols: Vec<String> = self.tickers.iter().map(|t| t.ticker.clone()).collect();
let mut returns_cols: Vec<polars::prelude::Column> = Vec::new();
let dates = joined
.column("timestamp")?
.datetime()?
.into_no_null_iter()
.map(|x| {
chrono::DateTime::from_timestamp_millis(x)
.unwrap_or_default()
.naive_local()
.to_string()
})
.collect::<Vec<String>>();
let dates = &dates[1..];
returns_cols.push(polars::prelude::Column::new("timestamp".into(), dates));
for sym in &ticker_symbols {
let prices = joined.column(sym)?.f64()?.to_vec();
let rets: Vec<f64> = prices
.windows(2)
.map(|w| match (w[0], w[1]) {
(Some(prev), Some(curr)) if prev.abs() > 1e-12 => (curr - prev) / prev,
_ => 0.0,
})
.collect();
returns_cols.push(polars::prelude::Column::new(sym.as_str().into(), &rets));
}
let joint_df = DataFrame::new(returns_cols)?;
Ok(joint_df)
}
async fn performance_stats(&self) -> Result<DataFrame, Box<dyn Error>> {
let mut futures = Vec::new();
let total_tickers = self.tickers.len();
let pb = ProgressBar::new(total_tickers as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{msg} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})")?
.progress_chars("#>-"),
);
for ticker in self.tickers.clone().into_iter() {
let fut = tokio::task::spawn(async move {
match ticker.performance_stats().await {
Ok(stats) => Ok(stats),
Err(e) => {
eprintln!("No Returns Data for {}: {}", &ticker.ticker, e);
Err(format!("No Returns Data for {}: {}", &ticker.ticker, e))
}
}
});
futures.push(fut);
}
let results = join_all(futures).await;
let mut all_stats: Vec<TickerPerformanceStats> = Vec::new();
for result in results {
match result {
Ok(Ok(stats)) => {
all_stats.push(stats);
}
Ok(Err(_)) => continue,
Err(e) => eprintln!("Error in task: {e}"),
}
}
let mut ticker_symbols: Vec<String> = vec![];
let mut numeric_fields: Vec<Vec<f64>> = vec![vec![]; 16];
for stat in &all_stats {
ticker_symbols.push(stat.ticker_symbol.clone());
numeric_fields[0].push(stat.performance_stats.daily_return);
numeric_fields[1].push(stat.performance_stats.daily_volatility);
numeric_fields[2].push(stat.performance_stats.cumulative_return);
numeric_fields[3].push(stat.performance_stats.annualized_return);
numeric_fields[4].push(stat.performance_stats.annualized_volatility);
numeric_fields[5].push(stat.performance_stats.alpha.unwrap_or(f64::NAN));
numeric_fields[6].push(stat.performance_stats.beta.unwrap_or(f64::NAN));
numeric_fields[7].push(stat.performance_stats.sharpe_ratio);
numeric_fields[8].push(stat.performance_stats.sortino_ratio);
numeric_fields[9].push(stat.performance_stats.active_return.unwrap_or(f64::NAN));
numeric_fields[10].push(stat.performance_stats.active_risk.unwrap_or(f64::NAN));
numeric_fields[11].push(stat.performance_stats.information_ratio.unwrap_or(f64::NAN));
numeric_fields[12].push(stat.performance_stats.calmar_ratio);
numeric_fields[13].push(stat.performance_stats.maximum_drawdown);
numeric_fields[14].push(stat.performance_stats.value_at_risk);
numeric_fields[15].push(stat.performance_stats.expected_shortfall);
}
let df = DataFrame::new(vec![
Column::new("Symbol".into(), ticker_symbols),
Column::new(
"Daily Return".into(),
numeric_fields[0]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
Column::new(
"Daily Volatility".into(),
numeric_fields[1]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
Column::new(
"Cumulative Return".into(),
numeric_fields[2]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
Column::new(
"Annualized Return".into(),
numeric_fields[3]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
Column::new(
"Annualized Volatility".into(),
numeric_fields[4]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
Column::new(
"Alpha".into(),
numeric_fields[5]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
Column::new(
"Beta".into(),
numeric_fields[6]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
Column::new(
"Sharpe Ratio".into(),
numeric_fields[7]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
Column::new(
"Sortino Ratio".into(),
numeric_fields[8]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
Column::new(
"Active Return".into(),
numeric_fields[9]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
Column::new(
"Active Risk".into(),
numeric_fields[10]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
Column::new(
"Information Ratio".into(),
numeric_fields[11]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
Column::new(
"Calmar Ratio".into(),
numeric_fields[12]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
Column::new(
"Maximum Drawdown".into(),
numeric_fields[13]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
Column::new(
"Value at Risk".into(),
numeric_fields[14]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
Column::new(
"Expected Shortfall".into(),
numeric_fields[15]
.iter()
.map(|x| x.to_string())
.collect::<Vec<String>>(),
),
])?;
pb.finish_with_message("Done");
Ok(df)
}
}