1use super::macros::{batch_fetch_cached, define_batch_response};
6use crate::adapters::yahoo::client::ClientConfig;
7#[cfg(feature = "backtesting")]
8use crate::backtesting;
9use crate::constants::{Frequency, Interval, Region, StatementType, TimeRange, ValueFormat};
10use crate::error::{FinanceError, Result};
11#[cfg(any(feature = "backtesting", feature = "indicators"))]
12use crate::indicators;
13use crate::models::chart::events::ChartEvents;
14use crate::models::chart::spark::Spark;
15use crate::models::chart::spark::response::SparkResponse;
16use crate::models::chart::{CapitalGain, Chart, Dividend, Split};
17use crate::models::corporate::news::News;
18use crate::models::corporate::recommendation::Recommendation;
19use crate::models::fundamentals::FinancialStatement;
20use crate::models::options::Options;
21use crate::models::quote::{Quote, QuoteSummaryResponse};
22use crate::providers::types::recommendation_from_similar;
23use crate::providers::yahoo::YahooProvider;
24use crate::providers::{
25 Capability, Fetch, Provider, ProviderAdapter, ProviderSet, Routes, build_providers,
26};
27use crate::ticker::ClientHandle;
28use crate::utils::{CacheEntry, EVICTION_THRESHOLD, filter_by_range};
29use futures::stream::{self, StreamExt};
30use std::collections::HashMap;
31use std::sync::Arc;
32use std::time::Duration;
33use tokio::sync::RwLock;
34
35type MapCache<K, V> = Arc<RwLock<HashMap<K, CacheEntry<V>>>>;
37type ChartCacheKey = (Arc<str>, Interval, TimeRange);
38type QuoteCache = MapCache<Arc<str>, Quote>;
39type ChartCache = MapCache<ChartCacheKey, Chart>;
40type EventsCache = MapCache<Arc<str>, ChartEvents>;
41type FinancialsCache = MapCache<(Arc<str>, StatementType, Frequency), FinancialStatement>;
42type NewsCache = MapCache<Arc<str>, Vec<News>>;
43type RecommendationsCache = MapCache<(Arc<str>, u32), Recommendation>;
44type OptionsCache = MapCache<(Arc<str>, Option<i64>), Options>;
45type SparkCacheKey = (Arc<str>, Interval, TimeRange);
46type SparkCache = MapCache<SparkCacheKey, Spark>;
47#[cfg(feature = "indicators")]
48type IndicatorsCache = MapCache<(Arc<str>, Interval, TimeRange), indicators::IndicatorsSummary>;
49
50type FetchGuard = Arc<tokio::sync::Mutex<()>>;
52type FetchGuardMap<K> = Arc<RwLock<HashMap<K, FetchGuard>>>;
53
54define_batch_response! {
56 BatchQuotesResponse => quotes: Quote
58}
59
60define_batch_response! {
61 BatchChartsResponse => charts: Chart
63}
64
65define_batch_response! {
66 BatchSparksResponse => sparks: Spark
71}
72
73define_batch_response! {
74 BatchDividendsResponse => dividends: Vec<Dividend>
76}
77
78define_batch_response! {
79 BatchSplitsResponse => splits: Vec<Split>
81}
82
83define_batch_response! {
84 BatchCapitalGainsResponse => capital_gains: Vec<CapitalGain>
86}
87
88define_batch_response! {
89 BatchFinancialsResponse => financials: FinancialStatement
91}
92
93define_batch_response! {
94 BatchNewsResponse => news: Vec<News>
96}
97
98define_batch_response! {
99 BatchRecommendationsResponse => recommendations: Recommendation
101}
102
103define_batch_response! {
104 BatchOptionsResponse => options: Options
106}
107
108#[cfg(feature = "indicators")]
109define_batch_response! {
110 BatchIndicatorsResponse => indicators: indicators::IndicatorsSummary
112}
113
114const DEFAULT_MAX_CONCURRENCY: usize = 10;
116
117pub struct TickersBuilder {
119 symbols: Vec<Arc<str>>,
120 config: ClientConfig,
121 shared_client: Option<ClientHandle>,
122 injected_providers: Option<Arc<ProviderSet>>,
123 max_concurrency: usize,
124 cache_ttl: Option<Duration>,
125 include_logo: bool,
126 value_format: ValueFormat,
127}
128
129impl TickersBuilder {
130 fn new<S, I>(symbols: I) -> Self
131 where
132 S: Into<String>,
133 I: IntoIterator<Item = S>,
134 {
135 Self {
136 symbols: symbols.into_iter().map(|s| s.into().into()).collect(),
137 config: ClientConfig::default(),
138 shared_client: None,
139 injected_providers: None,
140 max_concurrency: DEFAULT_MAX_CONCURRENCY,
141 cache_ttl: None,
142 include_logo: false,
143 value_format: ValueFormat::default(),
144 }
145 }
146
147 pub fn region(mut self, region: Region) -> Self {
149 self.config.lang = region.lang().to_string();
150 self.config.region = region.region().to_string();
151 self
152 }
153
154 pub fn lang(mut self, lang: impl Into<String>) -> Self {
156 self.config.lang = lang.into();
157 self
158 }
159
160 pub fn region_code(mut self, region: impl Into<String>) -> Self {
162 self.config.region = region.into();
163 self
164 }
165
166 pub fn timeout(mut self, timeout: Duration) -> Self {
168 self.config.timeout = timeout;
169 self
170 }
171
172 pub fn proxy(mut self, proxy: impl Into<String>) -> Self {
174 self.config.proxy = Some(proxy.into());
175 self
176 }
177
178 pub fn config(mut self, config: ClientConfig) -> Self {
180 self.config = config;
181 self
182 }
183
184 pub fn max_concurrency(mut self, n: usize) -> Self {
193 self.max_concurrency = n.max(1);
194 self
195 }
196
197 pub fn cache(mut self, ttl: Duration) -> Self {
218 self.cache_ttl = Some(ttl);
219 self
220 }
221
222 pub fn logo(mut self) -> Self {
227 self.include_logo = true;
228 self
229 }
230
231 pub fn format(mut self, format: ValueFormat) -> Self {
240 self.value_format = format;
241 self
242 }
243 pub(crate) fn with_provider_set(mut self, set: Arc<ProviderSet>) -> Self {
245 self.injected_providers = Some(set);
246 self
247 }
248
249 pub fn client(mut self, handle: ClientHandle) -> Self {
255 self.shared_client = Some(handle);
256 self
257 }
258
259 pub async fn build(self) -> Result<Tickers> {
261 let providers = if let Some(set) = self.injected_providers {
262 set
263 } else if let Some(handle) = self.shared_client {
264 let yahoo = YahooProvider::from_client(handle.0);
265 let client = yahoo.client_arc();
266 Arc::new(ProviderSet::new(
267 vec![Arc::new(yahoo) as Arc<dyn ProviderAdapter>],
268 Some(client),
269 Routes::new(Fetch::Sequential),
270 ))
271 } else {
272 Arc::new(
273 build_providers(
274 &[Provider::Yahoo],
275 &self.config,
276 Routes::new(Fetch::Sequential),
277 )
278 .await?,
279 )
280 };
281
282 Ok(Tickers {
283 symbols: self.symbols,
284 providers,
285 max_concurrency: self.max_concurrency,
286 cache_ttl: self.cache_ttl,
287 include_logo: self.include_logo,
288 value_format: self.value_format,
289 quote_cache: Default::default(),
290 chart_cache: Default::default(),
291 events_cache: Default::default(),
292 financials_cache: Default::default(),
293 news_cache: Default::default(),
294 recommendations_cache: Default::default(),
295 options_cache: Default::default(),
296 spark_cache: Default::default(),
297 #[cfg(feature = "indicators")]
298 indicators_cache: Default::default(),
299
300 quotes_fetch: Arc::new(tokio::sync::Mutex::new(())),
302 charts_fetch: Default::default(),
303 financials_fetch: Default::default(),
304 news_fetch: Arc::new(tokio::sync::Mutex::new(())),
305 recommendations_fetch: Default::default(),
306 options_fetch: Default::default(),
307 spark_fetch: Default::default(),
308 #[cfg(feature = "indicators")]
309 indicators_fetch: Default::default(),
310 })
311 }
312}
313
314pub struct Tickers {
345 symbols: Vec<Arc<str>>,
346 providers: Arc<ProviderSet>,
347 max_concurrency: usize,
348 cache_ttl: Option<Duration>,
349 include_logo: bool,
350 value_format: ValueFormat,
351 quote_cache: QuoteCache,
352 chart_cache: ChartCache,
353 events_cache: EventsCache,
354 financials_cache: FinancialsCache,
355 news_cache: NewsCache,
356 recommendations_cache: RecommendationsCache,
357 options_cache: OptionsCache,
358 spark_cache: SparkCache,
359 #[cfg(feature = "indicators")]
360 indicators_cache: IndicatorsCache,
361
362 quotes_fetch: FetchGuard,
364 charts_fetch: FetchGuardMap<(Interval, TimeRange)>,
365 financials_fetch: FetchGuardMap<(StatementType, Frequency)>,
366 news_fetch: FetchGuard,
367 recommendations_fetch: FetchGuardMap<u32>,
368 options_fetch: FetchGuardMap<Option<i64>>,
369 spark_fetch: FetchGuardMap<(Interval, TimeRange)>,
370 #[cfg(feature = "indicators")]
371 indicators_fetch: FetchGuardMap<(Interval, TimeRange)>,
372}
373
374impl std::fmt::Debug for Tickers {
375 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
376 f.debug_struct("Tickers")
377 .field("symbols", &self.symbols)
378 .field("max_concurrency", &self.max_concurrency)
379 .field("cache_ttl", &self.cache_ttl)
380 .finish_non_exhaustive()
381 }
382}
383
384impl Tickers {
385 pub async fn new<S, I>(symbols: I) -> Result<Self>
402 where
403 S: Into<String>,
404 I: IntoIterator<Item = S>,
405 {
406 Self::builder(symbols).build().await
407 }
408
409 pub fn builder<S, I>(symbols: I) -> TickersBuilder
411 where
412 S: Into<String>,
413 I: IntoIterator<Item = S>,
414 {
415 TickersBuilder::new(symbols)
416 }
417
418 pub fn symbols(&self) -> Vec<&str> {
420 self.symbols.iter().map(|s| &**s).collect()
421 }
422
423 pub fn len(&self) -> usize {
425 self.symbols.len()
426 }
427
428 pub fn is_empty(&self) -> bool {
430 self.symbols.is_empty()
431 }
432
433 pub fn client_handle(&self) -> ClientHandle {
439 ClientHandle(
440 self.providers
441 .first_yahoo()
442 .expect("Tickers always uses a Yahoo session"),
443 )
444 }
445
446 #[inline]
448 fn is_cache_fresh<T>(&self, entry: Option<&CacheEntry<T>>) -> bool {
449 CacheEntry::is_fresh_with_ttl(entry, self.cache_ttl)
450 }
451
452 fn all_cached<K: Eq + std::hash::Hash, V>(
454 &self,
455 map: &HashMap<K, CacheEntry<V>>,
456 keys: impl Iterator<Item = K>,
457 ) -> bool {
458 let Some(ttl) = self.cache_ttl else {
459 return false;
460 };
461 keys.into_iter()
462 .all(|k| map.get(&k).map(|e| e.is_fresh(ttl)).unwrap_or(false))
463 }
464
465 #[inline]
470 fn cache_insert<K: Eq + std::hash::Hash, V>(
471 &self,
472 map: &mut HashMap<K, CacheEntry<V>>,
473 key: K,
474 value: V,
475 ) {
476 if let Some(ttl) = self.cache_ttl {
477 if map.len() >= EVICTION_THRESHOLD {
478 map.retain(|_, entry| entry.is_fresh(ttl));
479 }
480 map.insert(key, CacheEntry::new(value));
481 }
482 }
483
484 pub async fn quotes(&self) -> Result<BatchQuotesResponse> {
492 {
494 let cache = self.quote_cache.read().await;
495 if self.all_cached(&cache, self.symbols.iter().cloned()) {
496 let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
497 for symbol in &self.symbols {
498 if let Some(entry) = cache.get(symbol) {
499 response
500 .quotes
501 .insert(symbol.to_string(), entry.value.clone());
502 }
503 }
504 return Ok(response);
505 }
506 }
507
508 let _fetch_guard = self.quotes_fetch.lock().await;
509
510 {
512 let cache = self.quote_cache.read().await;
513 if self.all_cached(&cache, self.symbols.iter().cloned()) {
514 let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
515 for symbol in &self.symbols {
516 if let Some(entry) = cache.get(symbol) {
517 response
518 .quotes
519 .insert(symbol.to_string(), entry.value.clone());
520 }
521 }
522 return Ok(response);
523 }
524 }
525
526 let symbol_strings: Vec<String> = self.symbols.iter().map(|s| s.to_string()).collect();
527 let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
528
529 let (quote_data, logos) = if self.include_logo {
530 let providers_logo = Arc::clone(&self.providers);
532 let syms_logo = symbol_strings.clone();
533 let logo_future = async move {
534 if let Ok(client) = providers_logo.first_yahoo() {
535 let syms_ref: Vec<&str> = syms_logo.iter().map(String::as_str).collect();
536 crate::adapters::yahoo::quote::quotes::fetch_with_fields(
537 &client,
538 &syms_ref,
539 Some(&["logoUrl", "companyLogoUrl"]),
540 true,
541 true,
542 )
543 .await
544 .ok()
545 } else {
546 None
547 }
548 };
549
550 let providers_quote = Arc::clone(&self.providers);
551 let syms_quote = symbol_strings.clone();
552 let quote_future = async move {
553 providers_quote
554 .fetch(Capability::QUOTE, |p| {
555 let syms = syms_quote.clone();
556 let p = p.clone();
557 async move {
558 let syms_ref: Vec<&str> = syms.iter().map(String::as_str).collect();
559 p.fetch_quotes_batch(&syms_ref).await
560 }
561 })
562 .await
563 };
564
565 let (batch_result, logo_result) = tokio::join!(quote_future, logo_future);
566 let quote_data = match batch_result {
567 Ok(data) => data,
568 Err(_) => {
569 self.fetch_quotes_per_symbol(&symbol_strings, &mut response)
570 .await
571 }
572 };
573 (quote_data, logo_result)
574 } else {
575 let providers = Arc::clone(&self.providers);
576 let syms = symbol_strings.clone();
577 let batch_result = providers
578 .fetch(Capability::QUOTE, |p| {
579 let syms = syms.clone();
580 let p = p.clone();
581 async move {
582 let syms_ref: Vec<&str> = syms.iter().map(String::as_str).collect();
583 p.fetch_quotes_batch(&syms_ref).await
584 }
585 })
586 .await;
587 let data = match batch_result {
588 Ok(data) => data,
589 Err(_) => {
590 self.fetch_quotes_per_symbol(&symbol_strings, &mut response)
591 .await
592 }
593 };
594 (data, None)
595 };
596
597 let logo_map: HashMap<String, (Option<String>, Option<String>)> = logos
598 .and_then(|l| l.get("quoteResponse")?.get("result")?.as_array().cloned())
599 .map(|results| {
600 results
601 .iter()
602 .filter_map(|r| {
603 let symbol = r.get("symbol")?.as_str()?.to_string();
604 let logo_url = r.get("logoUrl").and_then(|v| v.as_str()).map(String::from);
605 let company_logo_url = r
606 .get("companyLogoUrl")
607 .and_then(|v| v.as_str())
608 .map(String::from);
609 Some((symbol, (logo_url, company_logo_url)))
610 })
611 .collect()
612 })
613 .unwrap_or_default();
614
615 let mut parsed_quotes: Vec<(String, Quote)> = Vec::new();
616
617 for (symbol, summary) in quote_data {
618 let logo_url = logo_map.get(&symbol).and_then(|(l, _)| l.clone());
619 let company_logo_url = logo_map.get(&symbol).and_then(|(_, c)| c.clone());
620 let quote = Quote::from_response(&summary, logo_url, company_logo_url);
621 let quote = match self.value_format {
622 ValueFormat::Raw => {
623 let json =
624 serde_json::to_value("e).map_err(FinanceError::JsonParseError)?;
625 let transformed = self.value_format.transform(json);
626 serde_json::from_value(transformed).map_err(FinanceError::JsonParseError)?
627 }
628 ValueFormat::Pretty | ValueFormat::Both => quote,
629 };
630 parsed_quotes.push((symbol, quote));
631 }
632
633 if self.cache_ttl.is_some() {
634 let mut cache = self.quote_cache.write().await;
635 for (symbol, quote) in &parsed_quotes {
636 self.cache_insert(&mut cache, symbol.as_str().into(), quote.clone());
637 }
638 }
639
640 for (symbol, quote) in parsed_quotes {
641 response.quotes.insert(symbol, quote);
642 }
643
644 for symbol in &self.symbols {
646 let s = &**symbol;
647 if !response.quotes.contains_key(s) && !response.errors.contains_key(s) {
648 response.errors.insert(
649 symbol.to_string(),
650 "Symbol not found in response".to_string(),
651 );
652 }
653 }
654
655 Ok(response)
656 }
657
658 async fn fetch_quotes_per_symbol(
661 &self,
662 symbols: &[String],
663 response: &mut BatchQuotesResponse,
664 ) -> Vec<(String, QuoteSummaryResponse)> {
665 let futures: Vec<_> = symbols
666 .iter()
667 .map(|sym| {
668 let providers = Arc::clone(&self.providers);
669 let sym = sym.clone();
670 async move {
671 let result = providers
672 .fetch(Capability::QUOTE, |p| {
673 let sym = sym.clone();
674 let p = p.clone();
675 async move { p.fetch_quote(&sym).await }
676 })
677 .await;
678 (sym, result)
679 }
680 })
681 .collect();
682
683 let results: Vec<_> = stream::iter(futures)
684 .buffer_unordered(self.max_concurrency)
685 .collect()
686 .await;
687
688 let mut successes = Vec::new();
689 for (sym, result) in results {
690 match result {
691 Ok(resp) => successes.push((sym, resp)),
692 Err(e) => {
693 response.errors.insert(sym, e.to_string());
694 }
695 }
696 }
697 successes
698 }
699
700 pub async fn quote(&self, symbol: &str) -> Result<Quote> {
702 {
703 let cache = self.quote_cache.read().await;
704 if let Some(entry) = cache.get(symbol)
705 && self.is_cache_fresh(Some(entry))
706 {
707 return Ok(entry.value.clone());
708 }
709 }
710
711 let response = self.quotes().await?;
712
713 response
714 .quotes
715 .get(symbol)
716 .cloned()
717 .ok_or_else(|| FinanceError::SymbolNotFound {
718 symbol: Some(symbol.to_string()),
719 context: response
720 .errors
721 .get(symbol)
722 .cloned()
723 .unwrap_or_else(|| "Symbol not found".to_string()),
724 })
725 }
726
727 pub async fn quote_value(&self, symbol: &str) -> Result<serde_json::Value> {
731 let quote = self.quote(symbol).await?;
732 let json = serde_json::to_value("e).map_err(FinanceError::JsonParseError)?;
733 Ok(self.value_format.transform(json))
734 }
735
736 async fn get_fetch_guard<K: Clone + Eq + std::hash::Hash>(
741 guard_map: &FetchGuardMap<K>,
742 key: K,
743 ) -> FetchGuard {
744 {
745 let guards = guard_map.read().await;
746 if let Some(guard) = guards.get(&key) {
747 return Arc::clone(guard);
748 }
749 }
750
751 let mut guards = guard_map.write().await;
752 Arc::clone(
753 guards
754 .entry(key)
755 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))),
756 )
757 }
758
759 pub async fn charts(
764 &self,
765 interval: Interval,
766 range: TimeRange,
767 ) -> Result<BatchChartsResponse> {
768 {
770 let cache = self.chart_cache.read().await;
771 if self.all_cached(
772 &cache,
773 self.symbols.iter().map(|s| (s.clone(), interval, range)),
774 ) {
775 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
776 for symbol in &self.symbols {
777 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
778 response
779 .charts
780 .insert(symbol.to_string(), entry.value.clone());
781 }
782 }
783 return Ok(response);
784 }
785 }
786
787 let fetch_guard = Self::get_fetch_guard(&self.charts_fetch, (interval, range)).await;
789 let _guard = fetch_guard.lock().await;
790
791 {
793 let cache = self.chart_cache.read().await;
794 if self.all_cached(
795 &cache,
796 self.symbols.iter().map(|s| (s.clone(), interval, range)),
797 ) {
798 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
799 for symbol in &self.symbols {
800 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
801 response
802 .charts
803 .insert(symbol.to_string(), entry.value.clone());
804 }
805 }
806 return Ok(response);
807 }
808 }
809
810 let futures: Vec<_> = self
812 .symbols
813 .iter()
814 .map(|symbol| {
815 let providers = Arc::clone(&self.providers);
816 let symbol = Arc::clone(symbol);
817 async move {
818 let sym = symbol.to_string();
819 let result = providers
820 .fetch(Capability::CHART, |p| {
821 let sym = sym.clone();
822 let p = p.clone();
823 async move { p.fetch_chart(&sym, interval, range).await }
824 })
825 .await;
826 (symbol, result)
827 }
828 })
829 .collect();
830
831 let results: Vec<_> = stream::iter(futures)
832 .buffer_unordered(self.max_concurrency)
833 .collect()
834 .await;
835
836 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
837 let mut parsed_charts: Vec<(Arc<str>, Chart)> = Vec::new();
838
839 for (symbol, result) in results {
840 match result {
841 Ok(data) => {
842 let chart = data;
843 parsed_charts.push((symbol, chart));
844 }
845 Err(e) => {
846 response.errors.insert(symbol.to_string(), e.to_string());
847 }
848 }
849 }
850
851 if self.cache_ttl.is_some() {
853 let mut cache = self.chart_cache.write().await;
854 let cache_keys: Vec<_> = parsed_charts
855 .into_iter()
856 .map(|(symbol, chart)| {
857 self.cache_insert(&mut cache, (symbol.clone(), interval, range), chart);
858 symbol
859 })
860 .collect();
861 for symbol in cache_keys {
862 if let Some(cached) = cache.get(&(symbol.clone(), interval, range)) {
863 response
864 .charts
865 .insert(symbol.to_string(), cached.value.clone());
866 }
867 }
868 } else {
869 for (symbol, chart) in parsed_charts {
870 response.charts.insert(symbol.to_string(), chart);
871 }
872 }
873
874 Ok(response)
875 }
876
877 pub async fn chart(&self, symbol: &str, interval: Interval, range: TimeRange) -> Result<Chart> {
879 {
880 let cache = self.chart_cache.read().await;
881 let key: Arc<str> = symbol.into();
882 if let Some(entry) = cache.get(&(key, interval, range))
883 && self.is_cache_fresh(Some(entry))
884 {
885 return Ok(entry.value.clone());
886 }
887 }
888
889 let response = self.charts(interval, range).await?;
890
891 response
892 .charts
893 .get(symbol)
894 .cloned()
895 .ok_or_else(|| FinanceError::SymbolNotFound {
896 symbol: Some(symbol.to_string()),
897 context: response
898 .errors
899 .get(symbol)
900 .cloned()
901 .unwrap_or_else(|| "Symbol not found".to_string()),
902 })
903 }
904
905 pub async fn charts_range(
917 &self,
918 interval: Interval,
919 start: i64,
920 end: i64,
921 ) -> Result<BatchChartsResponse> {
922 let futures: Vec<_> = self
923 .symbols
924 .iter()
925 .map(|symbol| {
926 let providers = Arc::clone(&self.providers);
927 let symbol = Arc::clone(symbol);
928 async move {
929 let sym = symbol.to_string();
930 let result = providers
931 .fetch(Capability::CHART, |p| {
932 let sym = sym.clone();
933 let p = p.clone();
934 async move { p.fetch_chart_range(&sym, interval, start, end).await }
935 })
936 .await;
937 (symbol, result)
938 }
939 })
940 .collect();
941
942 let results: Vec<_> = stream::iter(futures)
943 .buffer_unordered(self.max_concurrency)
944 .collect()
945 .await;
946
947 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
948
949 for (symbol, result) in results {
950 match result {
951 Ok(data) => {
952 let chart = data;
953 response.charts.insert(symbol.to_string(), chart);
954 }
955 Err(e) => {
956 response.errors.insert(symbol.to_string(), e.to_string());
957 }
958 }
959 }
960
961 Ok(response)
962 }
963
964 async fn ensure_events_loaded(&self) -> Result<()> {
974 let symbols_to_fetch: Vec<Arc<str>> = {
976 let cache = self.events_cache.read().await;
977 self.symbols
978 .iter()
979 .filter(|sym| !cache.contains_key(*sym))
980 .cloned()
981 .collect()
982 };
983
984 if symbols_to_fetch.is_empty() {
985 return Ok(());
986 }
987
988 let futures: Vec<_> = symbols_to_fetch
990 .iter()
991 .map(|symbol| {
992 let providers = Arc::clone(&self.providers);
993 let symbol = Arc::clone(symbol);
994 async move {
995 let sym = symbol.to_string();
996 let result = providers
997 .fetch(Capability::CORPORATE, |p| {
998 let sym = sym.clone();
999 let p = p.clone();
1000 async move { p.fetch_events(&sym).await }
1001 })
1002 .await;
1003 (symbol, result)
1004 }
1005 })
1006 .collect();
1007
1008 let results: Vec<_> = stream::iter(futures)
1009 .buffer_unordered(self.max_concurrency)
1010 .collect()
1011 .await;
1012
1013 let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
1014
1015 for (symbol, result) in results {
1016 if let Ok(events_data) = result {
1017 parsed_events.push((symbol, events_data));
1018 }
1019 }
1020
1021 if !parsed_events.is_empty() {
1023 let mut events_cache = self.events_cache.write().await;
1024 for (symbol, events) in parsed_events {
1025 events_cache.insert(symbol, CacheEntry::new(events));
1026 }
1027 }
1028
1029 Ok(())
1030 }
1031
1032 pub async fn spark(&self, interval: Interval, range: TimeRange) -> Result<BatchSparksResponse> {
1061 {
1063 let cache = self.spark_cache.read().await;
1064 if self.all_cached(
1065 &cache,
1066 self.symbols.iter().map(|s| (s.clone(), interval, range)),
1067 ) {
1068 let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1069 for symbol in &self.symbols {
1070 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
1071 response
1072 .sparks
1073 .insert(symbol.to_string(), entry.value.clone());
1074 }
1075 }
1076 return Ok(response);
1077 }
1078 }
1079
1080 let fetch_guard = Self::get_fetch_guard(&self.spark_fetch, (interval, range)).await;
1082 let _guard = fetch_guard.lock().await;
1083
1084 {
1086 let cache = self.spark_cache.read().await;
1087 if self.all_cached(
1088 &cache,
1089 self.symbols.iter().map(|s| (s.clone(), interval, range)),
1090 ) {
1091 let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1092 for symbol in &self.symbols {
1093 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
1094 response
1095 .sparks
1096 .insert(symbol.to_string(), entry.value.clone());
1097 }
1098 }
1099 return Ok(response);
1100 }
1101 }
1102
1103 let client = self.providers.first_yahoo()?;
1105 let symbols_ref: Vec<&str> = self.symbols.iter().map(|s| &**s).collect();
1106 let json =
1107 crate::adapters::yahoo::quote::spark::fetch(&client, &symbols_ref, interval, range)
1108 .await?;
1109
1110 let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1111
1112 match SparkResponse::from_json(json) {
1113 Ok(spark_response) => {
1114 let mut parsed_sparks: Vec<(Arc<str>, Spark)> = Vec::new();
1115
1116 if let Some(results) = spark_response.spark.result {
1117 for result in &results {
1118 if let Some(spark) = Spark::from_response(
1119 result,
1120 Some(interval.as_str().to_string()),
1121 Some(range.as_str().to_string()),
1122 ) {
1123 let sym: Arc<str> = result.symbol.as_str().into();
1124 parsed_sparks.push((sym, spark));
1125 } else {
1126 response.errors.insert(
1127 result.symbol.to_string(),
1128 "Failed to parse spark data".to_string(),
1129 );
1130 }
1131 }
1132 }
1133
1134 if self.cache_ttl.is_some() {
1136 let mut cache = self.spark_cache.write().await;
1137 for (symbol, spark) in &parsed_sparks {
1138 self.cache_insert(
1139 &mut cache,
1140 (symbol.clone(), interval, range),
1141 spark.clone(),
1142 );
1143 }
1144 }
1145
1146 for (symbol, spark) in parsed_sparks {
1148 response.sparks.insert(symbol.to_string(), spark);
1149 }
1150
1151 for symbol in &self.symbols {
1153 let symbol_str = &**symbol;
1154 if !response.sparks.contains_key(symbol_str)
1155 && !response.errors.contains_key(symbol_str)
1156 {
1157 response.errors.insert(
1158 symbol.to_string(),
1159 "Symbol not found in response".to_string(),
1160 );
1161 }
1162 }
1163 }
1164 Err(e) => {
1165 for symbol in &self.symbols {
1166 response.errors.insert(symbol.to_string(), e.to_string());
1167 }
1168 }
1169 }
1170
1171 Ok(response)
1172 }
1173
1174 pub async fn dividends(&self, range: TimeRange) -> Result<BatchDividendsResponse> {
1199 let mut response = BatchDividendsResponse::with_capacity(self.symbols.len());
1200
1201 self.ensure_events_loaded().await?;
1203
1204 let events_cache = self.events_cache.read().await;
1205
1206 for symbol in &self.symbols {
1207 if let Some(entry) = events_cache.get(symbol) {
1208 let all_dividends = entry.value.to_dividends();
1209 let filtered = filter_by_range(all_dividends, range);
1210 response.dividends.insert(symbol.to_string(), filtered);
1211 } else {
1212 response
1213 .errors
1214 .insert(symbol.to_string(), "No events data available".to_string());
1215 }
1216 }
1217
1218 Ok(response)
1219 }
1220
1221 pub async fn splits(&self, range: TimeRange) -> Result<BatchSplitsResponse> {
1248 let mut response = BatchSplitsResponse::with_capacity(self.symbols.len());
1249
1250 self.ensure_events_loaded().await?;
1252
1253 let events_cache = self.events_cache.read().await;
1254
1255 for symbol in &self.symbols {
1256 if let Some(entry) = events_cache.get(symbol) {
1257 let all_splits = entry.value.to_splits();
1258 let filtered = filter_by_range(all_splits, range);
1259 response.splits.insert(symbol.to_string(), filtered);
1260 } else {
1261 response
1262 .errors
1263 .insert(symbol.to_string(), "No events data available".to_string());
1264 }
1265 }
1266
1267 Ok(response)
1268 }
1269
1270 pub async fn capital_gains(&self, range: TimeRange) -> Result<BatchCapitalGainsResponse> {
1296 let mut response = BatchCapitalGainsResponse::with_capacity(self.symbols.len());
1297
1298 self.ensure_events_loaded().await?;
1300
1301 let events_cache = self.events_cache.read().await;
1302
1303 for symbol in &self.symbols {
1304 if let Some(entry) = events_cache.get(symbol) {
1305 let all_gains = entry.value.to_capital_gains();
1306 let filtered = filter_by_range(all_gains, range);
1307 response.capital_gains.insert(symbol.to_string(), filtered);
1308 } else {
1309 response
1310 .errors
1311 .insert(symbol.to_string(), "No events data available".to_string());
1312 }
1313 }
1314
1315 Ok(response)
1316 }
1317
1318 pub async fn financials(
1346 &self,
1347 statement_type: StatementType,
1348 frequency: Frequency,
1349 ) -> Result<BatchFinancialsResponse> {
1350 batch_fetch_cached!(self;
1351 cache: financials_cache,
1352 guard: map(financials_fetch, (statement_type, frequency)),
1353 key: |s| (s.clone(), statement_type, frequency),
1354 response: BatchFinancialsResponse.financials,
1355 fetch: |providers, symbol| {
1356 let sym = symbol.to_string();
1357 providers.fetch(Capability::FUNDAMENTALS, move |p| {
1358 let sym = sym.clone();
1359 let p = p.clone();
1360 async move {
1361 p.fetch_financials(&sym, statement_type, frequency)
1362 .await
1363 }
1364 }).await
1365 },
1366 )
1367 }
1368
1369 pub async fn news(&self) -> Result<BatchNewsResponse> {
1393 batch_fetch_cached!(self;
1394 cache: news_cache,
1395 guard: simple(news_fetch),
1396 key: |s| s.clone(),
1397 response: BatchNewsResponse.news,
1398 fetch: |providers, symbol| {
1399 let sym = symbol.to_string();
1400 providers.fetch(Capability::CORPORATE, move |p| {
1401 let sym = sym.clone();
1402 let p = p.clone();
1403 async move {
1404 p.fetch_news(&sym)
1405 .await
1406 .map(|data| data.into_iter().collect::<Vec<News>>())
1407 }
1408 }).await
1409 },
1410 )
1411 }
1412
1413 pub async fn recommendations(&self, limit: u32) -> Result<BatchRecommendationsResponse> {
1442 batch_fetch_cached!(self;
1443 cache: recommendations_cache,
1444 guard: map(recommendations_fetch, limit),
1445 key: |s| (s.clone(), limit),
1446 response: BatchRecommendationsResponse.recommendations,
1447 fetch: |providers, symbol| {
1448 let sym = symbol.to_string();
1449 providers.fetch(Capability::CORPORATE, move |p| {
1450 let sym = sym.clone();
1451 let p = p.clone();
1452 async move {
1453 let items = p.fetch_similar_symbols(&sym, limit).await?;
1454 Ok(recommendation_from_similar(
1455 sym,
1456 Some(Provider::from_id_str(p.id()).ok_or_else(|| {
1457 FinanceError::InternalError(format!("unknown provider id: {}", p.id()))
1458 })?),
1459 items,
1460 Some(limit),
1461 ))
1462 }
1463 }).await
1464 },
1465 )
1466 }
1467
1468 pub async fn options(&self, date: Option<i64>) -> Result<BatchOptionsResponse> {
1493 batch_fetch_cached!(self;
1494 cache: options_cache,
1495 guard: map(options_fetch, date),
1496 key: |s| (s.clone(), date),
1497 response: BatchOptionsResponse.options,
1498 fetch: |providers, symbol| {
1499 let sym = symbol.to_string();
1500 providers.fetch(Capability::OPTIONS, move |p| {
1501 let sym = sym.clone();
1502 let p = p.clone();
1503 async move {
1504 p.fetch_options(&sym, date).await
1505 }
1506 }).await
1507 },
1508 )
1509 }
1510
1511 #[cfg(feature = "indicators")]
1537 pub async fn indicators(
1538 &self,
1539 interval: Interval,
1540 range: TimeRange,
1541 ) -> Result<BatchIndicatorsResponse> {
1542 let cache_key_for = |symbol: &Arc<str>| (symbol.clone(), interval, range);
1543
1544 {
1546 let cache = self.indicators_cache.read().await;
1547 if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
1548 let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1549 for symbol in &self.symbols {
1550 if let Some(entry) = cache.get(&cache_key_for(symbol)) {
1551 response
1552 .indicators
1553 .insert(symbol.to_string(), entry.value.clone());
1554 }
1555 }
1556 return Ok(response);
1557 }
1558 }
1559
1560 let fetch_guard = Self::get_fetch_guard(&self.indicators_fetch, (interval, range)).await;
1562 let _guard = fetch_guard.lock().await;
1563
1564 {
1566 let cache = self.indicators_cache.read().await;
1567 if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
1568 let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1569 for symbol in &self.symbols {
1570 if let Some(entry) = cache.get(&cache_key_for(symbol)) {
1571 response
1572 .indicators
1573 .insert(symbol.to_string(), entry.value.clone());
1574 }
1575 }
1576 return Ok(response);
1577 }
1578 }
1579
1580 let charts_response = self.charts(interval, range).await?;
1582
1583 let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1584
1585 let mut calculated_indicators: Vec<(String, indicators::IndicatorsSummary)> = Vec::new();
1587
1588 for (symbol, chart) in &charts_response.charts {
1589 let indicators = indicators::summary::calculate_indicators(&chart.candles);
1590 calculated_indicators.push((symbol.to_string(), indicators));
1591 }
1592
1593 if self.cache_ttl.is_some() {
1595 let mut cache = self.indicators_cache.write().await;
1596 for (symbol, indicators) in &calculated_indicators {
1597 let key: Arc<str> = symbol.as_str().into();
1598 self.cache_insert(&mut cache, cache_key_for(&key), indicators.clone());
1599 }
1600 }
1601
1602 for (symbol, indicators) in calculated_indicators {
1604 response.indicators.insert(symbol, indicators);
1605 }
1606
1607 for (symbol, error) in &charts_response.errors {
1609 response.errors.insert(symbol.to_string(), error.clone());
1610 }
1611
1612 Ok(response)
1613 }
1614
1615 pub fn add_symbols(&mut self, symbols: &[impl AsRef<str>]) {
1636 use std::collections::HashSet;
1638
1639 let existing: HashSet<&str> = self.symbols.iter().map(|s| &**s).collect();
1640 let to_add: Vec<Arc<str>> = symbols
1641 .iter()
1642 .map(|s| s.as_ref())
1643 .filter(|s| !existing.contains(s))
1644 .map(|s| s.into())
1645 .collect();
1646
1647 self.symbols.extend(to_add);
1648 }
1649
1650 #[cfg(feature = "backtesting")]
1689 pub async fn backtest<S, F>(
1690 &self,
1691 interval: Interval,
1692 range: TimeRange,
1693 config: Option<backtesting::portfolio::PortfolioConfig>,
1694 factory: F,
1695 ) -> backtesting::Result<backtesting::portfolio::PortfolioResult>
1696 where
1697 S: backtesting::Strategy,
1698 F: Fn(&str) -> S,
1699 {
1700 use crate::backtesting::portfolio::{PortfolioEngine, SymbolData};
1701
1702 let config = config.unwrap_or_default();
1703 config.validate(self.symbols.len())?;
1704
1705 let charts = self
1707 .charts(interval, range)
1708 .await
1709 .map_err(|e| backtesting::BacktestError::ChartError(e.to_string()))?;
1710
1711 let dividends_map = self
1714 .dividends(range)
1715 .await
1716 .map(|b| b.dividends)
1717 .unwrap_or_default();
1718
1719 let symbol_data: Vec<SymbolData> = self
1721 .symbols
1722 .iter()
1723 .filter_map(|sym| {
1724 charts.charts.get(sym.as_ref()).map(|chart| {
1725 let divs = dividends_map.get(sym.as_ref()).cloned().unwrap_or_default();
1726 SymbolData::new(sym.as_ref(), chart.candles.clone()).with_dividends(divs)
1727 })
1728 })
1729 .collect();
1730
1731 let engine = PortfolioEngine::new(config);
1732 engine.run(&symbol_data, factory)
1733 }
1734
1735 pub async fn remove_symbols(&mut self, symbols: &[impl AsRef<str>]) {
1752 use std::collections::HashSet;
1753 let to_remove: HashSet<&str> = symbols.iter().map(|s| s.as_ref()).collect();
1754
1755 self.symbols.retain(|s| !to_remove.contains(&**s));
1757
1758 let (
1760 mut quote_cache,
1761 mut chart_cache,
1762 mut events_cache,
1763 mut financials_cache,
1764 mut news_cache,
1765 mut recommendations_cache,
1766 mut options_cache,
1767 mut spark_cache,
1768 ) = tokio::join!(
1769 self.quote_cache.write(),
1770 self.chart_cache.write(),
1771 self.events_cache.write(),
1772 self.financials_cache.write(),
1773 self.news_cache.write(),
1774 self.recommendations_cache.write(),
1775 self.options_cache.write(),
1776 self.spark_cache.write(),
1777 );
1778
1779 for symbol in &to_remove {
1781 let key: Arc<str> = (*symbol).into();
1782 quote_cache.remove(&key);
1783 events_cache.remove(&key);
1784 news_cache.remove(&key);
1785 }
1786
1787 chart_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1789 financials_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1790 recommendations_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
1791 options_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
1792 spark_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1793
1794 drop((
1796 quote_cache,
1797 chart_cache,
1798 events_cache,
1799 financials_cache,
1800 news_cache,
1801 recommendations_cache,
1802 options_cache,
1803 spark_cache,
1804 ));
1805
1806 #[cfg(feature = "indicators")]
1807 self.indicators_cache
1808 .write()
1809 .await
1810 .retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1811 }
1812
1813 pub async fn clear_cache(&self) {
1818 tokio::join!(
1819 async { self.quote_cache.write().await.clear() },
1821 async { self.chart_cache.write().await.clear() },
1822 async { self.events_cache.write().await.clear() },
1823 async { self.financials_cache.write().await.clear() },
1824 async { self.news_cache.write().await.clear() },
1825 async { self.recommendations_cache.write().await.clear() },
1826 async { self.options_cache.write().await.clear() },
1827 async { self.spark_cache.write().await.clear() },
1828 async {
1829 #[cfg(feature = "indicators")]
1830 self.indicators_cache.write().await.clear();
1831 },
1832 async { self.charts_fetch.write().await.clear() },
1834 async { self.financials_fetch.write().await.clear() },
1835 async { self.recommendations_fetch.write().await.clear() },
1836 async { self.options_fetch.write().await.clear() },
1837 async { self.spark_fetch.write().await.clear() },
1838 async {
1839 #[cfg(feature = "indicators")]
1840 self.indicators_fetch.write().await.clear();
1841 },
1842 );
1843 }
1844
1845 pub async fn clear_quote_cache(&self) {
1849 self.quote_cache.write().await.clear();
1850 }
1851
1852 pub async fn clear_chart_cache(&self) {
1857 tokio::join!(
1858 async { self.chart_cache.write().await.clear() },
1859 async { self.events_cache.write().await.clear() },
1860 async { self.spark_cache.write().await.clear() },
1861 async {
1862 #[cfg(feature = "indicators")]
1863 self.indicators_cache.write().await.clear();
1864 },
1865 );
1866 }
1867}
1868
1869#[cfg(test)]
1870mod tests {
1871 use super::*;
1872
1873 #[tokio::test]
1874 #[ignore = "requires network access"]
1875 async fn test_tickers_quotes() {
1876 let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1877 let result = tickers.quotes().await.unwrap();
1878
1879 assert!(result.success_count() > 0);
1880 }
1881
1882 #[tokio::test]
1883 #[ignore = "requires network access"]
1884 async fn test_tickers_charts() {
1885 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1886 let result = tickers
1887 .charts(Interval::OneDay, TimeRange::FiveDays)
1888 .await
1889 .unwrap();
1890
1891 assert!(result.success_count() > 0);
1892 }
1893
1894 #[tokio::test]
1895 #[ignore = "requires network access"]
1896 async fn test_tickers_spark() {
1897 let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1898 let result = tickers
1899 .spark(Interval::FiveMinutes, TimeRange::OneDay)
1900 .await
1901 .unwrap();
1902
1903 assert!(result.success_count() > 0);
1904
1905 if let Some(spark) = result.sparks.get("AAPL") {
1907 assert!(!spark.closes.is_empty());
1908 assert_eq!(spark.symbol, "AAPL");
1909 assert!(spark.percent_change().is_some());
1911 }
1912 }
1913
1914 #[tokio::test]
1915 #[ignore = "requires network access"]
1916 async fn test_tickers_dividends() {
1917 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1918 let result = tickers.dividends(TimeRange::OneYear).await.unwrap();
1919
1920 assert!(result.success_count() > 0);
1921
1922 if let Some(dividends) = result.dividends.get("AAPL")
1924 && !dividends.is_empty()
1925 {
1926 let div = ÷nds[0];
1927 assert!(div.timestamp > 0);
1928 assert!(div.amount > 0.0);
1929 }
1930 }
1931
1932 #[tokio::test]
1933 #[ignore = "requires network access"]
1934 async fn test_tickers_splits() {
1935 let tickers = Tickers::new(["NVDA", "TSLA"]).await.unwrap();
1936 let result = tickers.splits(TimeRange::FiveYears).await.unwrap();
1937
1938 assert!(result.success_count() > 0);
1940
1941 for splits in result.splits.values() {
1943 for split in splits {
1944 assert!(split.timestamp > 0);
1945 assert!(split.numerator > 0.0);
1946 assert!(split.denominator > 0.0);
1947 assert!(!split.ratio.is_empty());
1948 }
1949 }
1950 }
1951
1952 #[tokio::test]
1953 #[ignore = "requires network access"]
1954 async fn test_tickers_capital_gains() {
1955 let tickers = Tickers::new(["VFIAX", "VTI"]).await.unwrap();
1956 let result = tickers.capital_gains(TimeRange::TwoYears).await.unwrap();
1957
1958 assert!(result.success_count() > 0);
1960
1961 for gains in result.capital_gains.values() {
1963 for gain in gains {
1964 assert!(gain.timestamp > 0);
1965 assert!(gain.amount >= 0.0);
1966 }
1967 }
1968 }
1969
1970 #[tokio::test]
1971 #[ignore = "requires network access"]
1972 async fn test_tickers_financials() {
1973 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1974 let result = tickers
1975 .financials(StatementType::Income, Frequency::Annual)
1976 .await
1977 .unwrap();
1978
1979 assert!(result.success_count() > 0);
1980
1981 for (symbol, stmt) in &result.financials {
1983 assert_eq!(stmt.symbol, *symbol);
1984 assert_eq!(stmt.statement_type, "income");
1985 assert_eq!(stmt.frequency, "annual");
1986 assert!(!stmt.statement.is_empty());
1987
1988 if let Some(revenue) = stmt.statement.get("TotalRevenue") {
1990 assert!(!revenue.is_empty());
1991 }
1992 }
1993 }
1994
1995 #[tokio::test]
1996 #[ignore = "requires network access"]
1997 async fn test_tickers_news() {
1998 let tickers = Tickers::new(["AAPL", "TSLA"]).await.unwrap();
1999 let result = tickers.news().await.unwrap();
2000
2001 assert!(result.success_count() > 0);
2002
2003 for articles in result.news.values() {
2005 if !articles.is_empty() {
2006 let article = &articles[0];
2007 assert!(!article.title.is_empty());
2008 assert!(!article.link.is_empty());
2009 assert!(!article.source.is_empty());
2010 }
2011 }
2012 }
2013
2014 #[tokio::test]
2015 #[ignore = "requires network access"]
2016 async fn test_tickers_recommendations() {
2017 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
2018 let result = tickers.recommendations(5).await.unwrap();
2019
2020 assert!(result.success_count() > 0);
2021
2022 for (symbol, rec) in &result.recommendations {
2024 assert_eq!(rec.symbol, *symbol);
2025 assert!(rec.count() > 0);
2026 for similar in &rec.recommendations {
2027 assert!(!similar.symbol.is_empty());
2028 }
2029 }
2030 }
2031
2032 #[tokio::test]
2033 #[ignore = "requires network access"]
2034 async fn test_tickers_options() {
2035 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
2036 let result = tickers.options(None).await.unwrap();
2037
2038 assert!(result.success_count() > 0);
2039
2040 for opts in result.options.values() {
2042 assert!(!opts.expiration_dates().is_empty());
2043 }
2044 }
2045
2046 #[tokio::test]
2047 #[ignore = "requires network access"]
2048 #[cfg(feature = "indicators")]
2049 async fn test_tickers_indicators() {
2050 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
2051 let result = tickers
2052 .indicators(Interval::OneDay, TimeRange::ThreeMonths)
2053 .await
2054 .unwrap();
2055
2056 assert!(result.success_count() > 0);
2057
2058 for ind in result.indicators.values() {
2060 assert!(ind.rsi_14.is_some() || ind.sma_20.is_some());
2062 }
2063 }
2064
2065 #[tokio::test]
2066 async fn test_tickers_add_symbols() {
2067 let mut tickers = Tickers::new(["AAPL"]).await.unwrap();
2068 assert_eq!(tickers.len(), 1);
2069 assert_eq!(tickers.symbols(), &["AAPL"]);
2070
2071 tickers.add_symbols(&["MSFT", "GOOGL"]);
2072 assert_eq!(tickers.len(), 3);
2073 assert!(tickers.symbols().contains(&"AAPL"));
2074 assert!(tickers.symbols().contains(&"MSFT"));
2075 assert!(tickers.symbols().contains(&"GOOGL"));
2076
2077 tickers.add_symbols(&["AAPL"]);
2079 assert_eq!(tickers.len(), 3);
2080 }
2081
2082 #[tokio::test]
2083 #[ignore = "requires network access"]
2084 async fn test_tickers_remove_symbols() {
2085 let mut tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
2086 assert_eq!(tickers.len(), 3);
2087
2088 let _ = tickers.quotes().await;
2090
2091 tickers.remove_symbols(&["MSFT"]).await;
2093 assert_eq!(tickers.len(), 2);
2094 assert!(tickers.symbols().contains(&"AAPL"));
2095 assert!(!tickers.symbols().contains(&"MSFT"));
2096 assert!(tickers.symbols().contains(&"GOOGL"));
2097
2098 let quotes = tickers.quotes().await.unwrap();
2100 assert!(!quotes.quotes.contains_key("MSFT"));
2101 assert_eq!(quotes.quotes.len(), 2);
2102 }
2103}