use std::error::Error;
use polars::prelude::*;
use futures::future::join_all;
use indicatif::{ProgressBar, ProgressStyle};
use crate::data::yahoo::config::TickerSummaryStats;
use crate::analytics::performance::TickerPerformanceStats;
use crate::prelude::{StatementFrequency, StatementType, TickerData, TickerPerformance, Tickers};
macro_rules! fetch_all {
($tickers:expr, $method:ident, $idx:expr $(, $param:expr)*) => {{
let mut futures = Vec::new();
let tickers = $tickers.clone();
let total_tickers = tickers.len();
let pb = ProgressBar::new(total_tickers as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{msg} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})")?
.progress_chars("#>-"),
);
for ticker in tickers.into_iter() {
let ticker = ticker.clone();
let fut = tokio::task::spawn(async move {
let result = ticker.$method($($param), *).await;
match result {
Ok(mut df) => {
let symbol_series = Series::new("symbol".into(), vec![ticker.ticker.clone(); df.height()]);
if df.width() > $idx {
let _ = df.insert_column($idx, symbol_series);
Ok(df)
} else {
eprintln!("No Data for {}", &ticker.ticker);
Err(format!("No Data for {}", &ticker.ticker))
}
}
Err(e) => {
eprintln!("Error Fetching Data for {}: {}", &ticker.ticker, e);
Err(format!("Error Fetching Data for {}: {}", &ticker.ticker, e))
}
}
});
futures.push(fut);
}
let results = join_all(futures).await;
let mut joint_df = DataFrame::default();
for result in results {
match result {
Ok(Ok(df)) => {
match joint_df.vstack(&df) {
Ok(jdf) => joint_df = jdf,
Err(e) => eprintln!("Unable to stack {:?}: {}", &df, e),
}
}
Ok(Err(_)) => continue,
Err(e) => eprintln!("Error in task: {}", e),
}
}
pb.finish_with_message(format!("Done"));
Ok(joint_df)
}};
}
pub trait TickersData {
fn get_chart(&self) -> impl std::future::Future<Output = Result<DataFrame, Box<dyn Error>>>;
fn get_news(&self) -> impl std::future::Future<Output = Result<DataFrame, Box<dyn Error>>>;
fn get_financials(&self, statement_type: StatementType, frequency: StatementFrequency, formatted: Option<bool>) -> impl std::future::Future<Output = Result<DataFrame, Box<dyn Error>>>;
fn get_ticker_stats(&self) -> impl std::future::Future<Output = Result<DataFrame, Box<dyn Error>>>;
fn get_options(&self) -> impl std::future::Future<Output = Result<DataFrame, Box<dyn Error>>>;
fn returns(&self) -> impl std::future::Future<Output = Result<DataFrame, Box<dyn Error>>>;
fn performance_stats(&self) -> impl std::future::Future<Output = Result<DataFrame, Box<dyn Error>>>;
}
impl TickersData for Tickers {
async fn get_chart(&self) -> Result<DataFrame, Box<dyn Error>> {
fetch_all!(self.tickers.clone(), get_chart, 1)
}
async fn get_news(&self) -> Result<DataFrame, Box<dyn Error>> {
fetch_all!(self.tickers.clone(), get_news, 1)
}
async fn get_financials(&self, statement_type: StatementType, frequency: StatementFrequency, formatted: Option<bool>) -> Result<DataFrame, Box<dyn Error>> {
fetch_all!(self.tickers.clone(), get_financials, 1, statement_type, frequency, formatted)
}
async fn get_ticker_stats(&self) -> Result<DataFrame, Box<dyn Error>> {
let mut futures = Vec::new();
let total_tickers = self.tickers.len();
let pb = ProgressBar::new(total_tickers as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{msg} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})")?
.progress_chars("#>-"),
);
for ticker in self.tickers.clone().into_iter() {
let fut = tokio::task::spawn(async move {
match ticker.get_ticker_stats().await {
Ok(stats) => {
Ok((ticker.ticker.clone(), stats))
}
Err(e) => {
eprintln!("Error Fetching Ticker Stats for {}: {}", &ticker.ticker, e);
Err(format!("Error Fetching Ticker Stats for {}: {}", &ticker.ticker, e))
}
}
});
futures.push(fut);
}
let results = join_all(futures).await;
let mut all_stats: Vec<(String, TickerSummaryStats)> = Vec::new();
for result in results {
match result {
Ok(Ok(stats)) => {
all_stats.push(stats);
}
Ok(Err(_)) => continue,
Err(e) => eprintln!("Error in task: {e}"),
}
}
let mut formatted_dfs = Vec::new();
for (ticker, stats) in all_stats {
let mut fmt_df = stats.to_dataframe()?;
fmt_df.rename("Value", ticker.as_str().into())?;
formatted_dfs.push(fmt_df);
}
let combine = |dfs: Vec<DataFrame>| -> Result<DataFrame, Box<dyn Error>> {
let mut combined = dfs.first().ok_or("No data")?.clone();
for df in dfs.iter().skip(1) {
combined = combined.left_join(df, ["Metric"], ["Metric"])?;
}
Ok(combined)
};
let mut stats = combine(formatted_dfs)?;
let columns = stats.column("Metric")?.str()?.into_no_null_iter()
.map(|x| x.to_string()).collect::<Vec<String>>();
stats = stats.drop("Metric")?;
let column_names = stats.get_column_names().iter()
.map(|&name| name.to_string()).collect::<Vec<String>>();
let symbols = Series::new("Symbol".into(), column_names);
let mut stats_df = stats.transpose(None, None)?;
stats_df.set_column_names(&columns)?;
let _ = stats_df.insert_column(0, symbols)?;
let columns_to_drop: Vec<String> = stats_df
.get_column_names()
.iter()
.filter(|&&name| name != "Symbol")
.filter_map(|&col_name| {
let col = stats_df.column(col_name).ok()?;
let utf8 = col.str().ok()?;
if utf8.into_no_null_iter().all(|val| val.trim().is_empty()) {
Some(col_name.to_string())
} else {
None
}
})
.collect();
for col_name in columns_to_drop {
stats_df = stats_df.drop(&col_name)?;
}
pb.finish_with_message("Done");
Ok(stats_df)
}
async fn get_options(&self) -> Result<DataFrame, Box<dyn Error>> {
let mut futures = Vec::new();
let total_tickers = self.tickers.len();
let pb = ProgressBar::new(total_tickers as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{msg} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})")?
.progress_chars("#>-"),
);
for ticker in self.tickers.clone().into_iter() {
let fut = tokio::task::spawn(async move {
match ticker.get_options().await {
Ok(options) => {
let mut df = options.chain;
let symbol_series = Series::new("symbol".into(), vec![ticker.ticker.clone(); df.height()]);
if df.width() > 3 {
let _ = df.insert_column(3, symbol_series);
Ok(df)
} else {
eprintln!("No Options Data for {}", &ticker.ticker);
Err(format!("No Options Data for {}", &ticker.ticker))
}
}
Err(e) => {
eprintln!("Error Fetching Options Data for {}: {}", &ticker.ticker, e);
Err(format!("Error Fetching Options Data for {}: {}", &ticker.ticker, e))
}
}
});
futures.push(fut);
}
let results = join_all(futures).await;
let mut joint_df = DataFrame::default();
for result in results {
match result {
Ok(Ok(df)) => {
joint_df = joint_df.vstack(&df)?;
}
Ok(Err(_)) => continue,
Err(e) => eprintln!("Error in task: {e}"),
}
}
pb.finish_with_message("Done");
Ok(joint_df)
}
async fn returns(&self) -> Result<DataFrame, Box<dyn Error>> {
let mut futures = Vec::new();
let total_tickers = self.tickers.len();
let pb = ProgressBar::new(total_tickers as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{msg} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})")?
.progress_chars("#>-"),
);
for ticker in self.tickers.clone().into_iter() {
let fut = tokio::task::spawn(async move {
match ticker.performance_stats().await {
Ok(stats) => {
let date_series = Column::new("timestamp".into(), stats.dates_array);
let returns_series = Column::new(ticker.ticker.as_str().into(), stats.security_returns);
if let Ok(df) = DataFrame::new(vec![date_series, returns_series]) {
Ok(df)
} else {
eprintln!("No Returns Data for {}", &ticker.ticker);
Err(format!("No Returns Data for {}", &ticker.ticker))
}
}
Err(e) => {
eprintln!("No Returns Data for {}: {}", &ticker.ticker, e);
Err(format!("No Returns Data for {}: {}", &ticker.ticker, e))
}
}
});
futures.push(fut);
}
let results = join_all(futures).await;
let mut joint_df = DataFrame::default();
for result in results {
match result {
Ok(Ok(df)) => {
if joint_df.width() == 0 {
joint_df = df;
} else {
joint_df = joint_df
.join(
&df,
["timestamp"],
["timestamp"],
JoinArgs::new(JoinType::Full).with_coalesce(JoinCoalesce::CoalesceColumns),
None
)?;
}
}
Ok(Err(_)) => continue,
Err(e) => eprintln!("Error in task: {e}"),
}
}
pb.finish_with_message("Done");
joint_df = joint_df.fill_null(FillNullStrategy::Zero)?;
Ok(joint_df)
}
async fn performance_stats(&self) -> Result<DataFrame, Box<dyn Error>> {
let mut futures = Vec::new();
let total_tickers = self.tickers.len();
let pb = ProgressBar::new(total_tickers as u64);
pb.set_style(
ProgressStyle::default_bar()
.template("{msg} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})")?
.progress_chars("#>-"),
);
for ticker in self.tickers.clone().into_iter() {
let fut = tokio::task::spawn(async move {
match ticker.performance_stats().await {
Ok(stats) => {
Ok(stats)
}
Err(e) => {
eprintln!("No Returns Data for {}: {}", &ticker.ticker, e);
Err(format!("No Returns Data for {}: {}", &ticker.ticker, e))
}
}
});
futures.push(fut);
}
let results = join_all(futures).await;
let mut all_stats: Vec<TickerPerformanceStats> = Vec::new();
for result in results {
match result {
Ok(Ok(stats)) => {
all_stats.push(stats);
}
Ok(Err(_)) => continue,
Err(e) => eprintln!("Error in task: {e}"),
}
}
let mut ticker_symbols: Vec<String> = vec![];
let mut numeric_fields: Vec<Vec<f64>> = vec![vec![]; 16];
for stat in &all_stats {
ticker_symbols.push(stat.ticker_symbol.clone());
numeric_fields[0].push(stat.performance_stats.daily_return);
numeric_fields[1].push(stat.performance_stats.daily_volatility);
numeric_fields[2].push(stat.performance_stats.cumulative_return);
numeric_fields[3].push(stat.performance_stats.annualized_return);
numeric_fields[4].push(stat.performance_stats.annualized_volatility);
numeric_fields[5].push(stat.performance_stats.alpha);
numeric_fields[6].push(stat.performance_stats.beta);
numeric_fields[7].push(stat.performance_stats.sharpe_ratio);
numeric_fields[8].push(stat.performance_stats.sortino_ratio);
numeric_fields[9].push(stat.performance_stats.active_return);
numeric_fields[10].push(stat.performance_stats.active_risk);
numeric_fields[11].push(stat.performance_stats.information_ratio);
numeric_fields[12].push(stat.performance_stats.calmar_ratio);
numeric_fields[13].push(stat.performance_stats.maximum_drawdown);
numeric_fields[14].push(stat.performance_stats.value_at_risk);
numeric_fields[15].push(stat.performance_stats.expected_shortfall);
}
let df = DataFrame::new(vec![
Column::new("Symbol".into(), ticker_symbols),
Column::new("Daily Return".into(), numeric_fields[0].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
Column::new("Daily Volatility".into(), numeric_fields[1].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
Column::new("Cumulative Return".into(), numeric_fields[2].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
Column::new("Annualized Return".into(), numeric_fields[3].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
Column::new("Annualized Volatility".into(), numeric_fields[4].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
Column::new("Alpha".into(), numeric_fields[5].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
Column::new("Beta".into(), numeric_fields[6].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
Column::new("Sharpe Ratio".into(), numeric_fields[7].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
Column::new("Sortino Ratio".into(), numeric_fields[8].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
Column::new("Active Return".into(), numeric_fields[9].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
Column::new("Active Risk".into(), numeric_fields[10].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
Column::new("Information Ratio".into(), numeric_fields[11].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
Column::new("Calmar Ratio".into(), numeric_fields[12].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
Column::new("Maximum Drawdown".into(), numeric_fields[13].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
Column::new("Value at Risk".into(), numeric_fields[14].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
Column::new("Expected Shortfall".into(), numeric_fields[15].iter().map(|x| x.to_string()).collect::<Vec<String>>()),
])?;
pb.finish_with_message("Done");
Ok(df)
}
}