1#![warn(missing_docs)]
7
8pub mod adapter;
10
11use std::sync::Arc;
12
13#[cfg(feature = "test-adapters")]
14use adapter::CloneArcAdapters;
15use adapter::{
16 RealAdapter, YfAnalysis, YfEsg, YfFundamentals, YfHistory, YfHolders, YfNews, YfOptions,
17 YfProfile, YfQuotes, YfSearch, YfStream,
18};
19use async_trait::async_trait;
20use borsa_core::{
21 AssetKind, BorsaError, HistoryRequest, HistoryResponse, Instrument, Quote, SearchRequest,
22 SearchResponse,
23 connector::{
24 AnalystPriceTargetProvider, BalanceSheetProvider, BorsaConnector, CalendarProvider,
25 CashflowProvider, ConnectorKey, EarningsProvider, EsgProvider, HistoryProvider,
26 IncomeStatementProvider, InsiderRosterHoldersProvider, InsiderTransactionsProvider,
27 InstitutionalHoldersProvider, IsinProvider, MajorHoldersProvider,
28 MutualFundHoldersProvider, NetSharePurchaseActivityProvider, NewsProvider,
29 OptionChainProvider, OptionsExpirationsProvider, ProfileProvider, QuoteProvider,
30 RecommendationsProvider, RecommendationsSummaryProvider, SearchProvider,
31 UpgradesDowngradesProvider,
32 },
33};
34
35#[cfg(not(feature = "test-adapters"))]
36type AdapterArc = Arc<RealAdapter>;
37
38#[cfg(feature = "test-adapters")]
39type HistoryAdapter = Arc<dyn YfHistory>;
40#[cfg(not(feature = "test-adapters"))]
41type HistoryAdapter = AdapterArc;
42
43#[cfg(feature = "test-adapters")]
44type QuotesAdapter = Arc<dyn YfQuotes>;
45#[cfg(not(feature = "test-adapters"))]
46type QuotesAdapter = AdapterArc;
47
48#[cfg(feature = "test-adapters")]
49type SearchAdapter = Arc<dyn YfSearch>;
50#[cfg(not(feature = "test-adapters"))]
51type SearchAdapter = AdapterArc;
52
53#[cfg(feature = "test-adapters")]
54type ProfileAdapter = Arc<dyn YfProfile>;
55#[cfg(not(feature = "test-adapters"))]
56type ProfileAdapter = AdapterArc;
57
58#[cfg(feature = "test-adapters")]
59type FundamentalsAdapter = Arc<dyn YfFundamentals>;
60#[cfg(not(feature = "test-adapters"))]
61type FundamentalsAdapter = AdapterArc;
62
63#[cfg(feature = "test-adapters")]
64type OptionsAdapter = Arc<dyn YfOptions>;
65#[cfg(not(feature = "test-adapters"))]
66type OptionsAdapter = AdapterArc;
67
68#[cfg(feature = "test-adapters")]
69type AnalysisAdapter = Arc<dyn YfAnalysis>;
70#[cfg(not(feature = "test-adapters"))]
71type AnalysisAdapter = AdapterArc;
72
73#[cfg(feature = "test-adapters")]
74type HoldersAdapter = Arc<dyn YfHolders>;
75#[cfg(not(feature = "test-adapters"))]
76type HoldersAdapter = AdapterArc;
77
78#[cfg(feature = "test-adapters")]
79type EsgAdapter = Arc<dyn YfEsg>;
80#[cfg(not(feature = "test-adapters"))]
81type EsgAdapter = AdapterArc;
82
83#[cfg(feature = "test-adapters")]
84type NewsAdapter = Arc<dyn YfNews>;
85#[cfg(not(feature = "test-adapters"))]
86type NewsAdapter = AdapterArc;
87
88#[cfg(feature = "test-adapters")]
89type StreamAdapter = Arc<dyn YfStream>;
90#[cfg(not(feature = "test-adapters"))]
91type StreamAdapter = AdapterArc;
92
93pub struct YfConnector {
95 history: HistoryAdapter,
96 quotes: QuotesAdapter,
97 search: SearchAdapter,
98 profile: ProfileAdapter,
99 fundamentals: FundamentalsAdapter,
100 options: OptionsAdapter,
101 analysis: AnalysisAdapter,
102 holders: HoldersAdapter,
103 esg: EsgAdapter,
104 news: NewsAdapter,
105 stream: StreamAdapter,
106}
107
108impl YfConnector {
109 #[must_use]
111 pub const fn key_static() -> ConnectorKey {
112 ConnectorKey::new("borsa-yfinance")
113 }
114
115 fn looks_like_not_found(msg: &str) -> bool {
116 let m = msg.to_ascii_lowercase();
117 m.contains("not found") || m.contains("no data") || m.contains("no matches")
118 }
119
120 fn normalize_error(e: BorsaError, what: &str) -> BorsaError {
121 match e {
122 BorsaError::Connector { connector: _, msg } => {
123 if Self::looks_like_not_found(&msg) {
124 BorsaError::not_found(what.to_string())
125 } else {
126 BorsaError::connector("borsa-yfinance", msg)
127 }
128 }
129 BorsaError::Other(msg) => BorsaError::connector("borsa-yfinance", msg),
130 other => other,
131 }
132 }
133 #[must_use]
135 pub fn new_default() -> Self {
136 let a = RealAdapter::new_default();
137 Self::from_adapter(&a)
138 }
139
140 #[must_use]
142 pub fn new_with_client(client: yfinance_rs::YfClient) -> Self {
143 let a = RealAdapter::new(client);
144 Self::from_adapter(&a)
145 }
146
147 pub fn try_new_with_reqwest_client(
154 http: reqwest::Client,
155 ) -> Result<Self, borsa_core::BorsaError> {
156 let yf = yfinance_rs::YfClient::builder()
157 .custom_client(http)
158 .user_agent("Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36")
159 .build()
160 .map_err(|e| borsa_core::BorsaError::Other(e.to_string()))?;
161 Ok(Self::new_with_client(yf))
162 }
163
164 #[cfg(feature = "test-adapters")]
168 pub fn from_adapter<A: CloneArcAdapters + 'static>(adapter: &A) -> Self {
169 Self {
170 history: adapter.clone_arc_history(),
171 quotes: adapter.clone_arc_quotes(),
172 search: adapter.clone_arc_search(),
173 profile: adapter.clone_arc_profile(),
174 fundamentals: adapter.clone_arc_fundamentals(),
175 options: adapter.clone_arc_options(),
176 analysis: adapter.clone_arc_analysis(),
177 holders: adapter.clone_arc_holders(),
178 esg: adapter.clone_arc_esg(),
179 news: adapter.clone_arc_news(),
180 stream: adapter.clone_arc_stream(),
181 }
182 }
183
184 #[cfg(not(feature = "test-adapters"))]
185 pub fn from_adapter(adapter: &RealAdapter) -> Self {
187 let shared = Arc::new(adapter.clone());
188 Self {
189 history: Arc::clone(&shared),
190 quotes: Arc::clone(&shared),
191 search: Arc::clone(&shared),
192 profile: Arc::clone(&shared),
193 fundamentals: Arc::clone(&shared),
194 options: Arc::clone(&shared),
195 analysis: Arc::clone(&shared),
196 holders: Arc::clone(&shared),
197 esg: Arc::clone(&shared),
198 news: Arc::clone(&shared),
199 stream: shared,
200 }
201 }
202}
203
204#[async_trait]
205impl QuoteProvider for YfConnector {
206 #[cfg_attr(
207 feature = "tracing",
208 tracing::instrument(
209 name = "borsa_yfinance::quote",
210 skip(self, instrument),
211 fields(symbol = %instrument.symbol()),
212 )
213 )]
214 async fn quote(&self, instrument: &Instrument) -> Result<Quote, BorsaError> {
215 let raw = self
216 .quotes
217 .fetch(std::slice::from_ref(&instrument.symbol().to_string()))
218 .await
219 .map_err(|e| Self::normalize_error(e, &format!("quote for {}", instrument.symbol())))?;
220 let first = raw
221 .into_iter()
222 .next()
223 .ok_or_else(|| BorsaError::not_found(format!("quote for {}", instrument.symbol())))?;
224 Ok(first)
225 }
226}
227
228#[async_trait]
229impl HistoryProvider for YfConnector {
230 #[cfg_attr(
231 feature = "tracing",
232 tracing::instrument(
233 name = "borsa_yfinance::history",
234 skip(self, instrument, req),
235 fields(symbol = %instrument.symbol(), interval = ?req.interval(), range = ?req.range(), include_prepost = req.include_prepost(), include_actions = req.include_actions(), auto_adjust = req.auto_adjust(), keepna = req.keepna()),
236 )
237 )]
238 async fn history(
239 &self,
240 instrument: &Instrument,
241 req: HistoryRequest,
242 ) -> Result<HistoryResponse, BorsaError> {
243 let yf_req = yfinance_rs::core::services::HistoryRequest {
244 range: req.range(),
245 period: req.period().map(|(s, e)| (s.timestamp(), e.timestamp())),
246 interval: req.interval(),
247 include_prepost: req.include_prepost(),
248 include_actions: req.include_actions(),
249 auto_adjust: req.auto_adjust(),
250 keepna: req.keepna(),
251 };
252 let symbol = instrument.symbol_str();
253 let raw = self.history.fetch_full(symbol, yf_req).await?;
254 Ok(raw)
255 }
256
257 fn supported_history_intervals(
258 &self,
259 _kind: AssetKind,
260 ) -> &'static [borsa_core::types::Interval] {
261 use borsa_core::types::Interval as I;
262 const YF_INTERVALS: &[I] = &[
263 I::I1m,
264 I::I2m,
265 I::I5m,
266 I::I15m,
267 I::I30m,
268 I::I1h,
269 I::I90m,
270 I::D1,
271 I::D5,
272 I::W1,
273 I::M1,
274 I::M3,
275 ];
276 YF_INTERVALS
277 }
278}
279
280#[async_trait]
281impl ProfileProvider for YfConnector {
282 #[cfg_attr(
283 feature = "tracing",
284 tracing::instrument(
285 name = "borsa_yfinance::profile",
286 skip(self, instrument),
287 fields(symbol = %instrument.symbol()),
288 )
289 )]
290 async fn profile(&self, instrument: &Instrument) -> Result<borsa_core::Profile, BorsaError> {
291 let symbol = instrument.symbol_str();
292 let raw = self.profile.load(symbol).await?;
293 Ok(raw)
294 }
295}
296
297#[async_trait]
298impl IsinProvider for YfConnector {
299 #[cfg_attr(
300 feature = "tracing",
301 tracing::instrument(
302 name = "borsa_yfinance::isin",
303 skip(self, instrument),
304 fields(symbol = %instrument.symbol()),
305 )
306 )]
307 async fn isin(&self, instrument: &Instrument) -> Result<Option<borsa_core::Isin>, BorsaError> {
308 let symbol = instrument.symbol_str();
309 match self.profile.isin(symbol).await {
310 Ok(Some(isin_str)) => match borsa_core::Isin::new(&isin_str) {
311 Ok(isin) => Ok(Some(isin)),
312 Err(e) => Err(BorsaError::Data(format!(
313 "Provider returned invalid ISIN '{isin_str}': {e}"
314 ))),
315 },
316 Ok(None) => Ok(None),
317 Err(e) => Err(Self::normalize_error(e, &format!("isin for {symbol}"))),
318 }
319 }
320}
321
322#[async_trait]
323impl SearchProvider for YfConnector {
324 #[cfg_attr(
325 feature = "tracing",
326 tracing::instrument(
327 name = "borsa_yfinance::search",
328 skip(self, req),
329 fields(kind = ?req.kind(), limit = req.limit()),
330 )
331 )]
332 async fn search(&self, req: SearchRequest) -> Result<SearchResponse, BorsaError> {
333 self.search.search(&req).await
334 }
335}
336
337#[async_trait]
338impl EarningsProvider for YfConnector {
339 #[cfg_attr(
340 feature = "tracing",
341 tracing::instrument(
342 name = "borsa_yfinance::earnings",
343 skip(self, instrument),
344 fields(symbol = %instrument.symbol()),
345 )
346 )]
347 async fn earnings(&self, instrument: &Instrument) -> Result<borsa_core::Earnings, BorsaError> {
348 let symbol = instrument.symbol_str();
349 let raw = self.fundamentals.earnings(symbol).await?;
350 Ok(raw)
351 }
352}
353
354#[async_trait]
355impl IncomeStatementProvider for YfConnector {
356 #[cfg_attr(
357 feature = "tracing",
358 tracing::instrument(
359 name = "borsa_yfinance::income_statement",
360 skip(self, instrument),
361 fields(symbol = %instrument.symbol(), quarterly = quarterly),
362 )
363 )]
364 async fn income_statement(
365 &self,
366 instrument: &Instrument,
367 quarterly: bool,
368 ) -> Result<Vec<borsa_core::IncomeStatementRow>, BorsaError> {
369 let raw = self
370 .fundamentals
371 .income_statement(instrument.symbol_str(), quarterly)
372 .await?;
373 Ok(raw)
374 }
375}
376
377#[async_trait]
378impl BalanceSheetProvider for YfConnector {
379 #[cfg_attr(
380 feature = "tracing",
381 tracing::instrument(
382 name = "borsa_yfinance::balance_sheet",
383 skip(self, instrument),
384 fields(symbol = %instrument.symbol(), quarterly = quarterly),
385 )
386 )]
387 async fn balance_sheet(
388 &self,
389 instrument: &Instrument,
390 quarterly: bool,
391 ) -> Result<Vec<borsa_core::BalanceSheetRow>, BorsaError> {
392 let raw = self
393 .fundamentals
394 .balance_sheet(instrument.symbol_str(), quarterly)
395 .await?;
396 Ok(raw)
397 }
398}
399
400#[async_trait]
401impl CashflowProvider for YfConnector {
402 #[cfg_attr(
403 feature = "tracing",
404 tracing::instrument(
405 name = "borsa_yfinance::cashflow",
406 skip(self, instrument),
407 fields(symbol = %instrument.symbol(), quarterly = quarterly),
408 )
409 )]
410 async fn cashflow(
411 &self,
412 instrument: &Instrument,
413 quarterly: bool,
414 ) -> Result<Vec<borsa_core::CashflowRow>, BorsaError> {
415 let raw = self
416 .fundamentals
417 .cashflow(instrument.symbol_str(), quarterly)
418 .await?;
419 Ok(raw)
420 }
421}
422
423#[async_trait]
424impl CalendarProvider for YfConnector {
425 #[cfg_attr(
426 feature = "tracing",
427 tracing::instrument(
428 name = "borsa_yfinance::calendar",
429 skip(self, instrument),
430 fields(symbol = %instrument.symbol()),
431 )
432 )]
433 async fn calendar(&self, instrument: &Instrument) -> Result<borsa_core::Calendar, BorsaError> {
434 let raw = self.fundamentals.calendar(instrument.symbol_str()).await?;
435 Ok(raw)
436 }
437}
438
439#[async_trait]
440impl OptionsExpirationsProvider for YfConnector {
441 #[cfg_attr(
442 feature = "tracing",
443 tracing::instrument(
444 name = "borsa_yfinance::options_expirations",
445 skip(self, instrument),
446 fields(symbol = %instrument.symbol()),
447 )
448 )]
449 async fn options_expirations(&self, instrument: &Instrument) -> Result<Vec<i64>, BorsaError> {
450 self.options.expirations(instrument.symbol_str()).await
451 }
452}
453
454#[async_trait]
455impl OptionChainProvider for YfConnector {
456 #[cfg_attr(
457 feature = "tracing",
458 tracing::instrument(
459 name = "borsa_yfinance::option_chain",
460 skip(self, instrument),
461 fields(symbol = %instrument.symbol(), date = ?date),
462 )
463 )]
464 async fn option_chain(
465 &self,
466 instrument: &Instrument,
467 date: Option<i64>,
468 ) -> Result<borsa_core::OptionChain, BorsaError> {
469 let raw = self.options.chain(instrument.symbol_str(), date).await?;
470 Ok(raw)
471 }
472}
473
474#[async_trait]
475impl RecommendationsProvider for YfConnector {
476 #[cfg_attr(
477 feature = "tracing",
478 tracing::instrument(
479 name = "borsa_yfinance::recommendations",
480 skip(self, instrument),
481 fields(symbol = %instrument.symbol()),
482 )
483 )]
484 async fn recommendations(
485 &self,
486 instrument: &Instrument,
487 ) -> Result<Vec<borsa_core::RecommendationRow>, BorsaError> {
488 let rows = self
489 .analysis
490 .recommendations(instrument.symbol_str())
491 .await?;
492 Ok(rows)
493 }
494}
495
496#[async_trait]
497impl RecommendationsSummaryProvider for YfConnector {
498 #[cfg_attr(
499 feature = "tracing",
500 tracing::instrument(
501 name = "borsa_yfinance::recommendations_summary",
502 skip(self, instrument),
503 fields(symbol = %instrument.symbol()),
504 )
505 )]
506 async fn recommendations_summary(
507 &self,
508 instrument: &Instrument,
509 ) -> Result<borsa_core::RecommendationSummary, BorsaError> {
510 let s = self
511 .analysis
512 .recommendations_summary(instrument.symbol_str())
513 .await?;
514 Ok(s)
515 }
516}
517
518#[async_trait]
519impl UpgradesDowngradesProvider for YfConnector {
520 #[cfg_attr(
521 feature = "tracing",
522 tracing::instrument(
523 name = "borsa_yfinance::upgrades_downgrades",
524 skip(self, instrument),
525 fields(symbol = %instrument.symbol()),
526 )
527 )]
528 async fn upgrades_downgrades(
529 &self,
530 instrument: &Instrument,
531 ) -> Result<Vec<borsa_core::UpgradeDowngradeRow>, BorsaError> {
532 let v = self
533 .analysis
534 .upgrades_downgrades(instrument.symbol_str())
535 .await?;
536 Ok(v)
537 }
538}
539
540#[async_trait]
541impl AnalystPriceTargetProvider for YfConnector {
542 #[cfg_attr(
543 feature = "tracing",
544 tracing::instrument(
545 name = "borsa_yfinance::analyst_price_target",
546 skip(self, instrument),
547 fields(symbol = %instrument.symbol()),
548 )
549 )]
550 async fn analyst_price_target(
551 &self,
552 instrument: &Instrument,
553 ) -> Result<borsa_core::PriceTarget, BorsaError> {
554 let p = self
555 .analysis
556 .analyst_price_target(instrument.symbol_str())
557 .await?;
558 Ok(p)
559 }
560}
561
562#[async_trait]
563impl MajorHoldersProvider for YfConnector {
564 #[cfg_attr(
565 feature = "tracing",
566 tracing::instrument(
567 name = "borsa_yfinance::major_holders",
568 skip(self, instrument),
569 fields(symbol = %instrument.symbol()),
570 )
571 )]
572 async fn major_holders(
573 &self,
574 instrument: &Instrument,
575 ) -> Result<Vec<borsa_core::MajorHolder>, BorsaError> {
576 let rows = self.holders.major_holders(instrument.symbol_str()).await?;
577 let mapped = rows.into_iter().collect();
578 Ok(mapped)
579 }
580}
581
582#[async_trait]
583impl InstitutionalHoldersProvider for YfConnector {
584 #[cfg_attr(
585 feature = "tracing",
586 tracing::instrument(
587 name = "borsa_yfinance::institutional_holders",
588 skip(self, instrument),
589 fields(symbol = %instrument.symbol()),
590 )
591 )]
592 async fn institutional_holders(
593 &self,
594 instrument: &Instrument,
595 ) -> Result<Vec<borsa_core::InstitutionalHolder>, BorsaError> {
596 let rows = self
597 .holders
598 .institutional_holders(instrument.symbol_str())
599 .await?;
600 Ok(rows)
601 }
602}
603
604#[async_trait]
605impl MutualFundHoldersProvider for YfConnector {
606 #[cfg_attr(
607 feature = "tracing",
608 tracing::instrument(
609 name = "borsa_yfinance::mutual_fund_holders",
610 skip(self, instrument),
611 fields(symbol = %instrument.symbol()),
612 )
613 )]
614 async fn mutual_fund_holders(
615 &self,
616 instrument: &Instrument,
617 ) -> Result<Vec<borsa_core::InstitutionalHolder>, BorsaError> {
618 let rows = self
619 .holders
620 .mutual_fund_holders(instrument.symbol_str())
621 .await?;
622 Ok(rows)
623 }
624}
625
626#[async_trait]
627impl InsiderTransactionsProvider for YfConnector {
628 #[cfg_attr(
629 feature = "tracing",
630 tracing::instrument(
631 name = "borsa_yfinance::insider_transactions",
632 skip(self, instrument),
633 fields(symbol = %instrument.symbol()),
634 )
635 )]
636 async fn insider_transactions(
637 &self,
638 instrument: &Instrument,
639 ) -> Result<Vec<borsa_core::InsiderTransaction>, BorsaError> {
640 let rows = self
641 .holders
642 .insider_transactions(instrument.symbol_str())
643 .await?;
644 Ok(rows)
645 }
646}
647
648#[async_trait]
649impl InsiderRosterHoldersProvider for YfConnector {
650 #[cfg_attr(
651 feature = "tracing",
652 tracing::instrument(
653 name = "borsa_yfinance::insider_roster_holders",
654 skip(self, instrument),
655 fields(symbol = %instrument.symbol()),
656 )
657 )]
658 async fn insider_roster_holders(
659 &self,
660 instrument: &Instrument,
661 ) -> Result<Vec<borsa_core::InsiderRosterHolder>, BorsaError> {
662 let rows = self
663 .holders
664 .insider_roster_holders(instrument.symbol_str())
665 .await?;
666 Ok(rows)
667 }
668}
669
670#[async_trait]
671impl NetSharePurchaseActivityProvider for YfConnector {
672 #[cfg_attr(
673 feature = "tracing",
674 tracing::instrument(
675 name = "borsa_yfinance::net_share_purchase_activity",
676 skip(self, instrument),
677 fields(symbol = %instrument.symbol()),
678 )
679 )]
680 async fn net_share_purchase_activity(
681 &self,
682 instrument: &Instrument,
683 ) -> Result<Option<borsa_core::NetSharePurchaseActivity>, BorsaError> {
684 let activity = self
685 .holders
686 .net_share_purchase_activity(instrument.symbol_str())
687 .await?;
688 Ok(activity)
689 }
690}
691
692#[async_trait]
693impl EsgProvider for YfConnector {
694 #[cfg_attr(
695 feature = "tracing",
696 tracing::instrument(
697 name = "borsa_yfinance::sustainability",
698 skip(self, instrument),
699 fields(symbol = %instrument.symbol()),
700 )
701 )]
702 async fn sustainability(
703 &self,
704 instrument: &Instrument,
705 ) -> Result<borsa_core::EsgScores, BorsaError> {
706 let scores = self.esg.sustainability(instrument.symbol_str()).await?;
707 Ok(scores)
708 }
709}
710
711#[async_trait]
712impl NewsProvider for YfConnector {
713 #[cfg_attr(
714 feature = "tracing",
715 tracing::instrument(
716 name = "borsa_yfinance::news",
717 skip(self, instrument, req),
718 fields(symbol = %instrument.symbol()),
719 )
720 )]
721 async fn news(
722 &self,
723 instrument: &Instrument,
724 req: borsa_core::NewsRequest,
725 ) -> Result<Vec<borsa_core::types::NewsArticle>, BorsaError> {
726 let articles = self.news.news(instrument.symbol_str(), req).await?;
727 Ok(articles)
728 }
729}
730
731#[async_trait]
732impl borsa_core::connector::StreamProvider for YfConnector {
733 #[cfg_attr(
734 feature = "tracing",
735 tracing::instrument(
736 name = "borsa_yfinance::stream_quotes",
737 skip(self, instruments),
738 fields(num_symbols = instruments.len()),
739 )
740 )]
741 async fn stream_quotes(
742 &self,
743 instruments: &[Instrument],
744 ) -> Result<
745 (
746 borsa_core::stream::StreamHandle,
747 tokio::sync::mpsc::Receiver<borsa_core::QuoteUpdate>,
748 ),
749 BorsaError,
750 > {
751 let symbols: Vec<String> = instruments.iter().map(|i| i.symbol().to_string()).collect();
752 let (upstream_handle, rx_raw) = self.stream.start(&symbols).await?;
754 let (tx, rx_core) = tokio::sync::mpsc::channel::<borsa_core::QuoteUpdate>(1024);
755 let forward = tokio::spawn(async move {
756 let mut rx = rx_raw;
757 while let Some(u) = rx.recv().await {
758 if tx.send(u).await.is_err() {
759 break;
760 }
761 }
762 });
763 let (stop_tx, stop_rx) = tokio::sync::oneshot::channel::<()>();
764 let join = tokio::spawn(async move {
765 let _ = stop_rx.await;
766 upstream_handle.stop().await;
768 forward.abort();
769 });
770 Ok((
771 borsa_core::stream::StreamHandle::new(join, stop_tx),
772 rx_core,
773 ))
774 }
775}
776
777#[async_trait]
778impl BorsaConnector for YfConnector {
779 fn name(&self) -> &'static str {
780 "borsa-yfinance"
781 }
782 fn vendor(&self) -> &'static str {
783 "Yahoo Finance"
784 }
785
786 fn as_history_provider(&self) -> Option<&dyn borsa_core::connector::HistoryProvider> {
789 Some(self as &dyn HistoryProvider)
790 }
791
792 fn as_quote_provider(&self) -> Option<&dyn borsa_core::connector::QuoteProvider> {
793 Some(self as &dyn QuoteProvider)
794 }
795
796 fn as_stream_provider(&self) -> Option<&dyn borsa_core::connector::StreamProvider> {
797 Some(self as &dyn borsa_core::connector::StreamProvider)
798 }
799
800 fn as_profile_provider(&self) -> Option<&dyn borsa_core::connector::ProfileProvider> {
801 Some(self as &dyn ProfileProvider)
802 }
803
804 fn as_isin_provider(&self) -> Option<&dyn borsa_core::connector::IsinProvider> {
805 Some(self as &dyn IsinProvider)
806 }
807
808 fn as_search_provider(&self) -> Option<&dyn borsa_core::connector::SearchProvider> {
809 Some(self as &dyn SearchProvider)
810 }
811
812 fn as_esg_provider(&self) -> Option<&dyn borsa_core::connector::EsgProvider> {
813 Some(self as &dyn EsgProvider)
814 }
815
816 fn as_news_provider(&self) -> Option<&dyn borsa_core::connector::NewsProvider> {
817 Some(self as &dyn NewsProvider)
818 }
819
820 fn as_earnings_provider(&self) -> Option<&dyn borsa_core::connector::EarningsProvider> {
821 Some(self as &dyn EarningsProvider)
822 }
823 fn as_income_statement_provider(
824 &self,
825 ) -> Option<&dyn borsa_core::connector::IncomeStatementProvider> {
826 Some(self as &dyn IncomeStatementProvider)
827 }
828 fn as_balance_sheet_provider(
829 &self,
830 ) -> Option<&dyn borsa_core::connector::BalanceSheetProvider> {
831 Some(self as &dyn BalanceSheetProvider)
832 }
833 fn as_cashflow_provider(&self) -> Option<&dyn borsa_core::connector::CashflowProvider> {
834 Some(self as &dyn CashflowProvider)
835 }
836 fn as_calendar_provider(&self) -> Option<&dyn borsa_core::connector::CalendarProvider> {
837 Some(self as &dyn CalendarProvider)
838 }
839
840 fn as_recommendations_provider(
841 &self,
842 ) -> Option<&dyn borsa_core::connector::RecommendationsProvider> {
843 Some(self as &dyn RecommendationsProvider)
844 }
845 fn as_recommendations_summary_provider(
846 &self,
847 ) -> Option<&dyn borsa_core::connector::RecommendationsSummaryProvider> {
848 Some(self as &dyn RecommendationsSummaryProvider)
849 }
850 fn as_upgrades_downgrades_provider(
851 &self,
852 ) -> Option<&dyn borsa_core::connector::UpgradesDowngradesProvider> {
853 Some(self as &dyn UpgradesDowngradesProvider)
854 }
855 fn as_analyst_price_target_provider(
856 &self,
857 ) -> Option<&dyn borsa_core::connector::AnalystPriceTargetProvider> {
858 Some(self as &dyn AnalystPriceTargetProvider)
859 }
860
861 fn as_major_holders_provider(
862 &self,
863 ) -> Option<&dyn borsa_core::connector::MajorHoldersProvider> {
864 Some(self as &dyn MajorHoldersProvider)
865 }
866 fn as_institutional_holders_provider(
867 &self,
868 ) -> Option<&dyn borsa_core::connector::InstitutionalHoldersProvider> {
869 Some(self as &dyn InstitutionalHoldersProvider)
870 }
871 fn as_mutual_fund_holders_provider(
872 &self,
873 ) -> Option<&dyn borsa_core::connector::MutualFundHoldersProvider> {
874 Some(self as &dyn MutualFundHoldersProvider)
875 }
876 fn as_insider_transactions_provider(
877 &self,
878 ) -> Option<&dyn borsa_core::connector::InsiderTransactionsProvider> {
879 Some(self as &dyn InsiderTransactionsProvider)
880 }
881 fn as_insider_roster_holders_provider(
882 &self,
883 ) -> Option<&dyn borsa_core::connector::InsiderRosterHoldersProvider> {
884 Some(self as &dyn InsiderRosterHoldersProvider)
885 }
886 fn as_net_share_purchase_activity_provider(
887 &self,
888 ) -> Option<&dyn borsa_core::connector::NetSharePurchaseActivityProvider> {
889 Some(self as &dyn NetSharePurchaseActivityProvider)
890 }
891
892 fn as_options_expirations_provider(
893 &self,
894 ) -> Option<&dyn borsa_core::connector::OptionsExpirationsProvider> {
895 Some(self as &dyn OptionsExpirationsProvider)
896 }
897 fn as_option_chain_provider(&self) -> Option<&dyn borsa_core::connector::OptionChainProvider> {
898 Some(self as &dyn OptionChainProvider)
899 }
900
901 fn supports_kind(&self, kind: AssetKind) -> bool {
903 matches!(
904 kind,
905 AssetKind::Equity
906 | AssetKind::Fund
907 | AssetKind::Index
908 | AssetKind::Crypto
909 | AssetKind::Forex
910 )
911 }
912}