use super::macros::{batch_fetch_cached, define_batch_response};
use crate::adapters::yahoo::client::ClientConfig;
#[cfg(feature = "backtesting")]
use crate::backtesting;
use crate::constants::{Frequency, Interval, Region, StatementType, TimeRange};
use crate::error::{FinanceError, Result};
use crate::format::Both;
#[cfg(any(feature = "backtesting", feature = "indicators"))]
use crate::indicators;
use crate::models::chart::events::ChartEvents;
use crate::models::chart::spark::Spark;
use crate::models::chart::spark::response::SparkResponse;
use crate::models::chart::{CapitalGain, Chart, Dividend, Split};
use crate::models::corporate::news::News;
use crate::models::corporate::recommendation::Recommendation;
use crate::models::format::Format;
use crate::models::fundamentals::FinancialStatement;
use crate::models::options::Options;
use crate::models::quote::{Quote, QuoteSummaryResponse};
use crate::providers::types::recommendation_from_similar;
use crate::providers::yahoo::YahooProvider;
use crate::providers::{
Capability, Fetch, Provider, ProviderAdapter, ProviderSet, Routes, build_providers,
};
use crate::ticker::ClientHandle;
use crate::utils::{CacheEntry, EVICTION_THRESHOLD, filter_by_range};
use futures::stream::{self, StreamExt};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
type MapCache<K, V> = Arc<RwLock<HashMap<K, CacheEntry<V>>>>;
type ChartCacheKey = (Arc<str>, Interval, TimeRange);
type QuoteCache = MapCache<Arc<str>, Quote>;
type ChartCache = MapCache<ChartCacheKey, Chart>;
type EventsCache = MapCache<Arc<str>, ChartEvents>;
type FinancialsCache = MapCache<(Arc<str>, StatementType, Frequency), FinancialStatement>;
type NewsCache = MapCache<Arc<str>, Vec<News>>;
type RecommendationsCache = MapCache<(Arc<str>, u32), Recommendation>;
type OptionsCache = MapCache<(Arc<str>, Option<i64>), Options>;
type SparkCacheKey = (Arc<str>, Interval, TimeRange);
type SparkCache = MapCache<SparkCacheKey, Spark>;
#[cfg(feature = "indicators")]
type IndicatorsCache = MapCache<(Arc<str>, Interval, TimeRange), indicators::IndicatorsSummary>;
type FetchGuard = Arc<tokio::sync::Mutex<()>>;
type FetchGuardMap<K> = Arc<RwLock<HashMap<K, FetchGuard>>>;
define_batch_response! {
BatchQuotesResponse => quotes: Quote
}
define_batch_response! {
BatchChartsResponse => charts: Chart
}
define_batch_response! {
BatchSparksResponse => sparks: Spark
}
define_batch_response! {
BatchDividendsResponse => dividends: Vec<Dividend>
}
define_batch_response! {
BatchSplitsResponse => splits: Vec<Split>
}
define_batch_response! {
BatchCapitalGainsResponse => capital_gains: Vec<CapitalGain>
}
define_batch_response! {
BatchFinancialsResponse => financials: FinancialStatement
}
define_batch_response! {
BatchNewsResponse => news: Vec<News>
}
define_batch_response! {
BatchRecommendationsResponse => recommendations: Recommendation
}
define_batch_response! {
BatchOptionsResponse => options: Options
}
#[cfg(feature = "indicators")]
define_batch_response! {
BatchIndicatorsResponse => indicators: indicators::IndicatorsSummary
}
const DEFAULT_MAX_CONCURRENCY: usize = 10;
pub struct TickersBuilder {
symbols: Vec<Arc<str>>,
config: ClientConfig,
shared_client: Option<ClientHandle>,
injected_providers: Option<Arc<ProviderSet>>,
max_concurrency: usize,
cache_ttl: Option<Duration>,
include_logo: bool,
}
impl TickersBuilder {
fn new<S, I>(symbols: I) -> Self
where
S: Into<String>,
I: IntoIterator<Item = S>,
{
Self {
symbols: symbols.into_iter().map(|s| s.into().into()).collect(),
config: ClientConfig::default(),
shared_client: None,
injected_providers: None,
max_concurrency: DEFAULT_MAX_CONCURRENCY,
cache_ttl: None,
include_logo: false,
}
}
pub fn region(mut self, region: Region) -> Self {
self.config.lang = region.lang().to_string();
self.config.region = region.region().to_string();
self
}
pub fn lang(mut self, lang: impl Into<String>) -> Self {
self.config.lang = lang.into();
self
}
pub fn region_code(mut self, region: impl Into<String>) -> Self {
self.config.region = region.into();
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.config.timeout = timeout;
self
}
pub fn proxy(mut self, proxy: impl Into<String>) -> Self {
self.config.proxy = Some(proxy.into());
self
}
pub fn config(mut self, config: ClientConfig) -> Self {
self.config = config;
self
}
pub fn max_concurrency(mut self, n: usize) -> Self {
self.max_concurrency = n.max(1);
self
}
pub fn cache(mut self, ttl: Duration) -> Self {
self.cache_ttl = Some(ttl);
self
}
pub fn logo(mut self) -> Self {
self.include_logo = true;
self
}
pub(crate) fn with_provider_set(mut self, set: Arc<ProviderSet>) -> Self {
self.injected_providers = Some(set);
self
}
pub fn client(mut self, handle: ClientHandle) -> Self {
self.shared_client = Some(handle);
self
}
pub async fn build(self) -> Result<Tickers> {
let providers = if let Some(set) = self.injected_providers {
set
} else if let Some(handle) = self.shared_client {
let yahoo = YahooProvider::from_client(handle.0);
let client = yahoo.client_arc();
Arc::new(ProviderSet::new(
vec![Arc::new(yahoo) as Arc<dyn ProviderAdapter>],
Some(client),
Routes::new(Fetch::Sequential),
))
} else {
Arc::new(
build_providers(
&[Provider::Yahoo],
&self.config,
Routes::new(Fetch::Sequential),
)
.await?,
)
};
Ok(Tickers {
symbols: self.symbols,
providers,
max_concurrency: self.max_concurrency,
cache_ttl: self.cache_ttl,
include_logo: self.include_logo,
quote_cache: Default::default(),
chart_cache: Default::default(),
events_cache: Default::default(),
financials_cache: Default::default(),
news_cache: Default::default(),
recommendations_cache: Default::default(),
options_cache: Default::default(),
spark_cache: Default::default(),
#[cfg(feature = "indicators")]
indicators_cache: Default::default(),
quotes_fetch: Arc::new(tokio::sync::Mutex::new(())),
charts_fetch: Default::default(),
financials_fetch: Default::default(),
news_fetch: Arc::new(tokio::sync::Mutex::new(())),
recommendations_fetch: Default::default(),
options_fetch: Default::default(),
spark_fetch: Default::default(),
#[cfg(feature = "indicators")]
indicators_fetch: Default::default(),
})
}
}
pub struct Tickers {
symbols: Vec<Arc<str>>,
providers: Arc<ProviderSet>,
max_concurrency: usize,
cache_ttl: Option<Duration>,
include_logo: bool,
quote_cache: QuoteCache,
chart_cache: ChartCache,
events_cache: EventsCache,
financials_cache: FinancialsCache,
news_cache: NewsCache,
recommendations_cache: RecommendationsCache,
options_cache: OptionsCache,
spark_cache: SparkCache,
#[cfg(feature = "indicators")]
indicators_cache: IndicatorsCache,
quotes_fetch: FetchGuard,
charts_fetch: FetchGuardMap<(Interval, TimeRange)>,
financials_fetch: FetchGuardMap<(StatementType, Frequency)>,
news_fetch: FetchGuard,
recommendations_fetch: FetchGuardMap<u32>,
options_fetch: FetchGuardMap<Option<i64>>,
spark_fetch: FetchGuardMap<(Interval, TimeRange)>,
#[cfg(feature = "indicators")]
indicators_fetch: FetchGuardMap<(Interval, TimeRange)>,
}
impl std::fmt::Debug for Tickers {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Tickers")
.field("symbols", &self.symbols)
.field("max_concurrency", &self.max_concurrency)
.field("cache_ttl", &self.cache_ttl)
.finish_non_exhaustive()
}
}
impl Tickers {
pub async fn new<S, I>(symbols: I) -> Result<Self>
where
S: Into<String>,
I: IntoIterator<Item = S>,
{
Self::builder(symbols).build().await
}
pub fn builder<S, I>(symbols: I) -> TickersBuilder
where
S: Into<String>,
I: IntoIterator<Item = S>,
{
TickersBuilder::new(symbols)
}
pub fn symbols(&self) -> Vec<&str> {
self.symbols.iter().map(|s| &**s).collect()
}
pub fn len(&self) -> usize {
self.symbols.len()
}
pub fn is_empty(&self) -> bool {
self.symbols.is_empty()
}
pub fn client_handle(&self) -> ClientHandle {
ClientHandle(
self.providers
.first_yahoo()
.expect("Tickers always uses a Yahoo session"),
)
}
#[inline]
fn is_cache_fresh<T>(&self, entry: Option<&CacheEntry<T>>) -> bool {
CacheEntry::is_fresh_with_ttl(entry, self.cache_ttl)
}
fn all_cached<K: Eq + std::hash::Hash, V>(
&self,
map: &HashMap<K, CacheEntry<V>>,
keys: impl Iterator<Item = K>,
) -> bool {
let Some(ttl) = self.cache_ttl else {
return false;
};
keys.into_iter()
.all(|k| map.get(&k).map(|e| e.is_fresh(ttl)).unwrap_or(false))
}
#[inline]
fn cache_insert<K: Eq + std::hash::Hash, V>(
&self,
map: &mut HashMap<K, CacheEntry<V>>,
key: K,
value: V,
) {
if let Some(ttl) = self.cache_ttl {
if map.len() >= EVICTION_THRESHOLD {
map.retain(|_, entry| entry.is_fresh(ttl));
}
map.insert(key, CacheEntry::new(value));
}
}
pub async fn quotes(&self) -> Result<BatchQuotesResponse> {
{
let cache = self.quote_cache.read().await;
if self.all_cached(&cache, self.symbols.iter().cloned()) {
let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
for symbol in &self.symbols {
if let Some(entry) = cache.get(symbol) {
response
.quotes
.insert(symbol.to_string(), entry.value.clone());
}
}
return Ok(response);
}
}
let _fetch_guard = self.quotes_fetch.lock().await;
{
let cache = self.quote_cache.read().await;
if self.all_cached(&cache, self.symbols.iter().cloned()) {
let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
for symbol in &self.symbols {
if let Some(entry) = cache.get(symbol) {
response
.quotes
.insert(symbol.to_string(), entry.value.clone());
}
}
return Ok(response);
}
}
let symbol_strings: Vec<String> = self.symbols.iter().map(|s| s.to_string()).collect();
let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
let (quote_data, logos) = if self.include_logo {
let providers_logo = Arc::clone(&self.providers);
let syms_logo = symbol_strings.clone();
let logo_future = async move {
if let Ok(client) = providers_logo.first_yahoo() {
let syms_ref: Vec<&str> = syms_logo.iter().map(String::as_str).collect();
crate::adapters::yahoo::quote::quotes::fetch_with_fields(
&client,
&syms_ref,
Some(&["logoUrl", "companyLogoUrl"]),
true,
true,
)
.await
.ok()
} else {
None
}
};
let providers_quote = Arc::clone(&self.providers);
let syms_quote = symbol_strings.clone();
let quote_future = async move {
providers_quote
.fetch(Capability::QUOTE, |p| {
let syms = syms_quote.clone();
let p = p.clone();
async move {
let syms_ref: Vec<&str> = syms.iter().map(String::as_str).collect();
p.fetch_quotes_batch(&syms_ref).await
}
})
.await
};
let (batch_result, logo_result) = tokio::join!(quote_future, logo_future);
let quote_data = match batch_result {
Ok(data) => data,
Err(_) => {
self.fetch_quotes_per_symbol(&symbol_strings, &mut response)
.await
}
};
(quote_data, logo_result)
} else {
let providers = Arc::clone(&self.providers);
let syms = symbol_strings.clone();
let batch_result = providers
.fetch(Capability::QUOTE, |p| {
let syms = syms.clone();
let p = p.clone();
async move {
let syms_ref: Vec<&str> = syms.iter().map(String::as_str).collect();
p.fetch_quotes_batch(&syms_ref).await
}
})
.await;
let data = match batch_result {
Ok(data) => data,
Err(_) => {
self.fetch_quotes_per_symbol(&symbol_strings, &mut response)
.await
}
};
(data, None)
};
let logo_map: HashMap<String, (Option<String>, Option<String>)> = logos
.and_then(|l| l.get("quoteResponse")?.get("result")?.as_array().cloned())
.map(|results| {
results
.iter()
.filter_map(|r| {
let symbol = r.get("symbol")?.as_str()?.to_string();
let logo_url = r.get("logoUrl").and_then(|v| v.as_str()).map(String::from);
let company_logo_url = r
.get("companyLogoUrl")
.and_then(|v| v.as_str())
.map(String::from);
Some((symbol, (logo_url, company_logo_url)))
})
.collect()
})
.unwrap_or_default();
let mut parsed_quotes: Vec<(String, Quote)> = Vec::new();
for (symbol, summary) in quote_data {
let logo_url = logo_map.get(&symbol).and_then(|(l, _)| l.clone());
let company_logo_url = logo_map.get(&symbol).and_then(|(_, c)| c.clone());
let quote = Quote::from_response(&summary, logo_url, company_logo_url);
parsed_quotes.push((symbol, quote));
}
if self.cache_ttl.is_some() {
let mut cache = self.quote_cache.write().await;
for (symbol, quote) in &parsed_quotes {
self.cache_insert(&mut cache, symbol.as_str().into(), quote.clone());
}
}
for (symbol, quote) in parsed_quotes {
response.quotes.insert(symbol, quote);
}
for symbol in &self.symbols {
let s = &**symbol;
if !response.quotes.contains_key(s) && !response.errors.contains_key(s) {
response.errors.insert(
symbol.to_string(),
"Symbol not found in response".to_string(),
);
}
}
Ok(response)
}
async fn fetch_quotes_per_symbol(
&self,
symbols: &[String],
response: &mut BatchQuotesResponse,
) -> Vec<(String, QuoteSummaryResponse)> {
let futures: Vec<_> = symbols
.iter()
.map(|sym| {
let providers = Arc::clone(&self.providers);
let sym = sym.clone();
async move {
let result = providers
.fetch(Capability::QUOTE, |p| {
let sym = sym.clone();
let p = p.clone();
async move { p.fetch_quote(&sym).await }
})
.await;
(sym, result)
}
})
.collect();
let results: Vec<_> = stream::iter(futures)
.buffer_unordered(self.max_concurrency)
.collect()
.await;
let mut successes = Vec::new();
for (sym, result) in results {
match result {
Ok(resp) => successes.push((sym, resp)),
Err(e) => {
response.errors.insert(sym, e.to_string());
}
}
}
successes
}
pub async fn quote<F>(&self, symbol: &str) -> Result<Quote<F>>
where
F: Format,
Quote<Both>: Into<Quote<F>>,
{
{
let cache = self.quote_cache.read().await;
if let Some(entry) = cache.get(symbol)
&& self.is_cache_fresh(Some(entry))
{
return Ok(entry.value.clone().into());
}
}
let response = self.quotes().await?;
response
.quotes
.get(symbol)
.cloned()
.map(Into::into)
.ok_or_else(|| FinanceError::SymbolNotFound {
symbol: Some(symbol.to_string()),
context: response
.errors
.get(symbol)
.cloned()
.unwrap_or_else(|| "Symbol not found".to_string()),
})
}
async fn get_fetch_guard<K: Clone + Eq + std::hash::Hash>(
guard_map: &FetchGuardMap<K>,
key: K,
) -> FetchGuard {
{
let guards = guard_map.read().await;
if let Some(guard) = guards.get(&key) {
return Arc::clone(guard);
}
}
let mut guards = guard_map.write().await;
Arc::clone(
guards
.entry(key)
.or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))),
)
}
pub async fn charts(
&self,
interval: Interval,
range: TimeRange,
) -> Result<BatchChartsResponse> {
{
let cache = self.chart_cache.read().await;
if self.all_cached(
&cache,
self.symbols.iter().map(|s| (s.clone(), interval, range)),
) {
let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
for symbol in &self.symbols {
if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
response
.charts
.insert(symbol.to_string(), entry.value.clone());
}
}
return Ok(response);
}
}
let fetch_guard = Self::get_fetch_guard(&self.charts_fetch, (interval, range)).await;
let _guard = fetch_guard.lock().await;
{
let cache = self.chart_cache.read().await;
if self.all_cached(
&cache,
self.symbols.iter().map(|s| (s.clone(), interval, range)),
) {
let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
for symbol in &self.symbols {
if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
response
.charts
.insert(symbol.to_string(), entry.value.clone());
}
}
return Ok(response);
}
}
let futures: Vec<_> = self
.symbols
.iter()
.map(|symbol| {
let providers = Arc::clone(&self.providers);
let symbol = Arc::clone(symbol);
async move {
let sym = symbol.to_string();
let result = providers
.fetch(Capability::CHART, |p| {
let sym = sym.clone();
let p = p.clone();
async move { p.fetch_chart(&sym, interval, range).await }
})
.await;
(symbol, result)
}
})
.collect();
let results: Vec<_> = stream::iter(futures)
.buffer_unordered(self.max_concurrency)
.collect()
.await;
let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
let mut parsed_charts: Vec<(Arc<str>, Chart)> = Vec::new();
for (symbol, result) in results {
match result {
Ok(data) => {
let chart = data;
parsed_charts.push((symbol, chart));
}
Err(e) => {
response.errors.insert(symbol.to_string(), e.to_string());
}
}
}
if self.cache_ttl.is_some() {
let mut cache = self.chart_cache.write().await;
let cache_keys: Vec<_> = parsed_charts
.into_iter()
.map(|(symbol, chart)| {
self.cache_insert(&mut cache, (symbol.clone(), interval, range), chart);
symbol
})
.collect();
for symbol in cache_keys {
if let Some(cached) = cache.get(&(symbol.clone(), interval, range)) {
response
.charts
.insert(symbol.to_string(), cached.value.clone());
}
}
} else {
for (symbol, chart) in parsed_charts {
response.charts.insert(symbol.to_string(), chart);
}
}
Ok(response)
}
pub async fn chart(&self, symbol: &str, interval: Interval, range: TimeRange) -> Result<Chart> {
{
let cache = self.chart_cache.read().await;
let key: Arc<str> = symbol.into();
if let Some(entry) = cache.get(&(key, interval, range))
&& self.is_cache_fresh(Some(entry))
{
return Ok(entry.value.clone());
}
}
let response = self.charts(interval, range).await?;
response
.charts
.get(symbol)
.cloned()
.ok_or_else(|| FinanceError::SymbolNotFound {
symbol: Some(symbol.to_string()),
context: response
.errors
.get(symbol)
.cloned()
.unwrap_or_else(|| "Symbol not found".to_string()),
})
}
pub async fn charts_range(
&self,
interval: Interval,
start: i64,
end: i64,
) -> Result<BatchChartsResponse> {
let futures: Vec<_> = self
.symbols
.iter()
.map(|symbol| {
let providers = Arc::clone(&self.providers);
let symbol = Arc::clone(symbol);
async move {
let sym = symbol.to_string();
let result = providers
.fetch(Capability::CHART, |p| {
let sym = sym.clone();
let p = p.clone();
async move { p.fetch_chart_range(&sym, interval, start, end).await }
})
.await;
(symbol, result)
}
})
.collect();
let results: Vec<_> = stream::iter(futures)
.buffer_unordered(self.max_concurrency)
.collect()
.await;
let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
for (symbol, result) in results {
match result {
Ok(data) => {
let chart = data;
response.charts.insert(symbol.to_string(), chart);
}
Err(e) => {
response.errors.insert(symbol.to_string(), e.to_string());
}
}
}
Ok(response)
}
async fn ensure_events_loaded(&self) -> Result<()> {
let symbols_to_fetch: Vec<Arc<str>> = {
let cache = self.events_cache.read().await;
self.symbols
.iter()
.filter(|sym| !cache.contains_key(*sym))
.cloned()
.collect()
};
if symbols_to_fetch.is_empty() {
return Ok(());
}
let futures: Vec<_> = symbols_to_fetch
.iter()
.map(|symbol| {
let providers = Arc::clone(&self.providers);
let symbol = Arc::clone(symbol);
async move {
let sym = symbol.to_string();
let result = providers
.fetch(Capability::CORPORATE, |p| {
let sym = sym.clone();
let p = p.clone();
async move { p.fetch_events(&sym).await }
})
.await;
(symbol, result)
}
})
.collect();
let results: Vec<_> = stream::iter(futures)
.buffer_unordered(self.max_concurrency)
.collect()
.await;
let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
for (symbol, result) in results {
if let Ok(events_data) = result {
parsed_events.push((symbol, events_data));
}
}
if !parsed_events.is_empty() {
let mut events_cache = self.events_cache.write().await;
for (symbol, events) in parsed_events {
events_cache.insert(symbol, CacheEntry::new(events));
}
}
Ok(())
}
pub async fn spark(&self, interval: Interval, range: TimeRange) -> Result<BatchSparksResponse> {
{
let cache = self.spark_cache.read().await;
if self.all_cached(
&cache,
self.symbols.iter().map(|s| (s.clone(), interval, range)),
) {
let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
for symbol in &self.symbols {
if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
response
.sparks
.insert(symbol.to_string(), entry.value.clone());
}
}
return Ok(response);
}
}
let fetch_guard = Self::get_fetch_guard(&self.spark_fetch, (interval, range)).await;
let _guard = fetch_guard.lock().await;
{
let cache = self.spark_cache.read().await;
if self.all_cached(
&cache,
self.symbols.iter().map(|s| (s.clone(), interval, range)),
) {
let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
for symbol in &self.symbols {
if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
response
.sparks
.insert(symbol.to_string(), entry.value.clone());
}
}
return Ok(response);
}
}
let client = self.providers.first_yahoo()?;
let symbols_ref: Vec<&str> = self.symbols.iter().map(|s| &**s).collect();
let json =
crate::adapters::yahoo::quote::spark::fetch(&client, &symbols_ref, interval, range)
.await?;
let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
match SparkResponse::from_json(json) {
Ok(spark_response) => {
let mut parsed_sparks: Vec<(Arc<str>, Spark)> = Vec::new();
if let Some(results) = spark_response.spark.result {
for result in &results {
if let Some(spark) = Spark::from_response(
result,
Some(interval.as_str().to_string()),
Some(range.as_str().to_string()),
) {
let sym: Arc<str> = result.symbol.as_str().into();
parsed_sparks.push((sym, spark));
} else {
response.errors.insert(
result.symbol.to_string(),
"Failed to parse spark data".to_string(),
);
}
}
}
if self.cache_ttl.is_some() {
let mut cache = self.spark_cache.write().await;
for (symbol, spark) in &parsed_sparks {
self.cache_insert(
&mut cache,
(symbol.clone(), interval, range),
spark.clone(),
);
}
}
for (symbol, spark) in parsed_sparks {
response.sparks.insert(symbol.to_string(), spark);
}
for symbol in &self.symbols {
let symbol_str = &**symbol;
if !response.sparks.contains_key(symbol_str)
&& !response.errors.contains_key(symbol_str)
{
response.errors.insert(
symbol.to_string(),
"Symbol not found in response".to_string(),
);
}
}
}
Err(e) => {
for symbol in &self.symbols {
response.errors.insert(symbol.to_string(), e.to_string());
}
}
}
Ok(response)
}
pub async fn dividends(&self, range: TimeRange) -> Result<BatchDividendsResponse> {
let mut response = BatchDividendsResponse::with_capacity(self.symbols.len());
self.ensure_events_loaded().await?;
let events_cache = self.events_cache.read().await;
for symbol in &self.symbols {
if let Some(entry) = events_cache.get(symbol) {
let all_dividends = entry.value.to_dividends();
let filtered = filter_by_range(all_dividends, range);
response.dividends.insert(symbol.to_string(), filtered);
} else {
response
.errors
.insert(symbol.to_string(), "No events data available".to_string());
}
}
Ok(response)
}
pub async fn splits(&self, range: TimeRange) -> Result<BatchSplitsResponse> {
let mut response = BatchSplitsResponse::with_capacity(self.symbols.len());
self.ensure_events_loaded().await?;
let events_cache = self.events_cache.read().await;
for symbol in &self.symbols {
if let Some(entry) = events_cache.get(symbol) {
let all_splits = entry.value.to_splits();
let filtered = filter_by_range(all_splits, range);
response.splits.insert(symbol.to_string(), filtered);
} else {
response
.errors
.insert(symbol.to_string(), "No events data available".to_string());
}
}
Ok(response)
}
pub async fn capital_gains(&self, range: TimeRange) -> Result<BatchCapitalGainsResponse> {
let mut response = BatchCapitalGainsResponse::with_capacity(self.symbols.len());
self.ensure_events_loaded().await?;
let events_cache = self.events_cache.read().await;
for symbol in &self.symbols {
if let Some(entry) = events_cache.get(symbol) {
let all_gains = entry.value.to_capital_gains();
let filtered = filter_by_range(all_gains, range);
response.capital_gains.insert(symbol.to_string(), filtered);
} else {
response
.errors
.insert(symbol.to_string(), "No events data available".to_string());
}
}
Ok(response)
}
pub async fn financials(
&self,
statement_type: StatementType,
frequency: Frequency,
) -> Result<BatchFinancialsResponse> {
batch_fetch_cached!(self;
cache: financials_cache,
guard: map(financials_fetch, (statement_type, frequency)),
key: |s| (s.clone(), statement_type, frequency),
response: BatchFinancialsResponse.financials,
fetch: |providers, symbol| {
let sym = symbol.to_string();
providers.fetch(Capability::FUNDAMENTALS, move |p| {
let sym = sym.clone();
let p = p.clone();
async move {
p.fetch_financials(&sym, statement_type, frequency)
.await
}
}).await
},
)
}
pub async fn news(&self) -> Result<BatchNewsResponse> {
batch_fetch_cached!(self;
cache: news_cache,
guard: simple(news_fetch),
key: |s| s.clone(),
response: BatchNewsResponse.news,
fetch: |providers, symbol| {
let sym = symbol.to_string();
providers.fetch(Capability::CORPORATE, move |p| {
let sym = sym.clone();
let p = p.clone();
async move {
p.fetch_news(&sym)
.await
.map(|data| data.into_iter().collect::<Vec<News>>())
}
}).await
},
)
}
pub async fn recommendations(&self, limit: u32) -> Result<BatchRecommendationsResponse> {
batch_fetch_cached!(self;
cache: recommendations_cache,
guard: map(recommendations_fetch, limit),
key: |s| (s.clone(), limit),
response: BatchRecommendationsResponse.recommendations,
fetch: |providers, symbol| {
let sym = symbol.to_string();
providers.fetch(Capability::CORPORATE, move |p| {
let sym = sym.clone();
let p = p.clone();
async move {
let items = p.fetch_similar_symbols(&sym, limit).await?;
Ok(recommendation_from_similar(
sym,
Some(Provider::from_id_str(p.id()).ok_or_else(|| {
FinanceError::InternalError(format!("unknown provider id: {}", p.id()))
})?),
items,
Some(limit),
))
}
}).await
},
)
}
pub async fn options(&self, date: Option<i64>) -> Result<BatchOptionsResponse> {
batch_fetch_cached!(self;
cache: options_cache,
guard: map(options_fetch, date),
key: |s| (s.clone(), date),
response: BatchOptionsResponse.options,
fetch: |providers, symbol| {
let sym = symbol.to_string();
providers.fetch(Capability::OPTIONS, move |p| {
let sym = sym.clone();
let p = p.clone();
async move {
p.fetch_options(&sym, date).await
}
}).await
},
)
}
#[cfg(feature = "indicators")]
pub async fn indicators(
&self,
interval: Interval,
range: TimeRange,
) -> Result<BatchIndicatorsResponse> {
let cache_key_for = |symbol: &Arc<str>| (symbol.clone(), interval, range);
{
let cache = self.indicators_cache.read().await;
if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
for symbol in &self.symbols {
if let Some(entry) = cache.get(&cache_key_for(symbol)) {
response
.indicators
.insert(symbol.to_string(), entry.value.clone());
}
}
return Ok(response);
}
}
let fetch_guard = Self::get_fetch_guard(&self.indicators_fetch, (interval, range)).await;
let _guard = fetch_guard.lock().await;
{
let cache = self.indicators_cache.read().await;
if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
for symbol in &self.symbols {
if let Some(entry) = cache.get(&cache_key_for(symbol)) {
response
.indicators
.insert(symbol.to_string(), entry.value.clone());
}
}
return Ok(response);
}
}
let charts_response = self.charts(interval, range).await?;
let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
let mut calculated_indicators: Vec<(String, indicators::IndicatorsSummary)> = Vec::new();
for (symbol, chart) in &charts_response.charts {
let indicators = indicators::summary::calculate_indicators(&chart.candles);
calculated_indicators.push((symbol.to_string(), indicators));
}
if self.cache_ttl.is_some() {
let mut cache = self.indicators_cache.write().await;
for (symbol, indicators) in &calculated_indicators {
let key: Arc<str> = symbol.as_str().into();
self.cache_insert(&mut cache, cache_key_for(&key), indicators.clone());
}
}
for (symbol, indicators) in calculated_indicators {
response.indicators.insert(symbol, indicators);
}
for (symbol, error) in &charts_response.errors {
response.errors.insert(symbol.to_string(), error.clone());
}
Ok(response)
}
pub fn add_symbols(&mut self, symbols: &[impl AsRef<str>]) {
use std::collections::HashSet;
let existing: HashSet<&str> = self.symbols.iter().map(|s| &**s).collect();
let to_add: Vec<Arc<str>> = symbols
.iter()
.map(|s| s.as_ref())
.filter(|s| !existing.contains(s))
.map(|s| s.into())
.collect();
self.symbols.extend(to_add);
}
#[cfg(feature = "backtesting")]
pub async fn backtest<S, F>(
&self,
interval: Interval,
range: TimeRange,
config: Option<backtesting::portfolio::PortfolioConfig>,
factory: F,
) -> backtesting::Result<backtesting::portfolio::PortfolioResult>
where
S: backtesting::Strategy,
F: Fn(&str) -> S,
{
use crate::backtesting::portfolio::{PortfolioEngine, SymbolData};
let config = config.unwrap_or_default();
config.validate(self.symbols.len())?;
let charts = self
.charts(interval, range)
.await
.map_err(|e| backtesting::BacktestError::ChartError(e.to_string()))?;
let dividends_map = self
.dividends(range)
.await
.map(|b| b.dividends)
.unwrap_or_default();
let symbol_data: Vec<SymbolData> = self
.symbols
.iter()
.filter_map(|sym| {
charts.charts.get(sym.as_ref()).map(|chart| {
let divs = dividends_map.get(sym.as_ref()).cloned().unwrap_or_default();
SymbolData::new(sym.as_ref(), chart.candles.clone()).with_dividends(divs)
})
})
.collect();
let engine = PortfolioEngine::new(config);
engine.run(&symbol_data, factory)
}
pub async fn remove_symbols(&mut self, symbols: &[impl AsRef<str>]) {
use std::collections::HashSet;
let to_remove: HashSet<&str> = symbols.iter().map(|s| s.as_ref()).collect();
self.symbols.retain(|s| !to_remove.contains(&**s));
let (
mut quote_cache,
mut chart_cache,
mut events_cache,
mut financials_cache,
mut news_cache,
mut recommendations_cache,
mut options_cache,
mut spark_cache,
) = tokio::join!(
self.quote_cache.write(),
self.chart_cache.write(),
self.events_cache.write(),
self.financials_cache.write(),
self.news_cache.write(),
self.recommendations_cache.write(),
self.options_cache.write(),
self.spark_cache.write(),
);
for symbol in &to_remove {
let key: Arc<str> = (*symbol).into();
quote_cache.remove(&key);
events_cache.remove(&key);
news_cache.remove(&key);
}
chart_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
financials_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
recommendations_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
options_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
spark_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
drop((
quote_cache,
chart_cache,
events_cache,
financials_cache,
news_cache,
recommendations_cache,
options_cache,
spark_cache,
));
#[cfg(feature = "indicators")]
self.indicators_cache
.write()
.await
.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
}
pub async fn clear_cache(&self) {
tokio::join!(
async { self.quote_cache.write().await.clear() },
async { self.chart_cache.write().await.clear() },
async { self.events_cache.write().await.clear() },
async { self.financials_cache.write().await.clear() },
async { self.news_cache.write().await.clear() },
async { self.recommendations_cache.write().await.clear() },
async { self.options_cache.write().await.clear() },
async { self.spark_cache.write().await.clear() },
async {
#[cfg(feature = "indicators")]
self.indicators_cache.write().await.clear();
},
async { self.charts_fetch.write().await.clear() },
async { self.financials_fetch.write().await.clear() },
async { self.recommendations_fetch.write().await.clear() },
async { self.options_fetch.write().await.clear() },
async { self.spark_fetch.write().await.clear() },
async {
#[cfg(feature = "indicators")]
self.indicators_fetch.write().await.clear();
},
);
}
pub async fn clear_quote_cache(&self) {
self.quote_cache.write().await.clear();
}
pub async fn clear_chart_cache(&self) {
tokio::join!(
async { self.chart_cache.write().await.clear() },
async { self.events_cache.write().await.clear() },
async { self.spark_cache.write().await.clear() },
async {
#[cfg(feature = "indicators")]
self.indicators_cache.write().await.clear();
},
);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
#[ignore = "requires network access"]
async fn test_tickers_quotes() {
let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
let result = tickers.quotes().await.unwrap();
assert!(result.success_count() > 0);
}
#[tokio::test]
#[ignore = "requires network access"]
async fn test_tickers_charts() {
let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
let result = tickers
.charts(Interval::OneDay, TimeRange::FiveDays)
.await
.unwrap();
assert!(result.success_count() > 0);
}
#[tokio::test]
#[ignore = "requires network access"]
async fn test_tickers_spark() {
let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
let result = tickers
.spark(Interval::FiveMinutes, TimeRange::OneDay)
.await
.unwrap();
assert!(result.success_count() > 0);
if let Some(spark) = result.sparks.get("AAPL") {
assert!(!spark.closes.is_empty());
assert_eq!(spark.symbol, "AAPL");
assert!(spark.percent_change().is_some());
}
}
#[tokio::test]
#[ignore = "requires network access"]
async fn test_tickers_dividends() {
let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
let result = tickers.dividends(TimeRange::OneYear).await.unwrap();
assert!(result.success_count() > 0);
if let Some(dividends) = result.dividends.get("AAPL")
&& !dividends.is_empty()
{
let div = ÷nds[0];
assert!(div.timestamp > 0);
assert!(div.amount > 0.0);
}
}
#[tokio::test]
#[ignore = "requires network access"]
async fn test_tickers_splits() {
let tickers = Tickers::new(["NVDA", "TSLA"]).await.unwrap();
let result = tickers.splits(TimeRange::FiveYears).await.unwrap();
assert!(result.success_count() > 0);
for splits in result.splits.values() {
for split in splits {
assert!(split.timestamp > 0);
assert!(split.numerator > 0.0);
assert!(split.denominator > 0.0);
assert!(!split.ratio.is_empty());
}
}
}
#[tokio::test]
#[ignore = "requires network access"]
async fn test_tickers_capital_gains() {
let tickers = Tickers::new(["VFIAX", "VTI"]).await.unwrap();
let result = tickers.capital_gains(TimeRange::TwoYears).await.unwrap();
assert!(result.success_count() > 0);
for gains in result.capital_gains.values() {
for gain in gains {
assert!(gain.timestamp > 0);
assert!(gain.amount >= 0.0);
}
}
}
#[tokio::test]
#[ignore = "requires network access"]
async fn test_tickers_financials() {
let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
let result = tickers
.financials(StatementType::Income, Frequency::Annual)
.await
.unwrap();
assert!(result.success_count() > 0);
for (symbol, stmt) in &result.financials {
assert_eq!(stmt.symbol, *symbol);
assert_eq!(stmt.statement_type, "income");
assert_eq!(stmt.frequency, "annual");
assert!(!stmt.statement.is_empty());
if let Some(revenue) = stmt.statement.get("TotalRevenue") {
assert!(!revenue.is_empty());
}
}
}
#[tokio::test]
#[ignore = "requires network access"]
async fn test_tickers_news() {
let tickers = Tickers::new(["AAPL", "TSLA"]).await.unwrap();
let result = tickers.news().await.unwrap();
assert!(result.success_count() > 0);
for articles in result.news.values() {
if !articles.is_empty() {
let article = &articles[0];
assert!(!article.title.is_empty());
assert!(!article.link.is_empty());
assert!(!article.source.is_empty());
}
}
}
#[tokio::test]
#[ignore = "requires network access"]
async fn test_tickers_recommendations() {
let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
let result = tickers.recommendations(5).await.unwrap();
assert!(result.success_count() > 0);
for (symbol, rec) in &result.recommendations {
assert_eq!(rec.symbol, *symbol);
assert!(rec.count() > 0);
for similar in &rec.recommendations {
assert!(!similar.symbol.is_empty());
}
}
}
#[tokio::test]
#[ignore = "requires network access"]
async fn test_tickers_options() {
let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
let result = tickers.options(None).await.unwrap();
assert!(result.success_count() > 0);
for opts in result.options.values() {
assert!(!opts.expiration_dates().is_empty());
}
}
#[tokio::test]
#[ignore = "requires network access"]
#[cfg(feature = "indicators")]
async fn test_tickers_indicators() {
let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
let result = tickers
.indicators(Interval::OneDay, TimeRange::ThreeMonths)
.await
.unwrap();
assert!(result.success_count() > 0);
for ind in result.indicators.values() {
assert!(ind.rsi_14.is_some() || ind.sma_20.is_some());
}
}
#[tokio::test]
async fn test_tickers_add_symbols() {
let mut tickers = Tickers::new(["AAPL"]).await.unwrap();
assert_eq!(tickers.len(), 1);
assert_eq!(tickers.symbols(), &["AAPL"]);
tickers.add_symbols(&["MSFT", "GOOGL"]);
assert_eq!(tickers.len(), 3);
assert!(tickers.symbols().contains(&"AAPL"));
assert!(tickers.symbols().contains(&"MSFT"));
assert!(tickers.symbols().contains(&"GOOGL"));
tickers.add_symbols(&["AAPL"]);
assert_eq!(tickers.len(), 3);
}
#[tokio::test]
#[ignore = "requires network access"]
async fn test_tickers_remove_symbols() {
let mut tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
assert_eq!(tickers.len(), 3);
let _ = tickers.quotes().await;
tickers.remove_symbols(&["MSFT"]).await;
assert_eq!(tickers.len(), 2);
assert!(tickers.symbols().contains(&"AAPL"));
assert!(!tickers.symbols().contains(&"MSFT"));
assert!(tickers.symbols().contains(&"GOOGL"));
let quotes = tickers.quotes().await.unwrap();
assert!(!quotes.quotes.contains_key("MSFT"));
assert_eq!(quotes.quotes.len(), 2);
}
}