Skip to main content

finance_query/tickers/
core.rs

1//! Tickers implementation for batch operations on multiple symbols.
2//!
3//! Optimizes data fetching by using batch endpoints and concurrent requests.
4
5use super::macros::{batch_fetch_cached, define_batch_response};
6use crate::client::{ClientConfig, YahooClient};
7use crate::constants::{Frequency, Interval, StatementType, TimeRange};
8use crate::error::{FinanceError, Result};
9use crate::models::chart::events::ChartEvents;
10use crate::models::chart::response::ChartResponse;
11use crate::models::chart::{CapitalGain, Chart, Dividend, Split};
12use crate::models::financials::FinancialStatement;
13use crate::models::news::News;
14use crate::models::options::Options;
15use crate::models::quote::Quote;
16use crate::models::recommendation::Recommendation;
17use crate::models::spark::Spark;
18use crate::models::spark::response::SparkResponse;
19use crate::utils::{CacheEntry, EVICTION_THRESHOLD, filter_by_range};
20use futures::stream::{self, StreamExt};
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::time::Duration;
24use tokio::sync::RwLock;
25
26// Type aliases — MapCache wraps values in CacheEntry for TTL support.
27type MapCache<K, V> = Arc<RwLock<HashMap<K, CacheEntry<V>>>>;
28type ChartCacheKey = (Arc<str>, Interval, TimeRange);
29type QuoteCache = MapCache<Arc<str>, Quote>;
30type ChartCache = MapCache<ChartCacheKey, Chart>;
31type EventsCache = MapCache<Arc<str>, ChartEvents>;
32type FinancialsCache = MapCache<(Arc<str>, StatementType, Frequency), FinancialStatement>;
33type NewsCache = MapCache<Arc<str>, Vec<News>>;
34type RecommendationsCache = MapCache<(Arc<str>, u32), Recommendation>;
35type OptionsCache = MapCache<(Arc<str>, Option<i64>), Options>;
36type SparkCacheKey = (Arc<str>, Interval, TimeRange);
37type SparkCache = MapCache<SparkCacheKey, Spark>;
38#[cfg(feature = "indicators")]
39type IndicatorsCache =
40    MapCache<(Arc<str>, Interval, TimeRange), crate::indicators::IndicatorsSummary>;
41
42// Fetch guards for request deduplication — prevent concurrent duplicate fetches
43type FetchGuard = Arc<tokio::sync::Mutex<()>>;
44type FetchGuardMap<K> = Arc<RwLock<HashMap<K, FetchGuard>>>;
45
46// Generate all batch response types
47define_batch_response! {
48    /// Response containing quotes for multiple symbols.
49    BatchQuotesResponse => quotes: Quote
50}
51
52define_batch_response! {
53    /// Response containing charts for multiple symbols.
54    BatchChartsResponse => charts: Chart
55}
56
57define_batch_response! {
58    /// Response containing spark data for multiple symbols.
59    ///
60    /// Spark data is optimized for sparkline rendering with only close prices.
61    /// Unlike charts, spark data is fetched in a single batch request.
62    BatchSparksResponse => sparks: Spark
63}
64
65define_batch_response! {
66    /// Response containing dividends for multiple symbols.
67    BatchDividendsResponse => dividends: Vec<Dividend>
68}
69
70define_batch_response! {
71    /// Response containing splits for multiple symbols.
72    BatchSplitsResponse => splits: Vec<Split>
73}
74
75define_batch_response! {
76    /// Response containing capital gains for multiple symbols.
77    BatchCapitalGainsResponse => capital_gains: Vec<CapitalGain>
78}
79
80define_batch_response! {
81    /// Response containing financial statements for multiple symbols.
82    BatchFinancialsResponse => financials: FinancialStatement
83}
84
85define_batch_response! {
86    /// Response containing news articles for multiple symbols.
87    BatchNewsResponse => news: Vec<News>
88}
89
90define_batch_response! {
91    /// Response containing recommendations for multiple symbols.
92    BatchRecommendationsResponse => recommendations: Recommendation
93}
94
95define_batch_response! {
96    /// Response containing options chains for multiple symbols.
97    BatchOptionsResponse => options: Options
98}
99
100#[cfg(feature = "indicators")]
101define_batch_response! {
102    /// Response containing technical indicators for multiple symbols.
103    BatchIndicatorsResponse => indicators: crate::indicators::IndicatorsSummary
104}
105
106/// Default maximum concurrent requests for batch operations.
107const DEFAULT_MAX_CONCURRENCY: usize = 10;
108
109/// Builder for Tickers
110pub struct TickersBuilder {
111    symbols: Vec<Arc<str>>,
112    config: ClientConfig,
113    shared_client: Option<crate::ticker::ClientHandle>,
114    max_concurrency: usize,
115    cache_ttl: Option<Duration>,
116    include_logo: bool,
117}
118
119impl TickersBuilder {
120    fn new<S, I>(symbols: I) -> Self
121    where
122        S: Into<String>,
123        I: IntoIterator<Item = S>,
124    {
125        Self {
126            symbols: symbols.into_iter().map(|s| s.into().into()).collect(),
127            config: ClientConfig::default(),
128            shared_client: None,
129            max_concurrency: DEFAULT_MAX_CONCURRENCY,
130            cache_ttl: None,
131            include_logo: false,
132        }
133    }
134
135    /// Set the region (automatically sets correct lang and region code)
136    pub fn region(mut self, region: crate::constants::Region) -> Self {
137        self.config.lang = region.lang().to_string();
138        self.config.region = region.region().to_string();
139        self
140    }
141
142    /// Set the language code (e.g., "en-US", "ja-JP", "de-DE")
143    pub fn lang(mut self, lang: impl Into<String>) -> Self {
144        self.config.lang = lang.into();
145        self
146    }
147
148    /// Set the region code (e.g., "US", "JP", "DE")
149    pub fn region_code(mut self, region: impl Into<String>) -> Self {
150        self.config.region = region.into();
151        self
152    }
153
154    /// Set the HTTP request timeout
155    pub fn timeout(mut self, timeout: Duration) -> Self {
156        self.config.timeout = timeout;
157        self
158    }
159
160    /// Set the proxy URL
161    pub fn proxy(mut self, proxy: impl Into<String>) -> Self {
162        self.config.proxy = Some(proxy.into());
163        self
164    }
165
166    /// Set a complete ClientConfig
167    pub fn config(mut self, config: ClientConfig) -> Self {
168        self.config = config;
169        self
170    }
171
172    /// Set the maximum number of concurrent requests for batch operations.
173    ///
174    /// Controls how many HTTP requests run in parallel when methods like
175    /// `charts()`, `financials()`, or `news()` fetch data for each symbol.
176    /// Default is 10.
177    ///
178    /// Lower values reduce the risk of rate limiting from Yahoo Finance.
179    /// Higher values increase throughput for large symbol lists.
180    pub fn max_concurrency(mut self, n: usize) -> Self {
181        self.max_concurrency = n.max(1);
182        self
183    }
184
185    /// Share an existing authenticated session instead of creating a new one.
186    ///
187    /// This avoids redundant authentication when you have multiple `Tickers`
188    /// instances or want to share a session with individual [`crate::Ticker`] instances.
189    ///
190    /// Obtain a [`ClientHandle`](crate::ClientHandle) from any existing
191    /// [`Ticker`](crate::Ticker) via [`Ticker::client_handle()`](crate::Ticker::client_handle).
192    ///
193    /// When set, the builder's `config`, `timeout`, `proxy`, `lang`, and `region`
194    /// settings are ignored (the shared session's configuration is used instead).
195    ///
196    /// # Example
197    ///
198    /// ```no_run
199    /// use finance_query::{Ticker, Tickers};
200    ///
201    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
202    /// let aapl = Ticker::new("AAPL").await?;
203    /// let handle = aapl.client_handle();
204    ///
205    /// let tickers = Tickers::builder(["MSFT", "GOOGL"])
206    ///     .client(handle)
207    ///     .build()
208    ///     .await?;
209    /// # Ok(())
210    /// # }
211    /// ```
212    pub fn client(mut self, handle: crate::ticker::ClientHandle) -> Self {
213        self.shared_client = Some(handle);
214        self
215    }
216
217    /// Enable response caching with a time-to-live.
218    ///
219    /// By default caching is **disabled** — every call fetches fresh data.
220    /// When enabled, responses are reused until the TTL expires. Stale
221    /// entries are evicted on the next write.
222    ///
223    /// # Example
224    ///
225    /// ```no_run
226    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
227    /// use finance_query::Tickers;
228    /// use std::time::Duration;
229    ///
230    /// let tickers = Tickers::builder(["AAPL", "MSFT"])
231    ///     .cache(Duration::from_secs(30))
232    ///     .build()
233    ///     .await?;
234    /// # Ok(())
235    /// # }
236    /// ```
237    pub fn cache(mut self, ttl: Duration) -> Self {
238        self.cache_ttl = Some(ttl);
239        self
240    }
241
242    /// Include company logo URLs in quote responses.
243    ///
244    /// When enabled, `quotes()` will fetch logo URLs in parallel with the
245    /// quote batch request, adding a small extra request.
246    pub fn logo(mut self) -> Self {
247        self.include_logo = true;
248        self
249    }
250
251    /// Build the Tickers instance
252    pub async fn build(self) -> Result<Tickers> {
253        let client = match self.shared_client {
254            Some(handle) => handle.0,
255            None => Arc::new(YahooClient::new(self.config).await?),
256        };
257
258        Ok(Tickers {
259            symbols: self.symbols,
260            client,
261            max_concurrency: self.max_concurrency,
262            cache_ttl: self.cache_ttl,
263            include_logo: self.include_logo,
264            quote_cache: Default::default(),
265            chart_cache: Default::default(),
266            events_cache: Default::default(),
267            financials_cache: Default::default(),
268            news_cache: Default::default(),
269            recommendations_cache: Default::default(),
270            options_cache: Default::default(),
271            spark_cache: Default::default(),
272            #[cfg(feature = "indicators")]
273            indicators_cache: Default::default(),
274
275            // Initialize fetch guards for request deduplication
276            quotes_fetch: Arc::new(tokio::sync::Mutex::new(())),
277            charts_fetch: Default::default(),
278            financials_fetch: Default::default(),
279            news_fetch: Arc::new(tokio::sync::Mutex::new(())),
280            recommendations_fetch: Default::default(),
281            options_fetch: Default::default(),
282            spark_fetch: Default::default(),
283            #[cfg(feature = "indicators")]
284            indicators_fetch: Default::default(),
285        })
286    }
287}
288
289/// Multi-symbol ticker for efficient batch operations.
290///
291/// `Tickers` optimizes data fetching for multiple symbols by:
292/// - Using batch endpoints where available (e.g., /v7/finance/quote)
293/// - Fetching concurrently when batch endpoints don't exist
294/// - Sharing a single authenticated client across all symbols
295/// - Caching results per symbol
296///
297/// # Example
298///
299/// ```no_run
300/// use finance_query::Tickers;
301///
302/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
303/// // Create tickers for multiple symbols
304/// let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await?;
305///
306/// // Batch fetch all quotes (single API call)
307/// let quotes = tickers.quotes().await?;
308/// for (symbol, quote) in &quotes.quotes {
309///     let price = quote.regular_market_price.as_ref().and_then(|v| v.raw).unwrap_or(0.0);
310///     println!("{}: ${:.2}", symbol, price);
311/// }
312///
313/// // Fetch charts concurrently
314/// use finance_query::{Interval, TimeRange};
315/// let charts = tickers.charts(Interval::OneDay, TimeRange::OneMonth).await?;
316/// # Ok(())
317/// # }
318/// ```
319pub struct Tickers {
320    symbols: Vec<Arc<str>>,
321    client: Arc<YahooClient>,
322    max_concurrency: usize,
323    cache_ttl: Option<Duration>,
324    include_logo: bool,
325    quote_cache: QuoteCache,
326    chart_cache: ChartCache,
327    events_cache: EventsCache,
328    financials_cache: FinancialsCache,
329    news_cache: NewsCache,
330    recommendations_cache: RecommendationsCache,
331    options_cache: OptionsCache,
332    spark_cache: SparkCache,
333    #[cfg(feature = "indicators")]
334    indicators_cache: IndicatorsCache,
335
336    // Fetch guards prevent duplicate concurrent requests
337    quotes_fetch: FetchGuard,
338    charts_fetch: FetchGuardMap<(Interval, TimeRange)>,
339    financials_fetch: FetchGuardMap<(StatementType, Frequency)>,
340    news_fetch: FetchGuard,
341    recommendations_fetch: FetchGuardMap<u32>,
342    options_fetch: FetchGuardMap<Option<i64>>,
343    spark_fetch: FetchGuardMap<(Interval, TimeRange)>,
344    #[cfg(feature = "indicators")]
345    indicators_fetch: FetchGuardMap<(Interval, TimeRange)>,
346}
347
348impl std::fmt::Debug for Tickers {
349    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
350        f.debug_struct("Tickers")
351            .field("symbols", &self.symbols)
352            .field("max_concurrency", &self.max_concurrency)
353            .field("cache_ttl", &self.cache_ttl)
354            .finish_non_exhaustive()
355    }
356}
357
358impl Tickers {
359    /// Creates new tickers with default configuration
360    ///
361    /// # Arguments
362    ///
363    /// * `symbols` - Iterable of stock symbols (e.g., `["AAPL", "MSFT"]`)
364    ///
365    /// # Example
366    ///
367    /// ```no_run
368    /// use finance_query::Tickers;
369    ///
370    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
371    /// let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await?;
372    /// # Ok(())
373    /// # }
374    /// ```
375    pub async fn new<S, I>(symbols: I) -> Result<Self>
376    where
377        S: Into<String>,
378        I: IntoIterator<Item = S>,
379    {
380        Self::builder(symbols).build().await
381    }
382
383    /// Creates a new builder for Tickers
384    pub fn builder<S, I>(symbols: I) -> TickersBuilder
385    where
386        S: Into<String>,
387        I: IntoIterator<Item = S>,
388    {
389        TickersBuilder::new(symbols)
390    }
391
392    /// Returns the symbols this tickers instance manages
393    pub fn symbols(&self) -> Vec<&str> {
394        self.symbols.iter().map(|s| &**s).collect()
395    }
396
397    /// Number of symbols
398    pub fn len(&self) -> usize {
399        self.symbols.len()
400    }
401
402    /// Check if empty
403    pub fn is_empty(&self) -> bool {
404        self.symbols.is_empty()
405    }
406
407    /// Returns a shareable handle to this instance's authenticated session.
408    ///
409    /// Pass the handle to [`Ticker`](crate::Ticker) or other `Tickers` builders
410    /// via `.client()` to reuse the same session without re-authenticating.
411    pub fn client_handle(&self) -> crate::ticker::ClientHandle {
412        crate::ticker::ClientHandle(Arc::clone(&self.client))
413    }
414
415    /// Returns `true` if a cache entry exists and has not exceeded the TTL.
416    #[inline]
417    fn is_cache_fresh<T>(&self, entry: Option<&CacheEntry<T>>) -> bool {
418        CacheEntry::is_fresh_with_ttl(entry, self.cache_ttl)
419    }
420
421    /// Returns `true` if all keys are present and fresh in a map cache.
422    fn all_cached<K: Eq + std::hash::Hash, V>(
423        &self,
424        map: &HashMap<K, CacheEntry<V>>,
425        keys: impl Iterator<Item = K>,
426    ) -> bool {
427        let Some(ttl) = self.cache_ttl else {
428            return false;
429        };
430        keys.into_iter()
431            .all(|k| map.get(&k).map(|e| e.is_fresh(ttl)).unwrap_or(false))
432    }
433
434    /// Insert into a map cache, amortizing stale-entry eviction.
435    ///
436    /// Only sweeps stale entries when the map exceeds [`EVICTION_THRESHOLD`],
437    /// avoiding O(n) scans on every write.
438    #[inline]
439    fn cache_insert<K: Eq + std::hash::Hash, V>(
440        &self,
441        map: &mut HashMap<K, CacheEntry<V>>,
442        key: K,
443        value: V,
444    ) {
445        if let Some(ttl) = self.cache_ttl {
446            if map.len() >= EVICTION_THRESHOLD {
447                map.retain(|_, entry| entry.is_fresh(ttl));
448            }
449            map.insert(key, CacheEntry::new(value));
450        }
451    }
452
453    /// Batch fetch quotes for all symbols.
454    ///
455    /// Uses /v7/finance/quote endpoint - fetches all symbols in a single API call.
456    /// When logos are enabled, makes a parallel call for logo URLs.
457    ///
458    /// Use [`TickersBuilder::logo()`](TickersBuilder::logo) to enable logo fetching
459    /// for this tickers instance.
460    pub async fn quotes(&self) -> Result<BatchQuotesResponse> {
461        // Fast path: check if all symbols are cached
462        {
463            let cache = self.quote_cache.read().await;
464            if self.all_cached(&cache, self.symbols.iter().cloned()) {
465                let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
466                for symbol in &self.symbols {
467                    if let Some(entry) = cache.get(symbol) {
468                        response
469                            .quotes
470                            .insert(symbol.to_string(), entry.value.clone());
471                    }
472                }
473                return Ok(response);
474            }
475        }
476
477        // Slow path: acquire fetch guard to prevent duplicate concurrent requests
478        let _fetch_guard = self.quotes_fetch.lock().await;
479
480        // Double-check: another task may have fetched while we waited
481        {
482            let cache = self.quote_cache.read().await;
483            if self.all_cached(&cache, self.symbols.iter().cloned()) {
484                let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
485                for symbol in &self.symbols {
486                    if let Some(entry) = cache.get(symbol) {
487                        response
488                            .quotes
489                            .insert(symbol.to_string(), entry.value.clone());
490                    }
491                }
492                return Ok(response);
493            }
494        }
495
496        // Fetch batch quotes (no lock held during HTTP I/O)
497        let symbols_ref: Vec<&str> = self.symbols.iter().map(|s| &**s).collect();
498
499        // Yahoo requires separate calls for quotes vs logos
500        // When include_logo=true, fetch both in parallel
501        let (json, logos) = if self.include_logo {
502            let quote_future = crate::endpoints::quotes::fetch_with_fields(
503                &self.client,
504                &symbols_ref,
505                None,  // all fields
506                true,  // formatted
507                false, // no logo params for main call
508            );
509            let logo_future = crate::endpoints::quotes::fetch_with_fields(
510                &self.client,
511                &symbols_ref,
512                Some(&["logoUrl", "companyLogoUrl"]), // only logo fields
513                true,
514                true, // include logo params
515            );
516            let (quote_result, logo_result) = tokio::join!(quote_future, logo_future);
517            (quote_result?, logo_result.ok())
518        } else {
519            let json = crate::endpoints::quotes::fetch_with_fields(
520                &self.client,
521                &symbols_ref,
522                None,
523                true,
524                false,
525            )
526            .await?;
527            (json, None)
528        };
529
530        // Build logo lookup map if we have logos
531        let logo_map: std::collections::HashMap<String, (Option<String>, Option<String>)> = logos
532            .and_then(|l| l.get("quoteResponse")?.get("result")?.as_array().cloned())
533            .map(|results| {
534                results
535                    .iter()
536                    .filter_map(|r| {
537                        let symbol = r.get("symbol")?.as_str()?.to_string();
538                        let logo_url = r
539                            .get("logoUrl")
540                            .and_then(|v| v.as_str())
541                            .map(|s| s.to_string());
542                        let company_logo_url = r
543                            .get("companyLogoUrl")
544                            .and_then(|v| v.as_str())
545                            .map(|s| s.to_string());
546                        Some((symbol, (logo_url, company_logo_url)))
547                    })
548                    .collect()
549            })
550            .unwrap_or_default();
551
552        // Parse response
553        let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
554
555        if let Some(quote_response) = json.get("quoteResponse") {
556            if let Some(results) = quote_response.get("result").and_then(|r| r.as_array()) {
557                // Parse all quotes first (no lock held)
558                let mut parsed_quotes: Vec<(String, Quote)> = Vec::new();
559
560                for result in results {
561                    if let Some(symbol) = result.get("symbol").and_then(|s| s.as_str()) {
562                        match Quote::from_batch_response(result) {
563                            Ok(mut quote) => {
564                                // Merge logo URLs if we have them
565                                if let Some((logo_url, company_logo_url)) = logo_map.get(symbol) {
566                                    if quote.logo_url.is_none() {
567                                        quote.logo_url = logo_url.clone();
568                                    }
569                                    if quote.company_logo_url.is_none() {
570                                        quote.company_logo_url = company_logo_url.clone();
571                                    }
572                                }
573                                parsed_quotes.push((symbol.to_string(), quote));
574                            }
575                            Err(e) => {
576                                response.errors.insert(symbol.to_string(), e.to_string());
577                            }
578                        }
579                    }
580                }
581
582                // Now acquire write lock briefly for batch cache insertion
583                if self.cache_ttl.is_some() {
584                    let mut cache = self.quote_cache.write().await;
585                    for (symbol, quote) in &parsed_quotes {
586                        self.cache_insert(&mut cache, symbol.as_str().into(), quote.clone());
587                    }
588                }
589
590                // Populate response (no lock needed)
591                for (symbol, quote) in parsed_quotes {
592                    response.quotes.insert(symbol, quote);
593                }
594            }
595
596            // Track missing symbols
597            for symbol in &self.symbols {
598                let symbol_str = &**symbol;
599                if !response.quotes.contains_key(symbol_str)
600                    && !response.errors.contains_key(symbol_str)
601                {
602                    response.errors.insert(
603                        symbol.to_string(),
604                        "Symbol not found in response".to_string(),
605                    );
606                }
607            }
608        }
609
610        Ok(response)
611    }
612
613    /// Get a specific quote by symbol (from cache or fetch all)
614    pub async fn quote(&self, symbol: &str) -> Result<Quote> {
615        {
616            let cache = self.quote_cache.read().await;
617            if let Some(entry) = cache.get(symbol)
618                && self.is_cache_fresh(Some(entry))
619            {
620                return Ok(entry.value.clone());
621            }
622        }
623
624        let response = self.quotes().await?;
625
626        response
627            .quotes
628            .get(symbol)
629            .cloned()
630            .ok_or_else(|| FinanceError::SymbolNotFound {
631                symbol: Some(symbol.to_string()),
632                context: response
633                    .errors
634                    .get(symbol)
635                    .cloned()
636                    .unwrap_or_else(|| "Symbol not found".to_string()),
637            })
638    }
639
640    /// Helper to get or create a fetch guard for a given key.
641    ///
642    /// Returns the guard from the map, never a locally-created copy that
643    /// could diverge under contention.
644    async fn get_fetch_guard<K: Clone + Eq + std::hash::Hash>(
645        guard_map: &FetchGuardMap<K>,
646        key: K,
647    ) -> FetchGuard {
648        {
649            let guards = guard_map.read().await;
650            if let Some(guard) = guards.get(&key) {
651                return Arc::clone(guard);
652            }
653        }
654
655        let mut guards = guard_map.write().await;
656        Arc::clone(
657            guards
658                .entry(key)
659                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))),
660        )
661    }
662
663    /// Batch fetch charts for all symbols concurrently
664    ///
665    /// Chart data cannot be batched in a single request, so this fetches
666    /// all charts concurrently using tokio for maximum performance.
667    pub async fn charts(
668        &self,
669        interval: Interval,
670        range: TimeRange,
671    ) -> Result<BatchChartsResponse> {
672        // Fast path: check if all symbols are cached
673        {
674            let cache = self.chart_cache.read().await;
675            if self.all_cached(
676                &cache,
677                self.symbols.iter().map(|s| (s.clone(), interval, range)),
678            ) {
679                let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
680                for symbol in &self.symbols {
681                    if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
682                        response
683                            .charts
684                            .insert(symbol.to_string(), entry.value.clone());
685                    }
686                }
687                return Ok(response);
688            }
689        }
690
691        // Slow path: acquire fetch guard to prevent duplicate concurrent requests
692        let fetch_guard = Self::get_fetch_guard(&self.charts_fetch, (interval, range)).await;
693        let _guard = fetch_guard.lock().await;
694
695        // Double-check: another task may have fetched while we waited
696        {
697            let cache = self.chart_cache.read().await;
698            if self.all_cached(
699                &cache,
700                self.symbols.iter().map(|s| (s.clone(), interval, range)),
701            ) {
702                let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
703                for symbol in &self.symbols {
704                    if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
705                        response
706                            .charts
707                            .insert(symbol.to_string(), entry.value.clone());
708                    }
709                }
710                return Ok(response);
711            }
712        }
713
714        // Fetch all charts concurrently (no lock held during HTTP I/O)
715        let futures: Vec<_> = self
716            .symbols
717            .iter()
718            .map(|symbol| {
719                let client = Arc::clone(&self.client);
720                let symbol = Arc::clone(symbol);
721                async move {
722                    let result = client.get_chart(&symbol, interval, range).await;
723                    (symbol, result)
724                }
725            })
726            .collect();
727
728        let results: Vec<_> = stream::iter(futures)
729            .buffer_unordered(self.max_concurrency)
730            .collect()
731            .await;
732
733        let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
734
735        // Parse all charts first (no locks held)
736        let mut parsed_charts: Vec<(Arc<str>, Chart)> = Vec::new();
737        let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
738
739        for (symbol, result) in results {
740            match result {
741                Ok(json) => match ChartResponse::from_json(json) {
742                    Ok(chart_response) => {
743                        if let Some(mut chart_results) = chart_response.chart.result {
744                            if let Some(chart_result) = chart_results.pop() {
745                                // Collect events for later caching
746                                if let Some(events) = chart_result.events.clone() {
747                                    parsed_events.push((Arc::clone(&symbol), events));
748                                }
749
750                                let chart = Chart {
751                                    symbol: symbol.to_string(),
752                                    meta: chart_result.meta.clone(),
753                                    candles: chart_result.to_candles(),
754                                    interval: Some(interval),
755                                    range: Some(range),
756                                };
757                                parsed_charts.push((symbol, chart));
758                            } else {
759                                response
760                                    .errors
761                                    .insert(symbol.to_string(), "Empty chart response".to_string());
762                            }
763                        } else {
764                            response.errors.insert(
765                                symbol.to_string(),
766                                "No chart data in response".to_string(),
767                            );
768                        }
769                    }
770                    Err(e) => {
771                        response.errors.insert(symbol.to_string(), e.to_string());
772                    }
773                },
774                Err(e) => {
775                    response.errors.insert(symbol.to_string(), e.to_string());
776                }
777            }
778        }
779
780        // Move into cache, then clone for response — avoids double-clone
781        if self.cache_ttl.is_some() {
782            let mut cache = self.chart_cache.write().await;
783
784            // Cache all charts (consuming parsed_charts) and collect keys
785            let cache_keys: Vec<_> = parsed_charts
786                .into_iter()
787                .map(|(symbol, chart)| {
788                    self.cache_insert(&mut cache, (symbol.clone(), interval, range), chart);
789                    symbol
790                })
791                .collect();
792
793            // Clone from cache into response (convert Arc<str> → String)
794            for symbol in cache_keys {
795                if let Some(cached) = cache.get(&(symbol.clone(), interval, range)) {
796                    response
797                        .charts
798                        .insert(symbol.to_string(), cached.value.clone());
799                }
800            }
801        } else {
802            // No caching: directly populate response (convert Arc<str> → String)
803            for (symbol, chart) in parsed_charts {
804                response.charts.insert(symbol.to_string(), chart);
805            }
806        }
807
808        // Always store events — they are derived data, not TTL-bounded cache
809        if !parsed_events.is_empty() {
810            let mut events_cache = self.events_cache.write().await;
811            for (symbol, events) in parsed_events {
812                events_cache
813                    .entry(symbol)
814                    .or_insert_with(|| CacheEntry::new(events));
815            }
816        }
817
818        Ok(response)
819    }
820
821    /// Get a specific chart by symbol
822    pub async fn chart(&self, symbol: &str, interval: Interval, range: TimeRange) -> Result<Chart> {
823        {
824            let cache = self.chart_cache.read().await;
825            let key: Arc<str> = symbol.into();
826            if let Some(entry) = cache.get(&(key, interval, range))
827                && self.is_cache_fresh(Some(entry))
828            {
829                return Ok(entry.value.clone());
830            }
831        }
832
833        let response = self.charts(interval, range).await?;
834
835        response
836            .charts
837            .get(symbol)
838            .cloned()
839            .ok_or_else(|| FinanceError::SymbolNotFound {
840                symbol: Some(symbol.to_string()),
841                context: response
842                    .errors
843                    .get(symbol)
844                    .cloned()
845                    .unwrap_or_else(|| "Symbol not found".to_string()),
846            })
847    }
848
849    /// Batch fetch chart data for a custom date range for all symbols concurrently.
850    ///
851    /// Unlike [`charts()`](Self::charts) which uses predefined time ranges,
852    /// this method accepts absolute start/end timestamps. Results are **not cached**
853    /// since custom ranges have unbounded key space.
854    ///
855    /// # Arguments
856    ///
857    /// * `interval` - Time interval between data points
858    /// * `start` - Start date as Unix timestamp (seconds since epoch)
859    /// * `end` - End date as Unix timestamp (seconds since epoch)
860    pub async fn charts_range(
861        &self,
862        interval: Interval,
863        start: i64,
864        end: i64,
865    ) -> Result<BatchChartsResponse> {
866        let futures: Vec<_> = self
867            .symbols
868            .iter()
869            .map(|symbol| {
870                let client = Arc::clone(&self.client);
871                let symbol = Arc::clone(symbol);
872                async move {
873                    let result = client.get_chart_range(&symbol, interval, start, end).await;
874                    (symbol, result)
875                }
876            })
877            .collect();
878
879        let results: Vec<_> = stream::iter(futures)
880            .buffer_unordered(self.max_concurrency)
881            .collect()
882            .await;
883
884        let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
885        let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
886
887        for (symbol, result) in results {
888            match result {
889                Ok(json) => match ChartResponse::from_json(json) {
890                    Ok(chart_response) => {
891                        if let Some(mut chart_results) = chart_response.chart.result {
892                            if let Some(chart_result) = chart_results.pop() {
893                                // Collect events for later caching
894                                if let Some(events) = chart_result.events.clone() {
895                                    parsed_events.push((Arc::clone(&symbol), events));
896                                }
897
898                                let chart = Chart {
899                                    symbol: symbol.to_string(),
900                                    meta: chart_result.meta.clone(),
901                                    candles: chart_result.to_candles(),
902                                    interval: Some(interval),
903                                    range: None,
904                                };
905                                response.charts.insert(symbol.to_string(), chart);
906                            } else {
907                                response
908                                    .errors
909                                    .insert(symbol.to_string(), "Empty chart response".to_string());
910                            }
911                        } else {
912                            response.errors.insert(
913                                symbol.to_string(),
914                                "No chart data in response".to_string(),
915                            );
916                        }
917                    }
918                    Err(e) => {
919                        response.errors.insert(symbol.to_string(), e.to_string());
920                    }
921                },
922                Err(e) => {
923                    response.errors.insert(symbol.to_string(), e.to_string());
924                }
925            }
926        }
927
928        // Always store events — they are derived data, not TTL-bounded cache
929        if !parsed_events.is_empty() {
930            let mut events_cache = self.events_cache.write().await;
931            for (symbol, events) in parsed_events {
932                events_cache
933                    .entry(symbol)
934                    .or_insert_with(|| CacheEntry::new(events));
935            }
936        }
937
938        Ok(response)
939    }
940
941    /// Ensures events are loaded for all symbols using chart requests.
942    ///
943    /// Fetches events concurrently for symbols that don't have cached events.
944    /// Uses `TimeRange::Max` to get full event history (Yahoo returns all
945    /// dividends/splits/capital gains regardless of chart range).
946    ///
947    /// Events are always stored regardless of `cache_ttl` because they are
948    /// derived data (not a TTL-bounded cache). When `cache_ttl` is `None`,
949    /// events persist for the lifetime of the `Tickers` instance.
950    async fn ensure_events_loaded(&self) -> Result<()> {
951        // Check which symbols need event data (existence check, not TTL-based)
952        let symbols_to_fetch: Vec<Arc<str>> = {
953            let cache = self.events_cache.read().await;
954            self.symbols
955                .iter()
956                .filter(|sym| !cache.contains_key(*sym))
957                .cloned()
958                .collect()
959        };
960
961        if symbols_to_fetch.is_empty() {
962            return Ok(());
963        }
964
965        // Fetch events concurrently for all symbols that need it
966        let futures: Vec<_> = symbols_to_fetch
967            .iter()
968            .map(|symbol| {
969                let client = Arc::clone(&self.client);
970                let symbol = Arc::clone(symbol);
971                async move {
972                    let result = crate::endpoints::chart::fetch(
973                        &client,
974                        &symbol,
975                        Interval::OneDay,
976                        TimeRange::Max,
977                    )
978                    .await;
979                    (symbol, result)
980                }
981            })
982            .collect();
983
984        let results: Vec<_> = stream::iter(futures)
985            .buffer_unordered(self.max_concurrency)
986            .collect()
987            .await;
988
989        // Parse and cache events
990        let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
991
992        for (symbol, result) in results {
993            if let Ok(json) = result
994                && let Ok(chart_response) = ChartResponse::from_json(json)
995                && let Some(mut chart_results) = chart_response.chart.result
996                && let Some(chart_result) = chart_results.pop()
997                && let Some(events) = chart_result.events
998            {
999                parsed_events.push((symbol, events));
1000            }
1001        }
1002
1003        // Always store events — they are derived data, not TTL-bounded cache
1004        if !parsed_events.is_empty() {
1005            let mut events_cache = self.events_cache.write().await;
1006            for (symbol, events) in parsed_events {
1007                events_cache.insert(symbol, CacheEntry::new(events));
1008            }
1009        }
1010
1011        Ok(())
1012    }
1013
1014    /// Batch fetch spark data for all symbols in a single request.
1015    ///
1016    /// Spark data is optimized for sparkline rendering, returning only close prices.
1017    /// Unlike `charts()`, this fetches all symbols in ONE API call, making it
1018    /// much more efficient for displaying price trends on dashboards or watchlists.
1019    ///
1020    /// # Arguments
1021    ///
1022    /// * `interval` - Time interval between data points (e.g., `Interval::FiveMinutes`)
1023    /// * `range` - Time range to fetch (e.g., `TimeRange::OneDay`)
1024    ///
1025    /// # Example
1026    ///
1027    /// ```no_run
1028    /// use finance_query::{Tickers, Interval, TimeRange};
1029    ///
1030    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1031    /// let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await?;
1032    /// let sparks = tickers.spark(Interval::FiveMinutes, TimeRange::OneDay).await?;
1033    ///
1034    /// for (symbol, spark) in &sparks.sparks {
1035    ///     if let Some(change) = spark.percent_change() {
1036    ///         println!("{}: {:.2}%", symbol, change);
1037    ///     }
1038    /// }
1039    /// # Ok(())
1040    /// # }
1041    /// ```
1042    pub async fn spark(&self, interval: Interval, range: TimeRange) -> Result<BatchSparksResponse> {
1043        // Fast path: check if all symbols are cached
1044        {
1045            let cache = self.spark_cache.read().await;
1046            if self.all_cached(
1047                &cache,
1048                self.symbols.iter().map(|s| (s.clone(), interval, range)),
1049            ) {
1050                let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1051                for symbol in &self.symbols {
1052                    if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
1053                        response
1054                            .sparks
1055                            .insert(symbol.to_string(), entry.value.clone());
1056                    }
1057                }
1058                return Ok(response);
1059            }
1060        }
1061
1062        // Slow path: acquire fetch guard
1063        let fetch_guard = Self::get_fetch_guard(&self.spark_fetch, (interval, range)).await;
1064        let _guard = fetch_guard.lock().await;
1065
1066        // Double-check after guard
1067        {
1068            let cache = self.spark_cache.read().await;
1069            if self.all_cached(
1070                &cache,
1071                self.symbols.iter().map(|s| (s.clone(), interval, range)),
1072            ) {
1073                let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1074                for symbol in &self.symbols {
1075                    if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
1076                        response
1077                            .sparks
1078                            .insert(symbol.to_string(), entry.value.clone());
1079                    }
1080                }
1081                return Ok(response);
1082            }
1083        }
1084
1085        // Fetch (single batch API call, no lock held during I/O)
1086        let symbols_ref: Vec<&str> = self.symbols.iter().map(|s| &**s).collect();
1087        let json =
1088            crate::endpoints::spark::fetch(&self.client, &symbols_ref, interval, range).await?;
1089
1090        let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1091
1092        match SparkResponse::from_json(json) {
1093            Ok(spark_response) => {
1094                let mut parsed_sparks: Vec<(Arc<str>, Spark)> = Vec::new();
1095
1096                if let Some(results) = spark_response.spark.result {
1097                    for result in &results {
1098                        if let Some(spark) = Spark::from_response(
1099                            result,
1100                            Some(interval.as_str().to_string()),
1101                            Some(range.as_str().to_string()),
1102                        ) {
1103                            let sym: Arc<str> = result.symbol.as_str().into();
1104                            parsed_sparks.push((sym, spark));
1105                        } else {
1106                            response.errors.insert(
1107                                result.symbol.to_string(),
1108                                "Failed to parse spark data".to_string(),
1109                            );
1110                        }
1111                    }
1112                }
1113
1114                // Cache all parsed sparks
1115                if self.cache_ttl.is_some() {
1116                    let mut cache = self.spark_cache.write().await;
1117                    for (symbol, spark) in &parsed_sparks {
1118                        self.cache_insert(
1119                            &mut cache,
1120                            (symbol.clone(), interval, range),
1121                            spark.clone(),
1122                        );
1123                    }
1124                }
1125
1126                // Build response
1127                for (symbol, spark) in parsed_sparks {
1128                    response.sparks.insert(symbol.to_string(), spark);
1129                }
1130
1131                // Track missing symbols
1132                for symbol in &self.symbols {
1133                    let symbol_str = &**symbol;
1134                    if !response.sparks.contains_key(symbol_str)
1135                        && !response.errors.contains_key(symbol_str)
1136                    {
1137                        response.errors.insert(
1138                            symbol.to_string(),
1139                            "Symbol not found in response".to_string(),
1140                        );
1141                    }
1142                }
1143            }
1144            Err(e) => {
1145                for symbol in &self.symbols {
1146                    response.errors.insert(symbol.to_string(), e.to_string());
1147                }
1148            }
1149        }
1150
1151        Ok(response)
1152    }
1153
1154    /// Batch fetch dividends for all symbols
1155    ///
1156    /// Returns dividend history for all symbols, filtered by the specified time range.
1157    /// Dividends are cached per symbol after the first chart fetch.
1158    ///
1159    /// # Arguments
1160    ///
1161    /// * `range` - Time range to filter dividends
1162    ///
1163    /// # Example
1164    ///
1165    /// ```no_run
1166    /// use finance_query::{Tickers, TimeRange};
1167    ///
1168    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1169    /// let tickers = Tickers::new(["AAPL", "MSFT"]).await?;
1170    /// let dividends = tickers.dividends(TimeRange::OneYear).await?;
1171    ///
1172    /// for (symbol, divs) in &dividends.dividends {
1173    ///     println!("{}: {} dividends", symbol, divs.len());
1174    /// }
1175    /// # Ok(())
1176    /// # }
1177    /// ```
1178    pub async fn dividends(&self, range: TimeRange) -> Result<BatchDividendsResponse> {
1179        let mut response = BatchDividendsResponse::with_capacity(self.symbols.len());
1180
1181        // Fetch events efficiently (1-day chart request per symbol)
1182        self.ensure_events_loaded().await?;
1183
1184        let events_cache = self.events_cache.read().await;
1185
1186        for symbol in &self.symbols {
1187            if let Some(entry) = events_cache.get(symbol) {
1188                let all_dividends = entry.value.to_dividends();
1189                let filtered = filter_by_range(all_dividends, range);
1190                response.dividends.insert(symbol.to_string(), filtered);
1191            } else {
1192                response
1193                    .errors
1194                    .insert(symbol.to_string(), "No events data available".to_string());
1195            }
1196        }
1197
1198        Ok(response)
1199    }
1200
1201    /// Batch fetch stock splits for all symbols
1202    ///
1203    /// Returns stock split history for all symbols, filtered by the specified time range.
1204    /// Splits are cached per symbol after the first chart fetch.
1205    ///
1206    /// # Arguments
1207    ///
1208    /// * `range` - Time range to filter splits
1209    ///
1210    /// # Example
1211    ///
1212    /// ```no_run
1213    /// use finance_query::{Tickers, TimeRange};
1214    ///
1215    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1216    /// let tickers = Tickers::new(["NVDA", "TSLA"]).await?;
1217    /// let splits = tickers.splits(TimeRange::FiveYears).await?;
1218    ///
1219    /// for (symbol, sp) in &splits.splits {
1220    ///     for split in sp {
1221    ///         println!("{}: {}", symbol, split.ratio);
1222    ///     }
1223    /// }
1224    /// # Ok(())
1225    /// # }
1226    /// ```
1227    pub async fn splits(&self, range: TimeRange) -> Result<BatchSplitsResponse> {
1228        let mut response = BatchSplitsResponse::with_capacity(self.symbols.len());
1229
1230        // Fetch events efficiently (1-day chart request per symbol)
1231        self.ensure_events_loaded().await?;
1232
1233        let events_cache = self.events_cache.read().await;
1234
1235        for symbol in &self.symbols {
1236            if let Some(entry) = events_cache.get(symbol) {
1237                let all_splits = entry.value.to_splits();
1238                let filtered = filter_by_range(all_splits, range);
1239                response.splits.insert(symbol.to_string(), filtered);
1240            } else {
1241                response
1242                    .errors
1243                    .insert(symbol.to_string(), "No events data available".to_string());
1244            }
1245        }
1246
1247        Ok(response)
1248    }
1249
1250    /// Batch fetch capital gains for all symbols
1251    ///
1252    /// Returns capital gain distribution history for all symbols, filtered by the
1253    /// specified time range. This is primarily relevant for mutual funds and ETFs.
1254    /// Capital gains are cached per symbol after the first chart fetch.
1255    ///
1256    /// # Arguments
1257    ///
1258    /// * `range` - Time range to filter capital gains
1259    ///
1260    /// # Example
1261    ///
1262    /// ```no_run
1263    /// use finance_query::{Tickers, TimeRange};
1264    ///
1265    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1266    /// let tickers = Tickers::new(["VFIAX", "VTI"]).await?;
1267    /// let gains = tickers.capital_gains(TimeRange::TwoYears).await?;
1268    ///
1269    /// for (symbol, cg) in &gains.capital_gains {
1270    ///     println!("{}: {} distributions", symbol, cg.len());
1271    /// }
1272    /// # Ok(())
1273    /// # }
1274    /// ```
1275    pub async fn capital_gains(&self, range: TimeRange) -> Result<BatchCapitalGainsResponse> {
1276        let mut response = BatchCapitalGainsResponse::with_capacity(self.symbols.len());
1277
1278        // Fetch events efficiently (1-day chart request per symbol)
1279        self.ensure_events_loaded().await?;
1280
1281        let events_cache = self.events_cache.read().await;
1282
1283        for symbol in &self.symbols {
1284            if let Some(entry) = events_cache.get(symbol) {
1285                let all_gains = entry.value.to_capital_gains();
1286                let filtered = filter_by_range(all_gains, range);
1287                response.capital_gains.insert(symbol.to_string(), filtered);
1288            } else {
1289                response
1290                    .errors
1291                    .insert(symbol.to_string(), "No events data available".to_string());
1292            }
1293        }
1294
1295        Ok(response)
1296    }
1297
1298    /// Batch fetch financial statements for all symbols
1299    ///
1300    /// Fetches the specified financial statement type for all symbols concurrently.
1301    /// Financial statements are cached per (symbol, statement_type, frequency) tuple.
1302    ///
1303    /// # Arguments
1304    ///
1305    /// * `statement_type` - Type of statement (Income, Balance, CashFlow)
1306    /// * `frequency` - Annual or Quarterly
1307    ///
1308    /// # Example
1309    ///
1310    /// ```no_run
1311    /// use finance_query::{Tickers, StatementType, Frequency};
1312    ///
1313    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1314    /// let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await?;
1315    /// let financials = tickers.financials(StatementType::Income, Frequency::Annual).await?;
1316    ///
1317    /// for (symbol, stmt) in &financials.financials {
1318    ///     if let Some(revenue) = stmt.statement.get("TotalRevenue") {
1319    ///         println!("{}: {:?}", symbol, revenue);
1320    ///     }
1321    /// }
1322    /// # Ok(())
1323    /// # }
1324    /// ```
1325    pub async fn financials(
1326        &self,
1327        statement_type: StatementType,
1328        frequency: Frequency,
1329    ) -> Result<BatchFinancialsResponse> {
1330        batch_fetch_cached!(self;
1331            cache: financials_cache,
1332            guard: map(financials_fetch, (statement_type, frequency)),
1333            key: |s| (s.clone(), statement_type, frequency),
1334            response: BatchFinancialsResponse.financials,
1335            fetch: |client, symbol| client.get_financials(&symbol, statement_type, frequency).await,
1336        )
1337    }
1338
1339    /// Batch fetch news articles for all symbols
1340    ///
1341    /// Fetches recent news articles for all symbols concurrently using scrapers.
1342    /// News articles are cached per symbol.
1343    ///
1344    /// # Example
1345    ///
1346    /// ```no_run
1347    /// use finance_query::Tickers;
1348    ///
1349    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1350    /// let tickers = Tickers::new(["AAPL", "MSFT"]).await?;
1351    /// let news = tickers.news().await?;
1352    ///
1353    /// for (symbol, articles) in &news.news {
1354    ///     println!("{}: {} articles", symbol, articles.len());
1355    ///     for article in articles.iter().take(3) {
1356    ///         println!("  - {}", article.title);
1357    ///     }
1358    /// }
1359    /// # Ok(())
1360    /// # }
1361    /// ```
1362    pub async fn news(&self) -> Result<BatchNewsResponse> {
1363        batch_fetch_cached!(self;
1364            cache: news_cache,
1365            guard: simple(news_fetch),
1366            key: |s| s.clone(),
1367            response: BatchNewsResponse.news,
1368            fetch: |_client, symbol| crate::scrapers::stockanalysis::scrape_symbol_news(&symbol).await,
1369        )
1370    }
1371
1372    /// Batch fetch recommendations for all symbols
1373    ///
1374    /// Fetches analyst recommendations and similar stocks for all symbols concurrently.
1375    /// Recommendations are cached per (symbol, limit) tuple — different limits
1376    /// produce different API responses and are cached independently.
1377    ///
1378    /// # Arguments
1379    ///
1380    /// * `limit` - Maximum number of similar stocks to return per symbol
1381    ///
1382    /// # Example
1383    ///
1384    /// ```no_run
1385    /// use finance_query::Tickers;
1386    ///
1387    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1388    /// let tickers = Tickers::new(["AAPL", "MSFT"]).await?;
1389    /// let recommendations = tickers.recommendations(10).await?;
1390    ///
1391    /// for (symbol, rec) in &recommendations.recommendations {
1392    ///     println!("{}: {} recommendations", symbol, rec.count());
1393    ///     for similar in &rec.recommendations {
1394    ///         println!("  - {}: score {}", similar.symbol, similar.score);
1395    ///     }
1396    /// }
1397    /// # Ok(())
1398    /// # }
1399    /// ```
1400    pub async fn recommendations(&self, limit: u32) -> Result<BatchRecommendationsResponse> {
1401        batch_fetch_cached!(self;
1402            cache: recommendations_cache,
1403            guard: map(recommendations_fetch, limit),
1404            key: |s| (s.clone(), limit),
1405            response: BatchRecommendationsResponse.recommendations,
1406            fetch: |client, symbol| {
1407                let json = client.get_recommendations(&symbol, limit).await?;
1408                let rec_response =
1409                    crate::models::recommendation::response::RecommendationResponse::from_json(json)?;
1410                Ok(Recommendation {
1411                    symbol: symbol.to_string(),
1412                    recommendations: rec_response
1413                        .finance
1414                        .result
1415                        .iter()
1416                        .flat_map(|r| &r.recommended_symbols)
1417                        .cloned()
1418                        .collect(),
1419                })
1420            },
1421        )
1422    }
1423
1424    /// Batch fetch options chains for all symbols
1425    ///
1426    /// Fetches options chains for the specified expiration date for all symbols concurrently.
1427    /// Options are cached per (symbol, date) tuple.
1428    ///
1429    /// # Arguments
1430    ///
1431    /// * `date` - Optional expiration date (Unix timestamp). If None, fetches nearest expiration.
1432    ///
1433    /// # Example
1434    ///
1435    /// ```no_run
1436    /// use finance_query::Tickers;
1437    ///
1438    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1439    /// let tickers = Tickers::new(["AAPL", "MSFT"]).await?;
1440    /// let options = tickers.options(None).await?;
1441    ///
1442    /// for (symbol, opts) in &options.options {
1443    ///     println!("{}: {} expirations", symbol, opts.expiration_dates().len());
1444    /// }
1445    /// # Ok(())
1446    /// # }
1447    /// ```
1448    pub async fn options(&self, date: Option<i64>) -> Result<BatchOptionsResponse> {
1449        batch_fetch_cached!(self;
1450            cache: options_cache,
1451            guard: map(options_fetch, date),
1452            key: |s| (s.clone(), date),
1453            response: BatchOptionsResponse.options,
1454            fetch: |client, symbol| {
1455                let json = client.get_options(&symbol, date).await?;
1456                Ok(serde_json::from_value::<Options>(json)?)
1457            },
1458        )
1459    }
1460
1461    /// Batch calculate all technical indicators for all symbols
1462    ///
1463    /// Calculates complete indicator summaries for all symbols from their chart data.
1464    /// Indicators are cached per (symbol, interval, range) tuple.
1465    ///
1466    /// # Arguments
1467    ///
1468    /// * `interval` - The time interval for each candle
1469    /// * `range` - The time range to fetch data for
1470    ///
1471    /// # Example
1472    ///
1473    /// ```no_run
1474    /// use finance_query::{Tickers, Interval, TimeRange};
1475    ///
1476    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1477    /// let tickers = Tickers::new(["AAPL", "MSFT"]).await?;
1478    /// let indicators = tickers.indicators(Interval::OneDay, TimeRange::ThreeMonths).await?;
1479    ///
1480    /// for (symbol, ind) in &indicators.indicators {
1481    ///     println!("{}: RSI(14) = {:?}, SMA(20) = {:?}", symbol, ind.rsi_14, ind.sma_20);
1482    /// }
1483    /// # Ok(())
1484    /// # }
1485    /// ```
1486    #[cfg(feature = "indicators")]
1487    pub async fn indicators(
1488        &self,
1489        interval: Interval,
1490        range: TimeRange,
1491    ) -> Result<BatchIndicatorsResponse> {
1492        let cache_key_for = |symbol: &Arc<str>| (symbol.clone(), interval, range);
1493
1494        // Fast path: check if all symbols are cached
1495        {
1496            let cache = self.indicators_cache.read().await;
1497            if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
1498                let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1499                for symbol in &self.symbols {
1500                    if let Some(entry) = cache.get(&cache_key_for(symbol)) {
1501                        response
1502                            .indicators
1503                            .insert(symbol.to_string(), entry.value.clone());
1504                    }
1505                }
1506                return Ok(response);
1507            }
1508        }
1509
1510        // Slow path: acquire fetch guard to prevent duplicate concurrent calculations
1511        let fetch_guard = Self::get_fetch_guard(&self.indicators_fetch, (interval, range)).await;
1512        let _guard = fetch_guard.lock().await;
1513
1514        // Double-check: another task may have computed while we waited
1515        {
1516            let cache = self.indicators_cache.read().await;
1517            if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
1518                let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1519                for symbol in &self.symbols {
1520                    if let Some(entry) = cache.get(&cache_key_for(symbol)) {
1521                        response
1522                            .indicators
1523                            .insert(symbol.to_string(), entry.value.clone());
1524                    }
1525                }
1526                return Ok(response);
1527            }
1528        }
1529
1530        // Fetch charts first (which may already be cached, has its own deduplication)
1531        let charts_response = self.charts(interval, range).await?;
1532
1533        let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1534
1535        // Calculate all indicators first (no lock held)
1536        let mut calculated_indicators: Vec<(String, crate::indicators::IndicatorsSummary)> =
1537            Vec::new();
1538
1539        for (symbol, chart) in &charts_response.charts {
1540            let indicators = crate::indicators::summary::calculate_indicators(&chart.candles);
1541            calculated_indicators.push((symbol.to_string(), indicators));
1542        }
1543
1544        // Now acquire write lock briefly for batch cache insertion
1545        if self.cache_ttl.is_some() {
1546            let mut cache = self.indicators_cache.write().await;
1547            for (symbol, indicators) in &calculated_indicators {
1548                let key: Arc<str> = symbol.as_str().into();
1549                self.cache_insert(&mut cache, cache_key_for(&key), indicators.clone());
1550            }
1551        }
1552
1553        // Populate response (no lock needed)
1554        for (symbol, indicators) in calculated_indicators {
1555            response.indicators.insert(symbol, indicators);
1556        }
1557
1558        // Add errors from chart fetch
1559        for (symbol, error) in &charts_response.errors {
1560            response.errors.insert(symbol.to_string(), error.clone());
1561        }
1562
1563        Ok(response)
1564    }
1565
1566    // ========================================================================
1567    // Dynamic Symbol Management
1568    // ========================================================================
1569
1570    /// Add symbols to the watch list
1571    ///
1572    /// Adds new symbols to track without affecting existing cached data.
1573    ///
1574    /// # Example
1575    ///
1576    /// ```no_run
1577    /// use finance_query::Tickers;
1578    ///
1579    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1580    /// let mut tickers = Tickers::new(["AAPL"]).await?;
1581    /// tickers.add_symbols(&["MSFT", "GOOGL"]);
1582    /// assert_eq!(tickers.len(), 3);
1583    /// # Ok(())
1584    /// # }
1585    /// ```
1586    pub fn add_symbols(&mut self, symbols: &[impl AsRef<str>]) {
1587        // Use HashSet for O(n+m) deduplication instead of O(n*m) linear search
1588        use std::collections::HashSet;
1589
1590        let existing: HashSet<&str> = self.symbols.iter().map(|s| &**s).collect();
1591        let to_add: Vec<Arc<str>> = symbols
1592            .iter()
1593            .map(|s| s.as_ref())
1594            .filter(|s| !existing.contains(s))
1595            .map(|s| s.into())
1596            .collect();
1597
1598        self.symbols.extend(to_add);
1599    }
1600
1601    // ========================================================================
1602    // Portfolio Backtesting
1603    // ========================================================================
1604
1605    /// Run a multi-symbol portfolio backtest across all tracked symbols.
1606    ///
1607    /// Fetches charts and dividends for each symbol concurrently, then runs
1608    /// the portfolio engine with the given strategy factory. Capital is shared
1609    /// across all symbols according to the [`PortfolioConfig`] allocation rules.
1610    ///
1611    /// `factory` is called once per symbol to produce an independent strategy
1612    /// instance:
1613    ///
1614    /// ```no_run
1615    /// use finance_query::{Tickers, Interval, TimeRange};
1616    /// use finance_query::backtesting::{SmaCrossover, BacktestConfig};
1617    /// use finance_query::backtesting::portfolio::{PortfolioConfig, RebalanceMode};
1618    ///
1619    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1620    /// let tickers = Tickers::new(["AAPL", "MSFT", "NVDA"]).await?;
1621    ///
1622    /// let config = PortfolioConfig::new(BacktestConfig::default())
1623    ///     .max_total_positions(2)
1624    ///     .rebalance(RebalanceMode::EqualWeight);
1625    ///
1626    /// let result = tickers.backtest(
1627    ///     Interval::OneDay,
1628    ///     TimeRange::TwoYears,
1629    ///     Some(config),
1630    ///     |_sym| SmaCrossover::new(10, 50),
1631    /// ).await?;
1632    ///
1633    /// println!("Portfolio return: {:.2}%", result.portfolio_metrics.total_return_pct);
1634    /// # Ok(())
1635    /// # }
1636    /// ```
1637    ///
1638    /// [`PortfolioConfig`]: crate::backtesting::portfolio::PortfolioConfig
1639    #[cfg(feature = "backtesting")]
1640    pub async fn backtest<S, F>(
1641        &self,
1642        interval: Interval,
1643        range: TimeRange,
1644        config: Option<crate::backtesting::portfolio::PortfolioConfig>,
1645        factory: F,
1646    ) -> crate::backtesting::Result<crate::backtesting::portfolio::PortfolioResult>
1647    where
1648        S: crate::backtesting::Strategy,
1649        F: Fn(&str) -> S,
1650    {
1651        use crate::backtesting::portfolio::{PortfolioEngine, SymbolData};
1652
1653        let config = config.unwrap_or_default();
1654        config.validate(self.symbols.len())?;
1655
1656        // Fetch charts for all symbols (uses the batch chart cache)
1657        let charts = self
1658            .charts(interval, range)
1659            .await
1660            .map_err(|e| crate::backtesting::BacktestError::ChartError(e.to_string()))?;
1661
1662        // Fetch dividends for all symbols (events cache is already warm after charts())
1663        // Treat errors as "no dividends" — dividend processing is best-effort
1664        let dividends_map = self
1665            .dividends(range)
1666            .await
1667            .map(|b| b.dividends)
1668            .unwrap_or_default();
1669
1670        // Assemble SymbolData slices — skip symbols with no chart data
1671        let symbol_data: Vec<SymbolData> = self
1672            .symbols
1673            .iter()
1674            .filter_map(|sym| {
1675                charts.charts.get(sym.as_ref()).map(|chart| {
1676                    let divs = dividends_map.get(sym.as_ref()).cloned().unwrap_or_default();
1677                    SymbolData::new(sym.as_ref(), chart.candles.clone()).with_dividends(divs)
1678                })
1679            })
1680            .collect();
1681
1682        let engine = PortfolioEngine::new(config);
1683        engine.run(&symbol_data, factory)
1684    }
1685
1686    /// Remove symbols from the watch list
1687    ///
1688    /// Removes symbols and clears their cached data to free memory.
1689    ///
1690    /// # Example
1691    ///
1692    /// ```no_run
1693    /// use finance_query::Tickers;
1694    ///
1695    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1696    /// let mut tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await?;
1697    /// tickers.remove_symbols(&["MSFT"]);
1698    /// assert_eq!(tickers.len(), 2);
1699    /// # Ok(())
1700    /// # }
1701    /// ```
1702    pub async fn remove_symbols(&mut self, symbols: &[impl AsRef<str>]) {
1703        use std::collections::HashSet;
1704        let to_remove: HashSet<&str> = symbols.iter().map(|s| s.as_ref()).collect();
1705
1706        // Remove from symbol list — O(1) lookup per element
1707        self.symbols.retain(|s| !to_remove.contains(&**s));
1708
1709        // Acquire all independent write locks in parallel
1710        let (
1711            mut quote_cache,
1712            mut chart_cache,
1713            mut events_cache,
1714            mut financials_cache,
1715            mut news_cache,
1716            mut recommendations_cache,
1717            mut options_cache,
1718            mut spark_cache,
1719        ) = tokio::join!(
1720            self.quote_cache.write(),
1721            self.chart_cache.write(),
1722            self.events_cache.write(),
1723            self.financials_cache.write(),
1724            self.news_cache.write(),
1725            self.recommendations_cache.write(),
1726            self.options_cache.write(),
1727            self.spark_cache.write(),
1728        );
1729
1730        // Simple key caches — O(1) per removal
1731        for symbol in &to_remove {
1732            let key: Arc<str> = (*symbol).into();
1733            quote_cache.remove(&key);
1734            events_cache.remove(&key);
1735            news_cache.remove(&key);
1736        }
1737
1738        // Composite key caches — O(n) retain but O(1) contains check
1739        chart_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1740        financials_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1741        recommendations_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
1742        options_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
1743        spark_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1744
1745        // Drop all guards before cfg-gated lock
1746        drop((
1747            quote_cache,
1748            chart_cache,
1749            events_cache,
1750            financials_cache,
1751            news_cache,
1752            recommendations_cache,
1753            options_cache,
1754            spark_cache,
1755        ));
1756
1757        #[cfg(feature = "indicators")]
1758        self.indicators_cache
1759            .write()
1760            .await
1761            .retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1762    }
1763
1764    /// Clear all cached data and fetch guards, forcing fresh fetches on next access.
1765    ///
1766    /// Use this when you need up-to-date data from a long-lived `Tickers` instance.
1767    /// Also clears fetch guard maps to prevent unbounded growth.
1768    pub async fn clear_cache(&self) {
1769        tokio::join!(
1770            // Data caches
1771            async { self.quote_cache.write().await.clear() },
1772            async { self.chart_cache.write().await.clear() },
1773            async { self.events_cache.write().await.clear() },
1774            async { self.financials_cache.write().await.clear() },
1775            async { self.news_cache.write().await.clear() },
1776            async { self.recommendations_cache.write().await.clear() },
1777            async { self.options_cache.write().await.clear() },
1778            async { self.spark_cache.write().await.clear() },
1779            async {
1780                #[cfg(feature = "indicators")]
1781                self.indicators_cache.write().await.clear();
1782            },
1783            // Fetch guard maps (prevent unbounded growth)
1784            async { self.charts_fetch.write().await.clear() },
1785            async { self.financials_fetch.write().await.clear() },
1786            async { self.recommendations_fetch.write().await.clear() },
1787            async { self.options_fetch.write().await.clear() },
1788            async { self.spark_fetch.write().await.clear() },
1789            async {
1790                #[cfg(feature = "indicators")]
1791                self.indicators_fetch.write().await.clear();
1792            },
1793        );
1794    }
1795
1796    /// Clear only the cached quote data.
1797    ///
1798    /// The next call to `quotes()` or `quote()` will re-fetch from the API.
1799    pub async fn clear_quote_cache(&self) {
1800        self.quote_cache.write().await.clear();
1801    }
1802
1803    /// Clear only the cached chart, spark, and events data.
1804    ///
1805    /// The next call to `charts()`, `spark()`, `dividends()`, `splits()`,
1806    /// or `capital_gains()` will re-fetch from the API.
1807    pub async fn clear_chart_cache(&self) {
1808        tokio::join!(
1809            async { self.chart_cache.write().await.clear() },
1810            async { self.events_cache.write().await.clear() },
1811            async { self.spark_cache.write().await.clear() },
1812            async {
1813                #[cfg(feature = "indicators")]
1814                self.indicators_cache.write().await.clear();
1815            },
1816        );
1817    }
1818}
1819
1820#[cfg(test)]
1821mod tests {
1822    use super::*;
1823
1824    #[tokio::test]
1825    #[ignore] // Requires network access
1826    async fn test_tickers_quotes() {
1827        let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1828        let result = tickers.quotes().await.unwrap();
1829
1830        assert!(result.success_count() > 0);
1831    }
1832
1833    #[tokio::test]
1834    #[ignore] // Requires network access
1835    async fn test_tickers_charts() {
1836        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1837        let result = tickers
1838            .charts(Interval::OneDay, TimeRange::FiveDays)
1839            .await
1840            .unwrap();
1841
1842        assert!(result.success_count() > 0);
1843    }
1844
1845    #[tokio::test]
1846    #[ignore = "requires network access"]
1847    async fn test_tickers_spark() {
1848        let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1849        let result = tickers
1850            .spark(Interval::FiveMinutes, TimeRange::OneDay)
1851            .await
1852            .unwrap();
1853
1854        assert!(result.success_count() > 0);
1855
1856        // Verify spark data structure
1857        if let Some(spark) = result.sparks.get("AAPL") {
1858            assert!(!spark.closes.is_empty());
1859            assert_eq!(spark.symbol, "AAPL");
1860            // Verify helper methods work
1861            assert!(spark.percent_change().is_some());
1862        }
1863    }
1864
1865    #[tokio::test]
1866    #[ignore = "requires network access"]
1867    async fn test_tickers_dividends() {
1868        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1869        let result = tickers.dividends(TimeRange::OneYear).await.unwrap();
1870
1871        assert!(result.success_count() > 0);
1872
1873        // Verify dividend data structure
1874        if let Some(dividends) = result.dividends.get("AAPL")
1875            && !dividends.is_empty()
1876        {
1877            let div = &dividends[0];
1878            assert!(div.timestamp > 0);
1879            assert!(div.amount > 0.0);
1880        }
1881    }
1882
1883    #[tokio::test]
1884    #[ignore = "requires network access"]
1885    async fn test_tickers_splits() {
1886        let tickers = Tickers::new(["NVDA", "TSLA"]).await.unwrap();
1887        let result = tickers.splits(TimeRange::FiveYears).await.unwrap();
1888
1889        // Note: Not all symbols have splits, so we just check for successful response
1890        assert!(result.success_count() > 0);
1891
1892        // If there are splits, verify structure
1893        for splits in result.splits.values() {
1894            for split in splits {
1895                assert!(split.timestamp > 0);
1896                assert!(split.numerator > 0.0);
1897                assert!(split.denominator > 0.0);
1898                assert!(!split.ratio.is_empty());
1899            }
1900        }
1901    }
1902
1903    #[tokio::test]
1904    #[ignore = "requires network access"]
1905    async fn test_tickers_capital_gains() {
1906        let tickers = Tickers::new(["VFIAX", "VTI"]).await.unwrap();
1907        let result = tickers.capital_gains(TimeRange::TwoYears).await.unwrap();
1908
1909        // Note: Not all symbols have capital gains distributions
1910        assert!(result.success_count() > 0);
1911
1912        // If there are capital gains, verify structure
1913        for gains in result.capital_gains.values() {
1914            for gain in gains {
1915                assert!(gain.timestamp > 0);
1916                assert!(gain.amount >= 0.0);
1917            }
1918        }
1919    }
1920
1921    #[tokio::test]
1922    #[ignore = "requires network access"]
1923    async fn test_tickers_financials() {
1924        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1925        let result = tickers
1926            .financials(StatementType::Income, Frequency::Annual)
1927            .await
1928            .unwrap();
1929
1930        assert!(result.success_count() > 0);
1931
1932        // Verify financial statement structure
1933        for (symbol, stmt) in &result.financials {
1934            assert_eq!(stmt.symbol, *symbol);
1935            assert_eq!(stmt.statement_type, "income");
1936            assert_eq!(stmt.frequency, "annual");
1937            assert!(!stmt.statement.is_empty());
1938
1939            // Common income statement fields
1940            if let Some(revenue) = stmt.statement.get("TotalRevenue") {
1941                assert!(!revenue.is_empty());
1942            }
1943        }
1944    }
1945
1946    #[tokio::test]
1947    #[ignore = "requires network access"]
1948    async fn test_tickers_news() {
1949        let tickers = Tickers::new(["AAPL", "TSLA"]).await.unwrap();
1950        let result = tickers.news().await.unwrap();
1951
1952        assert!(result.success_count() > 0);
1953
1954        // Verify news structure
1955        for articles in result.news.values() {
1956            if !articles.is_empty() {
1957                let article = &articles[0];
1958                assert!(!article.title.is_empty());
1959                assert!(!article.link.is_empty());
1960                assert!(!article.source.is_empty());
1961            }
1962        }
1963    }
1964
1965    #[tokio::test]
1966    #[ignore = "requires network access"]
1967    async fn test_tickers_recommendations() {
1968        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1969        let result = tickers.recommendations(5).await.unwrap();
1970
1971        assert!(result.success_count() > 0);
1972
1973        // Verify recommendations structure
1974        for (symbol, rec) in &result.recommendations {
1975            assert_eq!(rec.symbol, *symbol);
1976            assert!(rec.count() > 0);
1977            for similar in &rec.recommendations {
1978                assert!(!similar.symbol.is_empty());
1979            }
1980        }
1981    }
1982
1983    #[tokio::test]
1984    #[ignore = "requires network access"]
1985    async fn test_tickers_options() {
1986        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1987        let result = tickers.options(None).await.unwrap();
1988
1989        assert!(result.success_count() > 0);
1990
1991        // Verify options structure
1992        for opts in result.options.values() {
1993            assert!(!opts.expiration_dates().is_empty());
1994        }
1995    }
1996
1997    #[tokio::test]
1998    #[ignore = "requires network access"]
1999    #[cfg(feature = "indicators")]
2000    async fn test_tickers_indicators() {
2001        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
2002        let result = tickers
2003            .indicators(Interval::OneDay, TimeRange::ThreeMonths)
2004            .await
2005            .unwrap();
2006
2007        assert!(result.success_count() > 0);
2008
2009        // Verify indicators structure
2010        for ind in result.indicators.values() {
2011            // Check that at least some indicators are present
2012            assert!(ind.rsi_14.is_some() || ind.sma_20.is_some());
2013        }
2014    }
2015
2016    #[tokio::test]
2017    async fn test_tickers_add_symbols() {
2018        let mut tickers = Tickers::new(["AAPL"]).await.unwrap();
2019        assert_eq!(tickers.len(), 1);
2020        assert_eq!(tickers.symbols(), &["AAPL"]);
2021
2022        tickers.add_symbols(&["MSFT", "GOOGL"]);
2023        assert_eq!(tickers.len(), 3);
2024        assert!(tickers.symbols().contains(&"AAPL"));
2025        assert!(tickers.symbols().contains(&"MSFT"));
2026        assert!(tickers.symbols().contains(&"GOOGL"));
2027
2028        // Adding duplicate shouldn't increase count
2029        tickers.add_symbols(&["AAPL"]);
2030        assert_eq!(tickers.len(), 3);
2031    }
2032
2033    #[tokio::test]
2034    #[ignore = "requires network access"]
2035    async fn test_tickers_remove_symbols() {
2036        let mut tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
2037        assert_eq!(tickers.len(), 3);
2038
2039        // Fetch some data to populate caches
2040        let _ = tickers.quotes().await;
2041
2042        // Remove one symbol
2043        tickers.remove_symbols(&["MSFT"]).await;
2044        assert_eq!(tickers.len(), 2);
2045        assert!(tickers.symbols().contains(&"AAPL"));
2046        assert!(!tickers.symbols().contains(&"MSFT"));
2047        assert!(tickers.symbols().contains(&"GOOGL"));
2048
2049        // Verify cache was cleared
2050        let quotes = tickers.quotes().await.unwrap();
2051        assert!(!quotes.quotes.contains_key("MSFT"));
2052        assert_eq!(quotes.quotes.len(), 2);
2053    }
2054}