1use super::macros::{batch_fetch_cached, define_batch_response};
6use crate::adapters::yahoo::client::ClientConfig;
7#[cfg(feature = "backtesting")]
8use crate::backtesting;
9use crate::constants::{Frequency, Interval, Region, StatementType, TimeRange};
10use crate::error::{FinanceError, Result};
11use crate::format::Both;
12#[cfg(any(feature = "backtesting", feature = "indicators"))]
13use crate::indicators;
14use crate::models::chart::events::ChartEvents;
15use crate::models::chart::spark::Spark;
16use crate::models::chart::spark::response::SparkResponse;
17use crate::models::chart::{CapitalGain, Chart, Dividend, Split};
18use crate::models::corporate::news::News;
19use crate::models::corporate::recommendation::Recommendation;
20use crate::models::format::Format;
21use crate::models::fundamentals::FinancialStatement;
22use crate::models::options::Options;
23use crate::models::quote::{Quote, QuoteSummaryResponse};
24
25use crate::providers::types::recommendation_from_similar;
26use crate::providers::yahoo::YahooProvider;
27use crate::providers::{
28 Capability, Fetch, Provider, ProviderAdapter, ProviderSet, Routes, build_providers,
29};
30use crate::ticker::ClientHandle;
31use crate::utils::{CacheEntry, EVICTION_THRESHOLD, filter_by_range};
32use futures::stream::{self, StreamExt};
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::Duration;
36use tokio::sync::RwLock;
37
38type MapCache<K, V> = Arc<RwLock<HashMap<K, CacheEntry<V>>>>;
40type ChartCacheKey = (Arc<str>, Interval, TimeRange);
41type QuoteCache = MapCache<Arc<str>, Quote>;
42type ChartCache = MapCache<ChartCacheKey, Chart>;
43type EventsCache = MapCache<Arc<str>, ChartEvents>;
44type FinancialsCache = MapCache<(Arc<str>, StatementType, Frequency), FinancialStatement>;
45type NewsCache = MapCache<Arc<str>, Vec<News>>;
46type RecommendationsCache = MapCache<(Arc<str>, u32), Recommendation>;
47type OptionsCache = MapCache<(Arc<str>, Option<i64>), Options>;
48type SparkCacheKey = (Arc<str>, Interval, TimeRange);
49type SparkCache = MapCache<SparkCacheKey, Spark>;
50#[cfg(feature = "indicators")]
51type IndicatorsCache = MapCache<(Arc<str>, Interval, TimeRange), indicators::IndicatorsSummary>;
52
53type FetchGuard = Arc<tokio::sync::Mutex<()>>;
55type FetchGuardMap<K> = Arc<RwLock<HashMap<K, FetchGuard>>>;
56
57define_batch_response! {
59 BatchQuotesResponse => quotes: Quote
61}
62
63define_batch_response! {
64 BatchChartsResponse => charts: Chart
66}
67
68define_batch_response! {
69 BatchSparksResponse => sparks: Spark
74}
75
76define_batch_response! {
77 BatchDividendsResponse => dividends: Vec<Dividend>
79}
80
81define_batch_response! {
82 BatchSplitsResponse => splits: Vec<Split>
84}
85
86define_batch_response! {
87 BatchCapitalGainsResponse => capital_gains: Vec<CapitalGain>
89}
90
91define_batch_response! {
92 BatchFinancialsResponse => financials: FinancialStatement
94}
95
96define_batch_response! {
97 BatchNewsResponse => news: Vec<News>
99}
100
101define_batch_response! {
102 BatchRecommendationsResponse => recommendations: Recommendation
104}
105
106define_batch_response! {
107 BatchOptionsResponse => options: Options
109}
110
111#[cfg(feature = "indicators")]
112define_batch_response! {
113 BatchIndicatorsResponse => indicators: indicators::IndicatorsSummary
115}
116
117const DEFAULT_MAX_CONCURRENCY: usize = 10;
119
120pub struct TickersBuilder {
122 symbols: Vec<Arc<str>>,
123 config: ClientConfig,
124 shared_client: Option<ClientHandle>,
125 injected_providers: Option<Arc<ProviderSet>>,
126 max_concurrency: usize,
127 cache_ttl: Option<Duration>,
128 include_logo: bool,
129}
130
131impl TickersBuilder {
132 fn new<S, I>(symbols: I) -> Self
133 where
134 S: Into<String>,
135 I: IntoIterator<Item = S>,
136 {
137 Self {
138 symbols: symbols.into_iter().map(|s| s.into().into()).collect(),
139 config: ClientConfig::default(),
140 shared_client: None,
141 injected_providers: None,
142 max_concurrency: DEFAULT_MAX_CONCURRENCY,
143 cache_ttl: None,
144 include_logo: false,
145 }
146 }
147
148 pub fn region(mut self, region: Region) -> Self {
150 self.config.lang = region.lang().to_string();
151 self.config.region = region.region().to_string();
152 self
153 }
154
155 pub fn lang(mut self, lang: impl Into<String>) -> Self {
157 self.config.lang = lang.into();
158 self
159 }
160
161 pub fn region_code(mut self, region: impl Into<String>) -> Self {
163 self.config.region = region.into();
164 self
165 }
166
167 pub fn timeout(mut self, timeout: Duration) -> Self {
169 self.config.timeout = timeout;
170 self
171 }
172
173 pub fn proxy(mut self, proxy: impl Into<String>) -> Self {
175 self.config.proxy = Some(proxy.into());
176 self
177 }
178
179 pub fn config(mut self, config: ClientConfig) -> Self {
181 self.config = config;
182 self
183 }
184
185 pub fn max_concurrency(mut self, n: usize) -> Self {
194 self.max_concurrency = n.max(1);
195 self
196 }
197
198 pub fn cache(mut self, ttl: Duration) -> Self {
219 self.cache_ttl = Some(ttl);
220 self
221 }
222
223 pub fn logo(mut self) -> Self {
228 self.include_logo = true;
229 self
230 }
231
232 pub(crate) fn with_provider_set(mut self, set: Arc<ProviderSet>) -> Self {
234 self.injected_providers = Some(set);
235 self
236 }
237
238 pub fn client(mut self, handle: ClientHandle) -> Self {
244 self.shared_client = Some(handle);
245 self
246 }
247
248 pub async fn build(self) -> Result<Tickers> {
250 let providers = if let Some(set) = self.injected_providers {
251 set
252 } else if let Some(handle) = self.shared_client {
253 let yahoo = YahooProvider::from_client(handle.0);
254 let client = yahoo.client_arc();
255 Arc::new(ProviderSet::new(
256 vec![Arc::new(yahoo) as Arc<dyn ProviderAdapter>],
257 Some(client),
258 Routes::new(Fetch::Sequential),
259 ))
260 } else {
261 Arc::new(
262 build_providers(
263 &[Provider::Yahoo],
264 &self.config,
265 Routes::new(Fetch::Sequential),
266 )
267 .await?,
268 )
269 };
270
271 Ok(Tickers {
272 symbols: self.symbols,
273 providers,
274 max_concurrency: self.max_concurrency,
275 cache_ttl: self.cache_ttl,
276 include_logo: self.include_logo,
277 quote_cache: Default::default(),
278 chart_cache: Default::default(),
279 events_cache: Default::default(),
280 financials_cache: Default::default(),
281 news_cache: Default::default(),
282 recommendations_cache: Default::default(),
283 options_cache: Default::default(),
284 spark_cache: Default::default(),
285 #[cfg(feature = "indicators")]
286 indicators_cache: Default::default(),
287
288 quotes_fetch: Arc::new(tokio::sync::Mutex::new(())),
290 charts_fetch: Default::default(),
291 financials_fetch: Default::default(),
292 news_fetch: Arc::new(tokio::sync::Mutex::new(())),
293 recommendations_fetch: Default::default(),
294 options_fetch: Default::default(),
295 spark_fetch: Default::default(),
296 #[cfg(feature = "indicators")]
297 indicators_fetch: Default::default(),
298 })
299 }
300}
301
302pub struct Tickers {
333 symbols: Vec<Arc<str>>,
334 providers: Arc<ProviderSet>,
335 max_concurrency: usize,
336 cache_ttl: Option<Duration>,
337 include_logo: bool,
338 quote_cache: QuoteCache,
339 chart_cache: ChartCache,
340 events_cache: EventsCache,
341 financials_cache: FinancialsCache,
342 news_cache: NewsCache,
343 recommendations_cache: RecommendationsCache,
344 options_cache: OptionsCache,
345 spark_cache: SparkCache,
346 #[cfg(feature = "indicators")]
347 indicators_cache: IndicatorsCache,
348
349 quotes_fetch: FetchGuard,
351 charts_fetch: FetchGuardMap<(Interval, TimeRange)>,
352 financials_fetch: FetchGuardMap<(StatementType, Frequency)>,
353 news_fetch: FetchGuard,
354 recommendations_fetch: FetchGuardMap<u32>,
355 options_fetch: FetchGuardMap<Option<i64>>,
356 spark_fetch: FetchGuardMap<(Interval, TimeRange)>,
357 #[cfg(feature = "indicators")]
358 indicators_fetch: FetchGuardMap<(Interval, TimeRange)>,
359}
360
361impl std::fmt::Debug for Tickers {
362 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363 f.debug_struct("Tickers")
364 .field("symbols", &self.symbols)
365 .field("max_concurrency", &self.max_concurrency)
366 .field("cache_ttl", &self.cache_ttl)
367 .finish_non_exhaustive()
368 }
369}
370
371impl Tickers {
372 pub async fn new<S, I>(symbols: I) -> Result<Self>
389 where
390 S: Into<String>,
391 I: IntoIterator<Item = S>,
392 {
393 Self::builder(symbols).build().await
394 }
395
396 pub fn builder<S, I>(symbols: I) -> TickersBuilder
398 where
399 S: Into<String>,
400 I: IntoIterator<Item = S>,
401 {
402 TickersBuilder::new(symbols)
403 }
404
405 pub fn symbols(&self) -> Vec<&str> {
407 self.symbols.iter().map(|s| &**s).collect()
408 }
409
410 pub fn len(&self) -> usize {
412 self.symbols.len()
413 }
414
415 pub fn is_empty(&self) -> bool {
417 self.symbols.is_empty()
418 }
419
420 pub fn client_handle(&self) -> ClientHandle {
426 ClientHandle(
427 self.providers
428 .first_yahoo()
429 .expect("Tickers always uses a Yahoo session"),
430 )
431 }
432
433 #[inline]
435 fn is_cache_fresh<T>(&self, entry: Option<&CacheEntry<T>>) -> bool {
436 CacheEntry::is_fresh_with_ttl(entry, self.cache_ttl)
437 }
438
439 fn all_cached<K: Eq + std::hash::Hash, V>(
441 &self,
442 map: &HashMap<K, CacheEntry<V>>,
443 keys: impl Iterator<Item = K>,
444 ) -> bool {
445 let Some(ttl) = self.cache_ttl else {
446 return false;
447 };
448 keys.into_iter()
449 .all(|k| map.get(&k).map(|e| e.is_fresh(ttl)).unwrap_or(false))
450 }
451
452 #[inline]
457 fn cache_insert<K: Eq + std::hash::Hash, V>(
458 &self,
459 map: &mut HashMap<K, CacheEntry<V>>,
460 key: K,
461 value: V,
462 ) {
463 if let Some(ttl) = self.cache_ttl {
464 if map.len() >= EVICTION_THRESHOLD {
465 map.retain(|_, entry| entry.is_fresh(ttl));
466 }
467 map.insert(key, CacheEntry::new(value));
468 }
469 }
470
471 pub async fn quotes(&self) -> Result<BatchQuotesResponse> {
479 {
481 let cache = self.quote_cache.read().await;
482 if self.all_cached(&cache, self.symbols.iter().cloned()) {
483 let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
484 for symbol in &self.symbols {
485 if let Some(entry) = cache.get(symbol) {
486 response
487 .quotes
488 .insert(symbol.to_string(), entry.value.clone());
489 }
490 }
491 return Ok(response);
492 }
493 }
494
495 let _fetch_guard = self.quotes_fetch.lock().await;
496
497 {
499 let cache = self.quote_cache.read().await;
500 if self.all_cached(&cache, self.symbols.iter().cloned()) {
501 let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
502 for symbol in &self.symbols {
503 if let Some(entry) = cache.get(symbol) {
504 response
505 .quotes
506 .insert(symbol.to_string(), entry.value.clone());
507 }
508 }
509 return Ok(response);
510 }
511 }
512
513 let symbol_strings: Vec<String> = self.symbols.iter().map(|s| s.to_string()).collect();
514 let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
515
516 let (quote_data, logos) = if self.include_logo {
517 let providers_logo = Arc::clone(&self.providers);
519 let syms_logo = symbol_strings.clone();
520 let logo_future = async move {
521 if let Ok(client) = providers_logo.first_yahoo() {
522 let syms_ref: Vec<&str> = syms_logo.iter().map(String::as_str).collect();
523 crate::adapters::yahoo::quote::quotes::fetch_with_fields(
524 &client,
525 &syms_ref,
526 Some(&["logoUrl", "companyLogoUrl"]),
527 true,
528 true,
529 )
530 .await
531 .ok()
532 } else {
533 None
534 }
535 };
536
537 let providers_quote = Arc::clone(&self.providers);
538 let syms_quote = symbol_strings.clone();
539 let quote_future = async move {
540 providers_quote
541 .fetch(Capability::QUOTE, |p| {
542 let syms = syms_quote.clone();
543 let p = p.clone();
544 async move {
545 let syms_ref: Vec<&str> = syms.iter().map(String::as_str).collect();
546 p.fetch_quotes_batch(&syms_ref).await
547 }
548 })
549 .await
550 };
551
552 let (batch_result, logo_result) = tokio::join!(quote_future, logo_future);
553 let quote_data = match batch_result {
554 Ok(data) => data,
555 Err(_) => {
556 self.fetch_quotes_per_symbol(&symbol_strings, &mut response)
557 .await
558 }
559 };
560 (quote_data, logo_result)
561 } else {
562 let providers = Arc::clone(&self.providers);
563 let syms = symbol_strings.clone();
564 let batch_result = providers
565 .fetch(Capability::QUOTE, |p| {
566 let syms = syms.clone();
567 let p = p.clone();
568 async move {
569 let syms_ref: Vec<&str> = syms.iter().map(String::as_str).collect();
570 p.fetch_quotes_batch(&syms_ref).await
571 }
572 })
573 .await;
574 let data = match batch_result {
575 Ok(data) => data,
576 Err(_) => {
577 self.fetch_quotes_per_symbol(&symbol_strings, &mut response)
578 .await
579 }
580 };
581 (data, None)
582 };
583
584 let logo_map: HashMap<String, (Option<String>, Option<String>)> = logos
585 .and_then(|l| l.get("quoteResponse")?.get("result")?.as_array().cloned())
586 .map(|results| {
587 results
588 .iter()
589 .filter_map(|r| {
590 let symbol = r.get("symbol")?.as_str()?.to_string();
591 let logo_url = r.get("logoUrl").and_then(|v| v.as_str()).map(String::from);
592 let company_logo_url = r
593 .get("companyLogoUrl")
594 .and_then(|v| v.as_str())
595 .map(String::from);
596 Some((symbol, (logo_url, company_logo_url)))
597 })
598 .collect()
599 })
600 .unwrap_or_default();
601
602 let mut parsed_quotes: Vec<(String, Quote)> = Vec::new();
603
604 for (symbol, summary) in quote_data {
605 let logo_url = logo_map.get(&symbol).and_then(|(l, _)| l.clone());
606 let company_logo_url = logo_map.get(&symbol).and_then(|(_, c)| c.clone());
607 let quote = Quote::from_response(&summary, logo_url, company_logo_url);
608 parsed_quotes.push((symbol, quote));
609 }
610
611 if self.cache_ttl.is_some() {
612 let mut cache = self.quote_cache.write().await;
613 for (symbol, quote) in &parsed_quotes {
614 self.cache_insert(&mut cache, symbol.as_str().into(), quote.clone());
615 }
616 }
617
618 for (symbol, quote) in parsed_quotes {
619 response.quotes.insert(symbol, quote);
620 }
621
622 for symbol in &self.symbols {
624 let s = &**symbol;
625 if !response.quotes.contains_key(s) && !response.errors.contains_key(s) {
626 response.errors.insert(
627 symbol.to_string(),
628 "Symbol not found in response".to_string(),
629 );
630 }
631 }
632
633 Ok(response)
634 }
635
636 async fn fetch_quotes_per_symbol(
639 &self,
640 symbols: &[String],
641 response: &mut BatchQuotesResponse,
642 ) -> Vec<(String, QuoteSummaryResponse)> {
643 let futures: Vec<_> = symbols
644 .iter()
645 .map(|sym| {
646 let providers = Arc::clone(&self.providers);
647 let sym = sym.clone();
648 async move {
649 let result = providers
650 .fetch(Capability::QUOTE, |p| {
651 let sym = sym.clone();
652 let p = p.clone();
653 async move { p.fetch_quote(&sym).await }
654 })
655 .await;
656 (sym, result)
657 }
658 })
659 .collect();
660
661 let results: Vec<_> = stream::iter(futures)
662 .buffer_unordered(self.max_concurrency)
663 .collect()
664 .await;
665
666 let mut successes = Vec::new();
667 for (sym, result) in results {
668 match result {
669 Ok(resp) => successes.push((sym, resp)),
670 Err(e) => {
671 response.errors.insert(sym, e.to_string());
672 }
673 }
674 }
675 successes
676 }
677
678 pub async fn quote<F>(&self, symbol: &str) -> Result<Quote<F>>
680 where
681 F: Format,
682 Quote<Both>: Into<Quote<F>>,
683 {
684 {
685 let cache = self.quote_cache.read().await;
686 if let Some(entry) = cache.get(symbol)
687 && self.is_cache_fresh(Some(entry))
688 {
689 return Ok(entry.value.clone().into());
690 }
691 }
692
693 let response = self.quotes().await?;
694
695 response
696 .quotes
697 .get(symbol)
698 .cloned()
699 .map(Into::into)
700 .ok_or_else(|| FinanceError::SymbolNotFound {
701 symbol: Some(symbol.to_string()),
702 context: response
703 .errors
704 .get(symbol)
705 .cloned()
706 .unwrap_or_else(|| "Symbol not found".to_string()),
707 })
708 }
709
710 async fn get_fetch_guard<K: Clone + Eq + std::hash::Hash>(
715 guard_map: &FetchGuardMap<K>,
716 key: K,
717 ) -> FetchGuard {
718 {
719 let guards = guard_map.read().await;
720 if let Some(guard) = guards.get(&key) {
721 return Arc::clone(guard);
722 }
723 }
724
725 let mut guards = guard_map.write().await;
726 Arc::clone(
727 guards
728 .entry(key)
729 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))),
730 )
731 }
732
733 pub async fn charts(
738 &self,
739 interval: Interval,
740 range: TimeRange,
741 ) -> Result<BatchChartsResponse> {
742 {
744 let cache = self.chart_cache.read().await;
745 if self.all_cached(
746 &cache,
747 self.symbols.iter().map(|s| (s.clone(), interval, range)),
748 ) {
749 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
750 for symbol in &self.symbols {
751 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
752 response
753 .charts
754 .insert(symbol.to_string(), entry.value.clone());
755 }
756 }
757 return Ok(response);
758 }
759 }
760
761 let fetch_guard = Self::get_fetch_guard(&self.charts_fetch, (interval, range)).await;
763 let _guard = fetch_guard.lock().await;
764
765 {
767 let cache = self.chart_cache.read().await;
768 if self.all_cached(
769 &cache,
770 self.symbols.iter().map(|s| (s.clone(), interval, range)),
771 ) {
772 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
773 for symbol in &self.symbols {
774 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
775 response
776 .charts
777 .insert(symbol.to_string(), entry.value.clone());
778 }
779 }
780 return Ok(response);
781 }
782 }
783
784 let futures: Vec<_> = self
786 .symbols
787 .iter()
788 .map(|symbol| {
789 let providers = Arc::clone(&self.providers);
790 let symbol = Arc::clone(symbol);
791 async move {
792 let sym = symbol.to_string();
793 let result = providers
794 .fetch(Capability::CHART, |p| {
795 let sym = sym.clone();
796 let p = p.clone();
797 async move { p.fetch_chart(&sym, interval, range).await }
798 })
799 .await;
800 (symbol, result)
801 }
802 })
803 .collect();
804
805 let results: Vec<_> = stream::iter(futures)
806 .buffer_unordered(self.max_concurrency)
807 .collect()
808 .await;
809
810 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
811 let mut parsed_charts: Vec<(Arc<str>, Chart)> = Vec::new();
812
813 for (symbol, result) in results {
814 match result {
815 Ok(data) => {
816 let chart = data;
817 parsed_charts.push((symbol, chart));
818 }
819 Err(e) => {
820 response.errors.insert(symbol.to_string(), e.to_string());
821 }
822 }
823 }
824
825 if self.cache_ttl.is_some() {
827 let mut cache = self.chart_cache.write().await;
828 let cache_keys: Vec<_> = parsed_charts
829 .into_iter()
830 .map(|(symbol, chart)| {
831 self.cache_insert(&mut cache, (symbol.clone(), interval, range), chart);
832 symbol
833 })
834 .collect();
835 for symbol in cache_keys {
836 if let Some(cached) = cache.get(&(symbol.clone(), interval, range)) {
837 response
838 .charts
839 .insert(symbol.to_string(), cached.value.clone());
840 }
841 }
842 } else {
843 for (symbol, chart) in parsed_charts {
844 response.charts.insert(symbol.to_string(), chart);
845 }
846 }
847
848 Ok(response)
849 }
850
851 pub async fn chart(&self, symbol: &str, interval: Interval, range: TimeRange) -> Result<Chart> {
853 {
854 let cache = self.chart_cache.read().await;
855 let key: Arc<str> = symbol.into();
856 if let Some(entry) = cache.get(&(key, interval, range))
857 && self.is_cache_fresh(Some(entry))
858 {
859 return Ok(entry.value.clone());
860 }
861 }
862
863 let response = self.charts(interval, range).await?;
864
865 response
866 .charts
867 .get(symbol)
868 .cloned()
869 .ok_or_else(|| FinanceError::SymbolNotFound {
870 symbol: Some(symbol.to_string()),
871 context: response
872 .errors
873 .get(symbol)
874 .cloned()
875 .unwrap_or_else(|| "Symbol not found".to_string()),
876 })
877 }
878
879 pub async fn charts_range(
891 &self,
892 interval: Interval,
893 start: i64,
894 end: i64,
895 ) -> Result<BatchChartsResponse> {
896 let futures: Vec<_> = self
897 .symbols
898 .iter()
899 .map(|symbol| {
900 let providers = Arc::clone(&self.providers);
901 let symbol = Arc::clone(symbol);
902 async move {
903 let sym = symbol.to_string();
904 let result = providers
905 .fetch(Capability::CHART, |p| {
906 let sym = sym.clone();
907 let p = p.clone();
908 async move { p.fetch_chart_range(&sym, interval, start, end).await }
909 })
910 .await;
911 (symbol, result)
912 }
913 })
914 .collect();
915
916 let results: Vec<_> = stream::iter(futures)
917 .buffer_unordered(self.max_concurrency)
918 .collect()
919 .await;
920
921 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
922
923 for (symbol, result) in results {
924 match result {
925 Ok(data) => {
926 let chart = data;
927 response.charts.insert(symbol.to_string(), chart);
928 }
929 Err(e) => {
930 response.errors.insert(symbol.to_string(), e.to_string());
931 }
932 }
933 }
934
935 Ok(response)
936 }
937
938 async fn ensure_events_loaded(&self) -> Result<()> {
948 let symbols_to_fetch: Vec<Arc<str>> = {
950 let cache = self.events_cache.read().await;
951 self.symbols
952 .iter()
953 .filter(|sym| !cache.contains_key(*sym))
954 .cloned()
955 .collect()
956 };
957
958 if symbols_to_fetch.is_empty() {
959 return Ok(());
960 }
961
962 let futures: Vec<_> = symbols_to_fetch
964 .iter()
965 .map(|symbol| {
966 let providers = Arc::clone(&self.providers);
967 let symbol = Arc::clone(symbol);
968 async move {
969 let sym = symbol.to_string();
970 let result = providers
971 .fetch(Capability::CORPORATE, |p| {
972 let sym = sym.clone();
973 let p = p.clone();
974 async move { p.fetch_events(&sym).await }
975 })
976 .await;
977 (symbol, result)
978 }
979 })
980 .collect();
981
982 let results: Vec<_> = stream::iter(futures)
983 .buffer_unordered(self.max_concurrency)
984 .collect()
985 .await;
986
987 let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
988
989 for (symbol, result) in results {
990 if let Ok(events_data) = result {
991 parsed_events.push((symbol, events_data));
992 }
993 }
994
995 if !parsed_events.is_empty() {
997 let mut events_cache = self.events_cache.write().await;
998 for (symbol, events) in parsed_events {
999 events_cache.insert(symbol, CacheEntry::new(events));
1000 }
1001 }
1002
1003 Ok(())
1004 }
1005
1006 pub async fn spark(&self, interval: Interval, range: TimeRange) -> Result<BatchSparksResponse> {
1035 {
1037 let cache = self.spark_cache.read().await;
1038 if self.all_cached(
1039 &cache,
1040 self.symbols.iter().map(|s| (s.clone(), interval, range)),
1041 ) {
1042 let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1043 for symbol in &self.symbols {
1044 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
1045 response
1046 .sparks
1047 .insert(symbol.to_string(), entry.value.clone());
1048 }
1049 }
1050 return Ok(response);
1051 }
1052 }
1053
1054 let fetch_guard = Self::get_fetch_guard(&self.spark_fetch, (interval, range)).await;
1056 let _guard = fetch_guard.lock().await;
1057
1058 {
1060 let cache = self.spark_cache.read().await;
1061 if self.all_cached(
1062 &cache,
1063 self.symbols.iter().map(|s| (s.clone(), interval, range)),
1064 ) {
1065 let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1066 for symbol in &self.symbols {
1067 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
1068 response
1069 .sparks
1070 .insert(symbol.to_string(), entry.value.clone());
1071 }
1072 }
1073 return Ok(response);
1074 }
1075 }
1076
1077 let client = self.providers.first_yahoo()?;
1079 let symbols_ref: Vec<&str> = self.symbols.iter().map(|s| &**s).collect();
1080 let json =
1081 crate::adapters::yahoo::quote::spark::fetch(&client, &symbols_ref, interval, range)
1082 .await?;
1083
1084 let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1085
1086 match SparkResponse::from_json(json) {
1087 Ok(spark_response) => {
1088 let mut parsed_sparks: Vec<(Arc<str>, Spark)> = Vec::new();
1089
1090 if let Some(results) = spark_response.spark.result {
1091 for result in &results {
1092 if let Some(spark) = Spark::from_response(
1093 result,
1094 Some(interval.as_str().to_string()),
1095 Some(range.as_str().to_string()),
1096 ) {
1097 let sym: Arc<str> = result.symbol.as_str().into();
1098 parsed_sparks.push((sym, spark));
1099 } else {
1100 response.errors.insert(
1101 result.symbol.to_string(),
1102 "Failed to parse spark data".to_string(),
1103 );
1104 }
1105 }
1106 }
1107
1108 if self.cache_ttl.is_some() {
1110 let mut cache = self.spark_cache.write().await;
1111 for (symbol, spark) in &parsed_sparks {
1112 self.cache_insert(
1113 &mut cache,
1114 (symbol.clone(), interval, range),
1115 spark.clone(),
1116 );
1117 }
1118 }
1119
1120 for (symbol, spark) in parsed_sparks {
1122 response.sparks.insert(symbol.to_string(), spark);
1123 }
1124
1125 for symbol in &self.symbols {
1127 let symbol_str = &**symbol;
1128 if !response.sparks.contains_key(symbol_str)
1129 && !response.errors.contains_key(symbol_str)
1130 {
1131 response.errors.insert(
1132 symbol.to_string(),
1133 "Symbol not found in response".to_string(),
1134 );
1135 }
1136 }
1137 }
1138 Err(e) => {
1139 for symbol in &self.symbols {
1140 response.errors.insert(symbol.to_string(), e.to_string());
1141 }
1142 }
1143 }
1144
1145 Ok(response)
1146 }
1147
1148 pub async fn dividends(&self, range: TimeRange) -> Result<BatchDividendsResponse> {
1173 let mut response = BatchDividendsResponse::with_capacity(self.symbols.len());
1174
1175 self.ensure_events_loaded().await?;
1177
1178 let events_cache = self.events_cache.read().await;
1179
1180 for symbol in &self.symbols {
1181 if let Some(entry) = events_cache.get(symbol) {
1182 let all_dividends = entry.value.to_dividends();
1183 let filtered = filter_by_range(all_dividends, range);
1184 response.dividends.insert(symbol.to_string(), filtered);
1185 } else {
1186 response
1187 .errors
1188 .insert(symbol.to_string(), "No events data available".to_string());
1189 }
1190 }
1191
1192 Ok(response)
1193 }
1194
1195 pub async fn splits(&self, range: TimeRange) -> Result<BatchSplitsResponse> {
1222 let mut response = BatchSplitsResponse::with_capacity(self.symbols.len());
1223
1224 self.ensure_events_loaded().await?;
1226
1227 let events_cache = self.events_cache.read().await;
1228
1229 for symbol in &self.symbols {
1230 if let Some(entry) = events_cache.get(symbol) {
1231 let all_splits = entry.value.to_splits();
1232 let filtered = filter_by_range(all_splits, range);
1233 response.splits.insert(symbol.to_string(), filtered);
1234 } else {
1235 response
1236 .errors
1237 .insert(symbol.to_string(), "No events data available".to_string());
1238 }
1239 }
1240
1241 Ok(response)
1242 }
1243
1244 pub async fn capital_gains(&self, range: TimeRange) -> Result<BatchCapitalGainsResponse> {
1270 let mut response = BatchCapitalGainsResponse::with_capacity(self.symbols.len());
1271
1272 self.ensure_events_loaded().await?;
1274
1275 let events_cache = self.events_cache.read().await;
1276
1277 for symbol in &self.symbols {
1278 if let Some(entry) = events_cache.get(symbol) {
1279 let all_gains = entry.value.to_capital_gains();
1280 let filtered = filter_by_range(all_gains, range);
1281 response.capital_gains.insert(symbol.to_string(), filtered);
1282 } else {
1283 response
1284 .errors
1285 .insert(symbol.to_string(), "No events data available".to_string());
1286 }
1287 }
1288
1289 Ok(response)
1290 }
1291
1292 pub async fn financials(
1320 &self,
1321 statement_type: StatementType,
1322 frequency: Frequency,
1323 ) -> Result<BatchFinancialsResponse> {
1324 batch_fetch_cached!(self;
1325 cache: financials_cache,
1326 guard: map(financials_fetch, (statement_type, frequency)),
1327 key: |s| (s.clone(), statement_type, frequency),
1328 response: BatchFinancialsResponse.financials,
1329 fetch: |providers, symbol| {
1330 let sym = symbol.to_string();
1331 providers.fetch(Capability::FUNDAMENTALS, move |p| {
1332 let sym = sym.clone();
1333 let p = p.clone();
1334 async move {
1335 p.fetch_financials(&sym, statement_type, frequency)
1336 .await
1337 }
1338 }).await
1339 },
1340 )
1341 }
1342
1343 pub async fn news(&self) -> Result<BatchNewsResponse> {
1367 batch_fetch_cached!(self;
1368 cache: news_cache,
1369 guard: simple(news_fetch),
1370 key: |s| s.clone(),
1371 response: BatchNewsResponse.news,
1372 fetch: |providers, symbol| {
1373 let sym = symbol.to_string();
1374 providers.fetch(Capability::CORPORATE, move |p| {
1375 let sym = sym.clone();
1376 let p = p.clone();
1377 async move {
1378 p.fetch_news(&sym)
1379 .await
1380 .map(|data| data.into_iter().collect::<Vec<News>>())
1381 }
1382 }).await
1383 },
1384 )
1385 }
1386
1387 pub async fn recommendations(&self, limit: u32) -> Result<BatchRecommendationsResponse> {
1416 batch_fetch_cached!(self;
1417 cache: recommendations_cache,
1418 guard: map(recommendations_fetch, limit),
1419 key: |s| (s.clone(), limit),
1420 response: BatchRecommendationsResponse.recommendations,
1421 fetch: |providers, symbol| {
1422 let sym = symbol.to_string();
1423 providers.fetch(Capability::CORPORATE, move |p| {
1424 let sym = sym.clone();
1425 let p = p.clone();
1426 async move {
1427 let items = p.fetch_similar_symbols(&sym, limit).await?;
1428 Ok(recommendation_from_similar(
1429 sym,
1430 Some(Provider::from_id_str(p.id()).ok_or_else(|| {
1431 FinanceError::InternalError(format!("unknown provider id: {}", p.id()))
1432 })?),
1433 items,
1434 Some(limit),
1435 ))
1436 }
1437 }).await
1438 },
1439 )
1440 }
1441
1442 pub async fn options(&self, date: Option<i64>) -> Result<BatchOptionsResponse> {
1467 batch_fetch_cached!(self;
1468 cache: options_cache,
1469 guard: map(options_fetch, date),
1470 key: |s| (s.clone(), date),
1471 response: BatchOptionsResponse.options,
1472 fetch: |providers, symbol| {
1473 let sym = symbol.to_string();
1474 providers.fetch(Capability::OPTIONS, move |p| {
1475 let sym = sym.clone();
1476 let p = p.clone();
1477 async move {
1478 p.fetch_options(&sym, date).await
1479 }
1480 }).await
1481 },
1482 )
1483 }
1484
1485 #[cfg(feature = "indicators")]
1511 pub async fn indicators(
1512 &self,
1513 interval: Interval,
1514 range: TimeRange,
1515 ) -> Result<BatchIndicatorsResponse> {
1516 let cache_key_for = |symbol: &Arc<str>| (symbol.clone(), interval, range);
1517
1518 {
1520 let cache = self.indicators_cache.read().await;
1521 if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
1522 let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1523 for symbol in &self.symbols {
1524 if let Some(entry) = cache.get(&cache_key_for(symbol)) {
1525 response
1526 .indicators
1527 .insert(symbol.to_string(), entry.value.clone());
1528 }
1529 }
1530 return Ok(response);
1531 }
1532 }
1533
1534 let fetch_guard = Self::get_fetch_guard(&self.indicators_fetch, (interval, range)).await;
1536 let _guard = fetch_guard.lock().await;
1537
1538 {
1540 let cache = self.indicators_cache.read().await;
1541 if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
1542 let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1543 for symbol in &self.symbols {
1544 if let Some(entry) = cache.get(&cache_key_for(symbol)) {
1545 response
1546 .indicators
1547 .insert(symbol.to_string(), entry.value.clone());
1548 }
1549 }
1550 return Ok(response);
1551 }
1552 }
1553
1554 let charts_response = self.charts(interval, range).await?;
1556
1557 let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1558
1559 let mut calculated_indicators: Vec<(String, indicators::IndicatorsSummary)> = Vec::new();
1561
1562 for (symbol, chart) in &charts_response.charts {
1563 let indicators = indicators::summary::calculate_indicators(&chart.candles);
1564 calculated_indicators.push((symbol.to_string(), indicators));
1565 }
1566
1567 if self.cache_ttl.is_some() {
1569 let mut cache = self.indicators_cache.write().await;
1570 for (symbol, indicators) in &calculated_indicators {
1571 let key: Arc<str> = symbol.as_str().into();
1572 self.cache_insert(&mut cache, cache_key_for(&key), indicators.clone());
1573 }
1574 }
1575
1576 for (symbol, indicators) in calculated_indicators {
1578 response.indicators.insert(symbol, indicators);
1579 }
1580
1581 for (symbol, error) in &charts_response.errors {
1583 response.errors.insert(symbol.to_string(), error.clone());
1584 }
1585
1586 Ok(response)
1587 }
1588
1589 pub fn add_symbols(&mut self, symbols: &[impl AsRef<str>]) {
1610 use std::collections::HashSet;
1612
1613 let existing: HashSet<&str> = self.symbols.iter().map(|s| &**s).collect();
1614 let to_add: Vec<Arc<str>> = symbols
1615 .iter()
1616 .map(|s| s.as_ref())
1617 .filter(|s| !existing.contains(s))
1618 .map(|s| s.into())
1619 .collect();
1620
1621 self.symbols.extend(to_add);
1622 }
1623
1624 #[cfg(feature = "backtesting")]
1663 pub async fn backtest<S, F>(
1664 &self,
1665 interval: Interval,
1666 range: TimeRange,
1667 config: Option<backtesting::portfolio::PortfolioConfig>,
1668 factory: F,
1669 ) -> backtesting::Result<backtesting::portfolio::PortfolioResult>
1670 where
1671 S: backtesting::Strategy,
1672 F: Fn(&str) -> S,
1673 {
1674 use crate::backtesting::portfolio::{PortfolioEngine, SymbolData};
1675
1676 let config = config.unwrap_or_default();
1677 config.validate(self.symbols.len())?;
1678
1679 let charts = self
1681 .charts(interval, range)
1682 .await
1683 .map_err(|e| backtesting::BacktestError::ChartError(e.to_string()))?;
1684
1685 let dividends_map = self
1688 .dividends(range)
1689 .await
1690 .map(|b| b.dividends)
1691 .unwrap_or_default();
1692
1693 let symbol_data: Vec<SymbolData> = self
1695 .symbols
1696 .iter()
1697 .filter_map(|sym| {
1698 charts.charts.get(sym.as_ref()).map(|chart| {
1699 let divs = dividends_map.get(sym.as_ref()).cloned().unwrap_or_default();
1700 SymbolData::new(sym.as_ref(), chart.candles.clone()).with_dividends(divs)
1701 })
1702 })
1703 .collect();
1704
1705 let engine = PortfolioEngine::new(config);
1706 engine.run(&symbol_data, factory)
1707 }
1708
1709 pub async fn remove_symbols(&mut self, symbols: &[impl AsRef<str>]) {
1726 use std::collections::HashSet;
1727 let to_remove: HashSet<&str> = symbols.iter().map(|s| s.as_ref()).collect();
1728
1729 self.symbols.retain(|s| !to_remove.contains(&**s));
1731
1732 let (
1734 mut quote_cache,
1735 mut chart_cache,
1736 mut events_cache,
1737 mut financials_cache,
1738 mut news_cache,
1739 mut recommendations_cache,
1740 mut options_cache,
1741 mut spark_cache,
1742 ) = tokio::join!(
1743 self.quote_cache.write(),
1744 self.chart_cache.write(),
1745 self.events_cache.write(),
1746 self.financials_cache.write(),
1747 self.news_cache.write(),
1748 self.recommendations_cache.write(),
1749 self.options_cache.write(),
1750 self.spark_cache.write(),
1751 );
1752
1753 for symbol in &to_remove {
1755 let key: Arc<str> = (*symbol).into();
1756 quote_cache.remove(&key);
1757 events_cache.remove(&key);
1758 news_cache.remove(&key);
1759 }
1760
1761 chart_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1763 financials_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1764 recommendations_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
1765 options_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
1766 spark_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1767
1768 drop((
1770 quote_cache,
1771 chart_cache,
1772 events_cache,
1773 financials_cache,
1774 news_cache,
1775 recommendations_cache,
1776 options_cache,
1777 spark_cache,
1778 ));
1779
1780 #[cfg(feature = "indicators")]
1781 self.indicators_cache
1782 .write()
1783 .await
1784 .retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1785 }
1786
1787 pub async fn clear_cache(&self) {
1792 tokio::join!(
1793 async { self.quote_cache.write().await.clear() },
1795 async { self.chart_cache.write().await.clear() },
1796 async { self.events_cache.write().await.clear() },
1797 async { self.financials_cache.write().await.clear() },
1798 async { self.news_cache.write().await.clear() },
1799 async { self.recommendations_cache.write().await.clear() },
1800 async { self.options_cache.write().await.clear() },
1801 async { self.spark_cache.write().await.clear() },
1802 async {
1803 #[cfg(feature = "indicators")]
1804 self.indicators_cache.write().await.clear();
1805 },
1806 async { self.charts_fetch.write().await.clear() },
1808 async { self.financials_fetch.write().await.clear() },
1809 async { self.recommendations_fetch.write().await.clear() },
1810 async { self.options_fetch.write().await.clear() },
1811 async { self.spark_fetch.write().await.clear() },
1812 async {
1813 #[cfg(feature = "indicators")]
1814 self.indicators_fetch.write().await.clear();
1815 },
1816 );
1817 }
1818
1819 pub async fn clear_quote_cache(&self) {
1823 self.quote_cache.write().await.clear();
1824 }
1825
1826 pub async fn clear_chart_cache(&self) {
1831 tokio::join!(
1832 async { self.chart_cache.write().await.clear() },
1833 async { self.events_cache.write().await.clear() },
1834 async { self.spark_cache.write().await.clear() },
1835 async {
1836 #[cfg(feature = "indicators")]
1837 self.indicators_cache.write().await.clear();
1838 },
1839 );
1840 }
1841}
1842
1843#[cfg(test)]
1844mod tests {
1845 use super::*;
1846
1847 #[tokio::test]
1848 #[ignore = "requires network access"]
1849 async fn test_tickers_quotes() {
1850 let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1851 let result = tickers.quotes().await.unwrap();
1852
1853 assert!(result.success_count() > 0);
1854 }
1855
1856 #[tokio::test]
1857 #[ignore = "requires network access"]
1858 async fn test_tickers_charts() {
1859 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1860 let result = tickers
1861 .charts(Interval::OneDay, TimeRange::FiveDays)
1862 .await
1863 .unwrap();
1864
1865 assert!(result.success_count() > 0);
1866 }
1867
1868 #[tokio::test]
1869 #[ignore = "requires network access"]
1870 async fn test_tickers_spark() {
1871 let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1872 let result = tickers
1873 .spark(Interval::FiveMinutes, TimeRange::OneDay)
1874 .await
1875 .unwrap();
1876
1877 assert!(result.success_count() > 0);
1878
1879 if let Some(spark) = result.sparks.get("AAPL") {
1881 assert!(!spark.closes.is_empty());
1882 assert_eq!(spark.symbol, "AAPL");
1883 assert!(spark.percent_change().is_some());
1885 }
1886 }
1887
1888 #[tokio::test]
1889 #[ignore = "requires network access"]
1890 async fn test_tickers_dividends() {
1891 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1892 let result = tickers.dividends(TimeRange::OneYear).await.unwrap();
1893
1894 assert!(result.success_count() > 0);
1895
1896 if let Some(dividends) = result.dividends.get("AAPL")
1898 && !dividends.is_empty()
1899 {
1900 let div = ÷nds[0];
1901 assert!(div.timestamp > 0);
1902 assert!(div.amount > 0.0);
1903 }
1904 }
1905
1906 #[tokio::test]
1907 #[ignore = "requires network access"]
1908 async fn test_tickers_splits() {
1909 let tickers = Tickers::new(["NVDA", "TSLA"]).await.unwrap();
1910 let result = tickers.splits(TimeRange::FiveYears).await.unwrap();
1911
1912 assert!(result.success_count() > 0);
1914
1915 for splits in result.splits.values() {
1917 for split in splits {
1918 assert!(split.timestamp > 0);
1919 assert!(split.numerator > 0.0);
1920 assert!(split.denominator > 0.0);
1921 assert!(!split.ratio.is_empty());
1922 }
1923 }
1924 }
1925
1926 #[tokio::test]
1927 #[ignore = "requires network access"]
1928 async fn test_tickers_capital_gains() {
1929 let tickers = Tickers::new(["VFIAX", "VTI"]).await.unwrap();
1930 let result = tickers.capital_gains(TimeRange::TwoYears).await.unwrap();
1931
1932 assert!(result.success_count() > 0);
1934
1935 for gains in result.capital_gains.values() {
1937 for gain in gains {
1938 assert!(gain.timestamp > 0);
1939 assert!(gain.amount >= 0.0);
1940 }
1941 }
1942 }
1943
1944 #[tokio::test]
1945 #[ignore = "requires network access"]
1946 async fn test_tickers_financials() {
1947 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1948 let result = tickers
1949 .financials(StatementType::Income, Frequency::Annual)
1950 .await
1951 .unwrap();
1952
1953 assert!(result.success_count() > 0);
1954
1955 for (symbol, stmt) in &result.financials {
1957 assert_eq!(stmt.symbol, *symbol);
1958 assert_eq!(stmt.statement_type, "income");
1959 assert_eq!(stmt.frequency, "annual");
1960 assert!(!stmt.statement.is_empty());
1961
1962 if let Some(revenue) = stmt.statement.get("TotalRevenue") {
1964 assert!(!revenue.is_empty());
1965 }
1966 }
1967 }
1968
1969 #[tokio::test]
1970 #[ignore = "requires network access"]
1971 async fn test_tickers_news() {
1972 let tickers = Tickers::new(["AAPL", "TSLA"]).await.unwrap();
1973 let result = tickers.news().await.unwrap();
1974
1975 assert!(result.success_count() > 0);
1976
1977 for articles in result.news.values() {
1979 if !articles.is_empty() {
1980 let article = &articles[0];
1981 assert!(!article.title.is_empty());
1982 assert!(!article.link.is_empty());
1983 assert!(!article.source.is_empty());
1984 }
1985 }
1986 }
1987
1988 #[tokio::test]
1989 #[ignore = "requires network access"]
1990 async fn test_tickers_recommendations() {
1991 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1992 let result = tickers.recommendations(5).await.unwrap();
1993
1994 assert!(result.success_count() > 0);
1995
1996 for (symbol, rec) in &result.recommendations {
1998 assert_eq!(rec.symbol, *symbol);
1999 assert!(rec.count() > 0);
2000 for similar in &rec.recommendations {
2001 assert!(!similar.symbol.is_empty());
2002 }
2003 }
2004 }
2005
2006 #[tokio::test]
2007 #[ignore = "requires network access"]
2008 async fn test_tickers_options() {
2009 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
2010 let result = tickers.options(None).await.unwrap();
2011
2012 assert!(result.success_count() > 0);
2013
2014 for opts in result.options.values() {
2016 assert!(!opts.expiration_dates().is_empty());
2017 }
2018 }
2019
2020 #[tokio::test]
2021 #[ignore = "requires network access"]
2022 #[cfg(feature = "indicators")]
2023 async fn test_tickers_indicators() {
2024 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
2025 let result = tickers
2026 .indicators(Interval::OneDay, TimeRange::ThreeMonths)
2027 .await
2028 .unwrap();
2029
2030 assert!(result.success_count() > 0);
2031
2032 for ind in result.indicators.values() {
2034 assert!(ind.rsi_14.is_some() || ind.sma_20.is_some());
2036 }
2037 }
2038
2039 #[tokio::test]
2040 async fn test_tickers_add_symbols() {
2041 let mut tickers = Tickers::new(["AAPL"]).await.unwrap();
2042 assert_eq!(tickers.len(), 1);
2043 assert_eq!(tickers.symbols(), &["AAPL"]);
2044
2045 tickers.add_symbols(&["MSFT", "GOOGL"]);
2046 assert_eq!(tickers.len(), 3);
2047 assert!(tickers.symbols().contains(&"AAPL"));
2048 assert!(tickers.symbols().contains(&"MSFT"));
2049 assert!(tickers.symbols().contains(&"GOOGL"));
2050
2051 tickers.add_symbols(&["AAPL"]);
2053 assert_eq!(tickers.len(), 3);
2054 }
2055
2056 #[tokio::test]
2057 #[ignore = "requires network access"]
2058 async fn test_tickers_remove_symbols() {
2059 let mut tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
2060 assert_eq!(tickers.len(), 3);
2061
2062 let _ = tickers.quotes().await;
2064
2065 tickers.remove_symbols(&["MSFT"]).await;
2067 assert_eq!(tickers.len(), 2);
2068 assert!(tickers.symbols().contains(&"AAPL"));
2069 assert!(!tickers.symbols().contains(&"MSFT"));
2070 assert!(tickers.symbols().contains(&"GOOGL"));
2071
2072 let quotes = tickers.quotes().await.unwrap();
2074 assert!(!quotes.quotes.contains_key("MSFT"));
2075 assert_eq!(quotes.quotes.len(), 2);
2076 }
2077}