borsa_yfinance/
lib.rs

1//! borsa-yfinance
2//!
3//! Public connector that implements `BorsaConnector` on top of the `yfinance-rs`
4//! client library. Exposes quotes, history, search, fundamentals, options,
5//! analysis, holders, ESG, news, and streaming where available.
6#![warn(missing_docs)]
7
8/// Adapter definitions and the production adapter backed by `yfinance-rs`.
9pub mod adapter;
10
11use std::sync::Arc;
12
13#[cfg(feature = "test-adapters")]
14use adapter::CloneArcAdapters;
15use adapter::{
16    RealAdapter, YfAnalysis, YfEsg, YfFundamentals, YfHistory, YfHolders, YfNews, YfOptions,
17    YfProfile, YfQuotes, YfSearch, YfStream,
18};
19use async_trait::async_trait;
20use borsa_core::{
21    AssetKind, BorsaError, HistoryRequest, HistoryResponse, Instrument, Quote, SearchRequest,
22    SearchResponse,
23    connector::{
24        AnalystPriceTargetProvider, BalanceSheetProvider, BorsaConnector, CalendarProvider,
25        CashflowProvider, ConnectorKey, EarningsProvider, EsgProvider, HistoryProvider,
26        IncomeStatementProvider, InsiderRosterHoldersProvider, InsiderTransactionsProvider,
27        InstitutionalHoldersProvider, IsinProvider, MajorHoldersProvider,
28        MutualFundHoldersProvider, NetSharePurchaseActivityProvider, NewsProvider,
29        OptionChainProvider, OptionsExpirationsProvider, ProfileProvider, QuoteProvider,
30        RecommendationsProvider, RecommendationsSummaryProvider, SearchProvider,
31        UpgradesDowngradesProvider,
32    },
33};
34
35#[cfg(not(feature = "test-adapters"))]
36type AdapterArc = Arc<RealAdapter>;
37
38#[cfg(feature = "test-adapters")]
39type HistoryAdapter = Arc<dyn YfHistory>;
40#[cfg(not(feature = "test-adapters"))]
41type HistoryAdapter = AdapterArc;
42
43#[cfg(feature = "test-adapters")]
44type QuotesAdapter = Arc<dyn YfQuotes>;
45#[cfg(not(feature = "test-adapters"))]
46type QuotesAdapter = AdapterArc;
47
48#[cfg(feature = "test-adapters")]
49type SearchAdapter = Arc<dyn YfSearch>;
50#[cfg(not(feature = "test-adapters"))]
51type SearchAdapter = AdapterArc;
52
53#[cfg(feature = "test-adapters")]
54type ProfileAdapter = Arc<dyn YfProfile>;
55#[cfg(not(feature = "test-adapters"))]
56type ProfileAdapter = AdapterArc;
57
58#[cfg(feature = "test-adapters")]
59type FundamentalsAdapter = Arc<dyn YfFundamentals>;
60#[cfg(not(feature = "test-adapters"))]
61type FundamentalsAdapter = AdapterArc;
62
63#[cfg(feature = "test-adapters")]
64type OptionsAdapter = Arc<dyn YfOptions>;
65#[cfg(not(feature = "test-adapters"))]
66type OptionsAdapter = AdapterArc;
67
68#[cfg(feature = "test-adapters")]
69type AnalysisAdapter = Arc<dyn YfAnalysis>;
70#[cfg(not(feature = "test-adapters"))]
71type AnalysisAdapter = AdapterArc;
72
73#[cfg(feature = "test-adapters")]
74type HoldersAdapter = Arc<dyn YfHolders>;
75#[cfg(not(feature = "test-adapters"))]
76type HoldersAdapter = AdapterArc;
77
78#[cfg(feature = "test-adapters")]
79type EsgAdapter = Arc<dyn YfEsg>;
80#[cfg(not(feature = "test-adapters"))]
81type EsgAdapter = AdapterArc;
82
83#[cfg(feature = "test-adapters")]
84type NewsAdapter = Arc<dyn YfNews>;
85#[cfg(not(feature = "test-adapters"))]
86type NewsAdapter = AdapterArc;
87
88#[cfg(feature = "test-adapters")]
89type StreamAdapter = Arc<dyn YfStream>;
90#[cfg(not(feature = "test-adapters"))]
91type StreamAdapter = AdapterArc;
92
93/// Public connector type. Production users will construct with `YfConnector::new_default()`.
94pub struct YfConnector {
95    history: HistoryAdapter,
96    quotes: QuotesAdapter,
97    search: SearchAdapter,
98    profile: ProfileAdapter,
99    fundamentals: FundamentalsAdapter,
100    options: OptionsAdapter,
101    analysis: AnalysisAdapter,
102    holders: HoldersAdapter,
103    esg: EsgAdapter,
104    news: NewsAdapter,
105    stream: StreamAdapter,
106}
107
108impl YfConnector {
109    /// Static connector key for orchestrator priority configuration.
110    #[must_use]
111    pub const fn key_static() -> ConnectorKey {
112        ConnectorKey::new("borsa-yfinance")
113    }
114
115    fn looks_like_not_found(msg: &str) -> bool {
116        let m = msg.to_ascii_lowercase();
117        m.contains("not found") || m.contains("no data") || m.contains("no matches")
118    }
119
120    fn normalize_error(e: BorsaError, what: &str) -> BorsaError {
121        match e {
122            BorsaError::Connector { connector: _, msg } => {
123                if Self::looks_like_not_found(&msg) {
124                    BorsaError::not_found(what.to_string())
125                } else {
126                    BorsaError::connector("borsa-yfinance", msg)
127                }
128            }
129            BorsaError::Other(msg) => BorsaError::connector("borsa-yfinance", msg),
130            other => other,
131        }
132    }
133    /// Build with a fresh `yfinance_rs::YfClient` inside.
134    #[must_use]
135    pub fn new_default() -> Self {
136        let a = RealAdapter::new_default();
137        Self::from_adapter(&a)
138    }
139
140    /// Build from an existing `yfinance_rs::YfClient`.
141    #[must_use]
142    pub fn new_with_client(client: yfinance_rs::YfClient) -> Self {
143        let a = RealAdapter::new(client);
144        Self::from_adapter(&a)
145    }
146
147    /// Build from a provided `reqwest::Client` by constructing a `yfinance_rs::YfClient`.
148    ///
149    /// Note: The provided client should enable a cookie store for yfinance auth/crumb flow.
150    ///
151    /// # Errors
152    /// Returns an error if the internal `YfClient` cannot be constructed from the provided HTTP client.
153    pub fn try_new_with_reqwest_client(
154        http: reqwest::Client,
155    ) -> Result<Self, borsa_core::BorsaError> {
156        let yf = yfinance_rs::YfClient::builder()
157            .custom_client(http)
158            .user_agent("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36")
159            .build()
160            .map_err(|e| borsa_core::BorsaError::Other(e.to_string()))?;
161        Ok(Self::new_with_client(yf))
162    }
163
164    /// For tests/injection (requires the `test-adapters` feature).
165    ///
166    /// Accepts a borrowed adapter to avoid unnecessary moves.
167    #[cfg(feature = "test-adapters")]
168    pub fn from_adapter<A: CloneArcAdapters + 'static>(adapter: &A) -> Self {
169        Self {
170            history: adapter.clone_arc_history(),
171            quotes: adapter.clone_arc_quotes(),
172            search: adapter.clone_arc_search(),
173            profile: adapter.clone_arc_profile(),
174            fundamentals: adapter.clone_arc_fundamentals(),
175            options: adapter.clone_arc_options(),
176            analysis: adapter.clone_arc_analysis(),
177            holders: adapter.clone_arc_holders(),
178            esg: adapter.clone_arc_esg(),
179            news: adapter.clone_arc_news(),
180            stream: adapter.clone_arc_stream(),
181        }
182    }
183
184    #[cfg(not(feature = "test-adapters"))]
185    /// Build from a concrete `RealAdapter` by cloning it into shared handles.
186    pub fn from_adapter(adapter: &RealAdapter) -> Self {
187        let shared = Arc::new(adapter.clone());
188        Self {
189            history: Arc::clone(&shared),
190            quotes: Arc::clone(&shared),
191            search: Arc::clone(&shared),
192            profile: Arc::clone(&shared),
193            fundamentals: Arc::clone(&shared),
194            options: Arc::clone(&shared),
195            analysis: Arc::clone(&shared),
196            holders: Arc::clone(&shared),
197            esg: Arc::clone(&shared),
198            news: Arc::clone(&shared),
199            stream: shared,
200        }
201    }
202}
203
204#[async_trait]
205impl QuoteProvider for YfConnector {
206    #[cfg_attr(
207        feature = "tracing",
208        tracing::instrument(
209            name = "borsa_yfinance::quote",
210            skip(self, instrument),
211            fields(symbol = %instrument.symbol()),
212        )
213    )]
214    async fn quote(&self, instrument: &Instrument) -> Result<Quote, BorsaError> {
215        let raw = self
216            .quotes
217            .fetch(std::slice::from_ref(&instrument.symbol().to_string()))
218            .await
219            .map_err(|e| Self::normalize_error(e, &format!("quote for {}", instrument.symbol())))?;
220        let first = raw
221            .into_iter()
222            .next()
223            .ok_or_else(|| BorsaError::not_found(format!("quote for {}", instrument.symbol())))?;
224        Ok(first)
225    }
226}
227
228#[async_trait]
229impl HistoryProvider for YfConnector {
230    #[cfg_attr(
231        feature = "tracing",
232        tracing::instrument(
233            name = "borsa_yfinance::history",
234            skip(self, instrument, req),
235            fields(symbol = %instrument.symbol(), interval = ?req.interval(), range = ?req.range(), include_prepost = req.include_prepost(), include_actions = req.include_actions(), auto_adjust = req.auto_adjust(), keepna = req.keepna()),
236        )
237    )]
238    async fn history(
239        &self,
240        instrument: &Instrument,
241        req: HistoryRequest,
242    ) -> Result<HistoryResponse, BorsaError> {
243        let yf_req = yfinance_rs::core::services::HistoryRequest {
244            range: req.range(),
245            period: req.period().map(|(s, e)| (s.timestamp(), e.timestamp())),
246            interval: req.interval(),
247            include_prepost: req.include_prepost(),
248            include_actions: req.include_actions(),
249            auto_adjust: req.auto_adjust(),
250            keepna: req.keepna(),
251        };
252        let symbol = instrument.symbol_str();
253        let raw = self.history.fetch_full(symbol, yf_req).await?;
254        Ok(raw)
255    }
256
257    fn supported_history_intervals(
258        &self,
259        _kind: AssetKind,
260    ) -> &'static [borsa_core::types::Interval] {
261        use borsa_core::types::Interval as I;
262        const YF_INTERVALS: &[I] = &[
263            I::I1m,
264            I::I2m,
265            I::I5m,
266            I::I15m,
267            I::I30m,
268            I::I1h,
269            I::I90m,
270            I::D1,
271            I::D5,
272            I::W1,
273            I::M1,
274            I::M3,
275        ];
276        YF_INTERVALS
277    }
278}
279
280#[async_trait]
281impl ProfileProvider for YfConnector {
282    #[cfg_attr(
283        feature = "tracing",
284        tracing::instrument(
285            name = "borsa_yfinance::profile",
286            skip(self, instrument),
287            fields(symbol = %instrument.symbol()),
288        )
289    )]
290    async fn profile(&self, instrument: &Instrument) -> Result<borsa_core::Profile, BorsaError> {
291        let symbol = instrument.symbol_str();
292        let raw = self.profile.load(symbol).await?;
293        Ok(raw)
294    }
295}
296
297#[async_trait]
298impl IsinProvider for YfConnector {
299    #[cfg_attr(
300        feature = "tracing",
301        tracing::instrument(
302            name = "borsa_yfinance::isin",
303            skip(self, instrument),
304            fields(symbol = %instrument.symbol()),
305        )
306    )]
307    async fn isin(&self, instrument: &Instrument) -> Result<Option<borsa_core::Isin>, BorsaError> {
308        let symbol = instrument.symbol_str();
309        match self.profile.isin(symbol).await {
310            Ok(Some(isin_str)) => match borsa_core::Isin::new(&isin_str) {
311                Ok(isin) => Ok(Some(isin)),
312                Err(e) => Err(BorsaError::Data(format!(
313                    "Provider returned invalid ISIN '{isin_str}': {e}"
314                ))),
315            },
316            Ok(None) => Ok(None),
317            Err(e) => Err(Self::normalize_error(e, &format!("isin for {symbol}"))),
318        }
319    }
320}
321
322#[async_trait]
323impl SearchProvider for YfConnector {
324    #[cfg_attr(
325        feature = "tracing",
326        tracing::instrument(
327            name = "borsa_yfinance::search",
328            skip(self, req),
329            fields(kind = ?req.kind(), limit = req.limit()),
330        )
331    )]
332    async fn search(&self, req: SearchRequest) -> Result<SearchResponse, BorsaError> {
333        self.search.search(&req).await
334    }
335}
336
337#[async_trait]
338impl EarningsProvider for YfConnector {
339    #[cfg_attr(
340        feature = "tracing",
341        tracing::instrument(
342            name = "borsa_yfinance::earnings",
343            skip(self, instrument),
344            fields(symbol = %instrument.symbol()),
345        )
346    )]
347    async fn earnings(&self, instrument: &Instrument) -> Result<borsa_core::Earnings, BorsaError> {
348        let symbol = instrument.symbol_str();
349        let raw = self.fundamentals.earnings(symbol).await?;
350        Ok(raw)
351    }
352}
353
354#[async_trait]
355impl IncomeStatementProvider for YfConnector {
356    #[cfg_attr(
357        feature = "tracing",
358        tracing::instrument(
359            name = "borsa_yfinance::income_statement",
360            skip(self, instrument),
361            fields(symbol = %instrument.symbol(), quarterly = quarterly),
362        )
363    )]
364    async fn income_statement(
365        &self,
366        instrument: &Instrument,
367        quarterly: bool,
368    ) -> Result<Vec<borsa_core::IncomeStatementRow>, BorsaError> {
369        let raw = self
370            .fundamentals
371            .income_statement(instrument.symbol_str(), quarterly)
372            .await?;
373        Ok(raw)
374    }
375}
376
377#[async_trait]
378impl BalanceSheetProvider for YfConnector {
379    #[cfg_attr(
380        feature = "tracing",
381        tracing::instrument(
382            name = "borsa_yfinance::balance_sheet",
383            skip(self, instrument),
384            fields(symbol = %instrument.symbol(), quarterly = quarterly),
385        )
386    )]
387    async fn balance_sheet(
388        &self,
389        instrument: &Instrument,
390        quarterly: bool,
391    ) -> Result<Vec<borsa_core::BalanceSheetRow>, BorsaError> {
392        let raw = self
393            .fundamentals
394            .balance_sheet(instrument.symbol_str(), quarterly)
395            .await?;
396        Ok(raw)
397    }
398}
399
400#[async_trait]
401impl CashflowProvider for YfConnector {
402    #[cfg_attr(
403        feature = "tracing",
404        tracing::instrument(
405            name = "borsa_yfinance::cashflow",
406            skip(self, instrument),
407            fields(symbol = %instrument.symbol(), quarterly = quarterly),
408        )
409    )]
410    async fn cashflow(
411        &self,
412        instrument: &Instrument,
413        quarterly: bool,
414    ) -> Result<Vec<borsa_core::CashflowRow>, BorsaError> {
415        let raw = self
416            .fundamentals
417            .cashflow(instrument.symbol_str(), quarterly)
418            .await?;
419        Ok(raw)
420    }
421}
422
423#[async_trait]
424impl CalendarProvider for YfConnector {
425    #[cfg_attr(
426        feature = "tracing",
427        tracing::instrument(
428            name = "borsa_yfinance::calendar",
429            skip(self, instrument),
430            fields(symbol = %instrument.symbol()),
431        )
432    )]
433    async fn calendar(&self, instrument: &Instrument) -> Result<borsa_core::Calendar, BorsaError> {
434        let raw = self.fundamentals.calendar(instrument.symbol_str()).await?;
435        Ok(raw)
436    }
437}
438
439#[async_trait]
440impl OptionsExpirationsProvider for YfConnector {
441    #[cfg_attr(
442        feature = "tracing",
443        tracing::instrument(
444            name = "borsa_yfinance::options_expirations",
445            skip(self, instrument),
446            fields(symbol = %instrument.symbol()),
447        )
448    )]
449    async fn options_expirations(&self, instrument: &Instrument) -> Result<Vec<i64>, BorsaError> {
450        self.options.expirations(instrument.symbol_str()).await
451    }
452}
453
454#[async_trait]
455impl OptionChainProvider for YfConnector {
456    #[cfg_attr(
457        feature = "tracing",
458        tracing::instrument(
459            name = "borsa_yfinance::option_chain",
460            skip(self, instrument),
461            fields(symbol = %instrument.symbol(), date = ?date),
462        )
463    )]
464    async fn option_chain(
465        &self,
466        instrument: &Instrument,
467        date: Option<i64>,
468    ) -> Result<borsa_core::OptionChain, BorsaError> {
469        let raw = self.options.chain(instrument.symbol_str(), date).await?;
470        Ok(raw)
471    }
472}
473
474#[async_trait]
475impl RecommendationsProvider for YfConnector {
476    #[cfg_attr(
477        feature = "tracing",
478        tracing::instrument(
479            name = "borsa_yfinance::recommendations",
480            skip(self, instrument),
481            fields(symbol = %instrument.symbol()),
482        )
483    )]
484    async fn recommendations(
485        &self,
486        instrument: &Instrument,
487    ) -> Result<Vec<borsa_core::RecommendationRow>, BorsaError> {
488        let rows = self
489            .analysis
490            .recommendations(instrument.symbol_str())
491            .await?;
492        Ok(rows)
493    }
494}
495
496#[async_trait]
497impl RecommendationsSummaryProvider for YfConnector {
498    #[cfg_attr(
499        feature = "tracing",
500        tracing::instrument(
501            name = "borsa_yfinance::recommendations_summary",
502            skip(self, instrument),
503            fields(symbol = %instrument.symbol()),
504        )
505    )]
506    async fn recommendations_summary(
507        &self,
508        instrument: &Instrument,
509    ) -> Result<borsa_core::RecommendationSummary, BorsaError> {
510        let s = self
511            .analysis
512            .recommendations_summary(instrument.symbol_str())
513            .await?;
514        Ok(s)
515    }
516}
517
518#[async_trait]
519impl UpgradesDowngradesProvider for YfConnector {
520    #[cfg_attr(
521        feature = "tracing",
522        tracing::instrument(
523            name = "borsa_yfinance::upgrades_downgrades",
524            skip(self, instrument),
525            fields(symbol = %instrument.symbol()),
526        )
527    )]
528    async fn upgrades_downgrades(
529        &self,
530        instrument: &Instrument,
531    ) -> Result<Vec<borsa_core::UpgradeDowngradeRow>, BorsaError> {
532        let v = self
533            .analysis
534            .upgrades_downgrades(instrument.symbol_str())
535            .await?;
536        Ok(v)
537    }
538}
539
540#[async_trait]
541impl AnalystPriceTargetProvider for YfConnector {
542    #[cfg_attr(
543        feature = "tracing",
544        tracing::instrument(
545            name = "borsa_yfinance::analyst_price_target",
546            skip(self, instrument),
547            fields(symbol = %instrument.symbol()),
548        )
549    )]
550    async fn analyst_price_target(
551        &self,
552        instrument: &Instrument,
553    ) -> Result<borsa_core::PriceTarget, BorsaError> {
554        let p = self
555            .analysis
556            .analyst_price_target(instrument.symbol_str())
557            .await?;
558        Ok(p)
559    }
560}
561
562#[async_trait]
563impl MajorHoldersProvider for YfConnector {
564    #[cfg_attr(
565        feature = "tracing",
566        tracing::instrument(
567            name = "borsa_yfinance::major_holders",
568            skip(self, instrument),
569            fields(symbol = %instrument.symbol()),
570        )
571    )]
572    async fn major_holders(
573        &self,
574        instrument: &Instrument,
575    ) -> Result<Vec<borsa_core::MajorHolder>, BorsaError> {
576        let rows = self.holders.major_holders(instrument.symbol_str()).await?;
577        let mapped = rows.into_iter().collect();
578        Ok(mapped)
579    }
580}
581
582#[async_trait]
583impl InstitutionalHoldersProvider for YfConnector {
584    #[cfg_attr(
585        feature = "tracing",
586        tracing::instrument(
587            name = "borsa_yfinance::institutional_holders",
588            skip(self, instrument),
589            fields(symbol = %instrument.symbol()),
590        )
591    )]
592    async fn institutional_holders(
593        &self,
594        instrument: &Instrument,
595    ) -> Result<Vec<borsa_core::InstitutionalHolder>, BorsaError> {
596        let rows = self
597            .holders
598            .institutional_holders(instrument.symbol_str())
599            .await?;
600        Ok(rows)
601    }
602}
603
604#[async_trait]
605impl MutualFundHoldersProvider for YfConnector {
606    #[cfg_attr(
607        feature = "tracing",
608        tracing::instrument(
609            name = "borsa_yfinance::mutual_fund_holders",
610            skip(self, instrument),
611            fields(symbol = %instrument.symbol()),
612        )
613    )]
614    async fn mutual_fund_holders(
615        &self,
616        instrument: &Instrument,
617    ) -> Result<Vec<borsa_core::InstitutionalHolder>, BorsaError> {
618        let rows = self
619            .holders
620            .mutual_fund_holders(instrument.symbol_str())
621            .await?;
622        Ok(rows)
623    }
624}
625
626#[async_trait]
627impl InsiderTransactionsProvider for YfConnector {
628    #[cfg_attr(
629        feature = "tracing",
630        tracing::instrument(
631            name = "borsa_yfinance::insider_transactions",
632            skip(self, instrument),
633            fields(symbol = %instrument.symbol()),
634        )
635    )]
636    async fn insider_transactions(
637        &self,
638        instrument: &Instrument,
639    ) -> Result<Vec<borsa_core::InsiderTransaction>, BorsaError> {
640        let rows = self
641            .holders
642            .insider_transactions(instrument.symbol_str())
643            .await?;
644        Ok(rows)
645    }
646}
647
648#[async_trait]
649impl InsiderRosterHoldersProvider for YfConnector {
650    #[cfg_attr(
651        feature = "tracing",
652        tracing::instrument(
653            name = "borsa_yfinance::insider_roster_holders",
654            skip(self, instrument),
655            fields(symbol = %instrument.symbol()),
656        )
657    )]
658    async fn insider_roster_holders(
659        &self,
660        instrument: &Instrument,
661    ) -> Result<Vec<borsa_core::InsiderRosterHolder>, BorsaError> {
662        let rows = self
663            .holders
664            .insider_roster_holders(instrument.symbol_str())
665            .await?;
666        Ok(rows)
667    }
668}
669
670#[async_trait]
671impl NetSharePurchaseActivityProvider for YfConnector {
672    #[cfg_attr(
673        feature = "tracing",
674        tracing::instrument(
675            name = "borsa_yfinance::net_share_purchase_activity",
676            skip(self, instrument),
677            fields(symbol = %instrument.symbol()),
678        )
679    )]
680    async fn net_share_purchase_activity(
681        &self,
682        instrument: &Instrument,
683    ) -> Result<Option<borsa_core::NetSharePurchaseActivity>, BorsaError> {
684        let activity = self
685            .holders
686            .net_share_purchase_activity(instrument.symbol_str())
687            .await?;
688        Ok(activity)
689    }
690}
691
692#[async_trait]
693impl EsgProvider for YfConnector {
694    #[cfg_attr(
695        feature = "tracing",
696        tracing::instrument(
697            name = "borsa_yfinance::sustainability",
698            skip(self, instrument),
699            fields(symbol = %instrument.symbol()),
700        )
701    )]
702    async fn sustainability(
703        &self,
704        instrument: &Instrument,
705    ) -> Result<borsa_core::EsgScores, BorsaError> {
706        let scores = self.esg.sustainability(instrument.symbol_str()).await?;
707        Ok(scores)
708    }
709}
710
711#[async_trait]
712impl NewsProvider for YfConnector {
713    #[cfg_attr(
714        feature = "tracing",
715        tracing::instrument(
716            name = "borsa_yfinance::news",
717            skip(self, instrument, req),
718            fields(symbol = %instrument.symbol()),
719        )
720    )]
721    async fn news(
722        &self,
723        instrument: &Instrument,
724        req: borsa_core::NewsRequest,
725    ) -> Result<Vec<borsa_core::types::NewsArticle>, BorsaError> {
726        let articles = self.news.news(instrument.symbol_str(), req).await?;
727        Ok(articles)
728    }
729}
730
731#[async_trait]
732impl borsa_core::connector::StreamProvider for YfConnector {
733    #[cfg_attr(
734        feature = "tracing",
735        tracing::instrument(
736            name = "borsa_yfinance::stream_quotes",
737            skip(self, instruments),
738            fields(num_symbols = instruments.len()),
739        )
740    )]
741    async fn stream_quotes(
742        &self,
743        instruments: &[Instrument],
744    ) -> Result<
745        (
746            borsa_core::stream::StreamHandle,
747            tokio::sync::mpsc::Receiver<borsa_core::QuoteUpdate>,
748        ),
749        BorsaError,
750    > {
751        let symbols: Vec<String> = instruments.iter().map(|i| i.symbol().to_string()).collect();
752        // Keep the upstream handle and ensure our returned handle can stop it.
753        let (upstream_handle, rx_raw) = self.stream.start(&symbols).await?;
754        let (tx, rx_core) = tokio::sync::mpsc::channel::<borsa_core::QuoteUpdate>(1024);
755        let forward = tokio::spawn(async move {
756            let mut rx = rx_raw;
757            while let Some(u) = rx.recv().await {
758                if tx.send(u).await.is_err() {
759                    break;
760                }
761            }
762        });
763        let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>();
764        let join = tokio::spawn(async move {
765            let _ = stop_rx.await;
766            // Stop upstream stream, then abort forwarder to unblock quickly if needed.
767            upstream_handle.stop().await;
768            forward.abort();
769        });
770        Ok((
771            borsa_core::stream::StreamHandle::new(join, stop_tx),
772            rx_core,
773        ))
774    }
775}
776
777#[async_trait]
778impl BorsaConnector for YfConnector {
779    fn name(&self) -> &'static str {
780        "borsa-yfinance"
781    }
782    fn vendor(&self) -> &'static str {
783        "Yahoo Finance"
784    }
785
786    // capabilities removed; capability directory via as_*_provider
787
788    fn as_history_provider(&self) -> Option<&dyn borsa_core::connector::HistoryProvider> {
789        Some(self as &dyn HistoryProvider)
790    }
791
792    fn as_quote_provider(&self) -> Option<&dyn borsa_core::connector::QuoteProvider> {
793        Some(self as &dyn QuoteProvider)
794    }
795
796    fn as_stream_provider(&self) -> Option<&dyn borsa_core::connector::StreamProvider> {
797        Some(self as &dyn borsa_core::connector::StreamProvider)
798    }
799
800    fn as_profile_provider(&self) -> Option<&dyn borsa_core::connector::ProfileProvider> {
801        Some(self as &dyn ProfileProvider)
802    }
803
804    fn as_isin_provider(&self) -> Option<&dyn borsa_core::connector::IsinProvider> {
805        Some(self as &dyn IsinProvider)
806    }
807
808    fn as_search_provider(&self) -> Option<&dyn borsa_core::connector::SearchProvider> {
809        Some(self as &dyn SearchProvider)
810    }
811
812    fn as_esg_provider(&self) -> Option<&dyn borsa_core::connector::EsgProvider> {
813        Some(self as &dyn EsgProvider)
814    }
815
816    fn as_news_provider(&self) -> Option<&dyn borsa_core::connector::NewsProvider> {
817        Some(self as &dyn NewsProvider)
818    }
819
820    fn as_earnings_provider(&self) -> Option<&dyn borsa_core::connector::EarningsProvider> {
821        Some(self as &dyn EarningsProvider)
822    }
823    fn as_income_statement_provider(
824        &self,
825    ) -> Option<&dyn borsa_core::connector::IncomeStatementProvider> {
826        Some(self as &dyn IncomeStatementProvider)
827    }
828    fn as_balance_sheet_provider(
829        &self,
830    ) -> Option<&dyn borsa_core::connector::BalanceSheetProvider> {
831        Some(self as &dyn BalanceSheetProvider)
832    }
833    fn as_cashflow_provider(&self) -> Option<&dyn borsa_core::connector::CashflowProvider> {
834        Some(self as &dyn CashflowProvider)
835    }
836    fn as_calendar_provider(&self) -> Option<&dyn borsa_core::connector::CalendarProvider> {
837        Some(self as &dyn CalendarProvider)
838    }
839
840    fn as_recommendations_provider(
841        &self,
842    ) -> Option<&dyn borsa_core::connector::RecommendationsProvider> {
843        Some(self as &dyn RecommendationsProvider)
844    }
845    fn as_recommendations_summary_provider(
846        &self,
847    ) -> Option<&dyn borsa_core::connector::RecommendationsSummaryProvider> {
848        Some(self as &dyn RecommendationsSummaryProvider)
849    }
850    fn as_upgrades_downgrades_provider(
851        &self,
852    ) -> Option<&dyn borsa_core::connector::UpgradesDowngradesProvider> {
853        Some(self as &dyn UpgradesDowngradesProvider)
854    }
855    fn as_analyst_price_target_provider(
856        &self,
857    ) -> Option<&dyn borsa_core::connector::AnalystPriceTargetProvider> {
858        Some(self as &dyn AnalystPriceTargetProvider)
859    }
860
861    fn as_major_holders_provider(
862        &self,
863    ) -> Option<&dyn borsa_core::connector::MajorHoldersProvider> {
864        Some(self as &dyn MajorHoldersProvider)
865    }
866    fn as_institutional_holders_provider(
867        &self,
868    ) -> Option<&dyn borsa_core::connector::InstitutionalHoldersProvider> {
869        Some(self as &dyn InstitutionalHoldersProvider)
870    }
871    fn as_mutual_fund_holders_provider(
872        &self,
873    ) -> Option<&dyn borsa_core::connector::MutualFundHoldersProvider> {
874        Some(self as &dyn MutualFundHoldersProvider)
875    }
876    fn as_insider_transactions_provider(
877        &self,
878    ) -> Option<&dyn borsa_core::connector::InsiderTransactionsProvider> {
879        Some(self as &dyn InsiderTransactionsProvider)
880    }
881    fn as_insider_roster_holders_provider(
882        &self,
883    ) -> Option<&dyn borsa_core::connector::InsiderRosterHoldersProvider> {
884        Some(self as &dyn InsiderRosterHoldersProvider)
885    }
886    fn as_net_share_purchase_activity_provider(
887        &self,
888    ) -> Option<&dyn borsa_core::connector::NetSharePurchaseActivityProvider> {
889        Some(self as &dyn NetSharePurchaseActivityProvider)
890    }
891
892    fn as_options_expirations_provider(
893        &self,
894    ) -> Option<&dyn borsa_core::connector::OptionsExpirationsProvider> {
895        Some(self as &dyn OptionsExpirationsProvider)
896    }
897    fn as_option_chain_provider(&self) -> Option<&dyn borsa_core::connector::OptionChainProvider> {
898        Some(self as &dyn OptionChainProvider)
899    }
900
901    /// yfinance is fairly broad; we default to `true` and let router priorities steer quality.
902    fn supports_kind(&self, kind: AssetKind) -> bool {
903        matches!(
904            kind,
905            AssetKind::Equity
906                | AssetKind::Fund
907                | AssetKind::Index
908                | AssetKind::Crypto
909                | AssetKind::Forex
910        )
911    }
912}