1use super::macros::{batch_fetch_cached, define_batch_response};
6use crate::client::{ClientConfig, YahooClient};
7use crate::constants::{Frequency, Interval, StatementType, TimeRange};
8use crate::error::{FinanceError, Result};
9use crate::models::chart::events::ChartEvents;
10use crate::models::chart::response::ChartResponse;
11use crate::models::chart::{CapitalGain, Chart, Dividend, Split};
12use crate::models::financials::FinancialStatement;
13use crate::models::news::News;
14use crate::models::options::Options;
15use crate::models::quote::Quote;
16use crate::models::recommendation::Recommendation;
17use crate::models::spark::Spark;
18use crate::models::spark::response::SparkResponse;
19use crate::utils::{CacheEntry, EVICTION_THRESHOLD, filter_by_range};
20use futures::stream::{self, StreamExt};
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::time::Duration;
24use tokio::sync::RwLock;
25
26type MapCache<K, V> = Arc<RwLock<HashMap<K, CacheEntry<V>>>>;
28type ChartCacheKey = (Arc<str>, Interval, TimeRange);
29type QuoteCache = MapCache<Arc<str>, Quote>;
30type ChartCache = MapCache<ChartCacheKey, Chart>;
31type EventsCache = MapCache<Arc<str>, ChartEvents>;
32type FinancialsCache = MapCache<(Arc<str>, StatementType, Frequency), FinancialStatement>;
33type NewsCache = MapCache<Arc<str>, Vec<News>>;
34type RecommendationsCache = MapCache<(Arc<str>, u32), Recommendation>;
35type OptionsCache = MapCache<(Arc<str>, Option<i64>), Options>;
36type SparkCacheKey = (Arc<str>, Interval, TimeRange);
37type SparkCache = MapCache<SparkCacheKey, Spark>;
38#[cfg(feature = "indicators")]
39type IndicatorsCache =
40 MapCache<(Arc<str>, Interval, TimeRange), crate::indicators::IndicatorsSummary>;
41
42type FetchGuard = Arc<tokio::sync::Mutex<()>>;
44type FetchGuardMap<K> = Arc<RwLock<HashMap<K, FetchGuard>>>;
45
46define_batch_response! {
48 BatchQuotesResponse => quotes: Quote
50}
51
52define_batch_response! {
53 BatchChartsResponse => charts: Chart
55}
56
57define_batch_response! {
58 BatchSparksResponse => sparks: Spark
63}
64
65define_batch_response! {
66 BatchDividendsResponse => dividends: Vec<Dividend>
68}
69
70define_batch_response! {
71 BatchSplitsResponse => splits: Vec<Split>
73}
74
75define_batch_response! {
76 BatchCapitalGainsResponse => capital_gains: Vec<CapitalGain>
78}
79
80define_batch_response! {
81 BatchFinancialsResponse => financials: FinancialStatement
83}
84
85define_batch_response! {
86 BatchNewsResponse => news: Vec<News>
88}
89
90define_batch_response! {
91 BatchRecommendationsResponse => recommendations: Recommendation
93}
94
95define_batch_response! {
96 BatchOptionsResponse => options: Options
98}
99
100#[cfg(feature = "indicators")]
101define_batch_response! {
102 BatchIndicatorsResponse => indicators: crate::indicators::IndicatorsSummary
104}
105
106const DEFAULT_MAX_CONCURRENCY: usize = 10;
108
109pub struct TickersBuilder {
111 symbols: Vec<Arc<str>>,
112 config: ClientConfig,
113 shared_client: Option<crate::ticker::ClientHandle>,
114 max_concurrency: usize,
115 cache_ttl: Option<Duration>,
116 include_logo: bool,
117}
118
119impl TickersBuilder {
120 fn new<S, I>(symbols: I) -> Self
121 where
122 S: Into<String>,
123 I: IntoIterator<Item = S>,
124 {
125 Self {
126 symbols: symbols.into_iter().map(|s| s.into().into()).collect(),
127 config: ClientConfig::default(),
128 shared_client: None,
129 max_concurrency: DEFAULT_MAX_CONCURRENCY,
130 cache_ttl: None,
131 include_logo: false,
132 }
133 }
134
135 pub fn region(mut self, region: crate::constants::Region) -> Self {
137 self.config.lang = region.lang().to_string();
138 self.config.region = region.region().to_string();
139 self
140 }
141
142 pub fn lang(mut self, lang: impl Into<String>) -> Self {
144 self.config.lang = lang.into();
145 self
146 }
147
148 pub fn region_code(mut self, region: impl Into<String>) -> Self {
150 self.config.region = region.into();
151 self
152 }
153
154 pub fn timeout(mut self, timeout: Duration) -> Self {
156 self.config.timeout = timeout;
157 self
158 }
159
160 pub fn proxy(mut self, proxy: impl Into<String>) -> Self {
162 self.config.proxy = Some(proxy.into());
163 self
164 }
165
166 pub fn config(mut self, config: ClientConfig) -> Self {
168 self.config = config;
169 self
170 }
171
172 pub fn max_concurrency(mut self, n: usize) -> Self {
181 self.max_concurrency = n.max(1);
182 self
183 }
184
185 pub fn client(mut self, handle: crate::ticker::ClientHandle) -> Self {
213 self.shared_client = Some(handle);
214 self
215 }
216
217 pub fn cache(mut self, ttl: Duration) -> Self {
238 self.cache_ttl = Some(ttl);
239 self
240 }
241
242 pub fn logo(mut self) -> Self {
247 self.include_logo = true;
248 self
249 }
250
251 pub async fn build(self) -> Result<Tickers> {
253 let client = match self.shared_client {
254 Some(handle) => handle.0,
255 None => Arc::new(YahooClient::new(self.config).await?),
256 };
257
258 Ok(Tickers {
259 symbols: self.symbols,
260 client,
261 max_concurrency: self.max_concurrency,
262 cache_ttl: self.cache_ttl,
263 include_logo: self.include_logo,
264 quote_cache: Default::default(),
265 chart_cache: Default::default(),
266 events_cache: Default::default(),
267 financials_cache: Default::default(),
268 news_cache: Default::default(),
269 recommendations_cache: Default::default(),
270 options_cache: Default::default(),
271 spark_cache: Default::default(),
272 #[cfg(feature = "indicators")]
273 indicators_cache: Default::default(),
274
275 quotes_fetch: Arc::new(tokio::sync::Mutex::new(())),
277 charts_fetch: Default::default(),
278 financials_fetch: Default::default(),
279 news_fetch: Arc::new(tokio::sync::Mutex::new(())),
280 recommendations_fetch: Default::default(),
281 options_fetch: Default::default(),
282 spark_fetch: Default::default(),
283 #[cfg(feature = "indicators")]
284 indicators_fetch: Default::default(),
285 })
286 }
287}
288
289pub struct Tickers {
320 symbols: Vec<Arc<str>>,
321 client: Arc<YahooClient>,
322 max_concurrency: usize,
323 cache_ttl: Option<Duration>,
324 include_logo: bool,
325 quote_cache: QuoteCache,
326 chart_cache: ChartCache,
327 events_cache: EventsCache,
328 financials_cache: FinancialsCache,
329 news_cache: NewsCache,
330 recommendations_cache: RecommendationsCache,
331 options_cache: OptionsCache,
332 spark_cache: SparkCache,
333 #[cfg(feature = "indicators")]
334 indicators_cache: IndicatorsCache,
335
336 quotes_fetch: FetchGuard,
338 charts_fetch: FetchGuardMap<(Interval, TimeRange)>,
339 financials_fetch: FetchGuardMap<(StatementType, Frequency)>,
340 news_fetch: FetchGuard,
341 recommendations_fetch: FetchGuardMap<u32>,
342 options_fetch: FetchGuardMap<Option<i64>>,
343 spark_fetch: FetchGuardMap<(Interval, TimeRange)>,
344 #[cfg(feature = "indicators")]
345 indicators_fetch: FetchGuardMap<(Interval, TimeRange)>,
346}
347
348impl std::fmt::Debug for Tickers {
349 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350 f.debug_struct("Tickers")
351 .field("symbols", &self.symbols)
352 .field("max_concurrency", &self.max_concurrency)
353 .field("cache_ttl", &self.cache_ttl)
354 .finish_non_exhaustive()
355 }
356}
357
358impl Tickers {
359 pub async fn new<S, I>(symbols: I) -> Result<Self>
376 where
377 S: Into<String>,
378 I: IntoIterator<Item = S>,
379 {
380 Self::builder(symbols).build().await
381 }
382
383 pub fn builder<S, I>(symbols: I) -> TickersBuilder
385 where
386 S: Into<String>,
387 I: IntoIterator<Item = S>,
388 {
389 TickersBuilder::new(symbols)
390 }
391
392 pub fn symbols(&self) -> Vec<&str> {
394 self.symbols.iter().map(|s| &**s).collect()
395 }
396
397 pub fn len(&self) -> usize {
399 self.symbols.len()
400 }
401
402 pub fn is_empty(&self) -> bool {
404 self.symbols.is_empty()
405 }
406
407 pub fn client_handle(&self) -> crate::ticker::ClientHandle {
412 crate::ticker::ClientHandle(Arc::clone(&self.client))
413 }
414
415 #[inline]
417 fn is_cache_fresh<T>(&self, entry: Option<&CacheEntry<T>>) -> bool {
418 CacheEntry::is_fresh_with_ttl(entry, self.cache_ttl)
419 }
420
421 fn all_cached<K: Eq + std::hash::Hash, V>(
423 &self,
424 map: &HashMap<K, CacheEntry<V>>,
425 keys: impl Iterator<Item = K>,
426 ) -> bool {
427 let Some(ttl) = self.cache_ttl else {
428 return false;
429 };
430 keys.into_iter()
431 .all(|k| map.get(&k).map(|e| e.is_fresh(ttl)).unwrap_or(false))
432 }
433
434 #[inline]
439 fn cache_insert<K: Eq + std::hash::Hash, V>(
440 &self,
441 map: &mut HashMap<K, CacheEntry<V>>,
442 key: K,
443 value: V,
444 ) {
445 if let Some(ttl) = self.cache_ttl {
446 if map.len() >= EVICTION_THRESHOLD {
447 map.retain(|_, entry| entry.is_fresh(ttl));
448 }
449 map.insert(key, CacheEntry::new(value));
450 }
451 }
452
453 pub async fn quotes(&self) -> Result<BatchQuotesResponse> {
461 {
463 let cache = self.quote_cache.read().await;
464 if self.all_cached(&cache, self.symbols.iter().cloned()) {
465 let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
466 for symbol in &self.symbols {
467 if let Some(entry) = cache.get(symbol) {
468 response
469 .quotes
470 .insert(symbol.to_string(), entry.value.clone());
471 }
472 }
473 return Ok(response);
474 }
475 }
476
477 let _fetch_guard = self.quotes_fetch.lock().await;
479
480 {
482 let cache = self.quote_cache.read().await;
483 if self.all_cached(&cache, self.symbols.iter().cloned()) {
484 let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
485 for symbol in &self.symbols {
486 if let Some(entry) = cache.get(symbol) {
487 response
488 .quotes
489 .insert(symbol.to_string(), entry.value.clone());
490 }
491 }
492 return Ok(response);
493 }
494 }
495
496 let symbols_ref: Vec<&str> = self.symbols.iter().map(|s| &**s).collect();
498
499 let (json, logos) = if self.include_logo {
502 let quote_future = crate::endpoints::quotes::fetch_with_fields(
503 &self.client,
504 &symbols_ref,
505 None, true, false, );
509 let logo_future = crate::endpoints::quotes::fetch_with_fields(
510 &self.client,
511 &symbols_ref,
512 Some(&["logoUrl", "companyLogoUrl"]), true,
514 true, );
516 let (quote_result, logo_result) = tokio::join!(quote_future, logo_future);
517 (quote_result?, logo_result.ok())
518 } else {
519 let json = crate::endpoints::quotes::fetch_with_fields(
520 &self.client,
521 &symbols_ref,
522 None,
523 true,
524 false,
525 )
526 .await?;
527 (json, None)
528 };
529
530 let logo_map: std::collections::HashMap<String, (Option<String>, Option<String>)> = logos
532 .and_then(|l| l.get("quoteResponse")?.get("result")?.as_array().cloned())
533 .map(|results| {
534 results
535 .iter()
536 .filter_map(|r| {
537 let symbol = r.get("symbol")?.as_str()?.to_string();
538 let logo_url = r
539 .get("logoUrl")
540 .and_then(|v| v.as_str())
541 .map(|s| s.to_string());
542 let company_logo_url = r
543 .get("companyLogoUrl")
544 .and_then(|v| v.as_str())
545 .map(|s| s.to_string());
546 Some((symbol, (logo_url, company_logo_url)))
547 })
548 .collect()
549 })
550 .unwrap_or_default();
551
552 let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
554
555 if let Some(quote_response) = json.get("quoteResponse") {
556 if let Some(results) = quote_response.get("result").and_then(|r| r.as_array()) {
557 let mut parsed_quotes: Vec<(String, Quote)> = Vec::new();
559
560 for result in results {
561 if let Some(symbol) = result.get("symbol").and_then(|s| s.as_str()) {
562 match Quote::from_batch_response(result) {
563 Ok(mut quote) => {
564 if let Some((logo_url, company_logo_url)) = logo_map.get(symbol) {
566 if quote.logo_url.is_none() {
567 quote.logo_url = logo_url.clone();
568 }
569 if quote.company_logo_url.is_none() {
570 quote.company_logo_url = company_logo_url.clone();
571 }
572 }
573 parsed_quotes.push((symbol.to_string(), quote));
574 }
575 Err(e) => {
576 response.errors.insert(symbol.to_string(), e.to_string());
577 }
578 }
579 }
580 }
581
582 if self.cache_ttl.is_some() {
584 let mut cache = self.quote_cache.write().await;
585 for (symbol, quote) in &parsed_quotes {
586 self.cache_insert(&mut cache, symbol.as_str().into(), quote.clone());
587 }
588 }
589
590 for (symbol, quote) in parsed_quotes {
592 response.quotes.insert(symbol, quote);
593 }
594 }
595
596 for symbol in &self.symbols {
598 let symbol_str = &**symbol;
599 if !response.quotes.contains_key(symbol_str)
600 && !response.errors.contains_key(symbol_str)
601 {
602 response.errors.insert(
603 symbol.to_string(),
604 "Symbol not found in response".to_string(),
605 );
606 }
607 }
608 }
609
610 Ok(response)
611 }
612
613 pub async fn quote(&self, symbol: &str) -> Result<Quote> {
615 {
616 let cache = self.quote_cache.read().await;
617 if let Some(entry) = cache.get(symbol)
618 && self.is_cache_fresh(Some(entry))
619 {
620 return Ok(entry.value.clone());
621 }
622 }
623
624 let response = self.quotes().await?;
625
626 response
627 .quotes
628 .get(symbol)
629 .cloned()
630 .ok_or_else(|| FinanceError::SymbolNotFound {
631 symbol: Some(symbol.to_string()),
632 context: response
633 .errors
634 .get(symbol)
635 .cloned()
636 .unwrap_or_else(|| "Symbol not found".to_string()),
637 })
638 }
639
640 async fn get_fetch_guard<K: Clone + Eq + std::hash::Hash>(
645 guard_map: &FetchGuardMap<K>,
646 key: K,
647 ) -> FetchGuard {
648 {
649 let guards = guard_map.read().await;
650 if let Some(guard) = guards.get(&key) {
651 return Arc::clone(guard);
652 }
653 }
654
655 let mut guards = guard_map.write().await;
656 Arc::clone(
657 guards
658 .entry(key)
659 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))),
660 )
661 }
662
663 pub async fn charts(
668 &self,
669 interval: Interval,
670 range: TimeRange,
671 ) -> Result<BatchChartsResponse> {
672 {
674 let cache = self.chart_cache.read().await;
675 if self.all_cached(
676 &cache,
677 self.symbols.iter().map(|s| (s.clone(), interval, range)),
678 ) {
679 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
680 for symbol in &self.symbols {
681 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
682 response
683 .charts
684 .insert(symbol.to_string(), entry.value.clone());
685 }
686 }
687 return Ok(response);
688 }
689 }
690
691 let fetch_guard = Self::get_fetch_guard(&self.charts_fetch, (interval, range)).await;
693 let _guard = fetch_guard.lock().await;
694
695 {
697 let cache = self.chart_cache.read().await;
698 if self.all_cached(
699 &cache,
700 self.symbols.iter().map(|s| (s.clone(), interval, range)),
701 ) {
702 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
703 for symbol in &self.symbols {
704 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
705 response
706 .charts
707 .insert(symbol.to_string(), entry.value.clone());
708 }
709 }
710 return Ok(response);
711 }
712 }
713
714 let futures: Vec<_> = self
716 .symbols
717 .iter()
718 .map(|symbol| {
719 let client = Arc::clone(&self.client);
720 let symbol = Arc::clone(symbol);
721 async move {
722 let result = client.get_chart(&symbol, interval, range).await;
723 (symbol, result)
724 }
725 })
726 .collect();
727
728 let results: Vec<_> = stream::iter(futures)
729 .buffer_unordered(self.max_concurrency)
730 .collect()
731 .await;
732
733 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
734
735 let mut parsed_charts: Vec<(Arc<str>, Chart)> = Vec::new();
737 let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
738
739 for (symbol, result) in results {
740 match result {
741 Ok(json) => match ChartResponse::from_json(json) {
742 Ok(chart_response) => {
743 if let Some(mut chart_results) = chart_response.chart.result {
744 if let Some(chart_result) = chart_results.pop() {
745 if let Some(events) = chart_result.events.clone() {
747 parsed_events.push((Arc::clone(&symbol), events));
748 }
749
750 let chart = Chart {
751 symbol: symbol.to_string(),
752 meta: chart_result.meta.clone(),
753 candles: chart_result.to_candles(),
754 interval: Some(interval),
755 range: Some(range),
756 };
757 parsed_charts.push((symbol, chart));
758 } else {
759 response
760 .errors
761 .insert(symbol.to_string(), "Empty chart response".to_string());
762 }
763 } else {
764 response.errors.insert(
765 symbol.to_string(),
766 "No chart data in response".to_string(),
767 );
768 }
769 }
770 Err(e) => {
771 response.errors.insert(symbol.to_string(), e.to_string());
772 }
773 },
774 Err(e) => {
775 response.errors.insert(symbol.to_string(), e.to_string());
776 }
777 }
778 }
779
780 if self.cache_ttl.is_some() {
782 let mut cache = self.chart_cache.write().await;
783
784 let cache_keys: Vec<_> = parsed_charts
786 .into_iter()
787 .map(|(symbol, chart)| {
788 self.cache_insert(&mut cache, (symbol.clone(), interval, range), chart);
789 symbol
790 })
791 .collect();
792
793 for symbol in cache_keys {
795 if let Some(cached) = cache.get(&(symbol.clone(), interval, range)) {
796 response
797 .charts
798 .insert(symbol.to_string(), cached.value.clone());
799 }
800 }
801 } else {
802 for (symbol, chart) in parsed_charts {
804 response.charts.insert(symbol.to_string(), chart);
805 }
806 }
807
808 if !parsed_events.is_empty() {
810 let mut events_cache = self.events_cache.write().await;
811 for (symbol, events) in parsed_events {
812 events_cache
813 .entry(symbol)
814 .or_insert_with(|| CacheEntry::new(events));
815 }
816 }
817
818 Ok(response)
819 }
820
821 pub async fn chart(&self, symbol: &str, interval: Interval, range: TimeRange) -> Result<Chart> {
823 {
824 let cache = self.chart_cache.read().await;
825 let key: Arc<str> = symbol.into();
826 if let Some(entry) = cache.get(&(key, interval, range))
827 && self.is_cache_fresh(Some(entry))
828 {
829 return Ok(entry.value.clone());
830 }
831 }
832
833 let response = self.charts(interval, range).await?;
834
835 response
836 .charts
837 .get(symbol)
838 .cloned()
839 .ok_or_else(|| FinanceError::SymbolNotFound {
840 symbol: Some(symbol.to_string()),
841 context: response
842 .errors
843 .get(symbol)
844 .cloned()
845 .unwrap_or_else(|| "Symbol not found".to_string()),
846 })
847 }
848
849 pub async fn charts_range(
861 &self,
862 interval: Interval,
863 start: i64,
864 end: i64,
865 ) -> Result<BatchChartsResponse> {
866 let futures: Vec<_> = self
867 .symbols
868 .iter()
869 .map(|symbol| {
870 let client = Arc::clone(&self.client);
871 let symbol = Arc::clone(symbol);
872 async move {
873 let result = client.get_chart_range(&symbol, interval, start, end).await;
874 (symbol, result)
875 }
876 })
877 .collect();
878
879 let results: Vec<_> = stream::iter(futures)
880 .buffer_unordered(self.max_concurrency)
881 .collect()
882 .await;
883
884 let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
885 let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
886
887 for (symbol, result) in results {
888 match result {
889 Ok(json) => match ChartResponse::from_json(json) {
890 Ok(chart_response) => {
891 if let Some(mut chart_results) = chart_response.chart.result {
892 if let Some(chart_result) = chart_results.pop() {
893 if let Some(events) = chart_result.events.clone() {
895 parsed_events.push((Arc::clone(&symbol), events));
896 }
897
898 let chart = Chart {
899 symbol: symbol.to_string(),
900 meta: chart_result.meta.clone(),
901 candles: chart_result.to_candles(),
902 interval: Some(interval),
903 range: None,
904 };
905 response.charts.insert(symbol.to_string(), chart);
906 } else {
907 response
908 .errors
909 .insert(symbol.to_string(), "Empty chart response".to_string());
910 }
911 } else {
912 response.errors.insert(
913 symbol.to_string(),
914 "No chart data in response".to_string(),
915 );
916 }
917 }
918 Err(e) => {
919 response.errors.insert(symbol.to_string(), e.to_string());
920 }
921 },
922 Err(e) => {
923 response.errors.insert(symbol.to_string(), e.to_string());
924 }
925 }
926 }
927
928 if !parsed_events.is_empty() {
930 let mut events_cache = self.events_cache.write().await;
931 for (symbol, events) in parsed_events {
932 events_cache
933 .entry(symbol)
934 .or_insert_with(|| CacheEntry::new(events));
935 }
936 }
937
938 Ok(response)
939 }
940
941 async fn ensure_events_loaded(&self) -> Result<()> {
951 let symbols_to_fetch: Vec<Arc<str>> = {
953 let cache = self.events_cache.read().await;
954 self.symbols
955 .iter()
956 .filter(|sym| !cache.contains_key(*sym))
957 .cloned()
958 .collect()
959 };
960
961 if symbols_to_fetch.is_empty() {
962 return Ok(());
963 }
964
965 let futures: Vec<_> = symbols_to_fetch
967 .iter()
968 .map(|symbol| {
969 let client = Arc::clone(&self.client);
970 let symbol = Arc::clone(symbol);
971 async move {
972 let result = crate::endpoints::chart::fetch(
973 &client,
974 &symbol,
975 Interval::OneDay,
976 TimeRange::Max,
977 )
978 .await;
979 (symbol, result)
980 }
981 })
982 .collect();
983
984 let results: Vec<_> = stream::iter(futures)
985 .buffer_unordered(self.max_concurrency)
986 .collect()
987 .await;
988
989 let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
991
992 for (symbol, result) in results {
993 if let Ok(json) = result
994 && let Ok(chart_response) = ChartResponse::from_json(json)
995 && let Some(mut chart_results) = chart_response.chart.result
996 && let Some(chart_result) = chart_results.pop()
997 && let Some(events) = chart_result.events
998 {
999 parsed_events.push((symbol, events));
1000 }
1001 }
1002
1003 if !parsed_events.is_empty() {
1005 let mut events_cache = self.events_cache.write().await;
1006 for (symbol, events) in parsed_events {
1007 events_cache.insert(symbol, CacheEntry::new(events));
1008 }
1009 }
1010
1011 Ok(())
1012 }
1013
1014 pub async fn spark(&self, interval: Interval, range: TimeRange) -> Result<BatchSparksResponse> {
1043 {
1045 let cache = self.spark_cache.read().await;
1046 if self.all_cached(
1047 &cache,
1048 self.symbols.iter().map(|s| (s.clone(), interval, range)),
1049 ) {
1050 let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1051 for symbol in &self.symbols {
1052 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
1053 response
1054 .sparks
1055 .insert(symbol.to_string(), entry.value.clone());
1056 }
1057 }
1058 return Ok(response);
1059 }
1060 }
1061
1062 let fetch_guard = Self::get_fetch_guard(&self.spark_fetch, (interval, range)).await;
1064 let _guard = fetch_guard.lock().await;
1065
1066 {
1068 let cache = self.spark_cache.read().await;
1069 if self.all_cached(
1070 &cache,
1071 self.symbols.iter().map(|s| (s.clone(), interval, range)),
1072 ) {
1073 let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1074 for symbol in &self.symbols {
1075 if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
1076 response
1077 .sparks
1078 .insert(symbol.to_string(), entry.value.clone());
1079 }
1080 }
1081 return Ok(response);
1082 }
1083 }
1084
1085 let symbols_ref: Vec<&str> = self.symbols.iter().map(|s| &**s).collect();
1087 let json =
1088 crate::endpoints::spark::fetch(&self.client, &symbols_ref, interval, range).await?;
1089
1090 let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1091
1092 match SparkResponse::from_json(json) {
1093 Ok(spark_response) => {
1094 let mut parsed_sparks: Vec<(Arc<str>, Spark)> = Vec::new();
1095
1096 if let Some(results) = spark_response.spark.result {
1097 for result in &results {
1098 if let Some(spark) = Spark::from_response(
1099 result,
1100 Some(interval.as_str().to_string()),
1101 Some(range.as_str().to_string()),
1102 ) {
1103 let sym: Arc<str> = result.symbol.as_str().into();
1104 parsed_sparks.push((sym, spark));
1105 } else {
1106 response.errors.insert(
1107 result.symbol.to_string(),
1108 "Failed to parse spark data".to_string(),
1109 );
1110 }
1111 }
1112 }
1113
1114 if self.cache_ttl.is_some() {
1116 let mut cache = self.spark_cache.write().await;
1117 for (symbol, spark) in &parsed_sparks {
1118 self.cache_insert(
1119 &mut cache,
1120 (symbol.clone(), interval, range),
1121 spark.clone(),
1122 );
1123 }
1124 }
1125
1126 for (symbol, spark) in parsed_sparks {
1128 response.sparks.insert(symbol.to_string(), spark);
1129 }
1130
1131 for symbol in &self.symbols {
1133 let symbol_str = &**symbol;
1134 if !response.sparks.contains_key(symbol_str)
1135 && !response.errors.contains_key(symbol_str)
1136 {
1137 response.errors.insert(
1138 symbol.to_string(),
1139 "Symbol not found in response".to_string(),
1140 );
1141 }
1142 }
1143 }
1144 Err(e) => {
1145 for symbol in &self.symbols {
1146 response.errors.insert(symbol.to_string(), e.to_string());
1147 }
1148 }
1149 }
1150
1151 Ok(response)
1152 }
1153
1154 pub async fn dividends(&self, range: TimeRange) -> Result<BatchDividendsResponse> {
1179 let mut response = BatchDividendsResponse::with_capacity(self.symbols.len());
1180
1181 self.ensure_events_loaded().await?;
1183
1184 let events_cache = self.events_cache.read().await;
1185
1186 for symbol in &self.symbols {
1187 if let Some(entry) = events_cache.get(symbol) {
1188 let all_dividends = entry.value.to_dividends();
1189 let filtered = filter_by_range(all_dividends, range);
1190 response.dividends.insert(symbol.to_string(), filtered);
1191 } else {
1192 response
1193 .errors
1194 .insert(symbol.to_string(), "No events data available".to_string());
1195 }
1196 }
1197
1198 Ok(response)
1199 }
1200
1201 pub async fn splits(&self, range: TimeRange) -> Result<BatchSplitsResponse> {
1228 let mut response = BatchSplitsResponse::with_capacity(self.symbols.len());
1229
1230 self.ensure_events_loaded().await?;
1232
1233 let events_cache = self.events_cache.read().await;
1234
1235 for symbol in &self.symbols {
1236 if let Some(entry) = events_cache.get(symbol) {
1237 let all_splits = entry.value.to_splits();
1238 let filtered = filter_by_range(all_splits, range);
1239 response.splits.insert(symbol.to_string(), filtered);
1240 } else {
1241 response
1242 .errors
1243 .insert(symbol.to_string(), "No events data available".to_string());
1244 }
1245 }
1246
1247 Ok(response)
1248 }
1249
1250 pub async fn capital_gains(&self, range: TimeRange) -> Result<BatchCapitalGainsResponse> {
1276 let mut response = BatchCapitalGainsResponse::with_capacity(self.symbols.len());
1277
1278 self.ensure_events_loaded().await?;
1280
1281 let events_cache = self.events_cache.read().await;
1282
1283 for symbol in &self.symbols {
1284 if let Some(entry) = events_cache.get(symbol) {
1285 let all_gains = entry.value.to_capital_gains();
1286 let filtered = filter_by_range(all_gains, range);
1287 response.capital_gains.insert(symbol.to_string(), filtered);
1288 } else {
1289 response
1290 .errors
1291 .insert(symbol.to_string(), "No events data available".to_string());
1292 }
1293 }
1294
1295 Ok(response)
1296 }
1297
1298 pub async fn financials(
1326 &self,
1327 statement_type: StatementType,
1328 frequency: Frequency,
1329 ) -> Result<BatchFinancialsResponse> {
1330 batch_fetch_cached!(self;
1331 cache: financials_cache,
1332 guard: map(financials_fetch, (statement_type, frequency)),
1333 key: |s| (s.clone(), statement_type, frequency),
1334 response: BatchFinancialsResponse.financials,
1335 fetch: |client, symbol| client.get_financials(&symbol, statement_type, frequency).await,
1336 )
1337 }
1338
1339 pub async fn news(&self) -> Result<BatchNewsResponse> {
1363 batch_fetch_cached!(self;
1364 cache: news_cache,
1365 guard: simple(news_fetch),
1366 key: |s| s.clone(),
1367 response: BatchNewsResponse.news,
1368 fetch: |_client, symbol| crate::scrapers::stockanalysis::scrape_symbol_news(&symbol).await,
1369 )
1370 }
1371
1372 pub async fn recommendations(&self, limit: u32) -> Result<BatchRecommendationsResponse> {
1401 batch_fetch_cached!(self;
1402 cache: recommendations_cache,
1403 guard: map(recommendations_fetch, limit),
1404 key: |s| (s.clone(), limit),
1405 response: BatchRecommendationsResponse.recommendations,
1406 fetch: |client, symbol| {
1407 let json = client.get_recommendations(&symbol, limit).await?;
1408 let rec_response =
1409 crate::models::recommendation::response::RecommendationResponse::from_json(json)?;
1410 Ok(Recommendation {
1411 symbol: symbol.to_string(),
1412 recommendations: rec_response
1413 .finance
1414 .result
1415 .iter()
1416 .flat_map(|r| &r.recommended_symbols)
1417 .cloned()
1418 .collect(),
1419 })
1420 },
1421 )
1422 }
1423
1424 pub async fn options(&self, date: Option<i64>) -> Result<BatchOptionsResponse> {
1449 batch_fetch_cached!(self;
1450 cache: options_cache,
1451 guard: map(options_fetch, date),
1452 key: |s| (s.clone(), date),
1453 response: BatchOptionsResponse.options,
1454 fetch: |client, symbol| {
1455 let json = client.get_options(&symbol, date).await?;
1456 Ok(serde_json::from_value::<Options>(json)?)
1457 },
1458 )
1459 }
1460
1461 #[cfg(feature = "indicators")]
1487 pub async fn indicators(
1488 &self,
1489 interval: Interval,
1490 range: TimeRange,
1491 ) -> Result<BatchIndicatorsResponse> {
1492 let cache_key_for = |symbol: &Arc<str>| (symbol.clone(), interval, range);
1493
1494 {
1496 let cache = self.indicators_cache.read().await;
1497 if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
1498 let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1499 for symbol in &self.symbols {
1500 if let Some(entry) = cache.get(&cache_key_for(symbol)) {
1501 response
1502 .indicators
1503 .insert(symbol.to_string(), entry.value.clone());
1504 }
1505 }
1506 return Ok(response);
1507 }
1508 }
1509
1510 let fetch_guard = Self::get_fetch_guard(&self.indicators_fetch, (interval, range)).await;
1512 let _guard = fetch_guard.lock().await;
1513
1514 {
1516 let cache = self.indicators_cache.read().await;
1517 if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
1518 let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1519 for symbol in &self.symbols {
1520 if let Some(entry) = cache.get(&cache_key_for(symbol)) {
1521 response
1522 .indicators
1523 .insert(symbol.to_string(), entry.value.clone());
1524 }
1525 }
1526 return Ok(response);
1527 }
1528 }
1529
1530 let charts_response = self.charts(interval, range).await?;
1532
1533 let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1534
1535 let mut calculated_indicators: Vec<(String, crate::indicators::IndicatorsSummary)> =
1537 Vec::new();
1538
1539 for (symbol, chart) in &charts_response.charts {
1540 let indicators = crate::indicators::summary::calculate_indicators(&chart.candles);
1541 calculated_indicators.push((symbol.to_string(), indicators));
1542 }
1543
1544 if self.cache_ttl.is_some() {
1546 let mut cache = self.indicators_cache.write().await;
1547 for (symbol, indicators) in &calculated_indicators {
1548 let key: Arc<str> = symbol.as_str().into();
1549 self.cache_insert(&mut cache, cache_key_for(&key), indicators.clone());
1550 }
1551 }
1552
1553 for (symbol, indicators) in calculated_indicators {
1555 response.indicators.insert(symbol, indicators);
1556 }
1557
1558 for (symbol, error) in &charts_response.errors {
1560 response.errors.insert(symbol.to_string(), error.clone());
1561 }
1562
1563 Ok(response)
1564 }
1565
1566 pub fn add_symbols(&mut self, symbols: &[impl AsRef<str>]) {
1587 use std::collections::HashSet;
1589
1590 let existing: HashSet<&str> = self.symbols.iter().map(|s| &**s).collect();
1591 let to_add: Vec<Arc<str>> = symbols
1592 .iter()
1593 .map(|s| s.as_ref())
1594 .filter(|s| !existing.contains(s))
1595 .map(|s| s.into())
1596 .collect();
1597
1598 self.symbols.extend(to_add);
1599 }
1600
1601 pub async fn remove_symbols(&mut self, symbols: &[impl AsRef<str>]) {
1618 use std::collections::HashSet;
1619 let to_remove: HashSet<&str> = symbols.iter().map(|s| s.as_ref()).collect();
1620
1621 self.symbols.retain(|s| !to_remove.contains(&**s));
1623
1624 let (
1626 mut quote_cache,
1627 mut chart_cache,
1628 mut events_cache,
1629 mut financials_cache,
1630 mut news_cache,
1631 mut recommendations_cache,
1632 mut options_cache,
1633 mut spark_cache,
1634 ) = tokio::join!(
1635 self.quote_cache.write(),
1636 self.chart_cache.write(),
1637 self.events_cache.write(),
1638 self.financials_cache.write(),
1639 self.news_cache.write(),
1640 self.recommendations_cache.write(),
1641 self.options_cache.write(),
1642 self.spark_cache.write(),
1643 );
1644
1645 for symbol in &to_remove {
1647 let key: Arc<str> = (*symbol).into();
1648 quote_cache.remove(&key);
1649 events_cache.remove(&key);
1650 news_cache.remove(&key);
1651 }
1652
1653 chart_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1655 financials_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1656 recommendations_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
1657 options_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
1658 spark_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1659
1660 drop((
1662 quote_cache,
1663 chart_cache,
1664 events_cache,
1665 financials_cache,
1666 news_cache,
1667 recommendations_cache,
1668 options_cache,
1669 spark_cache,
1670 ));
1671
1672 #[cfg(feature = "indicators")]
1673 self.indicators_cache
1674 .write()
1675 .await
1676 .retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1677 }
1678
1679 pub async fn clear_cache(&self) {
1684 tokio::join!(
1685 async { self.quote_cache.write().await.clear() },
1687 async { self.chart_cache.write().await.clear() },
1688 async { self.events_cache.write().await.clear() },
1689 async { self.financials_cache.write().await.clear() },
1690 async { self.news_cache.write().await.clear() },
1691 async { self.recommendations_cache.write().await.clear() },
1692 async { self.options_cache.write().await.clear() },
1693 async { self.spark_cache.write().await.clear() },
1694 async {
1695 #[cfg(feature = "indicators")]
1696 self.indicators_cache.write().await.clear();
1697 },
1698 async { self.charts_fetch.write().await.clear() },
1700 async { self.financials_fetch.write().await.clear() },
1701 async { self.recommendations_fetch.write().await.clear() },
1702 async { self.options_fetch.write().await.clear() },
1703 async { self.spark_fetch.write().await.clear() },
1704 async {
1705 #[cfg(feature = "indicators")]
1706 self.indicators_fetch.write().await.clear();
1707 },
1708 );
1709 }
1710
1711 pub async fn clear_quote_cache(&self) {
1715 self.quote_cache.write().await.clear();
1716 }
1717
1718 pub async fn clear_chart_cache(&self) {
1723 tokio::join!(
1724 async { self.chart_cache.write().await.clear() },
1725 async { self.events_cache.write().await.clear() },
1726 async { self.spark_cache.write().await.clear() },
1727 async {
1728 #[cfg(feature = "indicators")]
1729 self.indicators_cache.write().await.clear();
1730 },
1731 );
1732 }
1733}
1734
1735#[cfg(test)]
1736mod tests {
1737 use super::*;
1738
1739 #[tokio::test]
1740 #[ignore] async fn test_tickers_quotes() {
1742 let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1743 let result = tickers.quotes().await.unwrap();
1744
1745 assert!(result.success_count() > 0);
1746 }
1747
1748 #[tokio::test]
1749 #[ignore] async fn test_tickers_charts() {
1751 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1752 let result = tickers
1753 .charts(Interval::OneDay, TimeRange::FiveDays)
1754 .await
1755 .unwrap();
1756
1757 assert!(result.success_count() > 0);
1758 }
1759
1760 #[tokio::test]
1761 #[ignore = "requires network access"]
1762 async fn test_tickers_spark() {
1763 let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1764 let result = tickers
1765 .spark(Interval::FiveMinutes, TimeRange::OneDay)
1766 .await
1767 .unwrap();
1768
1769 assert!(result.success_count() > 0);
1770
1771 if let Some(spark) = result.sparks.get("AAPL") {
1773 assert!(!spark.closes.is_empty());
1774 assert_eq!(spark.symbol, "AAPL");
1775 assert!(spark.percent_change().is_some());
1777 }
1778 }
1779
1780 #[tokio::test]
1781 #[ignore = "requires network access"]
1782 async fn test_tickers_dividends() {
1783 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1784 let result = tickers.dividends(TimeRange::OneYear).await.unwrap();
1785
1786 assert!(result.success_count() > 0);
1787
1788 if let Some(dividends) = result.dividends.get("AAPL")
1790 && !dividends.is_empty()
1791 {
1792 let div = ÷nds[0];
1793 assert!(div.timestamp > 0);
1794 assert!(div.amount > 0.0);
1795 }
1796 }
1797
1798 #[tokio::test]
1799 #[ignore = "requires network access"]
1800 async fn test_tickers_splits() {
1801 let tickers = Tickers::new(["NVDA", "TSLA"]).await.unwrap();
1802 let result = tickers.splits(TimeRange::FiveYears).await.unwrap();
1803
1804 assert!(result.success_count() > 0);
1806
1807 for splits in result.splits.values() {
1809 for split in splits {
1810 assert!(split.timestamp > 0);
1811 assert!(split.numerator > 0.0);
1812 assert!(split.denominator > 0.0);
1813 assert!(!split.ratio.is_empty());
1814 }
1815 }
1816 }
1817
1818 #[tokio::test]
1819 #[ignore = "requires network access"]
1820 async fn test_tickers_capital_gains() {
1821 let tickers = Tickers::new(["VFIAX", "VTI"]).await.unwrap();
1822 let result = tickers.capital_gains(TimeRange::TwoYears).await.unwrap();
1823
1824 assert!(result.success_count() > 0);
1826
1827 for gains in result.capital_gains.values() {
1829 for gain in gains {
1830 assert!(gain.timestamp > 0);
1831 assert!(gain.amount >= 0.0);
1832 }
1833 }
1834 }
1835
1836 #[tokio::test]
1837 #[ignore = "requires network access"]
1838 async fn test_tickers_financials() {
1839 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1840 let result = tickers
1841 .financials(StatementType::Income, Frequency::Annual)
1842 .await
1843 .unwrap();
1844
1845 assert!(result.success_count() > 0);
1846
1847 for (symbol, stmt) in &result.financials {
1849 assert_eq!(stmt.symbol, *symbol);
1850 assert_eq!(stmt.statement_type, "income");
1851 assert_eq!(stmt.frequency, "annual");
1852 assert!(!stmt.statement.is_empty());
1853
1854 if let Some(revenue) = stmt.statement.get("TotalRevenue") {
1856 assert!(!revenue.is_empty());
1857 }
1858 }
1859 }
1860
1861 #[tokio::test]
1862 #[ignore = "requires network access"]
1863 async fn test_tickers_news() {
1864 let tickers = Tickers::new(["AAPL", "TSLA"]).await.unwrap();
1865 let result = tickers.news().await.unwrap();
1866
1867 assert!(result.success_count() > 0);
1868
1869 for articles in result.news.values() {
1871 if !articles.is_empty() {
1872 let article = &articles[0];
1873 assert!(!article.title.is_empty());
1874 assert!(!article.link.is_empty());
1875 assert!(!article.source.is_empty());
1876 }
1877 }
1878 }
1879
1880 #[tokio::test]
1881 #[ignore = "requires network access"]
1882 async fn test_tickers_recommendations() {
1883 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1884 let result = tickers.recommendations(5).await.unwrap();
1885
1886 assert!(result.success_count() > 0);
1887
1888 for (symbol, rec) in &result.recommendations {
1890 assert_eq!(rec.symbol, *symbol);
1891 assert!(rec.count() > 0);
1892 for similar in &rec.recommendations {
1893 assert!(!similar.symbol.is_empty());
1894 }
1895 }
1896 }
1897
1898 #[tokio::test]
1899 #[ignore = "requires network access"]
1900 async fn test_tickers_options() {
1901 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1902 let result = tickers.options(None).await.unwrap();
1903
1904 assert!(result.success_count() > 0);
1905
1906 for opts in result.options.values() {
1908 assert!(!opts.expiration_dates().is_empty());
1909 }
1910 }
1911
1912 #[tokio::test]
1913 #[ignore = "requires network access"]
1914 #[cfg(feature = "indicators")]
1915 async fn test_tickers_indicators() {
1916 let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1917 let result = tickers
1918 .indicators(Interval::OneDay, TimeRange::ThreeMonths)
1919 .await
1920 .unwrap();
1921
1922 assert!(result.success_count() > 0);
1923
1924 for ind in result.indicators.values() {
1926 assert!(ind.rsi_14.is_some() || ind.sma_20.is_some());
1928 }
1929 }
1930
1931 #[tokio::test]
1932 async fn test_tickers_add_symbols() {
1933 let mut tickers = Tickers::new(["AAPL"]).await.unwrap();
1934 assert_eq!(tickers.len(), 1);
1935 assert_eq!(tickers.symbols(), &["AAPL"]);
1936
1937 tickers.add_symbols(&["MSFT", "GOOGL"]);
1938 assert_eq!(tickers.len(), 3);
1939 assert!(tickers.symbols().contains(&"AAPL"));
1940 assert!(tickers.symbols().contains(&"MSFT"));
1941 assert!(tickers.symbols().contains(&"GOOGL"));
1942
1943 tickers.add_symbols(&["AAPL"]);
1945 assert_eq!(tickers.len(), 3);
1946 }
1947
1948 #[tokio::test]
1949 #[ignore = "requires network access"]
1950 async fn test_tickers_remove_symbols() {
1951 let mut tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1952 assert_eq!(tickers.len(), 3);
1953
1954 let _ = tickers.quotes().await;
1956
1957 tickers.remove_symbols(&["MSFT"]).await;
1959 assert_eq!(tickers.len(), 2);
1960 assert!(tickers.symbols().contains(&"AAPL"));
1961 assert!(!tickers.symbols().contains(&"MSFT"));
1962 assert!(tickers.symbols().contains(&"GOOGL"));
1963
1964 let quotes = tickers.quotes().await.unwrap();
1966 assert!(!quotes.quotes.contains_key("MSFT"));
1967 assert_eq!(quotes.quotes.len(), 2);
1968 }
1969}