1use super::macros::{batch_fetch_cached, define_batch_response};
6use crate::client::{ClientConfig, YahooClient};
7use crate::constants::{Frequency, Interval, StatementType, TimeRange};
8use crate::error::{FinanceError, Result};
9use crate::models::chart::events::ChartEvents;
10use crate::models::chart::response::ChartResponse;
11use crate::models::chart::{CapitalGain, Chart, Dividend, Split};
12use crate::models::financials::FinancialStatement;
13use crate::models::news::News;
14use crate::models::options::Options;
15use crate::models::quote::Quote;
16use crate::models::recommendation::Recommendation;
17use crate::models::spark::Spark;
18use crate::models::spark::response::SparkResponse;
19use crate::utils::{CacheEntry, EVICTION_THRESHOLD, filter_by_range};
20use futures::stream::{self, StreamExt};
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::time::Duration;
24use tokio::sync::RwLock;
25
26type MapCache<K, V> = Arc<RwLock<HashMap<K, CacheEntry<V>>>>;
28type ChartCacheKey = (Arc<str>, Interval, TimeRange);
29type QuoteCache = MapCache<Arc<str>, Quote>;
30type ChartCache = MapCache<ChartCacheKey, Chart>;
31type EventsCache = MapCache<Arc<str>, ChartEvents>;
32type FinancialsCache = MapCache<(Arc<str>, StatementType, Frequency), FinancialStatement>;
33type NewsCache = MapCache<Arc<str>, Vec<News>>;
34type RecommendationsCache = MapCache<(Arc<str>, u32), Recommendation>;
35type OptionsCache = MapCache<(Arc<str>, Option<i64>), Options>;
36type SparkCacheKey = (Arc<str>, Interval, TimeRange);
37type SparkCache = MapCache<SparkCacheKey, Spark>;
38#[cfg(feature = "indicators")]
39type IndicatorsCache =
40 MapCache<(Arc<str>, Interval, TimeRange), crate::indicators::IndicatorsSummary>;
41
42type FetchGuard = Arc<tokio::sync::Mutex<()>>;
44type FetchGuardMap<K> = Arc<RwLock<HashMap<K, FetchGuard>>>;
45
46define_batch_response! {
48 BatchQuotesResponse => quotes: Quote
50}
51
52define_batch_response! {
53 BatchChartsResponse => charts: Chart
55}
56
57define_batch_response! {
58 BatchSparksResponse => sparks: Spark
63}
64
65define_batch_response! {
66 BatchDividendsResponse => dividends: Vec<Dividend>
68}
69
70define_batch_response! {
71 BatchSplitsResponse => splits: Vec<Split>
73}
74
75define_batch_response! {
76 BatchCapitalGainsResponse => capital_gains: Vec<CapitalGain>
78}
79
80define_batch_response! {
81 BatchFinancialsResponse => financials: FinancialStatement
83}
84
85define_batch_response! {
86 BatchNewsResponse => news: Vec<News>
88}
89
90define_batch_response! {
91 BatchRecommendationsResponse => recommendations: Recommendation
93}
94
95define_batch_response! {
96 BatchOptionsResponse => options: Options
98}
99
100#[cfg(feature = "indicators")]
101define_batch_response! {
102 BatchIndicatorsResponse => indicators: crate::indicators::IndicatorsSummary
104}
105
106const DEFAULT_MAX_CONCURRENCY: usize = 10;
108
109pub struct TickersBuilder {
111 symbols: Vec<Arc<str>>,
112 config: ClientConfig,
113 shared_client: Option<crate::ticker::ClientHandle>,
114 max_concurrency: usize,
115 cache_ttl: Option<Duration>,
116 include_logo: bool,
117}
118
119impl TickersBuilder {
120 fn new<S, I>(symbols: I) -> Self
121 where
122 S: Into<String>,
123 I: IntoIterator<Item = S>,
124 {
125 Self {
126 symbols: symbols.into_iter().map(|s| s.into().into()).collect(),
127 config: ClientConfig::default(),
128 shared_client: None,
129 max_concurrency: DEFAULT_MAX_CONCURRENCY,
130 cache_ttl: None,
131 include_logo: false,
132 }
133 }
134
135 pub fn region(mut self, region: crate::constants::Region) -> Self {
137 self.config.lang = region.lang().to_string();
138 self.config.region = region.region().to_string();
139 self
140 }
141
142 pub fn lang(mut self, lang: impl Into<String>) -> Self {
144 self.config.lang = lang.into();
145 self
146 }
147
148 pub fn region_code(mut self, region: impl Into<String>) -> Self {
150 self.config.region = region.into();
151 self
152 }
153
154 pub fn timeout(mut self, timeout: Duration) -> Self {
156 self.config.timeout = timeout;
157 self
158 }
159
160 pub fn proxy(mut self, proxy: impl Into<String>) -> Self {
162 self.config.proxy = Some(proxy.into());
163 self
164 }
165
166 pub fn config(mut self, config: ClientConfig) -> Self {
168 self.config = config;
169 self
170 }
171
172 pub fn max_concurrency(mut self, n: usize) -> Self {
181 self.max_concurrency = n.max(1);
182 self
183 }
184
185 pub fn client(mut self, handle: crate::ticker::ClientHandle) -> Self {
213 self.shared_client = Some(handle);
214 self
215 }
216
217 pub fn cache(mut self, ttl: Duration) -> Self {
238 self.cache_ttl = Some(ttl);
239 self
240 }
241
242 pub fn logo(mut self) -> Self {
247 self.include_logo = true;
248 self
249 }
250
251 pub async fn build(self) -> Result<Tickers> {
253 let client = match self.shared_client {
254 Some(handle) => handle.0,
255 None => Arc::new(YahooClient::new(self.config).await?),
256 };
257
258 Ok(Tickers {
259 symbols: self.symbols,
260 client,
261 max_concurrency: self.max_concurrency,
262 cache_ttl: self.cache_ttl,
263 include_logo: self.include_logo,
264 quote_cache: Default::default(),
265 chart_cache: Default::default(),
266 events_cache: Default::default(),
267 financials_cache: Default::default(),
268 news_cache: Default::default(),
269 recommendations_cache: Default::default(),
270 options_cache: Default::default(),
271 spark_cache: Default::default(),
272 #[cfg(feature = "indicators")]
273 indicators_cache: Default::default(),
274
275 quotes_fetch: Arc::new(tokio::sync::Mutex::new(())),
277 charts_fetch: Default::default(),
278 financials_fetch: Default::default(),
279 news_fetch: Arc::new(tokio::sync::Mutex::new(())),
280 recommendations_fetch: Default::default(),
281 options_fetch: Default::default(),
282 spark_fetch: Default::default(),
283 #[cfg(feature = "indicators")]
284 indicators_fetch: Default::default(),
285 })
286 }
287}
288
289pub struct Tickers {
320 symbols: Vec<Arc<str>>,
321 client: Arc<YahooClient>,
322 max_concurrency: usize,
323 cache_ttl: Option<Duration>,
324 include_logo: bool,
325 quote_cache: QuoteCache,
326 chart_cache: ChartCache,
327 events_cache: EventsCache,
328 financials_cache: FinancialsCache,
329 news_cache: NewsCache,
330 recommendations_cache: RecommendationsCache,
331 options_cache: OptionsCache,
332 spark_cache: SparkCache,
333 #[cfg(feature = "indicators")]
334 indicators_cache: IndicatorsCache,
335
336 quotes_fetch: FetchGuard,
338 charts_fetch: FetchGuardMap<(Interval, TimeRange)>,
339 financials_fetch: FetchGuardMap<(StatementType, Frequency)>,
340 news_fetch: FetchGuard,
341 recommendations_fetch: FetchGuardMap<u32>,
342 options_fetch: FetchGuardMap<Option<i64>>,
343 spark_fetch: FetchGuardMap<(Interval, TimeRange)>,
344 #[cfg(feature = "indicators")]
345 indicators_fetch: FetchGuardMap<(Interval, TimeRange)>,
346}
347
348impl std::fmt::Debug for Tickers {
349 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350 f.debug_struct("Tickers")
351 .field("symbols", &self.symbols)
352 .field("max_concurrency", &self.max_concurrency)
353 .field("cache_ttl", &self.cache_ttl)
354 .finish_non_exhaustive()
355 }
356}
357
358impl Tickers {
359 pub async fn new<S, I>(symbols: I) -> Result<Self>
376 where
377 S: Into<String>,
378 I: IntoIterator<Item = S>,
379 {
380 Self::builder(symbols).build().await
381 }
382
383 pub fn builder<S, I>(symbols: I) -> TickersBuilder
385 where
386 S: Into<String>,
387 I: IntoIterator<Item = S>,
388 {
389 TickersBuilder::new(symbols)
390 }
391
392 pub fn symbols(&self) -> Vec<&str> {
394 self.symbols.iter().map(|s| &**s).collect()
395 }
396
397 pub fn len(&self) -> usize {
399 self.symbols.len()
400 }
401
402 pub fn is_empty(&self) -> bool {
404 self.symbols.is_empty()
405 }
406
407 pub fn client_handle(&self) -> crate::ticker::ClientHandle {
412 crate::ticker::ClientHandle(Arc::clone(&self.client))
413 }
414
415 #[inline]
417 fn is_cache_fresh<T>(&self, entry: Option<&CacheEntry<T>>) -> bool {
418 CacheEntry::is_fresh_with_ttl(entry, self.cache_ttl)
419 }
420
421 fn all_cached<K: Eq + std::hash::Hash, V>(
423 &self,
424 map: &HashMap<K, CacheEntry<V>>,
425 keys: impl Iterator<Item = K>,
426 ) -> bool {
427 let Some(ttl) = self.cache_ttl else {
428 return false;
429 };
430 keys.into_iter()
431 .all(|k| map.get(&k).map(|e| e.is_fresh(ttl)).unwrap_or(false))
432 }
433
434 #[inline]
439 fn cache_insert<K: Eq + std::hash::Hash, V>(
440 &self,
441 map: &mut HashMap<K, CacheEntry<V>>,
442 key: K,
443 value: V,
444 ) {
445 if let Some(ttl) = self.cache_ttl {
446 if map.len() >= EVICTION_THRESHOLD {
447 map.retain(|_, entry| entry.is_fresh(ttl));
448 }
449 map.insert(key, CacheEntry::new(value));
450 }
451 }
452
453 pub async fn quotes(&self) -> Result<BatchQuotesResponse> {
461 {
463 let cache = self.quote_cache.read().await;
464 if self.all_cached(&cache, self.symbols.iter().cloned()) {
465 let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
466 for symbol in &self.symbols {
467 if let Some(entry) = cache.get(symbol) {
468 response
469 .quotes
470 .insert(symbol.to_string(), entry.value.clone());
471 }
472 }
473 return Ok(response);
474 }
475 }
476
477 let _fetch_guard = self.quotes_fetch.lock().await;
479
480 {
482 let cache = self.quote_cache.read().await;
483 if self.all_cached(&cache, self.symbols.iter().cloned()) {
484 let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
485 for symbol in &self.symbols {
486 if let Some(entry) = cache.get(symbol) {
487 response
488 .quotes
489 .insert(symbol.to_string(), entry.value.clone());
490 }
491 }
492 return Ok(response);
493 }
494 }
495
496 let symbols_ref: Vec<&str> = self.symbols.iter().map(|s| &**s).collect();
498
499 let (json, logos) = if self.include_logo {
502 let quote_future = crate::endpoints::quotes::fetch_with_fields(
503 &self.client,
504 &symbols_ref,
505 None, true, false, );
509 let logo_future = crate::endpoints::quotes::fetch_with_fields(
510 &self.client,
511 &symbols_ref,
512 Some(&["logoUrl", "companyLogoUrl"]), true,
514 true, );
516 let (quote_result, logo_result) = tokio::join!(quote_future, logo_future);
517 (quote_result?, logo_result.ok())
518 } else {
519 let json = crate::endpoints::quotes::fetch_with_fields(
520 &self.client,
521 &symbols_ref,
522 None,
523 true,
524 false,
525 )
526 .await?;
527 (json, None)
528 };
529
530 let logo_map: std::collections::HashMap<String, (Option<String>, Option<String>)> = logos
532 .and_then(|l| l.get("quoteResponse")?.get("result")?.as_array().cloned())
533 .map(|results| {
534 results
535 .iter()
536 .filter_map(|r| {
537 let symbol = r.get("symbol")?.as_str()?.to_string();
538 let logo_url = r
539 .get("logoUrl")
540 .and_then(|v| v.as_str())
541 .map(|s| s.to_string());
542 let company_logo_url = r
543 .get("companyLogoUrl")
544 .and_then(|v| v.as_str())
545 .map(|s| s.to_string());
546 Some((symbol, (logo_url, company_logo_url)))
547 })
548 .collect()
549 })
550 .unwrap_or_default();
551
552 let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
554
555 if let Some(quote_response) = json.get("quoteResponse") {
556 if let Some(results) = quote_response.get("result").and_then(|r| r.as_array()) {
557 let mut parsed_quotes: Vec<(String, Quote)> = Vec::new();
559
560 for result in results {
561 if let Some(symbol) = result.get("symbol").and_then(|s| s.as_str()) {
562 match Quote::from_batch_response(result) {
563 Ok(mut quote) => {
564 if let Some((logo_url, company_logo_url)) = logo_map.get(symbol) {
566 if quote.logo_url.is_none() {
567 quote.logo_url = logo_url.clone();
568 }
569 if quote.company_logo_url.is_none() {
570 quote.company_logo_url = company_logo_url.clone();
571 }
572 }
573 parsed_quotes.push((symbol.to_string(), quote));
574 }
575 Err(e) => {
576 response.errors.insert(symbol.to_string(), e.to_string());
577 }
578 }
579 }
580 }
581
582 if self.cache_ttl.is_some() {
584 let mut cache = self.quote_cache.write().await;
585 for (symbol, quote) in &parsed_quotes {
586 self.cache_insert(&mut cache, symbol.as_str().into(), quote.clone());
587 }
588 }
589
590 for (symbol, quote) in parsed_quotes {
592 response.quotes.insert(symbol, quote);
593 }
594 }
595
596 for symbol in &self.symbols {
598 let symbol_str = &**symbol;
599 if !response.quotes.contains_key(symbol_str)
600 && !response.errors.contains_key(symbol_str)
601 {
602 response.errors.insert(
603 symbol.to_string(),
604 "Symbol not found in response".to_string(),
605 );
606 }
607 }
608 }
609
610 Ok(response)
611 }
612
613 pub async fn quote(&self, symbol: &str) -> Result<Quote> {
615 {
616 let cache = self.quote_cache.read().await;
617 if let Some(entry) = cache.get(symbol)
618 && self.is_cache_fresh(Some(entry))
619 {
620 return Ok(entry.value.clone());
621 }
622 }
623
624 let response = self.quotes().await?;
625
626 response
627 .quotes
628 .get(symbol)
629 .cloned()
630 .ok_or_else(|| FinanceError::SymbolNotFound {
631 symbol: Some(symbol.to_string()),
632 context: response
633 .errors
634 .get(symbol)
635 .cloned()
636 .unwrap_or_else(|| "Symbol not found".to_string()),
637 })
638 }
639
640 async fn get_fetch_guard<K: Clone + Eq + std::hash::Hash>(
645 guard_map: &FetchGuardMap<K>,
646 key: K,
647 ) -> FetchGuard {
648 {
649 let guards = guard_map.read().await;
650 if let Some(guard) = guards.get(&key) {
651 return Arc::clone(guard);
652 }
653 }
654
655 let mut guards = guard_map.write().await;
656 Arc::clone(
657 guards
658 .entry(key)
659 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))),
660 )
661 }
662
663 pub async fn charts(
668 &self,
669 interval: Interval,
670 range: TimeRange,
671 ) -> Result<BatchChartsResponse> {
672 {
674 let cache = self.chart_cache.read().await;
675 if self.all_cached(
676 &cache,
677 self.symbols.iter().map(|s| (s.clone(), interval, range)),
678 ) {
679 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
680 for symbol in &self.symbols {
681 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
682 response
683 .charts
684 .insert(symbol.to_string(), entry.value.clone());
685 }
686 }
687 return Ok(response);
688 }
689 }
690
691 let fetch_guard = Self::get_fetch_guard(&self.charts_fetch, (interval, range)).await;
693 let _guard = fetch_guard.lock().await;
694
695 {
697 let cache = self.chart_cache.read().await;
698 if self.all_cached(
699 &cache,
700 self.symbols.iter().map(|s| (s.clone(), interval, range)),
701 ) {
702 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
703 for symbol in &self.symbols {
704 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
705 response
706 .charts
707 .insert(symbol.to_string(), entry.value.clone());
708 }
709 }
710 return Ok(response);
711 }
712 }
713
714 let futures: Vec<_> = self
716 .symbols
717 .iter()
718 .map(|symbol| {
719 let client = Arc::clone(&self.client);
720 let symbol = Arc::clone(symbol);
721 async move {
722 let result = client.get_chart(&symbol, interval, range).await;
723 (symbol, result)
724 }
725 })
726 .collect();
727
728 let results: Vec<_> = stream::iter(futures)
729 .buffer_unordered(self.max_concurrency)
730 .collect()
731 .await;
732
733 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
734
735 let mut parsed_charts: Vec<(Arc<str>, Chart)> = Vec::new();
737 let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
738
739 for (symbol, result) in results {
740 match result {
741 Ok(json) => match ChartResponse::from_json(json) {
742 Ok(chart_response) => {
743 if let Some(mut chart_results) = chart_response.chart.result {
744 if let Some(chart_result) = chart_results.pop() {
745 if let Some(events) = chart_result.events.clone() {
747 parsed_events.push((Arc::clone(&symbol), events));
748 }
749
750 let chart = Chart {
751 symbol: symbol.to_string(),
752 meta: chart_result.meta.clone(),
753 candles: chart_result.to_candles(),
754 interval: Some(interval),
755 range: Some(range),
756 };
757 parsed_charts.push((symbol, chart));
758 } else {
759 response
760 .errors
761 .insert(symbol.to_string(), "Empty chart response".to_string());
762 }
763 } else {
764 response.errors.insert(
765 symbol.to_string(),
766 "No chart data in response".to_string(),
767 );
768 }
769 }
770 Err(e) => {
771 response.errors.insert(symbol.to_string(), e.to_string());
772 }
773 },
774 Err(e) => {
775 response.errors.insert(symbol.to_string(), e.to_string());
776 }
777 }
778 }
779
780 if self.cache_ttl.is_some() {
782 let mut cache = self.chart_cache.write().await;
783
784 let cache_keys: Vec<_> = parsed_charts
786 .into_iter()
787 .map(|(symbol, chart)| {
788 self.cache_insert(&mut cache, (symbol.clone(), interval, range), chart);
789 symbol
790 })
791 .collect();
792
793 for symbol in cache_keys {
795 if let Some(cached) = cache.get(&(symbol.clone(), interval, range)) {
796 response
797 .charts
798 .insert(symbol.to_string(), cached.value.clone());
799 }
800 }
801 } else {
802 for (symbol, chart) in parsed_charts {
804 response.charts.insert(symbol.to_string(), chart);
805 }
806 }
807
808 if !parsed_events.is_empty() {
810 let mut events_cache = self.events_cache.write().await;
811 for (symbol, events) in parsed_events {
812 events_cache
813 .entry(symbol)
814 .or_insert_with(|| CacheEntry::new(events));
815 }
816 }
817
818 Ok(response)
819 }
820
821 pub async fn chart(&self, symbol: &str, interval: Interval, range: TimeRange) -> Result<Chart> {
823 {
824 let cache = self.chart_cache.read().await;
825 let key: Arc<str> = symbol.into();
826 if let Some(entry) = cache.get(&(key, interval, range))
827 && self.is_cache_fresh(Some(entry))
828 {
829 return Ok(entry.value.clone());
830 }
831 }
832
833 let response = self.charts(interval, range).await?;
834
835 response
836 .charts
837 .get(symbol)
838 .cloned()
839 .ok_or_else(|| FinanceError::SymbolNotFound {
840 symbol: Some(symbol.to_string()),
841 context: response
842 .errors
843 .get(symbol)
844 .cloned()
845 .unwrap_or_else(|| "Symbol not found".to_string()),
846 })
847 }
848
849 pub async fn charts_range(
861 &self,
862 interval: Interval,
863 start: i64,
864 end: i64,
865 ) -> Result<BatchChartsResponse> {
866 let futures: Vec<_> = self
867 .symbols
868 .iter()
869 .map(|symbol| {
870 let client = Arc::clone(&self.client);
871 let symbol = Arc::clone(symbol);
872 async move {
873 let result = client.get_chart_range(&symbol, interval, start, end).await;
874 (symbol, result)
875 }
876 })
877 .collect();
878
879 let results: Vec<_> = stream::iter(futures)
880 .buffer_unordered(self.max_concurrency)
881 .collect()
882 .await;
883
884 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
885 let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
886
887 for (symbol, result) in results {
888 match result {
889 Ok(json) => match ChartResponse::from_json(json) {
890 Ok(chart_response) => {
891 if let Some(mut chart_results) = chart_response.chart.result {
892 if let Some(chart_result) = chart_results.pop() {
893 if let Some(events) = chart_result.events.clone() {
895 parsed_events.push((Arc::clone(&symbol), events));
896 }
897
898 let chart = Chart {
899 symbol: symbol.to_string(),
900 meta: chart_result.meta.clone(),
901 candles: chart_result.to_candles(),
902 interval: Some(interval),
903 range: None,
904 };
905 response.charts.insert(symbol.to_string(), chart);
906 } else {
907 response
908 .errors
909 .insert(symbol.to_string(), "Empty chart response".to_string());
910 }
911 } else {
912 response.errors.insert(
913 symbol.to_string(),
914 "No chart data in response".to_string(),
915 );
916 }
917 }
918 Err(e) => {
919 response.errors.insert(symbol.to_string(), e.to_string());
920 }
921 },
922 Err(e) => {
923 response.errors.insert(symbol.to_string(), e.to_string());
924 }
925 }
926 }
927
928 if !parsed_events.is_empty() {
930 let mut events_cache = self.events_cache.write().await;
931 for (symbol, events) in parsed_events {
932 events_cache
933 .entry(symbol)
934 .or_insert_with(|| CacheEntry::new(events));
935 }
936 }
937
938 Ok(response)
939 }
940
941 async fn ensure_events_loaded(&self) -> Result<()> {
951 let symbols_to_fetch: Vec<Arc<str>> = {
953 let cache = self.events_cache.read().await;
954 self.symbols
955 .iter()
956 .filter(|sym| !cache.contains_key(*sym))
957 .cloned()
958 .collect()
959 };
960
961 if symbols_to_fetch.is_empty() {
962 return Ok(());
963 }
964
965 let futures: Vec<_> = symbols_to_fetch
967 .iter()
968 .map(|symbol| {
969 let client = Arc::clone(&self.client);
970 let symbol = Arc::clone(symbol);
971 async move {
972 let result = crate::endpoints::chart::fetch(
973 &client,
974 &symbol,
975 Interval::OneDay,
976 TimeRange::Max,
977 )
978 .await;
979 (symbol, result)
980 }
981 })
982 .collect();
983
984 let results: Vec<_> = stream::iter(futures)
985 .buffer_unordered(self.max_concurrency)
986 .collect()
987 .await;
988
989 let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
991
992 for (symbol, result) in results {
993 if let Ok(json) = result
994 && let Ok(chart_response) = ChartResponse::from_json(json)
995 && let Some(mut chart_results) = chart_response.chart.result
996 && let Some(chart_result) = chart_results.pop()
997 && let Some(events) = chart_result.events
998 {
999 parsed_events.push((symbol, events));
1000 }
1001 }
1002
1003 if !parsed_events.is_empty() {
1005 let mut events_cache = self.events_cache.write().await;
1006 for (symbol, events) in parsed_events {
1007 events_cache.insert(symbol, CacheEntry::new(events));
1008 }
1009 }
1010
1011 Ok(())
1012 }
1013
1014 pub async fn spark(&self, interval: Interval, range: TimeRange) -> Result<BatchSparksResponse> {
1043 {
1045 let cache = self.spark_cache.read().await;
1046 if self.all_cached(
1047 &cache,
1048 self.symbols.iter().map(|s| (s.clone(), interval, range)),
1049 ) {
1050 let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1051 for symbol in &self.symbols {
1052 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
1053 response
1054 .sparks
1055 .insert(symbol.to_string(), entry.value.clone());
1056 }
1057 }
1058 return Ok(response);
1059 }
1060 }
1061
1062 let fetch_guard = Self::get_fetch_guard(&self.spark_fetch, (interval, range)).await;
1064 let _guard = fetch_guard.lock().await;
1065
1066 {
1068 let cache = self.spark_cache.read().await;
1069 if self.all_cached(
1070 &cache,
1071 self.symbols.iter().map(|s| (s.clone(), interval, range)),
1072 ) {
1073 let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1074 for symbol in &self.symbols {
1075 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
1076 response
1077 .sparks
1078 .insert(symbol.to_string(), entry.value.clone());
1079 }
1080 }
1081 return Ok(response);
1082 }
1083 }
1084
1085 let symbols_ref: Vec<&str> = self.symbols.iter().map(|s| &**s).collect();
1087 let json =
1088 crate::endpoints::spark::fetch(&self.client, &symbols_ref, interval, range).await?;
1089
1090 let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1091
1092 match SparkResponse::from_json(json) {
1093 Ok(spark_response) => {
1094 let mut parsed_sparks: Vec<(Arc<str>, Spark)> = Vec::new();
1095
1096 if let Some(results) = spark_response.spark.result {
1097 for result in &results {
1098 if let Some(spark) = Spark::from_response(
1099 result,
1100 Some(interval.as_str().to_string()),
1101 Some(range.as_str().to_string()),
1102 ) {
1103 let sym: Arc<str> = result.symbol.as_str().into();
1104 parsed_sparks.push((sym, spark));
1105 } else {
1106 response.errors.insert(
1107 result.symbol.to_string(),
1108 "Failed to parse spark data".to_string(),
1109 );
1110 }
1111 }
1112 }
1113
1114 if self.cache_ttl.is_some() {
1116 let mut cache = self.spark_cache.write().await;
1117 for (symbol, spark) in &parsed_sparks {
1118 self.cache_insert(
1119 &mut cache,
1120 (symbol.clone(), interval, range),
1121 spark.clone(),
1122 );
1123 }
1124 }
1125
1126 for (symbol, spark) in parsed_sparks {
1128 response.sparks.insert(symbol.to_string(), spark);
1129 }
1130
1131 for symbol in &self.symbols {
1133 let symbol_str = &**symbol;
1134 if !response.sparks.contains_key(symbol_str)
1135 && !response.errors.contains_key(symbol_str)
1136 {
1137 response.errors.insert(
1138 symbol.to_string(),
1139 "Symbol not found in response".to_string(),
1140 );
1141 }
1142 }
1143 }
1144 Err(e) => {
1145 for symbol in &self.symbols {
1146 response.errors.insert(symbol.to_string(), e.to_string());
1147 }
1148 }
1149 }
1150
1151 Ok(response)
1152 }
1153
1154 pub async fn dividends(&self, range: TimeRange) -> Result<BatchDividendsResponse> {
1179 let mut response = BatchDividendsResponse::with_capacity(self.symbols.len());
1180
1181 self.ensure_events_loaded().await?;
1183
1184 let events_cache = self.events_cache.read().await;
1185
1186 for symbol in &self.symbols {
1187 if let Some(entry) = events_cache.get(symbol) {
1188 let all_dividends = entry.value.to_dividends();
1189 let filtered = filter_by_range(all_dividends, range);
1190 response.dividends.insert(symbol.to_string(), filtered);
1191 } else {
1192 response
1193 .errors
1194 .insert(symbol.to_string(), "No events data available".to_string());
1195 }
1196 }
1197
1198 Ok(response)
1199 }
1200
1201 pub async fn splits(&self, range: TimeRange) -> Result<BatchSplitsResponse> {
1228 let mut response = BatchSplitsResponse::with_capacity(self.symbols.len());
1229
1230 self.ensure_events_loaded().await?;
1232
1233 let events_cache = self.events_cache.read().await;
1234
1235 for symbol in &self.symbols {
1236 if let Some(entry) = events_cache.get(symbol) {
1237 let all_splits = entry.value.to_splits();
1238 let filtered = filter_by_range(all_splits, range);
1239 response.splits.insert(symbol.to_string(), filtered);
1240 } else {
1241 response
1242 .errors
1243 .insert(symbol.to_string(), "No events data available".to_string());
1244 }
1245 }
1246
1247 Ok(response)
1248 }
1249
1250 pub async fn capital_gains(&self, range: TimeRange) -> Result<BatchCapitalGainsResponse> {
1276 let mut response = BatchCapitalGainsResponse::with_capacity(self.symbols.len());
1277
1278 self.ensure_events_loaded().await?;
1280
1281 let events_cache = self.events_cache.read().await;
1282
1283 for symbol in &self.symbols {
1284 if let Some(entry) = events_cache.get(symbol) {
1285 let all_gains = entry.value.to_capital_gains();
1286 let filtered = filter_by_range(all_gains, range);
1287 response.capital_gains.insert(symbol.to_string(), filtered);
1288 } else {
1289 response
1290 .errors
1291 .insert(symbol.to_string(), "No events data available".to_string());
1292 }
1293 }
1294
1295 Ok(response)
1296 }
1297
1298 pub async fn financials(
1326 &self,
1327 statement_type: StatementType,
1328 frequency: Frequency,
1329 ) -> Result<BatchFinancialsResponse> {
1330 batch_fetch_cached!(self;
1331 cache: financials_cache,
1332 guard: map(financials_fetch, (statement_type, frequency)),
1333 key: |s| (s.clone(), statement_type, frequency),
1334 response: BatchFinancialsResponse.financials,
1335 fetch: |client, symbol| client.get_financials(&symbol, statement_type, frequency).await,
1336 )
1337 }
1338
1339 pub async fn news(&self) -> Result<BatchNewsResponse> {
1363 batch_fetch_cached!(self;
1364 cache: news_cache,
1365 guard: simple(news_fetch),
1366 key: |s| s.clone(),
1367 response: BatchNewsResponse.news,
1368 fetch: |_client, symbol| crate::scrapers::stockanalysis::scrape_symbol_news(&symbol).await,
1369 )
1370 }
1371
1372 pub async fn recommendations(&self, limit: u32) -> Result<BatchRecommendationsResponse> {
1401 batch_fetch_cached!(self;
1402 cache: recommendations_cache,
1403 guard: map(recommendations_fetch, limit),
1404 key: |s| (s.clone(), limit),
1405 response: BatchRecommendationsResponse.recommendations,
1406 fetch: |client, symbol| {
1407 let json = client.get_recommendations(&symbol, limit).await?;
1408 let rec_response =
1409 crate::models::recommendation::response::RecommendationResponse::from_json(json)?;
1410 Ok(Recommendation {
1411 symbol: symbol.to_string(),
1412 recommendations: rec_response
1413 .finance
1414 .result
1415 .iter()
1416 .flat_map(|r| &r.recommended_symbols)
1417 .cloned()
1418 .collect(),
1419 })
1420 },
1421 )
1422 }
1423
1424 pub async fn options(&self, date: Option<i64>) -> Result<BatchOptionsResponse> {
1449 batch_fetch_cached!(self;
1450 cache: options_cache,
1451 guard: map(options_fetch, date),
1452 key: |s| (s.clone(), date),
1453 response: BatchOptionsResponse.options,
1454 fetch: |client, symbol| {
1455 let json = client.get_options(&symbol, date).await?;
1456 Ok(serde_json::from_value::<Options>(json)?)
1457 },
1458 )
1459 }
1460
1461 #[cfg(feature = "indicators")]
1487 pub async fn indicators(
1488 &self,
1489 interval: Interval,
1490 range: TimeRange,
1491 ) -> Result<BatchIndicatorsResponse> {
1492 let cache_key_for = |symbol: &Arc<str>| (symbol.clone(), interval, range);
1493
1494 {
1496 let cache = self.indicators_cache.read().await;
1497 if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
1498 let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1499 for symbol in &self.symbols {
1500 if let Some(entry) = cache.get(&cache_key_for(symbol)) {
1501 response
1502 .indicators
1503 .insert(symbol.to_string(), entry.value.clone());
1504 }
1505 }
1506 return Ok(response);
1507 }
1508 }
1509
1510 let fetch_guard = Self::get_fetch_guard(&self.indicators_fetch, (interval, range)).await;
1512 let _guard = fetch_guard.lock().await;
1513
1514 {
1516 let cache = self.indicators_cache.read().await;
1517 if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
1518 let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1519 for symbol in &self.symbols {
1520 if let Some(entry) = cache.get(&cache_key_for(symbol)) {
1521 response
1522 .indicators
1523 .insert(symbol.to_string(), entry.value.clone());
1524 }
1525 }
1526 return Ok(response);
1527 }
1528 }
1529
1530 let charts_response = self.charts(interval, range).await?;
1532
1533 let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1534
1535 let mut calculated_indicators: Vec<(String, crate::indicators::IndicatorsSummary)> =
1537 Vec::new();
1538
1539 for (symbol, chart) in &charts_response.charts {
1540 let indicators = crate::indicators::summary::calculate_indicators(&chart.candles);
1541 calculated_indicators.push((symbol.to_string(), indicators));
1542 }
1543
1544 if self.cache_ttl.is_some() {
1546 let mut cache = self.indicators_cache.write().await;
1547 for (symbol, indicators) in &calculated_indicators {
1548 let key: Arc<str> = symbol.as_str().into();
1549 self.cache_insert(&mut cache, cache_key_for(&key), indicators.clone());
1550 }
1551 }
1552
1553 for (symbol, indicators) in calculated_indicators {
1555 response.indicators.insert(symbol, indicators);
1556 }
1557
1558 for (symbol, error) in &charts_response.errors {
1560 response.errors.insert(symbol.to_string(), error.clone());
1561 }
1562
1563 Ok(response)
1564 }
1565
1566 pub fn add_symbols(&mut self, symbols: &[impl AsRef<str>]) {
1587 use std::collections::HashSet;
1589
1590 let existing: HashSet<&str> = self.symbols.iter().map(|s| &**s).collect();
1591 let to_add: Vec<Arc<str>> = symbols
1592 .iter()
1593 .map(|s| s.as_ref())
1594 .filter(|s| !existing.contains(s))
1595 .map(|s| s.into())
1596 .collect();
1597
1598 self.symbols.extend(to_add);
1599 }
1600
1601 #[cfg(feature = "backtesting")]
1640 pub async fn backtest<S, F>(
1641 &self,
1642 interval: Interval,
1643 range: TimeRange,
1644 config: Option<crate::backtesting::portfolio::PortfolioConfig>,
1645 factory: F,
1646 ) -> crate::backtesting::Result<crate::backtesting::portfolio::PortfolioResult>
1647 where
1648 S: crate::backtesting::Strategy,
1649 F: Fn(&str) -> S,
1650 {
1651 use crate::backtesting::portfolio::{PortfolioEngine, SymbolData};
1652
1653 let config = config.unwrap_or_default();
1654 config.validate(self.symbols.len())?;
1655
1656 let charts = self
1658 .charts(interval, range)
1659 .await
1660 .map_err(|e| crate::backtesting::BacktestError::ChartError(e.to_string()))?;
1661
1662 let dividends_map = self
1665 .dividends(range)
1666 .await
1667 .map(|b| b.dividends)
1668 .unwrap_or_default();
1669
1670 let symbol_data: Vec<SymbolData> = self
1672 .symbols
1673 .iter()
1674 .filter_map(|sym| {
1675 charts.charts.get(sym.as_ref()).map(|chart| {
1676 let divs = dividends_map.get(sym.as_ref()).cloned().unwrap_or_default();
1677 SymbolData::new(sym.as_ref(), chart.candles.clone()).with_dividends(divs)
1678 })
1679 })
1680 .collect();
1681
1682 let engine = PortfolioEngine::new(config);
1683 engine.run(&symbol_data, factory)
1684 }
1685
1686 pub async fn remove_symbols(&mut self, symbols: &[impl AsRef<str>]) {
1703 use std::collections::HashSet;
1704 let to_remove: HashSet<&str> = symbols.iter().map(|s| s.as_ref()).collect();
1705
1706 self.symbols.retain(|s| !to_remove.contains(&**s));
1708
1709 let (
1711 mut quote_cache,
1712 mut chart_cache,
1713 mut events_cache,
1714 mut financials_cache,
1715 mut news_cache,
1716 mut recommendations_cache,
1717 mut options_cache,
1718 mut spark_cache,
1719 ) = tokio::join!(
1720 self.quote_cache.write(),
1721 self.chart_cache.write(),
1722 self.events_cache.write(),
1723 self.financials_cache.write(),
1724 self.news_cache.write(),
1725 self.recommendations_cache.write(),
1726 self.options_cache.write(),
1727 self.spark_cache.write(),
1728 );
1729
1730 for symbol in &to_remove {
1732 let key: Arc<str> = (*symbol).into();
1733 quote_cache.remove(&key);
1734 events_cache.remove(&key);
1735 news_cache.remove(&key);
1736 }
1737
1738 chart_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1740 financials_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1741 recommendations_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
1742 options_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
1743 spark_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1744
1745 drop((
1747 quote_cache,
1748 chart_cache,
1749 events_cache,
1750 financials_cache,
1751 news_cache,
1752 recommendations_cache,
1753 options_cache,
1754 spark_cache,
1755 ));
1756
1757 #[cfg(feature = "indicators")]
1758 self.indicators_cache
1759 .write()
1760 .await
1761 .retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1762 }
1763
1764 pub async fn clear_cache(&self) {
1769 tokio::join!(
1770 async { self.quote_cache.write().await.clear() },
1772 async { self.chart_cache.write().await.clear() },
1773 async { self.events_cache.write().await.clear() },
1774 async { self.financials_cache.write().await.clear() },
1775 async { self.news_cache.write().await.clear() },
1776 async { self.recommendations_cache.write().await.clear() },
1777 async { self.options_cache.write().await.clear() },
1778 async { self.spark_cache.write().await.clear() },
1779 async {
1780 #[cfg(feature = "indicators")]
1781 self.indicators_cache.write().await.clear();
1782 },
1783 async { self.charts_fetch.write().await.clear() },
1785 async { self.financials_fetch.write().await.clear() },
1786 async { self.recommendations_fetch.write().await.clear() },
1787 async { self.options_fetch.write().await.clear() },
1788 async { self.spark_fetch.write().await.clear() },
1789 async {
1790 #[cfg(feature = "indicators")]
1791 self.indicators_fetch.write().await.clear();
1792 },
1793 );
1794 }
1795
1796 pub async fn clear_quote_cache(&self) {
1800 self.quote_cache.write().await.clear();
1801 }
1802
1803 pub async fn clear_chart_cache(&self) {
1808 tokio::join!(
1809 async { self.chart_cache.write().await.clear() },
1810 async { self.events_cache.write().await.clear() },
1811 async { self.spark_cache.write().await.clear() },
1812 async {
1813 #[cfg(feature = "indicators")]
1814 self.indicators_cache.write().await.clear();
1815 },
1816 );
1817 }
1818}
1819
1820#[cfg(test)]
1821mod tests {
1822 use super::*;
1823
1824 #[tokio::test]
1825 #[ignore] async fn test_tickers_quotes() {
1827 let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1828 let result = tickers.quotes().await.unwrap();
1829
1830 assert!(result.success_count() > 0);
1831 }
1832
1833 #[tokio::test]
1834 #[ignore] async fn test_tickers_charts() {
1836 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1837 let result = tickers
1838 .charts(Interval::OneDay, TimeRange::FiveDays)
1839 .await
1840 .unwrap();
1841
1842 assert!(result.success_count() > 0);
1843 }
1844
1845 #[tokio::test]
1846 #[ignore = "requires network access"]
1847 async fn test_tickers_spark() {
1848 let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1849 let result = tickers
1850 .spark(Interval::FiveMinutes, TimeRange::OneDay)
1851 .await
1852 .unwrap();
1853
1854 assert!(result.success_count() > 0);
1855
1856 if let Some(spark) = result.sparks.get("AAPL") {
1858 assert!(!spark.closes.is_empty());
1859 assert_eq!(spark.symbol, "AAPL");
1860 assert!(spark.percent_change().is_some());
1862 }
1863 }
1864
1865 #[tokio::test]
1866 #[ignore = "requires network access"]
1867 async fn test_tickers_dividends() {
1868 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1869 let result = tickers.dividends(TimeRange::OneYear).await.unwrap();
1870
1871 assert!(result.success_count() > 0);
1872
1873 if let Some(dividends) = result.dividends.get("AAPL")
1875 && !dividends.is_empty()
1876 {
1877 let div = ÷nds[0];
1878 assert!(div.timestamp > 0);
1879 assert!(div.amount > 0.0);
1880 }
1881 }
1882
1883 #[tokio::test]
1884 #[ignore = "requires network access"]
1885 async fn test_tickers_splits() {
1886 let tickers = Tickers::new(["NVDA", "TSLA"]).await.unwrap();
1887 let result = tickers.splits(TimeRange::FiveYears).await.unwrap();
1888
1889 assert!(result.success_count() > 0);
1891
1892 for splits in result.splits.values() {
1894 for split in splits {
1895 assert!(split.timestamp > 0);
1896 assert!(split.numerator > 0.0);
1897 assert!(split.denominator > 0.0);
1898 assert!(!split.ratio.is_empty());
1899 }
1900 }
1901 }
1902
1903 #[tokio::test]
1904 #[ignore = "requires network access"]
1905 async fn test_tickers_capital_gains() {
1906 let tickers = Tickers::new(["VFIAX", "VTI"]).await.unwrap();
1907 let result = tickers.capital_gains(TimeRange::TwoYears).await.unwrap();
1908
1909 assert!(result.success_count() > 0);
1911
1912 for gains in result.capital_gains.values() {
1914 for gain in gains {
1915 assert!(gain.timestamp > 0);
1916 assert!(gain.amount >= 0.0);
1917 }
1918 }
1919 }
1920
1921 #[tokio::test]
1922 #[ignore = "requires network access"]
1923 async fn test_tickers_financials() {
1924 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1925 let result = tickers
1926 .financials(StatementType::Income, Frequency::Annual)
1927 .await
1928 .unwrap();
1929
1930 assert!(result.success_count() > 0);
1931
1932 for (symbol, stmt) in &result.financials {
1934 assert_eq!(stmt.symbol, *symbol);
1935 assert_eq!(stmt.statement_type, "income");
1936 assert_eq!(stmt.frequency, "annual");
1937 assert!(!stmt.statement.is_empty());
1938
1939 if let Some(revenue) = stmt.statement.get("TotalRevenue") {
1941 assert!(!revenue.is_empty());
1942 }
1943 }
1944 }
1945
1946 #[tokio::test]
1947 #[ignore = "requires network access"]
1948 async fn test_tickers_news() {
1949 let tickers = Tickers::new(["AAPL", "TSLA"]).await.unwrap();
1950 let result = tickers.news().await.unwrap();
1951
1952 assert!(result.success_count() > 0);
1953
1954 for articles in result.news.values() {
1956 if !articles.is_empty() {
1957 let article = &articles[0];
1958 assert!(!article.title.is_empty());
1959 assert!(!article.link.is_empty());
1960 assert!(!article.source.is_empty());
1961 }
1962 }
1963 }
1964
1965 #[tokio::test]
1966 #[ignore = "requires network access"]
1967 async fn test_tickers_recommendations() {
1968 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1969 let result = tickers.recommendations(5).await.unwrap();
1970
1971 assert!(result.success_count() > 0);
1972
1973 for (symbol, rec) in &result.recommendations {
1975 assert_eq!(rec.symbol, *symbol);
1976 assert!(rec.count() > 0);
1977 for similar in &rec.recommendations {
1978 assert!(!similar.symbol.is_empty());
1979 }
1980 }
1981 }
1982
1983 #[tokio::test]
1984 #[ignore = "requires network access"]
1985 async fn test_tickers_options() {
1986 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1987 let result = tickers.options(None).await.unwrap();
1988
1989 assert!(result.success_count() > 0);
1990
1991 for opts in result.options.values() {
1993 assert!(!opts.expiration_dates().is_empty());
1994 }
1995 }
1996
1997 #[tokio::test]
1998 #[ignore = "requires network access"]
1999 #[cfg(feature = "indicators")]
2000 async fn test_tickers_indicators() {
2001 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
2002 let result = tickers
2003 .indicators(Interval::OneDay, TimeRange::ThreeMonths)
2004 .await
2005 .unwrap();
2006
2007 assert!(result.success_count() > 0);
2008
2009 for ind in result.indicators.values() {
2011 assert!(ind.rsi_14.is_some() || ind.sma_20.is_some());
2013 }
2014 }
2015
2016 #[tokio::test]
2017 async fn test_tickers_add_symbols() {
2018 let mut tickers = Tickers::new(["AAPL"]).await.unwrap();
2019 assert_eq!(tickers.len(), 1);
2020 assert_eq!(tickers.symbols(), &["AAPL"]);
2021
2022 tickers.add_symbols(&["MSFT", "GOOGL"]);
2023 assert_eq!(tickers.len(), 3);
2024 assert!(tickers.symbols().contains(&"AAPL"));
2025 assert!(tickers.symbols().contains(&"MSFT"));
2026 assert!(tickers.symbols().contains(&"GOOGL"));
2027
2028 tickers.add_symbols(&["AAPL"]);
2030 assert_eq!(tickers.len(), 3);
2031 }
2032
2033 #[tokio::test]
2034 #[ignore = "requires network access"]
2035 async fn test_tickers_remove_symbols() {
2036 let mut tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
2037 assert_eq!(tickers.len(), 3);
2038
2039 let _ = tickers.quotes().await;
2041
2042 tickers.remove_symbols(&["MSFT"]).await;
2044 assert_eq!(tickers.len(), 2);
2045 assert!(tickers.symbols().contains(&"AAPL"));
2046 assert!(!tickers.symbols().contains(&"MSFT"));
2047 assert!(tickers.symbols().contains(&"GOOGL"));
2048
2049 let quotes = tickers.quotes().await.unwrap();
2051 assert!(!quotes.quotes.contains_key("MSFT"));
2052 assert_eq!(quotes.quotes.len(), 2);
2053 }
2054}