Skip to main content

finance_query/tickers/
core.rs

1//! Tickers implementation for batch operations on multiple symbols.
2//!
3//! Optimizes data fetching by using batch endpoints and concurrent requests.
4
5use super::macros::{batch_fetch_cached, define_batch_response};
6use crate::client::{ClientConfig, YahooClient};
7use crate::constants::{Frequency, Interval, StatementType, TimeRange};
8use crate::error::{FinanceError, Result};
9use crate::models::chart::events::ChartEvents;
10use crate::models::chart::response::ChartResponse;
11use crate::models::chart::{CapitalGain, Chart, Dividend, Split};
12use crate::models::financials::FinancialStatement;
13use crate::models::news::News;
14use crate::models::options::Options;
15use crate::models::quote::Quote;
16use crate::models::recommendation::Recommendation;
17use crate::models::spark::Spark;
18use crate::models::spark::response::SparkResponse;
19use crate::utils::{CacheEntry, EVICTION_THRESHOLD, filter_by_range};
20use futures::stream::{self, StreamExt};
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::time::Duration;
24use tokio::sync::RwLock;
25
26// Type aliases — MapCache wraps values in CacheEntry for TTL support.
27type MapCache<K, V> = Arc<RwLock<HashMap<K, CacheEntry<V>>>>;
28type ChartCacheKey = (Arc<str>, Interval, TimeRange);
29type QuoteCache = MapCache<Arc<str>, Quote>;
30type ChartCache = MapCache<ChartCacheKey, Chart>;
31type EventsCache = MapCache<Arc<str>, ChartEvents>;
32type FinancialsCache = MapCache<(Arc<str>, StatementType, Frequency), FinancialStatement>;
33type NewsCache = MapCache<Arc<str>, Vec<News>>;
34type RecommendationsCache = MapCache<(Arc<str>, u32), Recommendation>;
35type OptionsCache = MapCache<(Arc<str>, Option<i64>), Options>;
36type SparkCacheKey = (Arc<str>, Interval, TimeRange);
37type SparkCache = MapCache<SparkCacheKey, Spark>;
38#[cfg(feature = "indicators")]
39type IndicatorsCache =
40    MapCache<(Arc<str>, Interval, TimeRange), crate::indicators::IndicatorsSummary>;
41
42// Fetch guards for request deduplication — prevent concurrent duplicate fetches
43type FetchGuard = Arc<tokio::sync::Mutex<()>>;
44type FetchGuardMap<K> = Arc<RwLock<HashMap<K, FetchGuard>>>;
45
46// Generate all batch response types
47define_batch_response! {
48    /// Response containing quotes for multiple symbols.
49    BatchQuotesResponse => quotes: Quote
50}
51
52define_batch_response! {
53    /// Response containing charts for multiple symbols.
54    BatchChartsResponse => charts: Chart
55}
56
57define_batch_response! {
58    /// Response containing spark data for multiple symbols.
59    ///
60    /// Spark data is optimized for sparkline rendering with only close prices.
61    /// Unlike charts, spark data is fetched in a single batch request.
62    BatchSparksResponse => sparks: Spark
63}
64
65define_batch_response! {
66    /// Response containing dividends for multiple symbols.
67    BatchDividendsResponse => dividends: Vec<Dividend>
68}
69
70define_batch_response! {
71    /// Response containing splits for multiple symbols.
72    BatchSplitsResponse => splits: Vec<Split>
73}
74
75define_batch_response! {
76    /// Response containing capital gains for multiple symbols.
77    BatchCapitalGainsResponse => capital_gains: Vec<CapitalGain>
78}
79
80define_batch_response! {
81    /// Response containing financial statements for multiple symbols.
82    BatchFinancialsResponse => financials: FinancialStatement
83}
84
85define_batch_response! {
86    /// Response containing news articles for multiple symbols.
87    BatchNewsResponse => news: Vec<News>
88}
89
90define_batch_response! {
91    /// Response containing recommendations for multiple symbols.
92    BatchRecommendationsResponse => recommendations: Recommendation
93}
94
95define_batch_response! {
96    /// Response containing options chains for multiple symbols.
97    BatchOptionsResponse => options: Options
98}
99
100#[cfg(feature = "indicators")]
101define_batch_response! {
102    /// Response containing technical indicators for multiple symbols.
103    BatchIndicatorsResponse => indicators: crate::indicators::IndicatorsSummary
104}
105
106/// Default maximum concurrent requests for batch operations.
107const DEFAULT_MAX_CONCURRENCY: usize = 10;
108
109/// Builder for Tickers
110pub struct TickersBuilder {
111    symbols: Vec<Arc<str>>,
112    config: ClientConfig,
113    shared_client: Option<crate::ticker::ClientHandle>,
114    max_concurrency: usize,
115    cache_ttl: Option<Duration>,
116    include_logo: bool,
117}
118
119impl TickersBuilder {
120    fn new<S, I>(symbols: I) -> Self
121    where
122        S: Into<String>,
123        I: IntoIterator<Item = S>,
124    {
125        Self {
126            symbols: symbols.into_iter().map(|s| s.into().into()).collect(),
127            config: ClientConfig::default(),
128            shared_client: None,
129            max_concurrency: DEFAULT_MAX_CONCURRENCY,
130            cache_ttl: None,
131            include_logo: false,
132        }
133    }
134
135    /// Set the region (automatically sets correct lang and region code)
136    pub fn region(mut self, region: crate::constants::Region) -> Self {
137        self.config.lang = region.lang().to_string();
138        self.config.region = region.region().to_string();
139        self
140    }
141
142    /// Set the language code (e.g., "en-US", "ja-JP", "de-DE")
143    pub fn lang(mut self, lang: impl Into<String>) -> Self {
144        self.config.lang = lang.into();
145        self
146    }
147
148    /// Set the region code (e.g., "US", "JP", "DE")
149    pub fn region_code(mut self, region: impl Into<String>) -> Self {
150        self.config.region = region.into();
151        self
152    }
153
154    /// Set the HTTP request timeout
155    pub fn timeout(mut self, timeout: Duration) -> Self {
156        self.config.timeout = timeout;
157        self
158    }
159
160    /// Set the proxy URL
161    pub fn proxy(mut self, proxy: impl Into<String>) -> Self {
162        self.config.proxy = Some(proxy.into());
163        self
164    }
165
166    /// Set a complete ClientConfig
167    pub fn config(mut self, config: ClientConfig) -> Self {
168        self.config = config;
169        self
170    }
171
172    /// Set the maximum number of concurrent requests for batch operations.
173    ///
174    /// Controls how many HTTP requests run in parallel when methods like
175    /// `charts()`, `financials()`, or `news()` fetch data for each symbol.
176    /// Default is 10.
177    ///
178    /// Lower values reduce the risk of rate limiting from Yahoo Finance.
179    /// Higher values increase throughput for large symbol lists.
180    pub fn max_concurrency(mut self, n: usize) -> Self {
181        self.max_concurrency = n.max(1);
182        self
183    }
184
185    /// Share an existing authenticated session instead of creating a new one.
186    ///
187    /// This avoids redundant authentication when you have multiple `Tickers`
188    /// instances or want to share a session with individual [`crate::Ticker`] instances.
189    ///
190    /// Obtain a [`ClientHandle`](crate::ClientHandle) from any existing
191    /// [`Ticker`](crate::Ticker) via [`Ticker::client_handle()`](crate::Ticker::client_handle).
192    ///
193    /// When set, the builder's `config`, `timeout`, `proxy`, `lang`, and `region`
194    /// settings are ignored (the shared session's configuration is used instead).
195    ///
196    /// # Example
197    ///
198    /// ```no_run
199    /// use finance_query::{Ticker, Tickers};
200    ///
201    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
202    /// let aapl = Ticker::new("AAPL").await?;
203    /// let handle = aapl.client_handle();
204    ///
205    /// let tickers = Tickers::builder(["MSFT", "GOOGL"])
206    ///     .client(handle)
207    ///     .build()
208    ///     .await?;
209    /// # Ok(())
210    /// # }
211    /// ```
212    pub fn client(mut self, handle: crate::ticker::ClientHandle) -> Self {
213        self.shared_client = Some(handle);
214        self
215    }
216
217    /// Enable response caching with a time-to-live.
218    ///
219    /// By default caching is **disabled** — every call fetches fresh data.
220    /// When enabled, responses are reused until the TTL expires. Stale
221    /// entries are evicted on the next write.
222    ///
223    /// # Example
224    ///
225    /// ```no_run
226    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
227    /// use finance_query::Tickers;
228    /// use std::time::Duration;
229    ///
230    /// let tickers = Tickers::builder(["AAPL", "MSFT"])
231    ///     .cache(Duration::from_secs(30))
232    ///     .build()
233    ///     .await?;
234    /// # Ok(())
235    /// # }
236    /// ```
237    pub fn cache(mut self, ttl: Duration) -> Self {
238        self.cache_ttl = Some(ttl);
239        self
240    }
241
242    /// Include company logo URLs in quote responses.
243    ///
244    /// When enabled, `quotes()` will fetch logo URLs in parallel with the
245    /// quote batch request, adding a small extra request.
246    pub fn logo(mut self) -> Self {
247        self.include_logo = true;
248        self
249    }
250
251    /// Build the Tickers instance
252    pub async fn build(self) -> Result<Tickers> {
253        let client = match self.shared_client {
254            Some(handle) => handle.0,
255            None => Arc::new(YahooClient::new(self.config).await?),
256        };
257
258        Ok(Tickers {
259            symbols: self.symbols,
260            client,
261            max_concurrency: self.max_concurrency,
262            cache_ttl: self.cache_ttl,
263            include_logo: self.include_logo,
264            quote_cache: Default::default(),
265            chart_cache: Default::default(),
266            events_cache: Default::default(),
267            financials_cache: Default::default(),
268            news_cache: Default::default(),
269            recommendations_cache: Default::default(),
270            options_cache: Default::default(),
271            spark_cache: Default::default(),
272            #[cfg(feature = "indicators")]
273            indicators_cache: Default::default(),
274
275            // Initialize fetch guards for request deduplication
276            quotes_fetch: Arc::new(tokio::sync::Mutex::new(())),
277            charts_fetch: Default::default(),
278            financials_fetch: Default::default(),
279            news_fetch: Arc::new(tokio::sync::Mutex::new(())),
280            recommendations_fetch: Default::default(),
281            options_fetch: Default::default(),
282            spark_fetch: Default::default(),
283            #[cfg(feature = "indicators")]
284            indicators_fetch: Default::default(),
285        })
286    }
287}
288
289/// Multi-symbol ticker for efficient batch operations.
290///
291/// `Tickers` optimizes data fetching for multiple symbols by:
292/// - Using batch endpoints where available (e.g., /v7/finance/quote)
293/// - Fetching concurrently when batch endpoints don't exist
294/// - Sharing a single authenticated client across all symbols
295/// - Caching results per symbol
296///
297/// # Example
298///
299/// ```no_run
300/// use finance_query::Tickers;
301///
302/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
303/// // Create tickers for multiple symbols
304/// let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await?;
305///
306/// // Batch fetch all quotes (single API call)
307/// let quotes = tickers.quotes().await?;
308/// for (symbol, quote) in &quotes.quotes {
309///     let price = quote.regular_market_price.as_ref().and_then(|v| v.raw).unwrap_or(0.0);
310///     println!("{}: ${:.2}", symbol, price);
311/// }
312///
313/// // Fetch charts concurrently
314/// use finance_query::{Interval, TimeRange};
315/// let charts = tickers.charts(Interval::OneDay, TimeRange::OneMonth).await?;
316/// # Ok(())
317/// # }
318/// ```
319pub struct Tickers {
320    symbols: Vec<Arc<str>>,
321    client: Arc<YahooClient>,
322    max_concurrency: usize,
323    cache_ttl: Option<Duration>,
324    include_logo: bool,
325    quote_cache: QuoteCache,
326    chart_cache: ChartCache,
327    events_cache: EventsCache,
328    financials_cache: FinancialsCache,
329    news_cache: NewsCache,
330    recommendations_cache: RecommendationsCache,
331    options_cache: OptionsCache,
332    spark_cache: SparkCache,
333    #[cfg(feature = "indicators")]
334    indicators_cache: IndicatorsCache,
335
336    // Fetch guards prevent duplicate concurrent requests
337    quotes_fetch: FetchGuard,
338    charts_fetch: FetchGuardMap<(Interval, TimeRange)>,
339    financials_fetch: FetchGuardMap<(StatementType, Frequency)>,
340    news_fetch: FetchGuard,
341    recommendations_fetch: FetchGuardMap<u32>,
342    options_fetch: FetchGuardMap<Option<i64>>,
343    spark_fetch: FetchGuardMap<(Interval, TimeRange)>,
344    #[cfg(feature = "indicators")]
345    indicators_fetch: FetchGuardMap<(Interval, TimeRange)>,
346}
347
348impl std::fmt::Debug for Tickers {
349    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350        f.debug_struct("Tickers")
351            .field("symbols", &self.symbols)
352            .field("max_concurrency", &self.max_concurrency)
353            .field("cache_ttl", &self.cache_ttl)
354            .finish_non_exhaustive()
355    }
356}
357
358impl Tickers {
359    /// Creates new tickers with default configuration
360    ///
361    /// # Arguments
362    ///
363    /// * `symbols` - Iterable of stock symbols (e.g., `["AAPL", "MSFT"]`)
364    ///
365    /// # Example
366    ///
367    /// ```no_run
368    /// use finance_query::Tickers;
369    ///
370    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
371    /// let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await?;
372    /// # Ok(())
373    /// # }
374    /// ```
375    pub async fn new<S, I>(symbols: I) -> Result<Self>
376    where
377        S: Into<String>,
378        I: IntoIterator<Item = S>,
379    {
380        Self::builder(symbols).build().await
381    }
382
383    /// Creates a new builder for Tickers
384    pub fn builder<S, I>(symbols: I) -> TickersBuilder
385    where
386        S: Into<String>,
387        I: IntoIterator<Item = S>,
388    {
389        TickersBuilder::new(symbols)
390    }
391
392    /// Returns the symbols this tickers instance manages
393    pub fn symbols(&self) -> Vec<&str> {
394        self.symbols.iter().map(|s| &**s).collect()
395    }
396
397    /// Number of symbols
398    pub fn len(&self) -> usize {
399        self.symbols.len()
400    }
401
402    /// Check if empty
403    pub fn is_empty(&self) -> bool {
404        self.symbols.is_empty()
405    }
406
407    /// Returns a shareable handle to this instance's authenticated session.
408    ///
409    /// Pass the handle to [`Ticker`](crate::Ticker) or other `Tickers` builders
410    /// via `.client()` to reuse the same session without re-authenticating.
411    pub fn client_handle(&self) -> crate::ticker::ClientHandle {
412        crate::ticker::ClientHandle(Arc::clone(&self.client))
413    }
414
415    /// Returns `true` if a cache entry exists and has not exceeded the TTL.
416    #[inline]
417    fn is_cache_fresh<T>(&self, entry: Option<&CacheEntry<T>>) -> bool {
418        CacheEntry::is_fresh_with_ttl(entry, self.cache_ttl)
419    }
420
421    /// Returns `true` if all keys are present and fresh in a map cache.
422    fn all_cached<K: Eq + std::hash::Hash, V>(
423        &self,
424        map: &HashMap<K, CacheEntry<V>>,
425        keys: impl Iterator<Item = K>,
426    ) -> bool {
427        let Some(ttl) = self.cache_ttl else {
428            return false;
429        };
430        keys.into_iter()
431            .all(|k| map.get(&k).map(|e| e.is_fresh(ttl)).unwrap_or(false))
432    }
433
434    /// Insert into a map cache, amortizing stale-entry eviction.
435    ///
436    /// Only sweeps stale entries when the map exceeds [`EVICTION_THRESHOLD`],
437    /// avoiding O(n) scans on every write.
438    #[inline]
439    fn cache_insert<K: Eq + std::hash::Hash, V>(
440        &self,
441        map: &mut HashMap<K, CacheEntry<V>>,
442        key: K,
443        value: V,
444    ) {
445        if let Some(ttl) = self.cache_ttl {
446            if map.len() >= EVICTION_THRESHOLD {
447                map.retain(|_, entry| entry.is_fresh(ttl));
448            }
449            map.insert(key, CacheEntry::new(value));
450        }
451    }
452
453    /// Batch fetch quotes for all symbols.
454    ///
455    /// Uses /v7/finance/quote endpoint - fetches all symbols in a single API call.
456    /// When logos are enabled, makes a parallel call for logo URLs.
457    ///
458    /// Use [`TickersBuilder::logo()`](TickersBuilder::logo) to enable logo fetching
459    /// for this tickers instance.
460    pub async fn quotes(&self) -> Result<BatchQuotesResponse> {
461        // Fast path: check if all symbols are cached
462        {
463            let cache = self.quote_cache.read().await;
464            if self.all_cached(&cache, self.symbols.iter().cloned()) {
465                let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
466                for symbol in &self.symbols {
467                    if let Some(entry) = cache.get(symbol) {
468                        response
469                            .quotes
470                            .insert(symbol.to_string(), entry.value.clone());
471                    }
472                }
473                return Ok(response);
474            }
475        }
476
477        // Slow path: acquire fetch guard to prevent duplicate concurrent requests
478        let _fetch_guard = self.quotes_fetch.lock().await;
479
480        // Double-check: another task may have fetched while we waited
481        {
482            let cache = self.quote_cache.read().await;
483            if self.all_cached(&cache, self.symbols.iter().cloned()) {
484                let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
485                for symbol in &self.symbols {
486                    if let Some(entry) = cache.get(symbol) {
487                        response
488                            .quotes
489                            .insert(symbol.to_string(), entry.value.clone());
490                    }
491                }
492                return Ok(response);
493            }
494        }
495
496        // Fetch batch quotes (no lock held during HTTP I/O)
497        let symbols_ref: Vec<&str> = self.symbols.iter().map(|s| &**s).collect();
498
499        // Yahoo requires separate calls for quotes vs logos
500        // When include_logo=true, fetch both in parallel
501        let (json, logos) = if self.include_logo {
502            let quote_future = crate::endpoints::quotes::fetch_with_fields(
503                &self.client,
504                &symbols_ref,
505                None,  // all fields
506                true,  // formatted
507                false, // no logo params for main call
508            );
509            let logo_future = crate::endpoints::quotes::fetch_with_fields(
510                &self.client,
511                &symbols_ref,
512                Some(&["logoUrl", "companyLogoUrl"]), // only logo fields
513                true,
514                true, // include logo params
515            );
516            let (quote_result, logo_result) = tokio::join!(quote_future, logo_future);
517            (quote_result?, logo_result.ok())
518        } else {
519            let json = crate::endpoints::quotes::fetch_with_fields(
520                &self.client,
521                &symbols_ref,
522                None,
523                true,
524                false,
525            )
526            .await?;
527            (json, None)
528        };
529
530        // Build logo lookup map if we have logos
531        let logo_map: std::collections::HashMap<String, (Option<String>, Option<String>)> = logos
532            .and_then(|l| l.get("quoteResponse")?.get("result")?.as_array().cloned())
533            .map(|results| {
534                results
535                    .iter()
536                    .filter_map(|r| {
537                        let symbol = r.get("symbol")?.as_str()?.to_string();
538                        let logo_url = r
539                            .get("logoUrl")
540                            .and_then(|v| v.as_str())
541                            .map(|s| s.to_string());
542                        let company_logo_url = r
543                            .get("companyLogoUrl")
544                            .and_then(|v| v.as_str())
545                            .map(|s| s.to_string());
546                        Some((symbol, (logo_url, company_logo_url)))
547                    })
548                    .collect()
549            })
550            .unwrap_or_default();
551
552        // Parse response
553        let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
554
555        if let Some(quote_response) = json.get("quoteResponse") {
556            if let Some(results) = quote_response.get("result").and_then(|r| r.as_array()) {
557                // Parse all quotes first (no lock held)
558                let mut parsed_quotes: Vec<(String, Quote)> = Vec::new();
559
560                for result in results {
561                    if let Some(symbol) = result.get("symbol").and_then(|s| s.as_str()) {
562                        match Quote::from_batch_response(result) {
563                            Ok(mut quote) => {
564                                // Merge logo URLs if we have them
565                                if let Some((logo_url, company_logo_url)) = logo_map.get(symbol) {
566                                    if quote.logo_url.is_none() {
567                                        quote.logo_url = logo_url.clone();
568                                    }
569                                    if quote.company_logo_url.is_none() {
570                                        quote.company_logo_url = company_logo_url.clone();
571                                    }
572                                }
573                                parsed_quotes.push((symbol.to_string(), quote));
574                            }
575                            Err(e) => {
576                                response.errors.insert(symbol.to_string(), e.to_string());
577                            }
578                        }
579                    }
580                }
581
582                // Now acquire write lock briefly for batch cache insertion
583                if self.cache_ttl.is_some() {
584                    let mut cache = self.quote_cache.write().await;
585                    for (symbol, quote) in &parsed_quotes {
586                        self.cache_insert(&mut cache, symbol.as_str().into(), quote.clone());
587                    }
588                }
589
590                // Populate response (no lock needed)
591                for (symbol, quote) in parsed_quotes {
592                    response.quotes.insert(symbol, quote);
593                }
594            }
595
596            // Track missing symbols
597            for symbol in &self.symbols {
598                let symbol_str = &**symbol;
599                if !response.quotes.contains_key(symbol_str)
600                    && !response.errors.contains_key(symbol_str)
601                {
602                    response.errors.insert(
603                        symbol.to_string(),
604                        "Symbol not found in response".to_string(),
605                    );
606                }
607            }
608        }
609
610        Ok(response)
611    }
612
613    /// Get a specific quote by symbol (from cache or fetch all)
614    pub async fn quote(&self, symbol: &str) -> Result<Quote> {
615        {
616            let cache = self.quote_cache.read().await;
617            if let Some(entry) = cache.get(symbol)
618                && self.is_cache_fresh(Some(entry))
619            {
620                return Ok(entry.value.clone());
621            }
622        }
623
624        let response = self.quotes().await?;
625
626        response
627            .quotes
628            .get(symbol)
629            .cloned()
630            .ok_or_else(|| FinanceError::SymbolNotFound {
631                symbol: Some(symbol.to_string()),
632                context: response
633                    .errors
634                    .get(symbol)
635                    .cloned()
636                    .unwrap_or_else(|| "Symbol not found".to_string()),
637            })
638    }
639
640    /// Helper to get or create a fetch guard for a given key.
641    ///
642    /// Returns the guard from the map, never a locally-created copy that
643    /// could diverge under contention.
644    async fn get_fetch_guard<K: Clone + Eq + std::hash::Hash>(
645        guard_map: &FetchGuardMap<K>,
646        key: K,
647    ) -> FetchGuard {
648        {
649            let guards = guard_map.read().await;
650            if let Some(guard) = guards.get(&key) {
651                return Arc::clone(guard);
652            }
653        }
654
655        let mut guards = guard_map.write().await;
656        Arc::clone(
657            guards
658                .entry(key)
659                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))),
660        )
661    }
662
663    /// Batch fetch charts for all symbols concurrently
664    ///
665    /// Chart data cannot be batched in a single request, so this fetches
666    /// all charts concurrently using tokio for maximum performance.
667    pub async fn charts(
668        &self,
669        interval: Interval,
670        range: TimeRange,
671    ) -> Result<BatchChartsResponse> {
672        // Fast path: check if all symbols are cached
673        {
674            let cache = self.chart_cache.read().await;
675            if self.all_cached(
676                &cache,
677                self.symbols.iter().map(|s| (s.clone(), interval, range)),
678            ) {
679                let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
680                for symbol in &self.symbols {
681                    if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
682                        response
683                            .charts
684                            .insert(symbol.to_string(), entry.value.clone());
685                    }
686                }
687                return Ok(response);
688            }
689        }
690
691        // Slow path: acquire fetch guard to prevent duplicate concurrent requests
692        let fetch_guard = Self::get_fetch_guard(&self.charts_fetch, (interval, range)).await;
693        let _guard = fetch_guard.lock().await;
694
695        // Double-check: another task may have fetched while we waited
696        {
697            let cache = self.chart_cache.read().await;
698            if self.all_cached(
699                &cache,
700                self.symbols.iter().map(|s| (s.clone(), interval, range)),
701            ) {
702                let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
703                for symbol in &self.symbols {
704                    if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
705                        response
706                            .charts
707                            .insert(symbol.to_string(), entry.value.clone());
708                    }
709                }
710                return Ok(response);
711            }
712        }
713
714        // Fetch all charts concurrently (no lock held during HTTP I/O)
715        let futures: Vec<_> = self
716            .symbols
717            .iter()
718            .map(|symbol| {
719                let client = Arc::clone(&self.client);
720                let symbol = Arc::clone(symbol);
721                async move {
722                    let result = client.get_chart(&symbol, interval, range).await;
723                    (symbol, result)
724                }
725            })
726            .collect();
727
728        let results: Vec<_> = stream::iter(futures)
729            .buffer_unordered(self.max_concurrency)
730            .collect()
731            .await;
732
733        let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
734
735        // Parse all charts first (no locks held)
736        let mut parsed_charts: Vec<(Arc<str>, Chart)> = Vec::new();
737        let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
738
739        for (symbol, result) in results {
740            match result {
741                Ok(json) => match ChartResponse::from_json(json) {
742                    Ok(chart_response) => {
743                        if let Some(mut chart_results) = chart_response.chart.result {
744                            if let Some(chart_result) = chart_results.pop() {
745                                // Collect events for later caching
746                                if let Some(events) = chart_result.events.clone() {
747                                    parsed_events.push((Arc::clone(&symbol), events));
748                                }
749
750                                let chart = Chart {
751                                    symbol: symbol.to_string(),
752                                    meta: chart_result.meta.clone(),
753                                    candles: chart_result.to_candles(),
754                                    interval: Some(interval),
755                                    range: Some(range),
756                                };
757                                parsed_charts.push((symbol, chart));
758                            } else {
759                                response
760                                    .errors
761                                    .insert(symbol.to_string(), "Empty chart response".to_string());
762                            }
763                        } else {
764                            response.errors.insert(
765                                symbol.to_string(),
766                                "No chart data in response".to_string(),
767                            );
768                        }
769                    }
770                    Err(e) => {
771                        response.errors.insert(symbol.to_string(), e.to_string());
772                    }
773                },
774                Err(e) => {
775                    response.errors.insert(symbol.to_string(), e.to_string());
776                }
777            }
778        }
779
780        // Move into cache, then clone for response — avoids double-clone
781        if self.cache_ttl.is_some() {
782            let mut cache = self.chart_cache.write().await;
783
784            // Cache all charts (consuming parsed_charts) and collect keys
785            let cache_keys: Vec<_> = parsed_charts
786                .into_iter()
787                .map(|(symbol, chart)| {
788                    self.cache_insert(&mut cache, (symbol.clone(), interval, range), chart);
789                    symbol
790                })
791                .collect();
792
793            // Clone from cache into response (convert Arc<str> → String)
794            for symbol in cache_keys {
795                if let Some(cached) = cache.get(&(symbol.clone(), interval, range)) {
796                    response
797                        .charts
798                        .insert(symbol.to_string(), cached.value.clone());
799                }
800            }
801        } else {
802            // No caching: directly populate response (convert Arc<str> → String)
803            for (symbol, chart) in parsed_charts {
804                response.charts.insert(symbol.to_string(), chart);
805            }
806        }
807
808        // Always store events — they are derived data, not TTL-bounded cache
809        if !parsed_events.is_empty() {
810            let mut events_cache = self.events_cache.write().await;
811            for (symbol, events) in parsed_events {
812                events_cache
813                    .entry(symbol)
814                    .or_insert_with(|| CacheEntry::new(events));
815            }
816        }
817
818        Ok(response)
819    }
820
821    /// Get a specific chart by symbol
822    pub async fn chart(&self, symbol: &str, interval: Interval, range: TimeRange) -> Result<Chart> {
823        {
824            let cache = self.chart_cache.read().await;
825            let key: Arc<str> = symbol.into();
826            if let Some(entry) = cache.get(&(key, interval, range))
827                && self.is_cache_fresh(Some(entry))
828            {
829                return Ok(entry.value.clone());
830            }
831        }
832
833        let response = self.charts(interval, range).await?;
834
835        response
836            .charts
837            .get(symbol)
838            .cloned()
839            .ok_or_else(|| FinanceError::SymbolNotFound {
840                symbol: Some(symbol.to_string()),
841                context: response
842                    .errors
843                    .get(symbol)
844                    .cloned()
845                    .unwrap_or_else(|| "Symbol not found".to_string()),
846            })
847    }
848
849    /// Batch fetch chart data for a custom date range for all symbols concurrently.
850    ///
851    /// Unlike [`charts()`](Self::charts) which uses predefined time ranges,
852    /// this method accepts absolute start/end timestamps. Results are **not cached**
853    /// since custom ranges have unbounded key space.
854    ///
855    /// # Arguments
856    ///
857    /// * `interval` - Time interval between data points
858    /// * `start` - Start date as Unix timestamp (seconds since epoch)
859    /// * `end` - End date as Unix timestamp (seconds since epoch)
860    pub async fn charts_range(
861        &self,
862        interval: Interval,
863        start: i64,
864        end: i64,
865    ) -> Result<BatchChartsResponse> {
866        let futures: Vec<_> = self
867            .symbols
868            .iter()
869            .map(|symbol| {
870                let client = Arc::clone(&self.client);
871                let symbol = Arc::clone(symbol);
872                async move {
873                    let result = client.get_chart_range(&symbol, interval, start, end).await;
874                    (symbol, result)
875                }
876            })
877            .collect();
878
879        let results: Vec<_> = stream::iter(futures)
880            .buffer_unordered(self.max_concurrency)
881            .collect()
882            .await;
883
884        let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
885        let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
886
887        for (symbol, result) in results {
888            match result {
889                Ok(json) => match ChartResponse::from_json(json) {
890                    Ok(chart_response) => {
891                        if let Some(mut chart_results) = chart_response.chart.result {
892                            if let Some(chart_result) = chart_results.pop() {
893                                // Collect events for later caching
894                                if let Some(events) = chart_result.events.clone() {
895                                    parsed_events.push((Arc::clone(&symbol), events));
896                                }
897
898                                let chart = Chart {
899                                    symbol: symbol.to_string(),
900                                    meta: chart_result.meta.clone(),
901                                    candles: chart_result.to_candles(),
902                                    interval: Some(interval),
903                                    range: None,
904                                };
905                                response.charts.insert(symbol.to_string(), chart);
906                            } else {
907                                response
908                                    .errors
909                                    .insert(symbol.to_string(), "Empty chart response".to_string());
910                            }
911                        } else {
912                            response.errors.insert(
913                                symbol.to_string(),
914                                "No chart data in response".to_string(),
915                            );
916                        }
917                    }
918                    Err(e) => {
919                        response.errors.insert(symbol.to_string(), e.to_string());
920                    }
921                },
922                Err(e) => {
923                    response.errors.insert(symbol.to_string(), e.to_string());
924                }
925            }
926        }
927
928        // Always store events — they are derived data, not TTL-bounded cache
929        if !parsed_events.is_empty() {
930            let mut events_cache = self.events_cache.write().await;
931            for (symbol, events) in parsed_events {
932                events_cache
933                    .entry(symbol)
934                    .or_insert_with(|| CacheEntry::new(events));
935            }
936        }
937
938        Ok(response)
939    }
940
941    /// Ensures events are loaded for all symbols using chart requests.
942    ///
943    /// Fetches events concurrently for symbols that don't have cached events.
944    /// Uses `TimeRange::Max` to get full event history (Yahoo returns all
945    /// dividends/splits/capital gains regardless of chart range).
946    ///
947    /// Events are always stored regardless of `cache_ttl` because they are
948    /// derived data (not a TTL-bounded cache). When `cache_ttl` is `None`,
949    /// events persist for the lifetime of the `Tickers` instance.
950    async fn ensure_events_loaded(&self) -> Result<()> {
951        // Check which symbols need event data (existence check, not TTL-based)
952        let symbols_to_fetch: Vec<Arc<str>> = {
953            let cache = self.events_cache.read().await;
954            self.symbols
955                .iter()
956                .filter(|sym| !cache.contains_key(*sym))
957                .cloned()
958                .collect()
959        };
960
961        if symbols_to_fetch.is_empty() {
962            return Ok(());
963        }
964
965        // Fetch events concurrently for all symbols that need it
966        let futures: Vec<_> = symbols_to_fetch
967            .iter()
968            .map(|symbol| {
969                let client = Arc::clone(&self.client);
970                let symbol = Arc::clone(symbol);
971                async move {
972                    let result = crate::endpoints::chart::fetch(
973                        &client,
974                        &symbol,
975                        Interval::OneDay,
976                        TimeRange::Max,
977                    )
978                    .await;
979                    (symbol, result)
980                }
981            })
982            .collect();
983
984        let results: Vec<_> = stream::iter(futures)
985            .buffer_unordered(self.max_concurrency)
986            .collect()
987            .await;
988
989        // Parse and cache events
990        let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
991
992        for (symbol, result) in results {
993            if let Ok(json) = result
994                && let Ok(chart_response) = ChartResponse::from_json(json)
995                && let Some(mut chart_results) = chart_response.chart.result
996                && let Some(chart_result) = chart_results.pop()
997                && let Some(events) = chart_result.events
998            {
999                parsed_events.push((symbol, events));
1000            }
1001        }
1002
1003        // Always store events — they are derived data, not TTL-bounded cache
1004        if !parsed_events.is_empty() {
1005            let mut events_cache = self.events_cache.write().await;
1006            for (symbol, events) in parsed_events {
1007                events_cache.insert(symbol, CacheEntry::new(events));
1008            }
1009        }
1010
1011        Ok(())
1012    }
1013
1014    /// Batch fetch spark data for all symbols in a single request.
1015    ///
1016    /// Spark data is optimized for sparkline rendering, returning only close prices.
1017    /// Unlike `charts()`, this fetches all symbols in ONE API call, making it
1018    /// much more efficient for displaying price trends on dashboards or watchlists.
1019    ///
1020    /// # Arguments
1021    ///
1022    /// * `interval` - Time interval between data points (e.g., `Interval::FiveMinutes`)
1023    /// * `range` - Time range to fetch (e.g., `TimeRange::OneDay`)
1024    ///
1025    /// # Example
1026    ///
1027    /// ```no_run
1028    /// use finance_query::{Tickers, Interval, TimeRange};
1029    ///
1030    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1031    /// let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await?;
1032    /// let sparks = tickers.spark(Interval::FiveMinutes, TimeRange::OneDay).await?;
1033    ///
1034    /// for (symbol, spark) in &sparks.sparks {
1035    ///     if let Some(change) = spark.percent_change() {
1036    ///         println!("{}: {:.2}%", symbol, change);
1037    ///     }
1038    /// }
1039    /// # Ok(())
1040    /// # }
1041    /// ```
1042    pub async fn spark(&self, interval: Interval, range: TimeRange) -> Result<BatchSparksResponse> {
1043        // Fast path: check if all symbols are cached
1044        {
1045            let cache = self.spark_cache.read().await;
1046            if self.all_cached(
1047                &cache,
1048                self.symbols.iter().map(|s| (s.clone(), interval, range)),
1049            ) {
1050                let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1051                for symbol in &self.symbols {
1052                    if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
1053                        response
1054                            .sparks
1055                            .insert(symbol.to_string(), entry.value.clone());
1056                    }
1057                }
1058                return Ok(response);
1059            }
1060        }
1061
1062        // Slow path: acquire fetch guard
1063        let fetch_guard = Self::get_fetch_guard(&self.spark_fetch, (interval, range)).await;
1064        let _guard = fetch_guard.lock().await;
1065
1066        // Double-check after guard
1067        {
1068            let cache = self.spark_cache.read().await;
1069            if self.all_cached(
1070                &cache,
1071                self.symbols.iter().map(|s| (s.clone(), interval, range)),
1072            ) {
1073                let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1074                for symbol in &self.symbols {
1075                    if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
1076                        response
1077                            .sparks
1078                            .insert(symbol.to_string(), entry.value.clone());
1079                    }
1080                }
1081                return Ok(response);
1082            }
1083        }
1084
1085        // Fetch (single batch API call, no lock held during I/O)
1086        let symbols_ref: Vec<&str> = self.symbols.iter().map(|s| &**s).collect();
1087        let json =
1088            crate::endpoints::spark::fetch(&self.client, &symbols_ref, interval, range).await?;
1089
1090        let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1091
1092        match SparkResponse::from_json(json) {
1093            Ok(spark_response) => {
1094                let mut parsed_sparks: Vec<(Arc<str>, Spark)> = Vec::new();
1095
1096                if let Some(results) = spark_response.spark.result {
1097                    for result in &results {
1098                        if let Some(spark) = Spark::from_response(
1099                            result,
1100                            Some(interval.as_str().to_string()),
1101                            Some(range.as_str().to_string()),
1102                        ) {
1103                            let sym: Arc<str> = result.symbol.as_str().into();
1104                            parsed_sparks.push((sym, spark));
1105                        } else {
1106                            response.errors.insert(
1107                                result.symbol.to_string(),
1108                                "Failed to parse spark data".to_string(),
1109                            );
1110                        }
1111                    }
1112                }
1113
1114                // Cache all parsed sparks
1115                if self.cache_ttl.is_some() {
1116                    let mut cache = self.spark_cache.write().await;
1117                    for (symbol, spark) in &parsed_sparks {
1118                        self.cache_insert(
1119                            &mut cache,
1120                            (symbol.clone(), interval, range),
1121                            spark.clone(),
1122                        );
1123                    }
1124                }
1125
1126                // Build response
1127                for (symbol, spark) in parsed_sparks {
1128                    response.sparks.insert(symbol.to_string(), spark);
1129                }
1130
1131                // Track missing symbols
1132                for symbol in &self.symbols {
1133                    let symbol_str = &**symbol;
1134                    if !response.sparks.contains_key(symbol_str)
1135                        && !response.errors.contains_key(symbol_str)
1136                    {
1137                        response.errors.insert(
1138                            symbol.to_string(),
1139                            "Symbol not found in response".to_string(),
1140                        );
1141                    }
1142                }
1143            }
1144            Err(e) => {
1145                for symbol in &self.symbols {
1146                    response.errors.insert(symbol.to_string(), e.to_string());
1147                }
1148            }
1149        }
1150
1151        Ok(response)
1152    }
1153
1154    /// Batch fetch dividends for all symbols
1155    ///
1156    /// Returns dividend history for all symbols, filtered by the specified time range.
1157    /// Dividends are cached per symbol after the first chart fetch.
1158    ///
1159    /// # Arguments
1160    ///
1161    /// * `range` - Time range to filter dividends
1162    ///
1163    /// # Example
1164    ///
1165    /// ```no_run
1166    /// use finance_query::{Tickers, TimeRange};
1167    ///
1168    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1169    /// let tickers = Tickers::new(["AAPL", "MSFT"]).await?;
1170    /// let dividends = tickers.dividends(TimeRange::OneYear).await?;
1171    ///
1172    /// for (symbol, divs) in &dividends.dividends {
1173    ///     println!("{}: {} dividends", symbol, divs.len());
1174    /// }
1175    /// # Ok(())
1176    /// # }
1177    /// ```
1178    pub async fn dividends(&self, range: TimeRange) -> Result<BatchDividendsResponse> {
1179        let mut response = BatchDividendsResponse::with_capacity(self.symbols.len());
1180
1181        // Fetch events efficiently (1-day chart request per symbol)
1182        self.ensure_events_loaded().await?;
1183
1184        let events_cache = self.events_cache.read().await;
1185
1186        for symbol in &self.symbols {
1187            if let Some(entry) = events_cache.get(symbol) {
1188                let all_dividends = entry.value.to_dividends();
1189                let filtered = filter_by_range(all_dividends, range);
1190                response.dividends.insert(symbol.to_string(), filtered);
1191            } else {
1192                response
1193                    .errors
1194                    .insert(symbol.to_string(), "No events data available".to_string());
1195            }
1196        }
1197
1198        Ok(response)
1199    }
1200
1201    /// Batch fetch stock splits for all symbols
1202    ///
1203    /// Returns stock split history for all symbols, filtered by the specified time range.
1204    /// Splits are cached per symbol after the first chart fetch.
1205    ///
1206    /// # Arguments
1207    ///
1208    /// * `range` - Time range to filter splits
1209    ///
1210    /// # Example
1211    ///
1212    /// ```no_run
1213    /// use finance_query::{Tickers, TimeRange};
1214    ///
1215    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1216    /// let tickers = Tickers::new(["NVDA", "TSLA"]).await?;
1217    /// let splits = tickers.splits(TimeRange::FiveYears).await?;
1218    ///
1219    /// for (symbol, sp) in &splits.splits {
1220    ///     for split in sp {
1221    ///         println!("{}: {}", symbol, split.ratio);
1222    ///     }
1223    /// }
1224    /// # Ok(())
1225    /// # }
1226    /// ```
1227    pub async fn splits(&self, range: TimeRange) -> Result<BatchSplitsResponse> {
1228        let mut response = BatchSplitsResponse::with_capacity(self.symbols.len());
1229
1230        // Fetch events efficiently (1-day chart request per symbol)
1231        self.ensure_events_loaded().await?;
1232
1233        let events_cache = self.events_cache.read().await;
1234
1235        for symbol in &self.symbols {
1236            if let Some(entry) = events_cache.get(symbol) {
1237                let all_splits = entry.value.to_splits();
1238                let filtered = filter_by_range(all_splits, range);
1239                response.splits.insert(symbol.to_string(), filtered);
1240            } else {
1241                response
1242                    .errors
1243                    .insert(symbol.to_string(), "No events data available".to_string());
1244            }
1245        }
1246
1247        Ok(response)
1248    }
1249
1250    /// Batch fetch capital gains for all symbols
1251    ///
1252    /// Returns capital gain distribution history for all symbols, filtered by the
1253    /// specified time range. This is primarily relevant for mutual funds and ETFs.
1254    /// Capital gains are cached per symbol after the first chart fetch.
1255    ///
1256    /// # Arguments
1257    ///
1258    /// * `range` - Time range to filter capital gains
1259    ///
1260    /// # Example
1261    ///
1262    /// ```no_run
1263    /// use finance_query::{Tickers, TimeRange};
1264    ///
1265    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1266    /// let tickers = Tickers::new(["VFIAX", "VTI"]).await?;
1267    /// let gains = tickers.capital_gains(TimeRange::TwoYears).await?;
1268    ///
1269    /// for (symbol, cg) in &gains.capital_gains {
1270    ///     println!("{}: {} distributions", symbol, cg.len());
1271    /// }
1272    /// # Ok(())
1273    /// # }
1274    /// ```
1275    pub async fn capital_gains(&self, range: TimeRange) -> Result<BatchCapitalGainsResponse> {
1276        let mut response = BatchCapitalGainsResponse::with_capacity(self.symbols.len());
1277
1278        // Fetch events efficiently (1-day chart request per symbol)
1279        self.ensure_events_loaded().await?;
1280
1281        let events_cache = self.events_cache.read().await;
1282
1283        for symbol in &self.symbols {
1284            if let Some(entry) = events_cache.get(symbol) {
1285                let all_gains = entry.value.to_capital_gains();
1286                let filtered = filter_by_range(all_gains, range);
1287                response.capital_gains.insert(symbol.to_string(), filtered);
1288            } else {
1289                response
1290                    .errors
1291                    .insert(symbol.to_string(), "No events data available".to_string());
1292            }
1293        }
1294
1295        Ok(response)
1296    }
1297
1298    /// Batch fetch financial statements for all symbols
1299    ///
1300    /// Fetches the specified financial statement type for all symbols concurrently.
1301    /// Financial statements are cached per (symbol, statement_type, frequency) tuple.
1302    ///
1303    /// # Arguments
1304    ///
1305    /// * `statement_type` - Type of statement (Income, Balance, CashFlow)
1306    /// * `frequency` - Annual or Quarterly
1307    ///
1308    /// # Example
1309    ///
1310    /// ```no_run
1311    /// use finance_query::{Tickers, StatementType, Frequency};
1312    ///
1313    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1314    /// let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await?;
1315    /// let financials = tickers.financials(StatementType::Income, Frequency::Annual).await?;
1316    ///
1317    /// for (symbol, stmt) in &financials.financials {
1318    ///     if let Some(revenue) = stmt.statement.get("TotalRevenue") {
1319    ///         println!("{}: {:?}", symbol, revenue);
1320    ///     }
1321    /// }
1322    /// # Ok(())
1323    /// # }
1324    /// ```
1325    pub async fn financials(
1326        &self,
1327        statement_type: StatementType,
1328        frequency: Frequency,
1329    ) -> Result<BatchFinancialsResponse> {
1330        batch_fetch_cached!(self;
1331            cache: financials_cache,
1332            guard: map(financials_fetch, (statement_type, frequency)),
1333            key: |s| (s.clone(), statement_type, frequency),
1334            response: BatchFinancialsResponse.financials,
1335            fetch: |client, symbol| client.get_financials(&symbol, statement_type, frequency).await,
1336        )
1337    }
1338
1339    /// Batch fetch news articles for all symbols
1340    ///
1341    /// Fetches recent news articles for all symbols concurrently using scrapers.
1342    /// News articles are cached per symbol.
1343    ///
1344    /// # Example
1345    ///
1346    /// ```no_run
1347    /// use finance_query::Tickers;
1348    ///
1349    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1350    /// let tickers = Tickers::new(["AAPL", "MSFT"]).await?;
1351    /// let news = tickers.news().await?;
1352    ///
1353    /// for (symbol, articles) in &news.news {
1354    ///     println!("{}: {} articles", symbol, articles.len());
1355    ///     for article in articles.iter().take(3) {
1356    ///         println!("  - {}", article.title);
1357    ///     }
1358    /// }
1359    /// # Ok(())
1360    /// # }
1361    /// ```
1362    pub async fn news(&self) -> Result<BatchNewsResponse> {
1363        batch_fetch_cached!(self;
1364            cache: news_cache,
1365            guard: simple(news_fetch),
1366            key: |s| s.clone(),
1367            response: BatchNewsResponse.news,
1368            fetch: |_client, symbol| crate::scrapers::stockanalysis::scrape_symbol_news(&symbol).await,
1369        )
1370    }
1371
1372    /// Batch fetch recommendations for all symbols
1373    ///
1374    /// Fetches analyst recommendations and similar stocks for all symbols concurrently.
1375    /// Recommendations are cached per (symbol, limit) tuple — different limits
1376    /// produce different API responses and are cached independently.
1377    ///
1378    /// # Arguments
1379    ///
1380    /// * `limit` - Maximum number of similar stocks to return per symbol
1381    ///
1382    /// # Example
1383    ///
1384    /// ```no_run
1385    /// use finance_query::Tickers;
1386    ///
1387    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1388    /// let tickers = Tickers::new(["AAPL", "MSFT"]).await?;
1389    /// let recommendations = tickers.recommendations(10).await?;
1390    ///
1391    /// for (symbol, rec) in &recommendations.recommendations {
1392    ///     println!("{}: {} recommendations", symbol, rec.count());
1393    ///     for similar in &rec.recommendations {
1394    ///         println!("  - {}: score {}", similar.symbol, similar.score);
1395    ///     }
1396    /// }
1397    /// # Ok(())
1398    /// # }
1399    /// ```
1400    pub async fn recommendations(&self, limit: u32) -> Result<BatchRecommendationsResponse> {
1401        batch_fetch_cached!(self;
1402            cache: recommendations_cache,
1403            guard: map(recommendations_fetch, limit),
1404            key: |s| (s.clone(), limit),
1405            response: BatchRecommendationsResponse.recommendations,
1406            fetch: |client, symbol| {
1407                let json = client.get_recommendations(&symbol, limit).await?;
1408                let rec_response =
1409                    crate::models::recommendation::response::RecommendationResponse::from_json(json)?;
1410                Ok(Recommendation {
1411                    symbol: symbol.to_string(),
1412                    recommendations: rec_response
1413                        .finance
1414                        .result
1415                        .iter()
1416                        .flat_map(|r| &r.recommended_symbols)
1417                        .cloned()
1418                        .collect(),
1419                })
1420            },
1421        )
1422    }
1423
1424    /// Batch fetch options chains for all symbols
1425    ///
1426    /// Fetches options chains for the specified expiration date for all symbols concurrently.
1427    /// Options are cached per (symbol, date) tuple.
1428    ///
1429    /// # Arguments
1430    ///
1431    /// * `date` - Optional expiration date (Unix timestamp). If None, fetches nearest expiration.
1432    ///
1433    /// # Example
1434    ///
1435    /// ```no_run
1436    /// use finance_query::Tickers;
1437    ///
1438    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1439    /// let tickers = Tickers::new(["AAPL", "MSFT"]).await?;
1440    /// let options = tickers.options(None).await?;
1441    ///
1442    /// for (symbol, opts) in &options.options {
1443    ///     println!("{}: {} expirations", symbol, opts.expiration_dates().len());
1444    /// }
1445    /// # Ok(())
1446    /// # }
1447    /// ```
1448    pub async fn options(&self, date: Option<i64>) -> Result<BatchOptionsResponse> {
1449        batch_fetch_cached!(self;
1450            cache: options_cache,
1451            guard: map(options_fetch, date),
1452            key: |s| (s.clone(), date),
1453            response: BatchOptionsResponse.options,
1454            fetch: |client, symbol| {
1455                let json = client.get_options(&symbol, date).await?;
1456                Ok(serde_json::from_value::<Options>(json)?)
1457            },
1458        )
1459    }
1460
1461    /// Batch calculate all technical indicators for all symbols
1462    ///
1463    /// Calculates complete indicator summaries for all symbols from their chart data.
1464    /// Indicators are cached per (symbol, interval, range) tuple.
1465    ///
1466    /// # Arguments
1467    ///
1468    /// * `interval` - The time interval for each candle
1469    /// * `range` - The time range to fetch data for
1470    ///
1471    /// # Example
1472    ///
1473    /// ```no_run
1474    /// use finance_query::{Tickers, Interval, TimeRange};
1475    ///
1476    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1477    /// let tickers = Tickers::new(["AAPL", "MSFT"]).await?;
1478    /// let indicators = tickers.indicators(Interval::OneDay, TimeRange::ThreeMonths).await?;
1479    ///
1480    /// for (symbol, ind) in &indicators.indicators {
1481    ///     println!("{}: RSI(14) = {:?}, SMA(20) = {:?}", symbol, ind.rsi_14, ind.sma_20);
1482    /// }
1483    /// # Ok(())
1484    /// # }
1485    /// ```
1486    #[cfg(feature = "indicators")]
1487    pub async fn indicators(
1488        &self,
1489        interval: Interval,
1490        range: TimeRange,
1491    ) -> Result<BatchIndicatorsResponse> {
1492        let cache_key_for = |symbol: &Arc<str>| (symbol.clone(), interval, range);
1493
1494        // Fast path: check if all symbols are cached
1495        {
1496            let cache = self.indicators_cache.read().await;
1497            if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
1498                let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1499                for symbol in &self.symbols {
1500                    if let Some(entry) = cache.get(&cache_key_for(symbol)) {
1501                        response
1502                            .indicators
1503                            .insert(symbol.to_string(), entry.value.clone());
1504                    }
1505                }
1506                return Ok(response);
1507            }
1508        }
1509
1510        // Slow path: acquire fetch guard to prevent duplicate concurrent calculations
1511        let fetch_guard = Self::get_fetch_guard(&self.indicators_fetch, (interval, range)).await;
1512        let _guard = fetch_guard.lock().await;
1513
1514        // Double-check: another task may have computed while we waited
1515        {
1516            let cache = self.indicators_cache.read().await;
1517            if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
1518                let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1519                for symbol in &self.symbols {
1520                    if let Some(entry) = cache.get(&cache_key_for(symbol)) {
1521                        response
1522                            .indicators
1523                            .insert(symbol.to_string(), entry.value.clone());
1524                    }
1525                }
1526                return Ok(response);
1527            }
1528        }
1529
1530        // Fetch charts first (which may already be cached, has its own deduplication)
1531        let charts_response = self.charts(interval, range).await?;
1532
1533        let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1534
1535        // Calculate all indicators first (no lock held)
1536        let mut calculated_indicators: Vec<(String, crate::indicators::IndicatorsSummary)> =
1537            Vec::new();
1538
1539        for (symbol, chart) in &charts_response.charts {
1540            let indicators = crate::indicators::summary::calculate_indicators(&chart.candles);
1541            calculated_indicators.push((symbol.to_string(), indicators));
1542        }
1543
1544        // Now acquire write lock briefly for batch cache insertion
1545        if self.cache_ttl.is_some() {
1546            let mut cache = self.indicators_cache.write().await;
1547            for (symbol, indicators) in &calculated_indicators {
1548                let key: Arc<str> = symbol.as_str().into();
1549                self.cache_insert(&mut cache, cache_key_for(&key), indicators.clone());
1550            }
1551        }
1552
1553        // Populate response (no lock needed)
1554        for (symbol, indicators) in calculated_indicators {
1555            response.indicators.insert(symbol, indicators);
1556        }
1557
1558        // Add errors from chart fetch
1559        for (symbol, error) in &charts_response.errors {
1560            response.errors.insert(symbol.to_string(), error.clone());
1561        }
1562
1563        Ok(response)
1564    }
1565
1566    // ========================================================================
1567    // Dynamic Symbol Management
1568    // ========================================================================
1569
1570    /// Add symbols to the watch list
1571    ///
1572    /// Adds new symbols to track without affecting existing cached data.
1573    ///
1574    /// # Example
1575    ///
1576    /// ```no_run
1577    /// use finance_query::Tickers;
1578    ///
1579    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1580    /// let mut tickers = Tickers::new(["AAPL"]).await?;
1581    /// tickers.add_symbols(&["MSFT", "GOOGL"]);
1582    /// assert_eq!(tickers.len(), 3);
1583    /// # Ok(())
1584    /// # }
1585    /// ```
1586    pub fn add_symbols(&mut self, symbols: &[impl AsRef<str>]) {
1587        // Use HashSet for O(n+m) deduplication instead of O(n*m) linear search
1588        use std::collections::HashSet;
1589
1590        let existing: HashSet<&str> = self.symbols.iter().map(|s| &**s).collect();
1591        let to_add: Vec<Arc<str>> = symbols
1592            .iter()
1593            .map(|s| s.as_ref())
1594            .filter(|s| !existing.contains(s))
1595            .map(|s| s.into())
1596            .collect();
1597
1598        self.symbols.extend(to_add);
1599    }
1600
1601    /// Remove symbols from the watch list
1602    ///
1603    /// Removes symbols and clears their cached data to free memory.
1604    ///
1605    /// # Example
1606    ///
1607    /// ```no_run
1608    /// use finance_query::Tickers;
1609    ///
1610    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1611    /// let mut tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await?;
1612    /// tickers.remove_symbols(&["MSFT"]);
1613    /// assert_eq!(tickers.len(), 2);
1614    /// # Ok(())
1615    /// # }
1616    /// ```
1617    pub async fn remove_symbols(&mut self, symbols: &[impl AsRef<str>]) {
1618        use std::collections::HashSet;
1619        let to_remove: HashSet<&str> = symbols.iter().map(|s| s.as_ref()).collect();
1620
1621        // Remove from symbol list — O(1) lookup per element
1622        self.symbols.retain(|s| !to_remove.contains(&**s));
1623
1624        // Acquire all independent write locks in parallel
1625        let (
1626            mut quote_cache,
1627            mut chart_cache,
1628            mut events_cache,
1629            mut financials_cache,
1630            mut news_cache,
1631            mut recommendations_cache,
1632            mut options_cache,
1633            mut spark_cache,
1634        ) = tokio::join!(
1635            self.quote_cache.write(),
1636            self.chart_cache.write(),
1637            self.events_cache.write(),
1638            self.financials_cache.write(),
1639            self.news_cache.write(),
1640            self.recommendations_cache.write(),
1641            self.options_cache.write(),
1642            self.spark_cache.write(),
1643        );
1644
1645        // Simple key caches — O(1) per removal
1646        for symbol in &to_remove {
1647            let key: Arc<str> = (*symbol).into();
1648            quote_cache.remove(&key);
1649            events_cache.remove(&key);
1650            news_cache.remove(&key);
1651        }
1652
1653        // Composite key caches — O(n) retain but O(1) contains check
1654        chart_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1655        financials_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1656        recommendations_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
1657        options_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
1658        spark_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1659
1660        // Drop all guards before cfg-gated lock
1661        drop((
1662            quote_cache,
1663            chart_cache,
1664            events_cache,
1665            financials_cache,
1666            news_cache,
1667            recommendations_cache,
1668            options_cache,
1669            spark_cache,
1670        ));
1671
1672        #[cfg(feature = "indicators")]
1673        self.indicators_cache
1674            .write()
1675            .await
1676            .retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1677    }
1678
1679    /// Clear all cached data and fetch guards, forcing fresh fetches on next access.
1680    ///
1681    /// Use this when you need up-to-date data from a long-lived `Tickers` instance.
1682    /// Also clears fetch guard maps to prevent unbounded growth.
1683    pub async fn clear_cache(&self) {
1684        tokio::join!(
1685            // Data caches
1686            async { self.quote_cache.write().await.clear() },
1687            async { self.chart_cache.write().await.clear() },
1688            async { self.events_cache.write().await.clear() },
1689            async { self.financials_cache.write().await.clear() },
1690            async { self.news_cache.write().await.clear() },
1691            async { self.recommendations_cache.write().await.clear() },
1692            async { self.options_cache.write().await.clear() },
1693            async { self.spark_cache.write().await.clear() },
1694            async {
1695                #[cfg(feature = "indicators")]
1696                self.indicators_cache.write().await.clear();
1697            },
1698            // Fetch guard maps (prevent unbounded growth)
1699            async { self.charts_fetch.write().await.clear() },
1700            async { self.financials_fetch.write().await.clear() },
1701            async { self.recommendations_fetch.write().await.clear() },
1702            async { self.options_fetch.write().await.clear() },
1703            async { self.spark_fetch.write().await.clear() },
1704            async {
1705                #[cfg(feature = "indicators")]
1706                self.indicators_fetch.write().await.clear();
1707            },
1708        );
1709    }
1710
1711    /// Clear only the cached quote data.
1712    ///
1713    /// The next call to `quotes()` or `quote()` will re-fetch from the API.
1714    pub async fn clear_quote_cache(&self) {
1715        self.quote_cache.write().await.clear();
1716    }
1717
1718    /// Clear only the cached chart, spark, and events data.
1719    ///
1720    /// The next call to `charts()`, `spark()`, `dividends()`, `splits()`,
1721    /// or `capital_gains()` will re-fetch from the API.
1722    pub async fn clear_chart_cache(&self) {
1723        tokio::join!(
1724            async { self.chart_cache.write().await.clear() },
1725            async { self.events_cache.write().await.clear() },
1726            async { self.spark_cache.write().await.clear() },
1727            async {
1728                #[cfg(feature = "indicators")]
1729                self.indicators_cache.write().await.clear();
1730            },
1731        );
1732    }
1733}
1734
1735#[cfg(test)]
1736mod tests {
1737    use super::*;
1738
1739    #[tokio::test]
1740    #[ignore] // Requires network access
1741    async fn test_tickers_quotes() {
1742        let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1743        let result = tickers.quotes().await.unwrap();
1744
1745        assert!(result.success_count() > 0);
1746    }
1747
1748    #[tokio::test]
1749    #[ignore] // Requires network access
1750    async fn test_tickers_charts() {
1751        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1752        let result = tickers
1753            .charts(Interval::OneDay, TimeRange::FiveDays)
1754            .await
1755            .unwrap();
1756
1757        assert!(result.success_count() > 0);
1758    }
1759
1760    #[tokio::test]
1761    #[ignore = "requires network access"]
1762    async fn test_tickers_spark() {
1763        let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1764        let result = tickers
1765            .spark(Interval::FiveMinutes, TimeRange::OneDay)
1766            .await
1767            .unwrap();
1768
1769        assert!(result.success_count() > 0);
1770
1771        // Verify spark data structure
1772        if let Some(spark) = result.sparks.get("AAPL") {
1773            assert!(!spark.closes.is_empty());
1774            assert_eq!(spark.symbol, "AAPL");
1775            // Verify helper methods work
1776            assert!(spark.percent_change().is_some());
1777        }
1778    }
1779
1780    #[tokio::test]
1781    #[ignore = "requires network access"]
1782    async fn test_tickers_dividends() {
1783        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1784        let result = tickers.dividends(TimeRange::OneYear).await.unwrap();
1785
1786        assert!(result.success_count() > 0);
1787
1788        // Verify dividend data structure
1789        if let Some(dividends) = result.dividends.get("AAPL")
1790            && !dividends.is_empty()
1791        {
1792            let div = &dividends[0];
1793            assert!(div.timestamp > 0);
1794            assert!(div.amount > 0.0);
1795        }
1796    }
1797
1798    #[tokio::test]
1799    #[ignore = "requires network access"]
1800    async fn test_tickers_splits() {
1801        let tickers = Tickers::new(["NVDA", "TSLA"]).await.unwrap();
1802        let result = tickers.splits(TimeRange::FiveYears).await.unwrap();
1803
1804        // Note: Not all symbols have splits, so we just check for successful response
1805        assert!(result.success_count() > 0);
1806
1807        // If there are splits, verify structure
1808        for splits in result.splits.values() {
1809            for split in splits {
1810                assert!(split.timestamp > 0);
1811                assert!(split.numerator > 0.0);
1812                assert!(split.denominator > 0.0);
1813                assert!(!split.ratio.is_empty());
1814            }
1815        }
1816    }
1817
1818    #[tokio::test]
1819    #[ignore = "requires network access"]
1820    async fn test_tickers_capital_gains() {
1821        let tickers = Tickers::new(["VFIAX", "VTI"]).await.unwrap();
1822        let result = tickers.capital_gains(TimeRange::TwoYears).await.unwrap();
1823
1824        // Note: Not all symbols have capital gains distributions
1825        assert!(result.success_count() > 0);
1826
1827        // If there are capital gains, verify structure
1828        for gains in result.capital_gains.values() {
1829            for gain in gains {
1830                assert!(gain.timestamp > 0);
1831                assert!(gain.amount >= 0.0);
1832            }
1833        }
1834    }
1835
1836    #[tokio::test]
1837    #[ignore = "requires network access"]
1838    async fn test_tickers_financials() {
1839        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1840        let result = tickers
1841            .financials(StatementType::Income, Frequency::Annual)
1842            .await
1843            .unwrap();
1844
1845        assert!(result.success_count() > 0);
1846
1847        // Verify financial statement structure
1848        for (symbol, stmt) in &result.financials {
1849            assert_eq!(stmt.symbol, *symbol);
1850            assert_eq!(stmt.statement_type, "income");
1851            assert_eq!(stmt.frequency, "annual");
1852            assert!(!stmt.statement.is_empty());
1853
1854            // Common income statement fields
1855            if let Some(revenue) = stmt.statement.get("TotalRevenue") {
1856                assert!(!revenue.is_empty());
1857            }
1858        }
1859    }
1860
1861    #[tokio::test]
1862    #[ignore = "requires network access"]
1863    async fn test_tickers_news() {
1864        let tickers = Tickers::new(["AAPL", "TSLA"]).await.unwrap();
1865        let result = tickers.news().await.unwrap();
1866
1867        assert!(result.success_count() > 0);
1868
1869        // Verify news structure
1870        for articles in result.news.values() {
1871            if !articles.is_empty() {
1872                let article = &articles[0];
1873                assert!(!article.title.is_empty());
1874                assert!(!article.link.is_empty());
1875                assert!(!article.source.is_empty());
1876            }
1877        }
1878    }
1879
1880    #[tokio::test]
1881    #[ignore = "requires network access"]
1882    async fn test_tickers_recommendations() {
1883        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1884        let result = tickers.recommendations(5).await.unwrap();
1885
1886        assert!(result.success_count() > 0);
1887
1888        // Verify recommendations structure
1889        for (symbol, rec) in &result.recommendations {
1890            assert_eq!(rec.symbol, *symbol);
1891            assert!(rec.count() > 0);
1892            for similar in &rec.recommendations {
1893                assert!(!similar.symbol.is_empty());
1894            }
1895        }
1896    }
1897
1898    #[tokio::test]
1899    #[ignore = "requires network access"]
1900    async fn test_tickers_options() {
1901        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1902        let result = tickers.options(None).await.unwrap();
1903
1904        assert!(result.success_count() > 0);
1905
1906        // Verify options structure
1907        for opts in result.options.values() {
1908            assert!(!opts.expiration_dates().is_empty());
1909        }
1910    }
1911
1912    #[tokio::test]
1913    #[ignore = "requires network access"]
1914    #[cfg(feature = "indicators")]
1915    async fn test_tickers_indicators() {
1916        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1917        let result = tickers
1918            .indicators(Interval::OneDay, TimeRange::ThreeMonths)
1919            .await
1920            .unwrap();
1921
1922        assert!(result.success_count() > 0);
1923
1924        // Verify indicators structure
1925        for ind in result.indicators.values() {
1926            // Check that at least some indicators are present
1927            assert!(ind.rsi_14.is_some() || ind.sma_20.is_some());
1928        }
1929    }
1930
1931    #[tokio::test]
1932    async fn test_tickers_add_symbols() {
1933        let mut tickers = Tickers::new(["AAPL"]).await.unwrap();
1934        assert_eq!(tickers.len(), 1);
1935        assert_eq!(tickers.symbols(), &["AAPL"]);
1936
1937        tickers.add_symbols(&["MSFT", "GOOGL"]);
1938        assert_eq!(tickers.len(), 3);
1939        assert!(tickers.symbols().contains(&"AAPL"));
1940        assert!(tickers.symbols().contains(&"MSFT"));
1941        assert!(tickers.symbols().contains(&"GOOGL"));
1942
1943        // Adding duplicate shouldn't increase count
1944        tickers.add_symbols(&["AAPL"]);
1945        assert_eq!(tickers.len(), 3);
1946    }
1947
1948    #[tokio::test]
1949    #[ignore = "requires network access"]
1950    async fn test_tickers_remove_symbols() {
1951        let mut tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1952        assert_eq!(tickers.len(), 3);
1953
1954        // Fetch some data to populate caches
1955        let _ = tickers.quotes().await;
1956
1957        // Remove one symbol
1958        tickers.remove_symbols(&["MSFT"]).await;
1959        assert_eq!(tickers.len(), 2);
1960        assert!(tickers.symbols().contains(&"AAPL"));
1961        assert!(!tickers.symbols().contains(&"MSFT"));
1962        assert!(tickers.symbols().contains(&"GOOGL"));
1963
1964        // Verify cache was cleared
1965        let quotes = tickers.quotes().await.unwrap();
1966        assert!(!quotes.quotes.contains_key("MSFT"));
1967        assert_eq!(quotes.quotes.len(), 2);
1968    }
1969}