Skip to main content

finance_query/tickers/
core.rs

1//! Tickers implementation for batch operations on multiple symbols.
2//!
3//! Optimizes data fetching by using batch endpoints and concurrent requests.
4
5use super::macros::{batch_fetch_cached, define_batch_response};
6use crate::adapters::yahoo::client::ClientConfig;
7#[cfg(feature = "backtesting")]
8use crate::backtesting;
9use crate::constants::{Frequency, Interval, Region, StatementType, TimeRange};
10use crate::error::{FinanceError, Result};
11use crate::format::Both;
12#[cfg(any(feature = "backtesting", feature = "indicators"))]
13use crate::indicators;
14use crate::models::chart::events::ChartEvents;
15use crate::models::chart::spark::Spark;
16use crate::models::chart::spark::response::SparkResponse;
17use crate::models::chart::{CapitalGain, Chart, Dividend, Split};
18use crate::models::corporate::news::News;
19use crate::models::corporate::recommendation::Recommendation;
20use crate::models::format::Format;
21use crate::models::fundamentals::FinancialStatement;
22use crate::models::options::Options;
23use crate::models::quote::{Quote, QuoteSummaryResponse};
24
25use crate::providers::types::recommendation_from_similar;
26use crate::providers::yahoo::YahooProvider;
27use crate::providers::{
28    Capability, Fetch, Provider, ProviderAdapter, ProviderSet, Routes, build_providers,
29};
30use crate::ticker::ClientHandle;
31use crate::utils::{CacheEntry, EVICTION_THRESHOLD, filter_by_range};
32use futures::stream::{self, StreamExt};
33use std::collections::HashMap;
34use std::sync::Arc;
35use std::time::Duration;
36use tokio::sync::RwLock;
37
38// Type aliases — MapCache wraps values in CacheEntry for TTL support.
39type MapCache<K, V> = Arc<RwLock<HashMap<K, CacheEntry<V>>>>;
40type ChartCacheKey = (Arc<str>, Interval, TimeRange);
41type QuoteCache = MapCache<Arc<str>, Quote>;
42type ChartCache = MapCache<ChartCacheKey, Chart>;
43type EventsCache = MapCache<Arc<str>, ChartEvents>;
44type FinancialsCache = MapCache<(Arc<str>, StatementType, Frequency), FinancialStatement>;
45type NewsCache = MapCache<Arc<str>, Vec<News>>;
46type RecommendationsCache = MapCache<(Arc<str>, u32), Recommendation>;
47type OptionsCache = MapCache<(Arc<str>, Option<i64>), Options>;
48type SparkCacheKey = (Arc<str>, Interval, TimeRange);
49type SparkCache = MapCache<SparkCacheKey, Spark>;
50#[cfg(feature = "indicators")]
51type IndicatorsCache = MapCache<(Arc<str>, Interval, TimeRange), indicators::IndicatorsSummary>;
52
53// Fetch guards for request deduplication — prevent concurrent duplicate fetches
54type FetchGuard = Arc<tokio::sync::Mutex<()>>;
55type FetchGuardMap<K> = Arc<RwLock<HashMap<K, FetchGuard>>>;
56
57// Generate all batch response types
58define_batch_response! {
59    /// Response containing quotes for multiple symbols.
60    BatchQuotesResponse => quotes: Quote
61}
62
63define_batch_response! {
64    /// Response containing charts for multiple symbols.
65    BatchChartsResponse => charts: Chart
66}
67
68define_batch_response! {
69    /// Response containing spark data for multiple symbols.
70    ///
71    /// Spark data is optimized for sparkline rendering with only close prices.
72    /// Unlike charts, spark data is fetched in a single batch request.
73    BatchSparksResponse => sparks: Spark
74}
75
76define_batch_response! {
77    /// Response containing dividends for multiple symbols.
78    BatchDividendsResponse => dividends: Vec<Dividend>
79}
80
81define_batch_response! {
82    /// Response containing splits for multiple symbols.
83    BatchSplitsResponse => splits: Vec<Split>
84}
85
86define_batch_response! {
87    /// Response containing capital gains for multiple symbols.
88    BatchCapitalGainsResponse => capital_gains: Vec<CapitalGain>
89}
90
91define_batch_response! {
92    /// Response containing financial statements for multiple symbols.
93    BatchFinancialsResponse => financials: FinancialStatement
94}
95
96define_batch_response! {
97    /// Response containing news articles for multiple symbols.
98    BatchNewsResponse => news: Vec<News>
99}
100
101define_batch_response! {
102    /// Response containing recommendations for multiple symbols.
103    BatchRecommendationsResponse => recommendations: Recommendation
104}
105
106define_batch_response! {
107    /// Response containing options chains for multiple symbols.
108    BatchOptionsResponse => options: Options
109}
110
111#[cfg(feature = "indicators")]
112define_batch_response! {
113    /// Response containing technical indicators for multiple symbols.
114    BatchIndicatorsResponse => indicators: indicators::IndicatorsSummary
115}
116
117/// Default maximum concurrent requests for batch operations.
118const DEFAULT_MAX_CONCURRENCY: usize = 10;
119
120/// Builder for Tickers
121pub struct TickersBuilder {
122    symbols: Vec<Arc<str>>,
123    config: ClientConfig,
124    shared_client: Option<ClientHandle>,
125    injected_providers: Option<Arc<ProviderSet>>,
126    max_concurrency: usize,
127    cache_ttl: Option<Duration>,
128    include_logo: bool,
129}
130
131impl TickersBuilder {
132    fn new<S, I>(symbols: I) -> Self
133    where
134        S: Into<String>,
135        I: IntoIterator<Item = S>,
136    {
137        Self {
138            symbols: symbols.into_iter().map(|s| s.into().into()).collect(),
139            config: ClientConfig::default(),
140            shared_client: None,
141            injected_providers: None,
142            max_concurrency: DEFAULT_MAX_CONCURRENCY,
143            cache_ttl: None,
144            include_logo: false,
145        }
146    }
147
148    /// Set the region (automatically sets correct lang and region code)
149    pub fn region(mut self, region: Region) -> Self {
150        self.config.lang = region.lang().to_string();
151        self.config.region = region.region().to_string();
152        self
153    }
154
155    /// Set the language code (e.g., "en-US", "ja-JP", "de-DE")
156    pub fn lang(mut self, lang: impl Into<String>) -> Self {
157        self.config.lang = lang.into();
158        self
159    }
160
161    /// Set the region code (e.g., "US", "JP", "DE")
162    pub fn region_code(mut self, region: impl Into<String>) -> Self {
163        self.config.region = region.into();
164        self
165    }
166
167    /// Set the HTTP request timeout
168    pub fn timeout(mut self, timeout: Duration) -> Self {
169        self.config.timeout = timeout;
170        self
171    }
172
173    /// Set the proxy URL
174    pub fn proxy(mut self, proxy: impl Into<String>) -> Self {
175        self.config.proxy = Some(proxy.into());
176        self
177    }
178
179    /// Set a complete ClientConfig
180    pub fn config(mut self, config: ClientConfig) -> Self {
181        self.config = config;
182        self
183    }
184
185    /// Set the maximum number of concurrent requests for batch operations.
186    ///
187    /// Controls how many HTTP requests run in parallel when methods like
188    /// `charts()`, `financials()`, or `news()` fetch data for each symbol.
189    /// Default is 10.
190    ///
191    /// Lower values reduce the risk of rate limiting from Yahoo Finance.
192    /// Higher values increase throughput for large symbol lists.
193    pub fn max_concurrency(mut self, n: usize) -> Self {
194        self.max_concurrency = n.max(1);
195        self
196    }
197
198    /// Enable response caching with a time-to-live.
199    ///
200    /// By default caching is **disabled** — every call fetches fresh data.
201    /// When enabled, responses are reused until the TTL expires. Stale
202    /// entries are evicted on the next write.
203    ///
204    /// # Example
205    ///
206    /// ```no_run
207    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
208    /// use finance_query::Tickers;
209    /// use std::time::Duration;
210    ///
211    /// let tickers = Tickers::builder(["AAPL", "MSFT"])
212    ///     .cache(Duration::from_secs(30))
213    ///     .build()
214    ///     .await?;
215    /// # Ok(())
216    /// # }
217    /// ```
218    pub fn cache(mut self, ttl: Duration) -> Self {
219        self.cache_ttl = Some(ttl);
220        self
221    }
222
223    /// Include company logo URLs in quote responses.
224    ///
225    /// When enabled, `quotes()` will fetch logo URLs in parallel with the
226    /// quote batch request, adding a small extra request.
227    pub fn logo(mut self) -> Self {
228        self.include_logo = true;
229        self
230    }
231
232    /// Pre-inject a shared provider set (used by [`Providers::tickers`]).
233    pub(crate) fn with_provider_set(mut self, set: Arc<ProviderSet>) -> Self {
234        self.injected_providers = Some(set);
235        self
236    }
237
238    /// Share an existing authenticated session instead of creating a new one.
239    ///
240    /// Avoids redundant auth handshakes when combining `Tickers` with other
241    /// `Ticker` instances. Obtain a handle from any existing `Ticker` or
242    /// `Tickers` via `.client_handle()`.
243    pub fn client(mut self, handle: ClientHandle) -> Self {
244        self.shared_client = Some(handle);
245        self
246    }
247
248    /// Build the Tickers instance
249    pub async fn build(self) -> Result<Tickers> {
250        let providers = if let Some(set) = self.injected_providers {
251            set
252        } else if let Some(handle) = self.shared_client {
253            let yahoo = YahooProvider::from_client(handle.0);
254            let client = yahoo.client_arc();
255            Arc::new(ProviderSet::new(
256                vec![Arc::new(yahoo) as Arc<dyn ProviderAdapter>],
257                Some(client),
258                Routes::new(Fetch::Sequential),
259            ))
260        } else {
261            Arc::new(
262                build_providers(
263                    &[Provider::Yahoo],
264                    &self.config,
265                    Routes::new(Fetch::Sequential),
266                )
267                .await?,
268            )
269        };
270
271        Ok(Tickers {
272            symbols: self.symbols,
273            providers,
274            max_concurrency: self.max_concurrency,
275            cache_ttl: self.cache_ttl,
276            include_logo: self.include_logo,
277            quote_cache: Default::default(),
278            chart_cache: Default::default(),
279            events_cache: Default::default(),
280            financials_cache: Default::default(),
281            news_cache: Default::default(),
282            recommendations_cache: Default::default(),
283            options_cache: Default::default(),
284            spark_cache: Default::default(),
285            #[cfg(feature = "indicators")]
286            indicators_cache: Default::default(),
287
288            // Initialize fetch guards for request deduplication
289            quotes_fetch: Arc::new(tokio::sync::Mutex::new(())),
290            charts_fetch: Default::default(),
291            financials_fetch: Default::default(),
292            news_fetch: Arc::new(tokio::sync::Mutex::new(())),
293            recommendations_fetch: Default::default(),
294            options_fetch: Default::default(),
295            spark_fetch: Default::default(),
296            #[cfg(feature = "indicators")]
297            indicators_fetch: Default::default(),
298        })
299    }
300}
301
302/// Multi-symbol ticker for efficient batch operations.
303///
304/// `Tickers` optimizes data fetching for multiple symbols by:
305/// - Using batch endpoints where available (e.g., /v7/finance/quote)
306/// - Fetching concurrently when batch endpoints don't exist
307/// - Sharing a single authenticated client across all symbols
308/// - Caching results per symbol
309///
310/// # Example
311///
312/// ```no_run
313/// use finance_query::Tickers;
314///
315/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
316/// // Create tickers for multiple symbols
317/// let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await?;
318///
319/// // Batch fetch all quotes (single API call)
320/// let quotes = tickers.quotes().await?;
321/// for (symbol, quote) in &quotes.quotes {
322///     let price = quote.regular_market_price.as_ref().and_then(|v| v.raw).unwrap_or(0.0);
323///     println!("{}: ${:.2}", symbol, price);
324/// }
325///
326/// // Fetch charts concurrently
327/// use finance_query::{Interval, TimeRange};
328/// let charts = tickers.charts(Interval::OneDay, TimeRange::OneMonth).await?;
329/// # Ok(())
330/// # }
331/// ```
332pub struct Tickers {
333    symbols: Vec<Arc<str>>,
334    providers: Arc<ProviderSet>,
335    max_concurrency: usize,
336    cache_ttl: Option<Duration>,
337    include_logo: bool,
338    quote_cache: QuoteCache,
339    chart_cache: ChartCache,
340    events_cache: EventsCache,
341    financials_cache: FinancialsCache,
342    news_cache: NewsCache,
343    recommendations_cache: RecommendationsCache,
344    options_cache: OptionsCache,
345    spark_cache: SparkCache,
346    #[cfg(feature = "indicators")]
347    indicators_cache: IndicatorsCache,
348
349    // Fetch guards prevent duplicate concurrent requests
350    quotes_fetch: FetchGuard,
351    charts_fetch: FetchGuardMap<(Interval, TimeRange)>,
352    financials_fetch: FetchGuardMap<(StatementType, Frequency)>,
353    news_fetch: FetchGuard,
354    recommendations_fetch: FetchGuardMap<u32>,
355    options_fetch: FetchGuardMap<Option<i64>>,
356    spark_fetch: FetchGuardMap<(Interval, TimeRange)>,
357    #[cfg(feature = "indicators")]
358    indicators_fetch: FetchGuardMap<(Interval, TimeRange)>,
359}
360
361impl std::fmt::Debug for Tickers {
362    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
363        f.debug_struct("Tickers")
364            .field("symbols", &self.symbols)
365            .field("max_concurrency", &self.max_concurrency)
366            .field("cache_ttl", &self.cache_ttl)
367            .finish_non_exhaustive()
368    }
369}
370
371impl Tickers {
372    /// Creates new tickers with default configuration
373    ///
374    /// # Arguments
375    ///
376    /// * `symbols` - Iterable of stock symbols (e.g., `["AAPL", "MSFT"]`)
377    ///
378    /// # Example
379    ///
380    /// ```no_run
381    /// use finance_query::Tickers;
382    ///
383    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
384    /// let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await?;
385    /// # Ok(())
386    /// # }
387    /// ```
388    pub async fn new<S, I>(symbols: I) -> Result<Self>
389    where
390        S: Into<String>,
391        I: IntoIterator<Item = S>,
392    {
393        Self::builder(symbols).build().await
394    }
395
396    /// Creates a new builder for Tickers
397    pub fn builder<S, I>(symbols: I) -> TickersBuilder
398    where
399        S: Into<String>,
400        I: IntoIterator<Item = S>,
401    {
402        TickersBuilder::new(symbols)
403    }
404
405    /// Returns the symbols this tickers instance manages
406    pub fn symbols(&self) -> Vec<&str> {
407        self.symbols.iter().map(|s| &**s).collect()
408    }
409
410    /// Number of symbols
411    pub fn len(&self) -> usize {
412        self.symbols.len()
413    }
414
415    /// Check if empty
416    pub fn is_empty(&self) -> bool {
417        self.symbols.is_empty()
418    }
419
420    /// Returns a handle to the underlying Yahoo Finance session.
421    ///
422    /// Pass to [`Ticker::builder`](crate::Ticker::builder) or other
423    /// [`Tickers::builder`] calls via `.client(handle)` to share the
424    /// authenticated session without a new auth handshake.
425    pub fn client_handle(&self) -> ClientHandle {
426        ClientHandle(
427            self.providers
428                .first_yahoo()
429                .expect("Tickers always uses a Yahoo session"),
430        )
431    }
432
433    /// Returns `true` if a cache entry exists and has not exceeded the TTL.
434    #[inline]
435    fn is_cache_fresh<T>(&self, entry: Option<&CacheEntry<T>>) -> bool {
436        CacheEntry::is_fresh_with_ttl(entry, self.cache_ttl)
437    }
438
439    /// Returns `true` if all keys are present and fresh in a map cache.
440    fn all_cached<K: Eq + std::hash::Hash, V>(
441        &self,
442        map: &HashMap<K, CacheEntry<V>>,
443        keys: impl Iterator<Item = K>,
444    ) -> bool {
445        let Some(ttl) = self.cache_ttl else {
446            return false;
447        };
448        keys.into_iter()
449            .all(|k| map.get(&k).map(|e| e.is_fresh(ttl)).unwrap_or(false))
450    }
451
452    /// Insert into a map cache, amortizing stale-entry eviction.
453    ///
454    /// Only sweeps stale entries when the map exceeds [`EVICTION_THRESHOLD`],
455    /// avoiding O(n) scans on every write.
456    #[inline]
457    fn cache_insert<K: Eq + std::hash::Hash, V>(
458        &self,
459        map: &mut HashMap<K, CacheEntry<V>>,
460        key: K,
461        value: V,
462    ) {
463        if let Some(ttl) = self.cache_ttl {
464            if map.len() >= EVICTION_THRESHOLD {
465                map.retain(|_, entry| entry.is_fresh(ttl));
466            }
467            map.insert(key, CacheEntry::new(value));
468        }
469    }
470
471    /// Batch fetch quotes for all symbols.
472    ///
473    /// Dispatches through the configured provider set. When logos are enabled,
474    /// fetches logo URLs in parallel via the Yahoo client.
475    ///
476    /// Use [`TickersBuilder::logo()`](TickersBuilder::logo) to enable logo fetching
477    /// for this tickers instance.
478    pub async fn quotes(&self) -> Result<BatchQuotesResponse> {
479        // Fast path: check if all symbols are cached
480        {
481            let cache = self.quote_cache.read().await;
482            if self.all_cached(&cache, self.symbols.iter().cloned()) {
483                let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
484                for symbol in &self.symbols {
485                    if let Some(entry) = cache.get(symbol) {
486                        response
487                            .quotes
488                            .insert(symbol.to_string(), entry.value.clone());
489                    }
490                }
491                return Ok(response);
492            }
493        }
494
495        let _fetch_guard = self.quotes_fetch.lock().await;
496
497        // Double-check: another task may have fetched while we waited
498        {
499            let cache = self.quote_cache.read().await;
500            if self.all_cached(&cache, self.symbols.iter().cloned()) {
501                let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
502                for symbol in &self.symbols {
503                    if let Some(entry) = cache.get(symbol) {
504                        response
505                            .quotes
506                            .insert(symbol.to_string(), entry.value.clone());
507                    }
508                }
509                return Ok(response);
510            }
511        }
512
513        let symbol_strings: Vec<String> = self.symbols.iter().map(|s| s.to_string()).collect();
514        let mut response = BatchQuotesResponse::with_capacity(self.symbols.len());
515
516        let (quote_data, logos) = if self.include_logo {
517            // Fire logo fetch in parallel with quote fetch; logos are Yahoo-only
518            let providers_logo = Arc::clone(&self.providers);
519            let syms_logo = symbol_strings.clone();
520            let logo_future = async move {
521                if let Ok(client) = providers_logo.first_yahoo() {
522                    let syms_ref: Vec<&str> = syms_logo.iter().map(String::as_str).collect();
523                    crate::adapters::yahoo::quote::quotes::fetch_with_fields(
524                        &client,
525                        &syms_ref,
526                        Some(&["logoUrl", "companyLogoUrl"]),
527                        true,
528                        true,
529                    )
530                    .await
531                    .ok()
532                } else {
533                    None
534                }
535            };
536
537            let providers_quote = Arc::clone(&self.providers);
538            let syms_quote = symbol_strings.clone();
539            let quote_future = async move {
540                providers_quote
541                    .fetch(Capability::QUOTE, |p| {
542                        let syms = syms_quote.clone();
543                        let p = p.clone();
544                        async move {
545                            let syms_ref: Vec<&str> = syms.iter().map(String::as_str).collect();
546                            p.fetch_quotes_batch(&syms_ref).await
547                        }
548                    })
549                    .await
550            };
551
552            let (batch_result, logo_result) = tokio::join!(quote_future, logo_future);
553            let quote_data = match batch_result {
554                Ok(data) => data,
555                Err(_) => {
556                    self.fetch_quotes_per_symbol(&symbol_strings, &mut response)
557                        .await
558                }
559            };
560            (quote_data, logo_result)
561        } else {
562            let providers = Arc::clone(&self.providers);
563            let syms = symbol_strings.clone();
564            let batch_result = providers
565                .fetch(Capability::QUOTE, |p| {
566                    let syms = syms.clone();
567                    let p = p.clone();
568                    async move {
569                        let syms_ref: Vec<&str> = syms.iter().map(String::as_str).collect();
570                        p.fetch_quotes_batch(&syms_ref).await
571                    }
572                })
573                .await;
574            let data = match batch_result {
575                Ok(data) => data,
576                Err(_) => {
577                    self.fetch_quotes_per_symbol(&symbol_strings, &mut response)
578                        .await
579                }
580            };
581            (data, None)
582        };
583
584        let logo_map: HashMap<String, (Option<String>, Option<String>)> = logos
585            .and_then(|l| l.get("quoteResponse")?.get("result")?.as_array().cloned())
586            .map(|results| {
587                results
588                    .iter()
589                    .filter_map(|r| {
590                        let symbol = r.get("symbol")?.as_str()?.to_string();
591                        let logo_url = r.get("logoUrl").and_then(|v| v.as_str()).map(String::from);
592                        let company_logo_url = r
593                            .get("companyLogoUrl")
594                            .and_then(|v| v.as_str())
595                            .map(String::from);
596                        Some((symbol, (logo_url, company_logo_url)))
597                    })
598                    .collect()
599            })
600            .unwrap_or_default();
601
602        let mut parsed_quotes: Vec<(String, Quote)> = Vec::new();
603
604        for (symbol, summary) in quote_data {
605            let logo_url = logo_map.get(&symbol).and_then(|(l, _)| l.clone());
606            let company_logo_url = logo_map.get(&symbol).and_then(|(_, c)| c.clone());
607            let quote = Quote::from_response(&summary, logo_url, company_logo_url);
608            parsed_quotes.push((symbol, quote));
609        }
610
611        if self.cache_ttl.is_some() {
612            let mut cache = self.quote_cache.write().await;
613            for (symbol, quote) in &parsed_quotes {
614                self.cache_insert(&mut cache, symbol.as_str().into(), quote.clone());
615            }
616        }
617
618        for (symbol, quote) in parsed_quotes {
619            response.quotes.insert(symbol, quote);
620        }
621
622        // Track missing symbols
623        for symbol in &self.symbols {
624            let s = &**symbol;
625            if !response.quotes.contains_key(s) && !response.errors.contains_key(s) {
626                response.errors.insert(
627                    symbol.to_string(),
628                    "Symbol not found in response".to_string(),
629                );
630            }
631        }
632
633        Ok(response)
634    }
635
636    /// Fallback for when no provider supports `fetch_quotes_batch`.
637    /// Fetches each symbol individually; failures go into `response.errors`.
638    async fn fetch_quotes_per_symbol(
639        &self,
640        symbols: &[String],
641        response: &mut BatchQuotesResponse,
642    ) -> Vec<(String, QuoteSummaryResponse)> {
643        let futures: Vec<_> = symbols
644            .iter()
645            .map(|sym| {
646                let providers = Arc::clone(&self.providers);
647                let sym = sym.clone();
648                async move {
649                    let result = providers
650                        .fetch(Capability::QUOTE, |p| {
651                            let sym = sym.clone();
652                            let p = p.clone();
653                            async move { p.fetch_quote(&sym).await }
654                        })
655                        .await;
656                    (sym, result)
657                }
658            })
659            .collect();
660
661        let results: Vec<_> = stream::iter(futures)
662            .buffer_unordered(self.max_concurrency)
663            .collect()
664            .await;
665
666        let mut successes = Vec::new();
667        for (sym, result) in results {
668            match result {
669                Ok(resp) => successes.push((sym, resp)),
670                Err(e) => {
671                    response.errors.insert(sym, e.to_string());
672                }
673            }
674        }
675        successes
676    }
677
678    /// Get a specific quote by symbol (from cache or fetch all)
679    pub async fn quote<F>(&self, symbol: &str) -> Result<Quote<F>>
680    where
681        F: Format,
682        Quote<Both>: Into<Quote<F>>,
683    {
684        {
685            let cache = self.quote_cache.read().await;
686            if let Some(entry) = cache.get(symbol)
687                && self.is_cache_fresh(Some(entry))
688            {
689                return Ok(entry.value.clone().into());
690            }
691        }
692
693        let response = self.quotes().await?;
694
695        response
696            .quotes
697            .get(symbol)
698            .cloned()
699            .map(Into::into)
700            .ok_or_else(|| FinanceError::SymbolNotFound {
701                symbol: Some(symbol.to_string()),
702                context: response
703                    .errors
704                    .get(symbol)
705                    .cloned()
706                    .unwrap_or_else(|| "Symbol not found".to_string()),
707            })
708    }
709
710    /// Helper to get or create a fetch guard for a given key.
711    ///
712    /// Returns the guard from the map, never a locally-created copy that
713    /// could diverge under contention.
714    async fn get_fetch_guard<K: Clone + Eq + std::hash::Hash>(
715        guard_map: &FetchGuardMap<K>,
716        key: K,
717    ) -> FetchGuard {
718        {
719            let guards = guard_map.read().await;
720            if let Some(guard) = guards.get(&key) {
721                return Arc::clone(guard);
722            }
723        }
724
725        let mut guards = guard_map.write().await;
726        Arc::clone(
727            guards
728                .entry(key)
729                .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))),
730        )
731    }
732
733    /// Batch fetch charts for all symbols concurrently
734    ///
735    /// Chart data cannot be batched in a single request, so this fetches
736    /// all charts concurrently using tokio for maximum performance.
737    pub async fn charts(
738        &self,
739        interval: Interval,
740        range: TimeRange,
741    ) -> Result<BatchChartsResponse> {
742        // Fast path: check if all symbols are cached
743        {
744            let cache = self.chart_cache.read().await;
745            if self.all_cached(
746                &cache,
747                self.symbols.iter().map(|s| (s.clone(), interval, range)),
748            ) {
749                let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
750                for symbol in &self.symbols {
751                    if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
752                        response
753                            .charts
754                            .insert(symbol.to_string(), entry.value.clone());
755                    }
756                }
757                return Ok(response);
758            }
759        }
760
761        // Slow path: acquire fetch guard to prevent duplicate concurrent requests
762        let fetch_guard = Self::get_fetch_guard(&self.charts_fetch, (interval, range)).await;
763        let _guard = fetch_guard.lock().await;
764
765        // Double-check: another task may have fetched while we waited
766        {
767            let cache = self.chart_cache.read().await;
768            if self.all_cached(
769                &cache,
770                self.symbols.iter().map(|s| (s.clone(), interval, range)),
771            ) {
772                let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
773                for symbol in &self.symbols {
774                    if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
775                        response
776                            .charts
777                            .insert(symbol.to_string(), entry.value.clone());
778                    }
779                }
780                return Ok(response);
781            }
782        }
783
784        // Fetch all charts concurrently via provider dispatch (no lock held during I/O)
785        let futures: Vec<_> = self
786            .symbols
787            .iter()
788            .map(|symbol| {
789                let providers = Arc::clone(&self.providers);
790                let symbol = Arc::clone(symbol);
791                async move {
792                    let sym = symbol.to_string();
793                    let result = providers
794                        .fetch(Capability::CHART, |p| {
795                            let sym = sym.clone();
796                            let p = p.clone();
797                            async move { p.fetch_chart(&sym, interval, range).await }
798                        })
799                        .await;
800                    (symbol, result)
801                }
802            })
803            .collect();
804
805        let results: Vec<_> = stream::iter(futures)
806            .buffer_unordered(self.max_concurrency)
807            .collect()
808            .await;
809
810        let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
811        let mut parsed_charts: Vec<(Arc<str>, Chart)> = Vec::new();
812
813        for (symbol, result) in results {
814            match result {
815                Ok(data) => {
816                    let chart = data;
817                    parsed_charts.push((symbol, chart));
818                }
819                Err(e) => {
820                    response.errors.insert(symbol.to_string(), e.to_string());
821                }
822            }
823        }
824
825        // Move into cache, then clone for response — avoids double-clone
826        if self.cache_ttl.is_some() {
827            let mut cache = self.chart_cache.write().await;
828            let cache_keys: Vec<_> = parsed_charts
829                .into_iter()
830                .map(|(symbol, chart)| {
831                    self.cache_insert(&mut cache, (symbol.clone(), interval, range), chart);
832                    symbol
833                })
834                .collect();
835            for symbol in cache_keys {
836                if let Some(cached) = cache.get(&(symbol.clone(), interval, range)) {
837                    response
838                        .charts
839                        .insert(symbol.to_string(), cached.value.clone());
840                }
841            }
842        } else {
843            for (symbol, chart) in parsed_charts {
844                response.charts.insert(symbol.to_string(), chart);
845            }
846        }
847
848        Ok(response)
849    }
850
851    /// Get a specific chart by symbol
852    pub async fn chart(&self, symbol: &str, interval: Interval, range: TimeRange) -> Result<Chart> {
853        {
854            let cache = self.chart_cache.read().await;
855            let key: Arc<str> = symbol.into();
856            if let Some(entry) = cache.get(&(key, interval, range))
857                && self.is_cache_fresh(Some(entry))
858            {
859                return Ok(entry.value.clone());
860            }
861        }
862
863        let response = self.charts(interval, range).await?;
864
865        response
866            .charts
867            .get(symbol)
868            .cloned()
869            .ok_or_else(|| FinanceError::SymbolNotFound {
870                symbol: Some(symbol.to_string()),
871                context: response
872                    .errors
873                    .get(symbol)
874                    .cloned()
875                    .unwrap_or_else(|| "Symbol not found".to_string()),
876            })
877    }
878
879    /// Batch fetch chart data for a custom date range for all symbols concurrently.
880    ///
881    /// Unlike [`charts()`](Self::charts) which uses predefined time ranges,
882    /// this method accepts absolute start/end timestamps. Results are **not cached**
883    /// since custom ranges have unbounded key space.
884    ///
885    /// # Arguments
886    ///
887    /// * `interval` - Time interval between data points
888    /// * `start` - Start date as Unix timestamp (seconds since epoch)
889    /// * `end` - End date as Unix timestamp (seconds since epoch)
890    pub async fn charts_range(
891        &self,
892        interval: Interval,
893        start: i64,
894        end: i64,
895    ) -> Result<BatchChartsResponse> {
896        let futures: Vec<_> = self
897            .symbols
898            .iter()
899            .map(|symbol| {
900                let providers = Arc::clone(&self.providers);
901                let symbol = Arc::clone(symbol);
902                async move {
903                    let sym = symbol.to_string();
904                    let result = providers
905                        .fetch(Capability::CHART, |p| {
906                            let sym = sym.clone();
907                            let p = p.clone();
908                            async move { p.fetch_chart_range(&sym, interval, start, end).await }
909                        })
910                        .await;
911                    (symbol, result)
912                }
913            })
914            .collect();
915
916        let results: Vec<_> = stream::iter(futures)
917            .buffer_unordered(self.max_concurrency)
918            .collect()
919            .await;
920
921        let mut response = BatchChartsResponse::with_capacity(self.symbols.len());
922
923        for (symbol, result) in results {
924            match result {
925                Ok(data) => {
926                    let chart = data;
927                    response.charts.insert(symbol.to_string(), chart);
928                }
929                Err(e) => {
930                    response.errors.insert(symbol.to_string(), e.to_string());
931                }
932            }
933        }
934
935        Ok(response)
936    }
937
938    /// Ensures events are loaded for all symbols using chart requests.
939    ///
940    /// Fetches events concurrently for symbols that don't have cached events.
941    /// Uses `TimeRange::Max` to get full event history (Yahoo returns all
942    /// dividends/splits/capital gains regardless of chart range).
943    ///
944    /// Events are always stored regardless of `cache_ttl` because they are
945    /// derived data (not a TTL-bounded cache). When `cache_ttl` is `None`,
946    /// events persist for the lifetime of the `Tickers` instance.
947    async fn ensure_events_loaded(&self) -> Result<()> {
948        // Check which symbols need event data (existence check, not TTL-based)
949        let symbols_to_fetch: Vec<Arc<str>> = {
950            let cache = self.events_cache.read().await;
951            self.symbols
952                .iter()
953                .filter(|sym| !cache.contains_key(*sym))
954                .cloned()
955                .collect()
956        };
957
958        if symbols_to_fetch.is_empty() {
959            return Ok(());
960        }
961
962        // Fetch events concurrently for all symbols that need it via provider dispatch
963        let futures: Vec<_> = symbols_to_fetch
964            .iter()
965            .map(|symbol| {
966                let providers = Arc::clone(&self.providers);
967                let symbol = Arc::clone(symbol);
968                async move {
969                    let sym = symbol.to_string();
970                    let result = providers
971                        .fetch(Capability::CORPORATE, |p| {
972                            let sym = sym.clone();
973                            let p = p.clone();
974                            async move { p.fetch_events(&sym).await }
975                        })
976                        .await;
977                    (symbol, result)
978                }
979            })
980            .collect();
981
982        let results: Vec<_> = stream::iter(futures)
983            .buffer_unordered(self.max_concurrency)
984            .collect()
985            .await;
986
987        let mut parsed_events: Vec<(Arc<str>, ChartEvents)> = Vec::new();
988
989        for (symbol, result) in results {
990            if let Ok(events_data) = result {
991                parsed_events.push((symbol, events_data));
992            }
993        }
994
995        // Always store events — they are derived data, not TTL-bounded cache
996        if !parsed_events.is_empty() {
997            let mut events_cache = self.events_cache.write().await;
998            for (symbol, events) in parsed_events {
999                events_cache.insert(symbol, CacheEntry::new(events));
1000            }
1001        }
1002
1003        Ok(())
1004    }
1005
1006    /// Batch fetch spark data for all symbols in a single request.
1007    ///
1008    /// Spark data is optimized for sparkline rendering, returning only close prices.
1009    /// Unlike `charts()`, this fetches all symbols in ONE API call, making it
1010    /// much more efficient for displaying price trends on dashboards or watchlists.
1011    ///
1012    /// # Arguments
1013    ///
1014    /// * `interval` - Time interval between data points (e.g., `Interval::FiveMinutes`)
1015    /// * `range` - Time range to fetch (e.g., `TimeRange::OneDay`)
1016    ///
1017    /// # Example
1018    ///
1019    /// ```no_run
1020    /// use finance_query::{Tickers, Interval, TimeRange};
1021    ///
1022    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1023    /// let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await?;
1024    /// let sparks = tickers.spark(Interval::FiveMinutes, TimeRange::OneDay).await?;
1025    ///
1026    /// for (symbol, spark) in &sparks.sparks {
1027    ///     if let Some(change) = spark.percent_change() {
1028    ///         println!("{}: {:.2}%", symbol, change);
1029    ///     }
1030    /// }
1031    /// # Ok(())
1032    /// # }
1033    /// ```
1034    pub async fn spark(&self, interval: Interval, range: TimeRange) -> Result<BatchSparksResponse> {
1035        // Fast path: check if all symbols are cached
1036        {
1037            let cache = self.spark_cache.read().await;
1038            if self.all_cached(
1039                &cache,
1040                self.symbols.iter().map(|s| (s.clone(), interval, range)),
1041            ) {
1042                let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1043                for symbol in &self.symbols {
1044                    if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
1045                        response
1046                            .sparks
1047                            .insert(symbol.to_string(), entry.value.clone());
1048                    }
1049                }
1050                return Ok(response);
1051            }
1052        }
1053
1054        // Slow path: acquire fetch guard
1055        let fetch_guard = Self::get_fetch_guard(&self.spark_fetch, (interval, range)).await;
1056        let _guard = fetch_guard.lock().await;
1057
1058        // Double-check after guard
1059        {
1060            let cache = self.spark_cache.read().await;
1061            if self.all_cached(
1062                &cache,
1063                self.symbols.iter().map(|s| (s.clone(), interval, range)),
1064            ) {
1065                let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1066                for symbol in &self.symbols {
1067                    if let Some(entry) = cache.get(&(symbol.clone(), interval, range)) {
1068                        response
1069                            .sparks
1070                            .insert(symbol.to_string(), entry.value.clone());
1071                    }
1072                }
1073                return Ok(response);
1074            }
1075        }
1076
1077        // Spark is a Yahoo-specific batch endpoint with no provider abstraction equivalent
1078        let client = self.providers.first_yahoo()?;
1079        let symbols_ref: Vec<&str> = self.symbols.iter().map(|s| &**s).collect();
1080        let json =
1081            crate::adapters::yahoo::quote::spark::fetch(&client, &symbols_ref, interval, range)
1082                .await?;
1083
1084        let mut response = BatchSparksResponse::with_capacity(self.symbols.len());
1085
1086        match SparkResponse::from_json(json) {
1087            Ok(spark_response) => {
1088                let mut parsed_sparks: Vec<(Arc<str>, Spark)> = Vec::new();
1089
1090                if let Some(results) = spark_response.spark.result {
1091                    for result in &results {
1092                        if let Some(spark) = Spark::from_response(
1093                            result,
1094                            Some(interval.as_str().to_string()),
1095                            Some(range.as_str().to_string()),
1096                        ) {
1097                            let sym: Arc<str> = result.symbol.as_str().into();
1098                            parsed_sparks.push((sym, spark));
1099                        } else {
1100                            response.errors.insert(
1101                                result.symbol.to_string(),
1102                                "Failed to parse spark data".to_string(),
1103                            );
1104                        }
1105                    }
1106                }
1107
1108                // Cache all parsed sparks
1109                if self.cache_ttl.is_some() {
1110                    let mut cache = self.spark_cache.write().await;
1111                    for (symbol, spark) in &parsed_sparks {
1112                        self.cache_insert(
1113                            &mut cache,
1114                            (symbol.clone(), interval, range),
1115                            spark.clone(),
1116                        );
1117                    }
1118                }
1119
1120                // Build response
1121                for (symbol, spark) in parsed_sparks {
1122                    response.sparks.insert(symbol.to_string(), spark);
1123                }
1124
1125                // Track missing symbols
1126                for symbol in &self.symbols {
1127                    let symbol_str = &**symbol;
1128                    if !response.sparks.contains_key(symbol_str)
1129                        && !response.errors.contains_key(symbol_str)
1130                    {
1131                        response.errors.insert(
1132                            symbol.to_string(),
1133                            "Symbol not found in response".to_string(),
1134                        );
1135                    }
1136                }
1137            }
1138            Err(e) => {
1139                for symbol in &self.symbols {
1140                    response.errors.insert(symbol.to_string(), e.to_string());
1141                }
1142            }
1143        }
1144
1145        Ok(response)
1146    }
1147
1148    /// Batch fetch dividends for all symbols
1149    ///
1150    /// Returns dividend history for all symbols, filtered by the specified time range.
1151    /// Dividends are cached per symbol after the first chart fetch.
1152    ///
1153    /// # Arguments
1154    ///
1155    /// * `range` - Time range to filter dividends
1156    ///
1157    /// # Example
1158    ///
1159    /// ```no_run
1160    /// use finance_query::{Tickers, TimeRange};
1161    ///
1162    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1163    /// let tickers = Tickers::new(["AAPL", "MSFT"]).await?;
1164    /// let dividends = tickers.dividends(TimeRange::OneYear).await?;
1165    ///
1166    /// for (symbol, divs) in &dividends.dividends {
1167    ///     println!("{}: {} dividends", symbol, divs.len());
1168    /// }
1169    /// # Ok(())
1170    /// # }
1171    /// ```
1172    pub async fn dividends(&self, range: TimeRange) -> Result<BatchDividendsResponse> {
1173        let mut response = BatchDividendsResponse::with_capacity(self.symbols.len());
1174
1175        // Fetch events efficiently (1-day chart request per symbol)
1176        self.ensure_events_loaded().await?;
1177
1178        let events_cache = self.events_cache.read().await;
1179
1180        for symbol in &self.symbols {
1181            if let Some(entry) = events_cache.get(symbol) {
1182                let all_dividends = entry.value.to_dividends();
1183                let filtered = filter_by_range(all_dividends, range);
1184                response.dividends.insert(symbol.to_string(), filtered);
1185            } else {
1186                response
1187                    .errors
1188                    .insert(symbol.to_string(), "No events data available".to_string());
1189            }
1190        }
1191
1192        Ok(response)
1193    }
1194
1195    /// Batch fetch stock splits for all symbols
1196    ///
1197    /// Returns stock split history for all symbols, filtered by the specified time range.
1198    /// Splits are cached per symbol after the first chart fetch.
1199    ///
1200    /// # Arguments
1201    ///
1202    /// * `range` - Time range to filter splits
1203    ///
1204    /// # Example
1205    ///
1206    /// ```no_run
1207    /// use finance_query::{Tickers, TimeRange};
1208    ///
1209    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1210    /// let tickers = Tickers::new(["NVDA", "TSLA"]).await?;
1211    /// let splits = tickers.splits(TimeRange::FiveYears).await?;
1212    ///
1213    /// for (symbol, sp) in &splits.splits {
1214    ///     for split in sp {
1215    ///         println!("{}: {}", symbol, split.ratio);
1216    ///     }
1217    /// }
1218    /// # Ok(())
1219    /// # }
1220    /// ```
1221    pub async fn splits(&self, range: TimeRange) -> Result<BatchSplitsResponse> {
1222        let mut response = BatchSplitsResponse::with_capacity(self.symbols.len());
1223
1224        // Fetch events efficiently (1-day chart request per symbol)
1225        self.ensure_events_loaded().await?;
1226
1227        let events_cache = self.events_cache.read().await;
1228
1229        for symbol in &self.symbols {
1230            if let Some(entry) = events_cache.get(symbol) {
1231                let all_splits = entry.value.to_splits();
1232                let filtered = filter_by_range(all_splits, range);
1233                response.splits.insert(symbol.to_string(), filtered);
1234            } else {
1235                response
1236                    .errors
1237                    .insert(symbol.to_string(), "No events data available".to_string());
1238            }
1239        }
1240
1241        Ok(response)
1242    }
1243
1244    /// Batch fetch capital gains for all symbols
1245    ///
1246    /// Returns capital gain distribution history for all symbols, filtered by the
1247    /// specified time range. This is primarily relevant for mutual funds and ETFs.
1248    /// Capital gains are cached per symbol after the first chart fetch.
1249    ///
1250    /// # Arguments
1251    ///
1252    /// * `range` - Time range to filter capital gains
1253    ///
1254    /// # Example
1255    ///
1256    /// ```no_run
1257    /// use finance_query::{Tickers, TimeRange};
1258    ///
1259    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1260    /// let tickers = Tickers::new(["VFIAX", "VTI"]).await?;
1261    /// let gains = tickers.capital_gains(TimeRange::TwoYears).await?;
1262    ///
1263    /// for (symbol, cg) in &gains.capital_gains {
1264    ///     println!("{}: {} distributions", symbol, cg.len());
1265    /// }
1266    /// # Ok(())
1267    /// # }
1268    /// ```
1269    pub async fn capital_gains(&self, range: TimeRange) -> Result<BatchCapitalGainsResponse> {
1270        let mut response = BatchCapitalGainsResponse::with_capacity(self.symbols.len());
1271
1272        // Fetch events efficiently (1-day chart request per symbol)
1273        self.ensure_events_loaded().await?;
1274
1275        let events_cache = self.events_cache.read().await;
1276
1277        for symbol in &self.symbols {
1278            if let Some(entry) = events_cache.get(symbol) {
1279                let all_gains = entry.value.to_capital_gains();
1280                let filtered = filter_by_range(all_gains, range);
1281                response.capital_gains.insert(symbol.to_string(), filtered);
1282            } else {
1283                response
1284                    .errors
1285                    .insert(symbol.to_string(), "No events data available".to_string());
1286            }
1287        }
1288
1289        Ok(response)
1290    }
1291
1292    /// Batch fetch financial statements for all symbols
1293    ///
1294    /// Fetches the specified financial statement type for all symbols concurrently.
1295    /// Financial statements are cached per (symbol, statement_type, frequency) tuple.
1296    ///
1297    /// # Arguments
1298    ///
1299    /// * `statement_type` - Type of statement (Income, Balance, CashFlow)
1300    /// * `frequency` - Annual or Quarterly
1301    ///
1302    /// # Example
1303    ///
1304    /// ```no_run
1305    /// use finance_query::{Tickers, StatementType, Frequency};
1306    ///
1307    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1308    /// let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await?;
1309    /// let financials = tickers.financials(StatementType::Income, Frequency::Annual).await?;
1310    ///
1311    /// for (symbol, stmt) in &financials.financials {
1312    ///     if let Some(revenue) = stmt.statement.get("TotalRevenue") {
1313    ///         println!("{}: {:?}", symbol, revenue);
1314    ///     }
1315    /// }
1316    /// # Ok(())
1317    /// # }
1318    /// ```
1319    pub async fn financials(
1320        &self,
1321        statement_type: StatementType,
1322        frequency: Frequency,
1323    ) -> Result<BatchFinancialsResponse> {
1324        batch_fetch_cached!(self;
1325            cache: financials_cache,
1326            guard: map(financials_fetch, (statement_type, frequency)),
1327            key: |s| (s.clone(), statement_type, frequency),
1328            response: BatchFinancialsResponse.financials,
1329            fetch: |providers, symbol| {
1330                let sym = symbol.to_string();
1331                providers.fetch(Capability::FUNDAMENTALS, move |p| {
1332                    let sym = sym.clone();
1333                    let p = p.clone();
1334                    async move {
1335                        p.fetch_financials(&sym, statement_type, frequency)
1336                            .await
1337                    }
1338                }).await
1339            },
1340        )
1341    }
1342
1343    /// Batch fetch news articles for all symbols
1344    ///
1345    /// Fetches recent news articles for all symbols concurrently using scrapers.
1346    /// News articles are cached per symbol.
1347    ///
1348    /// # Example
1349    ///
1350    /// ```no_run
1351    /// use finance_query::Tickers;
1352    ///
1353    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1354    /// let tickers = Tickers::new(["AAPL", "MSFT"]).await?;
1355    /// let news = tickers.news().await?;
1356    ///
1357    /// for (symbol, articles) in &news.news {
1358    ///     println!("{}: {} articles", symbol, articles.len());
1359    ///     for article in articles.iter().take(3) {
1360    ///         println!("  - {}", article.title);
1361    ///     }
1362    /// }
1363    /// # Ok(())
1364    /// # }
1365    /// ```
1366    pub async fn news(&self) -> Result<BatchNewsResponse> {
1367        batch_fetch_cached!(self;
1368            cache: news_cache,
1369            guard: simple(news_fetch),
1370            key: |s| s.clone(),
1371            response: BatchNewsResponse.news,
1372            fetch: |providers, symbol| {
1373                let sym = symbol.to_string();
1374                providers.fetch(Capability::CORPORATE, move |p| {
1375                    let sym = sym.clone();
1376                    let p = p.clone();
1377                    async move {
1378                        p.fetch_news(&sym)
1379                            .await
1380                            .map(|data| data.into_iter().collect::<Vec<News>>())
1381                    }
1382                }).await
1383            },
1384        )
1385    }
1386
1387    /// Batch fetch recommendations for all symbols
1388    ///
1389    /// Fetches analyst recommendations and similar stocks for all symbols concurrently.
1390    /// Recommendations are cached per (symbol, limit) tuple — different limits
1391    /// produce different API responses and are cached independently.
1392    ///
1393    /// # Arguments
1394    ///
1395    /// * `limit` - Maximum number of similar stocks to return per symbol
1396    ///
1397    /// # Example
1398    ///
1399    /// ```no_run
1400    /// use finance_query::Tickers;
1401    ///
1402    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1403    /// let tickers = Tickers::new(["AAPL", "MSFT"]).await?;
1404    /// let recommendations = tickers.recommendations(10).await?;
1405    ///
1406    /// for (symbol, rec) in &recommendations.recommendations {
1407    ///     println!("{}: {} recommendations", symbol, rec.count());
1408    ///     for similar in &rec.recommendations {
1409    ///         println!("  - {}: score {}", similar.symbol, similar.score);
1410    ///     }
1411    /// }
1412    /// # Ok(())
1413    /// # }
1414    /// ```
1415    pub async fn recommendations(&self, limit: u32) -> Result<BatchRecommendationsResponse> {
1416        batch_fetch_cached!(self;
1417            cache: recommendations_cache,
1418            guard: map(recommendations_fetch, limit),
1419            key: |s| (s.clone(), limit),
1420            response: BatchRecommendationsResponse.recommendations,
1421            fetch: |providers, symbol| {
1422                let sym = symbol.to_string();
1423                providers.fetch(Capability::CORPORATE, move |p| {
1424                    let sym = sym.clone();
1425                    let p = p.clone();
1426                    async move {
1427                        let items = p.fetch_similar_symbols(&sym, limit).await?;
1428                        Ok(recommendation_from_similar(
1429                            sym,
1430                            Some(Provider::from_id_str(p.id()).ok_or_else(|| {
1431                                FinanceError::InternalError(format!("unknown provider id: {}", p.id()))
1432                            })?),
1433                            items,
1434                            Some(limit),
1435                        ))
1436                    }
1437                }).await
1438            },
1439        )
1440    }
1441
1442    /// Batch fetch options chains for all symbols
1443    ///
1444    /// Fetches options chains for the specified expiration date for all symbols concurrently.
1445    /// Options are cached per (symbol, date) tuple.
1446    ///
1447    /// # Arguments
1448    ///
1449    /// * `date` - Optional expiration date (Unix timestamp). If None, fetches nearest expiration.
1450    ///
1451    /// # Example
1452    ///
1453    /// ```no_run
1454    /// use finance_query::Tickers;
1455    ///
1456    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1457    /// let tickers = Tickers::new(["AAPL", "MSFT"]).await?;
1458    /// let options = tickers.options(None).await?;
1459    ///
1460    /// for (symbol, opts) in &options.options {
1461    ///     println!("{}: {} expirations", symbol, opts.expiration_dates().len());
1462    /// }
1463    /// # Ok(())
1464    /// # }
1465    /// ```
1466    pub async fn options(&self, date: Option<i64>) -> Result<BatchOptionsResponse> {
1467        batch_fetch_cached!(self;
1468            cache: options_cache,
1469            guard: map(options_fetch, date),
1470            key: |s| (s.clone(), date),
1471            response: BatchOptionsResponse.options,
1472            fetch: |providers, symbol| {
1473                let sym = symbol.to_string();
1474                providers.fetch(Capability::OPTIONS, move |p| {
1475                    let sym = sym.clone();
1476                    let p = p.clone();
1477                    async move {
1478                        p.fetch_options(&sym, date).await
1479                    }
1480                }).await
1481            },
1482        )
1483    }
1484
1485    /// Batch calculate all technical indicators for all symbols
1486    ///
1487    /// Calculates complete indicator summaries for all symbols from their chart data.
1488    /// Indicators are cached per (symbol, interval, range) tuple.
1489    ///
1490    /// # Arguments
1491    ///
1492    /// * `interval` - The time interval for each candle
1493    /// * `range` - The time range to fetch data for
1494    ///
1495    /// # Example
1496    ///
1497    /// ```no_run
1498    /// use finance_query::{Tickers, Interval, TimeRange};
1499    ///
1500    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1501    /// let tickers = Tickers::new(["AAPL", "MSFT"]).await?;
1502    /// let indicators = tickers.indicators(Interval::OneDay, TimeRange::ThreeMonths).await?;
1503    ///
1504    /// for (symbol, ind) in &indicators.indicators {
1505    ///     println!("{}: RSI(14) = {:?}, SMA(20) = {:?}", symbol, ind.rsi_14, ind.sma_20);
1506    /// }
1507    /// # Ok(())
1508    /// # }
1509    /// ```
1510    #[cfg(feature = "indicators")]
1511    pub async fn indicators(
1512        &self,
1513        interval: Interval,
1514        range: TimeRange,
1515    ) -> Result<BatchIndicatorsResponse> {
1516        let cache_key_for = |symbol: &Arc<str>| (symbol.clone(), interval, range);
1517
1518        // Fast path: check if all symbols are cached
1519        {
1520            let cache = self.indicators_cache.read().await;
1521            if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
1522                let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1523                for symbol in &self.symbols {
1524                    if let Some(entry) = cache.get(&cache_key_for(symbol)) {
1525                        response
1526                            .indicators
1527                            .insert(symbol.to_string(), entry.value.clone());
1528                    }
1529                }
1530                return Ok(response);
1531            }
1532        }
1533
1534        // Slow path: acquire fetch guard to prevent duplicate concurrent calculations
1535        let fetch_guard = Self::get_fetch_guard(&self.indicators_fetch, (interval, range)).await;
1536        let _guard = fetch_guard.lock().await;
1537
1538        // Double-check: another task may have computed while we waited
1539        {
1540            let cache = self.indicators_cache.read().await;
1541            if self.all_cached(&cache, self.symbols.iter().map(&cache_key_for)) {
1542                let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1543                for symbol in &self.symbols {
1544                    if let Some(entry) = cache.get(&cache_key_for(symbol)) {
1545                        response
1546                            .indicators
1547                            .insert(symbol.to_string(), entry.value.clone());
1548                    }
1549                }
1550                return Ok(response);
1551            }
1552        }
1553
1554        // Fetch charts first (which may already be cached, has its own deduplication)
1555        let charts_response = self.charts(interval, range).await?;
1556
1557        let mut response = BatchIndicatorsResponse::with_capacity(self.symbols.len());
1558
1559        // Calculate all indicators first (no lock held)
1560        let mut calculated_indicators: Vec<(String, indicators::IndicatorsSummary)> = Vec::new();
1561
1562        for (symbol, chart) in &charts_response.charts {
1563            let indicators = indicators::summary::calculate_indicators(&chart.candles);
1564            calculated_indicators.push((symbol.to_string(), indicators));
1565        }
1566
1567        // Now acquire write lock briefly for batch cache insertion
1568        if self.cache_ttl.is_some() {
1569            let mut cache = self.indicators_cache.write().await;
1570            for (symbol, indicators) in &calculated_indicators {
1571                let key: Arc<str> = symbol.as_str().into();
1572                self.cache_insert(&mut cache, cache_key_for(&key), indicators.clone());
1573            }
1574        }
1575
1576        // Populate response (no lock needed)
1577        for (symbol, indicators) in calculated_indicators {
1578            response.indicators.insert(symbol, indicators);
1579        }
1580
1581        // Add errors from chart fetch
1582        for (symbol, error) in &charts_response.errors {
1583            response.errors.insert(symbol.to_string(), error.clone());
1584        }
1585
1586        Ok(response)
1587    }
1588
1589    // ========================================================================
1590    // Dynamic Symbol Management
1591    // ========================================================================
1592
1593    /// Add symbols to the watch list
1594    ///
1595    /// Adds new symbols to track without affecting existing cached data.
1596    ///
1597    /// # Example
1598    ///
1599    /// ```no_run
1600    /// use finance_query::Tickers;
1601    ///
1602    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1603    /// let mut tickers = Tickers::new(["AAPL"]).await?;
1604    /// tickers.add_symbols(&["MSFT", "GOOGL"]);
1605    /// assert_eq!(tickers.len(), 3);
1606    /// # Ok(())
1607    /// # }
1608    /// ```
1609    pub fn add_symbols(&mut self, symbols: &[impl AsRef<str>]) {
1610        // Use HashSet for O(n+m) deduplication instead of O(n*m) linear search
1611        use std::collections::HashSet;
1612
1613        let existing: HashSet<&str> = self.symbols.iter().map(|s| &**s).collect();
1614        let to_add: Vec<Arc<str>> = symbols
1615            .iter()
1616            .map(|s| s.as_ref())
1617            .filter(|s| !existing.contains(s))
1618            .map(|s| s.into())
1619            .collect();
1620
1621        self.symbols.extend(to_add);
1622    }
1623
1624    // ========================================================================
1625    // Portfolio Backtesting
1626    // ========================================================================
1627
1628    /// Run a multi-symbol portfolio backtest across all tracked symbols.
1629    ///
1630    /// Fetches charts and dividends for each symbol concurrently, then runs
1631    /// the portfolio engine with the given strategy factory. Capital is shared
1632    /// across all symbols according to the [`PortfolioConfig`] allocation rules.
1633    ///
1634    /// `factory` is called once per symbol to produce an independent strategy
1635    /// instance:
1636    ///
1637    /// ```no_run
1638    /// use finance_query::{Tickers, Interval, TimeRange};
1639    /// use finance_query::backtesting::{SmaCrossover, BacktestConfig};
1640    /// use finance_query::backtesting::portfolio::{PortfolioConfig, RebalanceMode};
1641    ///
1642    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1643    /// let tickers = Tickers::new(["AAPL", "MSFT", "NVDA"]).await?;
1644    ///
1645    /// let config = PortfolioConfig::new(BacktestConfig::default())
1646    ///     .max_total_positions(2)
1647    ///     .rebalance(RebalanceMode::EqualWeight);
1648    ///
1649    /// let result = tickers.backtest(
1650    ///     Interval::OneDay,
1651    ///     TimeRange::TwoYears,
1652    ///     Some(config),
1653    ///     |_sym| SmaCrossover::new(10, 50),
1654    /// ).await?;
1655    ///
1656    /// println!("Portfolio return: {:.2}%", result.portfolio_metrics.total_return_pct);
1657    /// # Ok(())
1658    /// # }
1659    /// ```
1660    ///
1661    /// [`PortfolioConfig`]: backtesting::portfolio::PortfolioConfig
1662    #[cfg(feature = "backtesting")]
1663    pub async fn backtest<S, F>(
1664        &self,
1665        interval: Interval,
1666        range: TimeRange,
1667        config: Option<backtesting::portfolio::PortfolioConfig>,
1668        factory: F,
1669    ) -> backtesting::Result<backtesting::portfolio::PortfolioResult>
1670    where
1671        S: backtesting::Strategy,
1672        F: Fn(&str) -> S,
1673    {
1674        use crate::backtesting::portfolio::{PortfolioEngine, SymbolData};
1675
1676        let config = config.unwrap_or_default();
1677        config.validate(self.symbols.len())?;
1678
1679        // Fetch charts for all symbols (uses the batch chart cache)
1680        let charts = self
1681            .charts(interval, range)
1682            .await
1683            .map_err(|e| backtesting::BacktestError::ChartError(e.to_string()))?;
1684
1685        // Fetch dividends for all symbols (events cache is already warm after charts())
1686        // Treat errors as "no dividends" — dividend processing is best-effort
1687        let dividends_map = self
1688            .dividends(range)
1689            .await
1690            .map(|b| b.dividends)
1691            .unwrap_or_default();
1692
1693        // Assemble SymbolData slices — skip symbols with no chart data
1694        let symbol_data: Vec<SymbolData> = self
1695            .symbols
1696            .iter()
1697            .filter_map(|sym| {
1698                charts.charts.get(sym.as_ref()).map(|chart| {
1699                    let divs = dividends_map.get(sym.as_ref()).cloned().unwrap_or_default();
1700                    SymbolData::new(sym.as_ref(), chart.candles.clone()).with_dividends(divs)
1701                })
1702            })
1703            .collect();
1704
1705        let engine = PortfolioEngine::new(config);
1706        engine.run(&symbol_data, factory)
1707    }
1708
1709    /// Remove symbols from the watch list
1710    ///
1711    /// Removes symbols and clears their cached data to free memory.
1712    ///
1713    /// # Example
1714    ///
1715    /// ```no_run
1716    /// use finance_query::Tickers;
1717    ///
1718    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1719    /// let mut tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await?;
1720    /// tickers.remove_symbols(&["MSFT"]);
1721    /// assert_eq!(tickers.len(), 2);
1722    /// # Ok(())
1723    /// # }
1724    /// ```
1725    pub async fn remove_symbols(&mut self, symbols: &[impl AsRef<str>]) {
1726        use std::collections::HashSet;
1727        let to_remove: HashSet<&str> = symbols.iter().map(|s| s.as_ref()).collect();
1728
1729        // Remove from symbol list — O(1) lookup per element
1730        self.symbols.retain(|s| !to_remove.contains(&**s));
1731
1732        // Acquire all independent write locks in parallel
1733        let (
1734            mut quote_cache,
1735            mut chart_cache,
1736            mut events_cache,
1737            mut financials_cache,
1738            mut news_cache,
1739            mut recommendations_cache,
1740            mut options_cache,
1741            mut spark_cache,
1742        ) = tokio::join!(
1743            self.quote_cache.write(),
1744            self.chart_cache.write(),
1745            self.events_cache.write(),
1746            self.financials_cache.write(),
1747            self.news_cache.write(),
1748            self.recommendations_cache.write(),
1749            self.options_cache.write(),
1750            self.spark_cache.write(),
1751        );
1752
1753        // Simple key caches — O(1) per removal
1754        for symbol in &to_remove {
1755            let key: Arc<str> = (*symbol).into();
1756            quote_cache.remove(&key);
1757            events_cache.remove(&key);
1758            news_cache.remove(&key);
1759        }
1760
1761        // Composite key caches — O(n) retain but O(1) contains check
1762        chart_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1763        financials_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1764        recommendations_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
1765        options_cache.retain(|(sym, _), _| !to_remove.contains(&**sym));
1766        spark_cache.retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1767
1768        // Drop all guards before cfg-gated lock
1769        drop((
1770            quote_cache,
1771            chart_cache,
1772            events_cache,
1773            financials_cache,
1774            news_cache,
1775            recommendations_cache,
1776            options_cache,
1777            spark_cache,
1778        ));
1779
1780        #[cfg(feature = "indicators")]
1781        self.indicators_cache
1782            .write()
1783            .await
1784            .retain(|(sym, _, _), _| !to_remove.contains(&**sym));
1785    }
1786
1787    /// Clear all cached data and fetch guards, forcing fresh fetches on next access.
1788    ///
1789    /// Use this when you need up-to-date data from a long-lived `Tickers` instance.
1790    /// Also clears fetch guard maps to prevent unbounded growth.
1791    pub async fn clear_cache(&self) {
1792        tokio::join!(
1793            // Data caches
1794            async { self.quote_cache.write().await.clear() },
1795            async { self.chart_cache.write().await.clear() },
1796            async { self.events_cache.write().await.clear() },
1797            async { self.financials_cache.write().await.clear() },
1798            async { self.news_cache.write().await.clear() },
1799            async { self.recommendations_cache.write().await.clear() },
1800            async { self.options_cache.write().await.clear() },
1801            async { self.spark_cache.write().await.clear() },
1802            async {
1803                #[cfg(feature = "indicators")]
1804                self.indicators_cache.write().await.clear();
1805            },
1806            // Fetch guard maps (prevent unbounded growth)
1807            async { self.charts_fetch.write().await.clear() },
1808            async { self.financials_fetch.write().await.clear() },
1809            async { self.recommendations_fetch.write().await.clear() },
1810            async { self.options_fetch.write().await.clear() },
1811            async { self.spark_fetch.write().await.clear() },
1812            async {
1813                #[cfg(feature = "indicators")]
1814                self.indicators_fetch.write().await.clear();
1815            },
1816        );
1817    }
1818
1819    /// Clear only the cached quote data.
1820    ///
1821    /// The next call to `quotes()` or `quote()` will re-fetch from the API.
1822    pub async fn clear_quote_cache(&self) {
1823        self.quote_cache.write().await.clear();
1824    }
1825
1826    /// Clear only the cached chart, spark, and events data.
1827    ///
1828    /// The next call to `charts()`, `spark()`, `dividends()`, `splits()`,
1829    /// or `capital_gains()` will re-fetch from the API.
1830    pub async fn clear_chart_cache(&self) {
1831        tokio::join!(
1832            async { self.chart_cache.write().await.clear() },
1833            async { self.events_cache.write().await.clear() },
1834            async { self.spark_cache.write().await.clear() },
1835            async {
1836                #[cfg(feature = "indicators")]
1837                self.indicators_cache.write().await.clear();
1838            },
1839        );
1840    }
1841}
1842
1843#[cfg(test)]
1844mod tests {
1845    use super::*;
1846
1847    #[tokio::test]
1848    #[ignore = "requires network access"]
1849    async fn test_tickers_quotes() {
1850        let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1851        let result = tickers.quotes().await.unwrap();
1852
1853        assert!(result.success_count() > 0);
1854    }
1855
1856    #[tokio::test]
1857    #[ignore = "requires network access"]
1858    async fn test_tickers_charts() {
1859        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1860        let result = tickers
1861            .charts(Interval::OneDay, TimeRange::FiveDays)
1862            .await
1863            .unwrap();
1864
1865        assert!(result.success_count() > 0);
1866    }
1867
1868    #[tokio::test]
1869    #[ignore = "requires network access"]
1870    async fn test_tickers_spark() {
1871        let tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
1872        let result = tickers
1873            .spark(Interval::FiveMinutes, TimeRange::OneDay)
1874            .await
1875            .unwrap();
1876
1877        assert!(result.success_count() > 0);
1878
1879        // Verify spark data structure
1880        if let Some(spark) = result.sparks.get("AAPL") {
1881            assert!(!spark.closes.is_empty());
1882            assert_eq!(spark.symbol, "AAPL");
1883            // Verify helper methods work
1884            assert!(spark.percent_change().is_some());
1885        }
1886    }
1887
1888    #[tokio::test]
1889    #[ignore = "requires network access"]
1890    async fn test_tickers_dividends() {
1891        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1892        let result = tickers.dividends(TimeRange::OneYear).await.unwrap();
1893
1894        assert!(result.success_count() > 0);
1895
1896        // Verify dividend data structure
1897        if let Some(dividends) = result.dividends.get("AAPL")
1898            && !dividends.is_empty()
1899        {
1900            let div = &dividends[0];
1901            assert!(div.timestamp > 0);
1902            assert!(div.amount > 0.0);
1903        }
1904    }
1905
1906    #[tokio::test]
1907    #[ignore = "requires network access"]
1908    async fn test_tickers_splits() {
1909        let tickers = Tickers::new(["NVDA", "TSLA"]).await.unwrap();
1910        let result = tickers.splits(TimeRange::FiveYears).await.unwrap();
1911
1912        // Note: Not all symbols have splits, so we just check for successful response
1913        assert!(result.success_count() > 0);
1914
1915        // If there are splits, verify structure
1916        for splits in result.splits.values() {
1917            for split in splits {
1918                assert!(split.timestamp > 0);
1919                assert!(split.numerator > 0.0);
1920                assert!(split.denominator > 0.0);
1921                assert!(!split.ratio.is_empty());
1922            }
1923        }
1924    }
1925
1926    #[tokio::test]
1927    #[ignore = "requires network access"]
1928    async fn test_tickers_capital_gains() {
1929        let tickers = Tickers::new(["VFIAX", "VTI"]).await.unwrap();
1930        let result = tickers.capital_gains(TimeRange::TwoYears).await.unwrap();
1931
1932        // Note: Not all symbols have capital gains distributions
1933        assert!(result.success_count() > 0);
1934
1935        // If there are capital gains, verify structure
1936        for gains in result.capital_gains.values() {
1937            for gain in gains {
1938                assert!(gain.timestamp > 0);
1939                assert!(gain.amount >= 0.0);
1940            }
1941        }
1942    }
1943
1944    #[tokio::test]
1945    #[ignore = "requires network access"]
1946    async fn test_tickers_financials() {
1947        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1948        let result = tickers
1949            .financials(StatementType::Income, Frequency::Annual)
1950            .await
1951            .unwrap();
1952
1953        assert!(result.success_count() > 0);
1954
1955        // Verify financial statement structure
1956        for (symbol, stmt) in &result.financials {
1957            assert_eq!(stmt.symbol, *symbol);
1958            assert_eq!(stmt.statement_type, "income");
1959            assert_eq!(stmt.frequency, "annual");
1960            assert!(!stmt.statement.is_empty());
1961
1962            // Common income statement fields
1963            if let Some(revenue) = stmt.statement.get("TotalRevenue") {
1964                assert!(!revenue.is_empty());
1965            }
1966        }
1967    }
1968
1969    #[tokio::test]
1970    #[ignore = "requires network access"]
1971    async fn test_tickers_news() {
1972        let tickers = Tickers::new(["AAPL", "TSLA"]).await.unwrap();
1973        let result = tickers.news().await.unwrap();
1974
1975        assert!(result.success_count() > 0);
1976
1977        // Verify news structure
1978        for articles in result.news.values() {
1979            if !articles.is_empty() {
1980                let article = &articles[0];
1981                assert!(!article.title.is_empty());
1982                assert!(!article.link.is_empty());
1983                assert!(!article.source.is_empty());
1984            }
1985        }
1986    }
1987
1988    #[tokio::test]
1989    #[ignore = "requires network access"]
1990    async fn test_tickers_recommendations() {
1991        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
1992        let result = tickers.recommendations(5).await.unwrap();
1993
1994        assert!(result.success_count() > 0);
1995
1996        // Verify recommendations structure
1997        for (symbol, rec) in &result.recommendations {
1998            assert_eq!(rec.symbol, *symbol);
1999            assert!(rec.count() > 0);
2000            for similar in &rec.recommendations {
2001                assert!(!similar.symbol.is_empty());
2002            }
2003        }
2004    }
2005
2006    #[tokio::test]
2007    #[ignore = "requires network access"]
2008    async fn test_tickers_options() {
2009        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
2010        let result = tickers.options(None).await.unwrap();
2011
2012        assert!(result.success_count() > 0);
2013
2014        // Verify options structure
2015        for opts in result.options.values() {
2016            assert!(!opts.expiration_dates().is_empty());
2017        }
2018    }
2019
2020    #[tokio::test]
2021    #[ignore = "requires network access"]
2022    #[cfg(feature = "indicators")]
2023    async fn test_tickers_indicators() {
2024        let tickers = Tickers::new(["AAPL", "MSFT"]).await.unwrap();
2025        let result = tickers
2026            .indicators(Interval::OneDay, TimeRange::ThreeMonths)
2027            .await
2028            .unwrap();
2029
2030        assert!(result.success_count() > 0);
2031
2032        // Verify indicators structure
2033        for ind in result.indicators.values() {
2034            // Check that at least some indicators are present
2035            assert!(ind.rsi_14.is_some() || ind.sma_20.is_some());
2036        }
2037    }
2038
2039    #[tokio::test]
2040    async fn test_tickers_add_symbols() {
2041        let mut tickers = Tickers::new(["AAPL"]).await.unwrap();
2042        assert_eq!(tickers.len(), 1);
2043        assert_eq!(tickers.symbols(), &["AAPL"]);
2044
2045        tickers.add_symbols(&["MSFT", "GOOGL"]);
2046        assert_eq!(tickers.len(), 3);
2047        assert!(tickers.symbols().contains(&"AAPL"));
2048        assert!(tickers.symbols().contains(&"MSFT"));
2049        assert!(tickers.symbols().contains(&"GOOGL"));
2050
2051        // Adding duplicate shouldn't increase count
2052        tickers.add_symbols(&["AAPL"]);
2053        assert_eq!(tickers.len(), 3);
2054    }
2055
2056    #[tokio::test]
2057    #[ignore = "requires network access"]
2058    async fn test_tickers_remove_symbols() {
2059        let mut tickers = Tickers::new(["AAPL", "MSFT", "GOOGL"]).await.unwrap();
2060        assert_eq!(tickers.len(), 3);
2061
2062        // Fetch some data to populate caches
2063        let _ = tickers.quotes().await;
2064
2065        // Remove one symbol
2066        tickers.remove_symbols(&["MSFT"]).await;
2067        assert_eq!(tickers.len(), 2);
2068        assert!(tickers.symbols().contains(&"AAPL"));
2069        assert!(!tickers.symbols().contains(&"MSFT"));
2070        assert!(tickers.symbols().contains(&"GOOGL"));
2071
2072        // Verify cache was cleared
2073        let quotes = tickers.quotes().await.unwrap();
2074        assert!(!quotes.quotes.contains_key("MSFT"));
2075        assert_eq!(quotes.quotes.len(), 2);
2076    }
2077}