1use std::collections::{HashMap, HashSet};
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use bon::{Builder, bon};
8use reqwest::header::{COOKIE, HeaderValue, ORIGIN, REFERER, USER_AGENT};
9use reqwest_middleware::{
10 ClientBuilder as MiddlewareClientBuilder, ClientWithMiddleware, RequestBuilder,
11};
12use reqwest_retry::{Jitter, RetryTransientMiddleware, policies::ExponentialBackoff};
13use serde::de::DeserializeOwned;
14use tokio::sync::{Mutex, OwnedSemaphorePermit, RwLock, Semaphore};
15use tokio::time::{Instant as TokioInstant, sleep_until};
16#[cfg(feature = "tracing")]
17use tracing::{debug, warn};
18use url::Url;
19
20#[cfg(feature = "calendar")]
21use crate::calendar::{
22 CalendarWindowRequest, DividendCalendarEntry, DividendCalendarRequest, EarningsCalendarEntry,
23 IpoCalendarEntry,
24};
25#[cfg(feature = "economics")]
26use crate::economics::{
27 EconomicCalendarRequest, EconomicCalendarResponse, RawEconomicCalendarResponse,
28 sanitize_calendar,
29};
30use crate::error::{Error, Result};
31use crate::history::{
32 Adjustment, HistoryRequest, HistorySeries, TradingSession,
33 fetch_history_with_timeout_for_client,
34};
35use crate::scanner::{
36 Market, PartiallySupportedColumn, RawScanResponse, ScanQuery, ScanResponse,
37 ScanValidationReport, ScannerMetainfo, ScreenerKind, embedded_registry,
38};
39#[cfg(feature = "search")]
40use crate::search::{
41 RawSearchResponse, SearchHit, SearchRequest, SearchResponse, sanitize_response,
42};
43use crate::transport::websocket::{TradingViewWebSocket, connect_socket};
44
45const DEFAULT_USER_AGENT: &str =
46 "tvdata-rs/0.1 (+https://github.com/deepentropy/tvscreener reference)";
47const DEFAULT_AUTH_TOKEN: &str = "unauthorized_user_token";
48
49fn default_scanner_base_url() -> Url {
50 Url::parse("https://scanner.tradingview.com").expect("default scanner endpoint must be valid")
51}
52
53fn default_symbol_search_base_url() -> Url {
54 Url::parse("https://symbol-search.tradingview.com/symbol_search/v3/")
55 .expect("default symbol search endpoint must be valid")
56}
57
58fn default_calendar_base_url() -> Url {
59 Url::parse("https://chartevents-reuters.tradingview.com/events")
60 .expect("default calendar endpoint must be valid")
61}
62
63fn default_websocket_url() -> Url {
64 Url::parse("wss://data.tradingview.com/socket.io/websocket")
65 .expect("default websocket endpoint must be valid")
66}
67
68fn default_site_origin() -> Url {
69 Url::parse("https://www.tradingview.com").expect("default site origin must be valid")
70}
71
72fn default_data_origin() -> Url {
73 Url::parse("https://data.tradingview.com").expect("default data origin must be valid")
74}
75
76fn default_timeout() -> Duration {
77 Duration::from_secs(30)
78}
79
80fn default_history_session_timeout() -> Duration {
81 Duration::from_secs(30)
82}
83
84fn default_history_batch_concurrency() -> usize {
85 6
86}
87
88fn default_backend_http_budget_concurrency() -> usize {
89 8
90}
91
92fn default_backend_history_batch_concurrency() -> usize {
93 6
94}
95
96fn default_backend_websocket_budget_concurrency() -> usize {
97 6
98}
99
100fn default_backend_http_min_interval() -> Duration {
101 Duration::from_millis(50)
102}
103
104fn default_research_http_budget_concurrency() -> usize {
105 4
106}
107
108fn default_research_websocket_budget_concurrency() -> usize {
109 4
110}
111
112fn default_research_http_min_interval() -> Duration {
113 Duration::from_millis(25)
114}
115
116fn default_interactive_http_budget_concurrency() -> usize {
117 2
118}
119
120fn default_interactive_websocket_budget_concurrency() -> usize {
121 2
122}
123
124#[cfg(any(
125 feature = "calendar",
126 feature = "crypto",
127 feature = "equity",
128 feature = "forex"
129))]
130fn default_snapshot_chunk_size() -> usize {
131 250
132}
133
134#[cfg(any(
135 feature = "calendar",
136 feature = "crypto",
137 feature = "equity",
138 feature = "forex"
139))]
140fn default_snapshot_chunk_concurrency() -> usize {
141 4
142}
143
144#[cfg(any(
145 feature = "calendar",
146 feature = "crypto",
147 feature = "equity",
148 feature = "forex"
149))]
150fn default_snapshot_auto_single_request_limit() -> usize {
151 1_000
152}
153
154#[cfg(any(
155 feature = "calendar",
156 feature = "crypto",
157 feature = "equity",
158 feature = "forex"
159))]
160fn default_snapshot_auto_target_cells() -> usize {
161 25_000
162}
163
164fn default_user_agent() -> String {
165 DEFAULT_USER_AGENT.to_owned()
166}
167
168fn default_auth_token() -> String {
169 DEFAULT_AUTH_TOKEN.to_owned()
170}
171
172fn default_anonymous_auth_token() -> String {
173 DEFAULT_AUTH_TOKEN.to_owned()
174}
175
176fn cookie_header_value(session_id: &str) -> Result<HeaderValue> {
177 HeaderValue::from_str(&format!("sessionid={session_id}"))
178 .map_err(|_| Error::Protocol("invalid session id configured for cookie header"))
179}
180
181fn default_min_retry_interval() -> Duration {
182 Duration::from_millis(250)
183}
184
185fn default_max_retry_interval() -> Duration {
186 Duration::from_secs(2)
187}
188
189fn parse_url(value: impl AsRef<str>) -> Result<Url> {
190 Url::parse(value.as_ref()).map_err(Into::into)
191}
192
193fn referer(origin: &Url) -> String {
194 format!("{}/", origin.as_str().trim_end_matches('/'))
195}
196
197fn request_preview(request: &RequestBuilder) -> Option<(String, String)> {
198 request.try_clone().and_then(|builder| {
199 builder
200 .build()
201 .ok()
202 .map(|request| (request.method().to_string(), request.url().to_string()))
203 })
204}
205
206#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
207pub enum RetryJitter {
208 None,
209 Full,
210 #[default]
211 Bounded,
212}
213
214impl From<RetryJitter> for Jitter {
215 fn from(value: RetryJitter) -> Self {
216 match value {
217 RetryJitter::None => Self::None,
218 RetryJitter::Full => Self::Full,
219 RetryJitter::Bounded => Self::Bounded,
220 }
221 }
222}
223
224#[derive(Debug, Clone, PartialEq, Eq, Builder)]
225pub struct RetryConfig {
226 #[builder(default = 2)]
227 pub max_retries: u32,
228 #[builder(default = default_min_retry_interval())]
229 pub min_retry_interval: Duration,
230 #[builder(default = default_max_retry_interval())]
231 pub max_retry_interval: Duration,
232 #[builder(default)]
233 pub jitter: RetryJitter,
234}
235
236impl Default for RetryConfig {
237 fn default() -> Self {
238 Self::builder().build()
239 }
240}
241
242impl RetryConfig {
243 pub fn disabled() -> Self {
244 Self {
245 max_retries: 0,
246 ..Self::default()
247 }
248 }
249
250 fn validate(&self) -> Result<()> {
251 if self.min_retry_interval > self.max_retry_interval {
252 return Err(Error::InvalidRetryBounds {
253 min: self.min_retry_interval,
254 max: self.max_retry_interval,
255 });
256 }
257
258 Ok(())
259 }
260
261 fn to_policy(&self) -> ExponentialBackoff {
262 ExponentialBackoff::builder()
263 .retry_bounds(self.min_retry_interval, self.max_retry_interval)
264 .jitter(self.jitter.into())
265 .build_with_max_retries(self.max_retries)
266 }
267}
268
269#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
271pub enum AuthMode {
272 #[default]
273 Anonymous,
274 Token,
275 Session,
276 SessionAndToken,
277}
278
279#[derive(Debug, Clone, PartialEq, Eq)]
285pub struct AuthConfig {
286 mode: AuthMode,
287 auth_token: Option<String>,
288 session_id: Option<String>,
289}
290
291impl AuthConfig {
292 pub fn anonymous() -> Self {
293 Self {
294 mode: AuthMode::Anonymous,
295 auth_token: None,
296 session_id: None,
297 }
298 }
299
300 pub fn token(auth_token: impl Into<String>) -> Self {
301 Self {
302 mode: AuthMode::Token,
303 auth_token: Some(auth_token.into()),
304 session_id: None,
305 }
306 }
307
308 pub fn session(session_id: impl Into<String>) -> Self {
309 Self {
310 mode: AuthMode::Session,
311 auth_token: None,
312 session_id: Some(session_id.into()),
313 }
314 }
315
316 pub fn session_and_token(session_id: impl Into<String>, auth_token: impl Into<String>) -> Self {
317 Self {
318 mode: AuthMode::SessionAndToken,
319 auth_token: Some(auth_token.into()),
320 session_id: Some(session_id.into()),
321 }
322 }
323
324 pub fn mode(&self) -> AuthMode {
325 self.mode
326 }
327
328 fn resolve(self) -> (String, Option<String>) {
329 match self.mode {
330 AuthMode::Anonymous => (default_anonymous_auth_token(), None),
331 AuthMode::Token => (
332 self.auth_token.unwrap_or_else(default_anonymous_auth_token),
333 None,
334 ),
335 AuthMode::Session => (
336 default_anonymous_auth_token(),
337 self.session_id.filter(|value| !value.is_empty()),
338 ),
339 AuthMode::SessionAndToken => (
340 self.auth_token.unwrap_or_else(default_anonymous_auth_token),
341 self.session_id.filter(|value| !value.is_empty()),
342 ),
343 }
344 }
345}
346
347impl Default for AuthConfig {
348 fn default() -> Self {
349 Self::anonymous()
350 }
351}
352
353#[derive(Debug, Clone, PartialEq, Eq, Builder)]
354pub struct HistoryClientConfig {
355 #[builder(default = default_history_session_timeout())]
356 pub session_timeout: Duration,
357 #[builder(default = default_history_batch_concurrency())]
358 pub default_batch_concurrency: usize,
359 #[builder(default)]
360 pub default_session: TradingSession,
361 #[builder(default)]
362 pub default_adjustment: Adjustment,
363}
364
365impl Default for HistoryClientConfig {
366 fn default() -> Self {
367 Self::builder().build()
368 }
369}
370
371#[derive(Debug, Clone, PartialEq, Eq, Builder)]
372pub struct RequestBudget {
373 pub max_concurrent_http_requests: Option<usize>,
374 pub max_concurrent_websocket_sessions: Option<usize>,
375 pub min_http_interval: Option<Duration>,
376}
377
378impl Default for RequestBudget {
379 fn default() -> Self {
380 Self::builder().build()
381 }
382}
383
384impl RequestBudget {
385 pub fn disabled() -> Self {
386 Self::default()
387 }
388
389 fn validate(&self) -> Result<()> {
390 if self.max_concurrent_http_requests == Some(0) {
391 return Err(Error::InvalidRequestBudget {
392 field: "max_concurrent_http_requests",
393 });
394 }
395
396 if self.max_concurrent_websocket_sessions == Some(0) {
397 return Err(Error::InvalidRequestBudget {
398 field: "max_concurrent_websocket_sessions",
399 });
400 }
401
402 Ok(())
403 }
404}
405
406#[derive(Debug, Clone, PartialEq, Eq, Default)]
407pub enum SnapshotBatchStrategy {
408 #[default]
409 Auto,
410 SingleRequest,
411 Chunked {
412 chunk_size: usize,
413 max_concurrent_requests: usize,
414 },
415}
416
417#[derive(Debug, Clone, PartialEq, Eq, Builder)]
418pub struct SnapshotBatchConfig {
419 #[builder(default)]
420 pub strategy: SnapshotBatchStrategy,
421}
422
423impl Default for SnapshotBatchConfig {
424 fn default() -> Self {
425 Self::builder().build()
426 }
427}
428
429impl SnapshotBatchConfig {
430 fn validate(&self) -> Result<()> {
431 match self.strategy {
432 SnapshotBatchStrategy::Auto | SnapshotBatchStrategy::SingleRequest => Ok(()),
433 SnapshotBatchStrategy::Chunked {
434 chunk_size,
435 max_concurrent_requests,
436 } => {
437 if chunk_size == 0 {
438 return Err(Error::InvalidSnapshotBatchConfig {
439 field: "chunk_size",
440 });
441 }
442 if max_concurrent_requests == 0 {
443 return Err(Error::InvalidSnapshotBatchConfig {
444 field: "max_concurrent_requests",
445 });
446 }
447 Ok(())
448 }
449 }
450 }
451}
452
453#[cfg(any(
454 feature = "calendar",
455 feature = "crypto",
456 feature = "equity",
457 feature = "forex"
458))]
459#[derive(Debug, Clone, Copy, PartialEq, Eq)]
460pub(crate) struct SnapshotBatchPlan {
461 pub(crate) chunk_size: usize,
462 pub(crate) concurrency: usize,
463}
464
465pub type WebSocketConnectFuture<'a> =
466 Pin<Box<dyn Future<Output = Result<TradingViewWebSocket>> + Send + 'a>>;
467
468#[derive(Debug, Clone, PartialEq, Eq)]
469pub struct HttpRequestCompletedEvent {
470 pub method: String,
471 pub url: String,
472 pub status: u16,
473 pub elapsed_ms: u64,
474 pub authenticated: bool,
475}
476
477#[derive(Debug, Clone, PartialEq, Eq)]
478pub struct HttpRequestFailedEvent {
479 pub method: String,
480 pub url: String,
481 pub elapsed_ms: u64,
482 pub authenticated: bool,
483 pub kind: crate::error::ErrorKind,
484}
485
486#[derive(Debug, Clone, PartialEq, Eq)]
487pub struct WebSocketConnectedEvent {
488 pub url: String,
489 pub authenticated: bool,
490}
491
492#[derive(Debug, Clone, PartialEq, Eq)]
493pub struct WebSocketConnectionFailedEvent {
494 pub url: String,
495 pub authenticated: bool,
496 pub kind: crate::error::ErrorKind,
497}
498
499#[derive(Debug, Clone, Copy, PartialEq, Eq)]
500pub enum HistoryBatchMode {
501 Strict,
502 Detailed,
503}
504
505#[derive(Debug, Clone, PartialEq, Eq)]
506pub struct HistoryBatchCompletedEvent {
507 pub requested: usize,
508 pub successes: usize,
509 pub missing: usize,
510 pub failures: usize,
511 pub concurrency: usize,
512 pub mode: HistoryBatchMode,
513}
514
515#[derive(Debug, Clone, PartialEq, Eq)]
516pub enum ClientEvent {
517 HttpRequestCompleted(HttpRequestCompletedEvent),
518 HttpRequestFailed(HttpRequestFailedEvent),
519 WebSocketConnected(WebSocketConnectedEvent),
520 WebSocketConnectionFailed(WebSocketConnectionFailedEvent),
521 HistoryBatchCompleted(HistoryBatchCompletedEvent),
522}
523
524pub trait ClientObserver: std::fmt::Debug + Send + Sync {
525 fn on_event(&self, event: &ClientEvent);
526}
527
528pub trait WebSocketConnector: std::fmt::Debug + Send + Sync {
529 fn connect<'a>(
530 &'a self,
531 endpoints: &'a Endpoints,
532 user_agent: &'a str,
533 session_id: Option<&'a str>,
534 ) -> WebSocketConnectFuture<'a>;
535}
536
537#[derive(Debug, Clone, Copy, Default)]
538pub struct DefaultWebSocketConnector;
539
540impl WebSocketConnector for DefaultWebSocketConnector {
541 fn connect<'a>(
542 &'a self,
543 endpoints: &'a Endpoints,
544 user_agent: &'a str,
545 session_id: Option<&'a str>,
546 ) -> WebSocketConnectFuture<'a> {
547 Box::pin(connect_socket(endpoints, user_agent, session_id))
548 }
549}
550
551#[derive(Debug, Clone, Builder)]
552pub struct TransportConfig {
553 #[builder(default = default_timeout())]
554 pub timeout: Duration,
555 #[builder(default = RetryConfig::default())]
556 pub retry: RetryConfig,
557 #[builder(default = default_user_agent(), into)]
558 pub user_agent: String,
559 pub http_client: Option<ClientWithMiddleware>,
560 pub websocket_connector: Option<Arc<dyn WebSocketConnector>>,
561}
562
563impl Default for TransportConfig {
564 fn default() -> Self {
565 Self::builder().build()
566 }
567}
568
569#[derive(Debug)]
570struct RequestBudgetState {
571 http_limiter: Option<Arc<Semaphore>>,
572 websocket_limiter: Option<Arc<Semaphore>>,
573 http_pacer: Option<Arc<Mutex<TokioInstant>>>,
574}
575
576impl RequestBudgetState {
577 fn new(config: &RequestBudget) -> Self {
578 Self {
579 http_limiter: config
580 .max_concurrent_http_requests
581 .map(|limit| Arc::new(Semaphore::new(limit))),
582 websocket_limiter: config
583 .max_concurrent_websocket_sessions
584 .map(|limit| Arc::new(Semaphore::new(limit))),
585 http_pacer: config
586 .min_http_interval
587 .map(|_| Arc::new(Mutex::new(TokioInstant::now()))),
588 }
589 }
590}
591
592#[derive(Debug, Clone, Builder)]
593pub struct TradingViewClientConfig {
594 #[builder(default = Endpoints::default())]
595 pub endpoints: Endpoints,
596 #[builder(default = TransportConfig::default())]
597 pub transport: TransportConfig,
598 #[builder(default = AuthConfig::default())]
599 pub auth: AuthConfig,
600 #[builder(default = HistoryClientConfig::default())]
601 pub history: HistoryClientConfig,
602 #[builder(default = SnapshotBatchConfig::default())]
603 pub snapshot_batch: SnapshotBatchConfig,
604 #[builder(default = RequestBudget::default())]
605 pub request_budget: RequestBudget,
606 pub observer: Option<Arc<dyn ClientObserver>>,
607}
608
609impl Default for TradingViewClientConfig {
610 fn default() -> Self {
611 Self::builder().build()
612 }
613}
614
615impl TradingViewClientConfig {
616 pub fn backend_history() -> Self {
617 Self::builder()
618 .transport(
619 TransportConfig::builder()
620 .timeout(Duration::from_secs(60))
621 .retry(
622 RetryConfig::builder()
623 .max_retries(4)
624 .min_retry_interval(Duration::from_millis(500))
625 .max_retry_interval(Duration::from_secs(5))
626 .build(),
627 )
628 .build(),
629 )
630 .history(
631 HistoryClientConfig::builder()
632 .session_timeout(Duration::from_secs(60))
633 .default_batch_concurrency(default_backend_history_batch_concurrency())
634 .default_session(TradingSession::Regular)
635 .default_adjustment(Adjustment::Splits)
636 .build(),
637 )
638 .request_budget(
639 RequestBudget::builder()
640 .max_concurrent_http_requests(default_backend_http_budget_concurrency())
641 .max_concurrent_websocket_sessions(
642 default_backend_websocket_budget_concurrency(),
643 )
644 .min_http_interval(default_backend_http_min_interval())
645 .build(),
646 )
647 .build()
648 }
649
650 pub fn research() -> Self {
651 Self::builder()
652 .transport(
653 TransportConfig::builder()
654 .timeout(Duration::from_secs(45))
655 .retry(
656 RetryConfig::builder()
657 .max_retries(2)
658 .min_retry_interval(Duration::from_millis(250))
659 .max_retry_interval(Duration::from_secs(2))
660 .build(),
661 )
662 .build(),
663 )
664 .request_budget(
665 RequestBudget::builder()
666 .max_concurrent_http_requests(default_research_http_budget_concurrency())
667 .max_concurrent_websocket_sessions(
668 default_research_websocket_budget_concurrency(),
669 )
670 .min_http_interval(default_research_http_min_interval())
671 .build(),
672 )
673 .build()
674 }
675
676 pub fn interactive() -> Self {
677 Self::builder()
678 .transport(
679 TransportConfig::builder()
680 .timeout(Duration::from_secs(15))
681 .retry(
682 RetryConfig::builder()
683 .max_retries(1)
684 .min_retry_interval(Duration::from_millis(100))
685 .max_retry_interval(Duration::from_millis(500))
686 .build(),
687 )
688 .build(),
689 )
690 .request_budget(
691 RequestBudget::builder()
692 .max_concurrent_http_requests(default_interactive_http_budget_concurrency())
693 .max_concurrent_websocket_sessions(
694 default_interactive_websocket_budget_concurrency(),
695 )
696 .build(),
697 )
698 .build()
699 }
700}
701
702#[derive(Debug, Clone, PartialEq, Eq, Builder)]
704pub struct Endpoints {
705 #[builder(default = default_scanner_base_url())]
706 scanner_base_url: Url,
707 #[builder(default = default_symbol_search_base_url())]
708 symbol_search_base_url: Url,
709 #[builder(default = default_calendar_base_url())]
710 calendar_base_url: Url,
711 #[builder(default = default_websocket_url())]
712 websocket_url: Url,
713 #[builder(default = default_site_origin())]
714 site_origin: Url,
715 #[builder(default = default_data_origin())]
716 data_origin: Url,
717}
718
719impl Default for Endpoints {
720 fn default() -> Self {
721 Self::builder().build()
722 }
723}
724
725impl Endpoints {
726 pub fn scanner_base_url(&self) -> &Url {
727 &self.scanner_base_url
728 }
729
730 pub fn symbol_search_base_url(&self) -> &Url {
731 &self.symbol_search_base_url
732 }
733
734 pub fn calendar_base_url(&self) -> &Url {
735 &self.calendar_base_url
736 }
737
738 pub fn websocket_url(&self) -> &Url {
739 &self.websocket_url
740 }
741
742 pub fn site_origin(&self) -> &Url {
743 &self.site_origin
744 }
745
746 pub fn data_origin(&self) -> &Url {
747 &self.data_origin
748 }
749
750 pub fn with_scanner_base_url(mut self, url: impl AsRef<str>) -> Result<Self> {
751 self.scanner_base_url = parse_url(url)?;
752 Ok(self)
753 }
754
755 pub fn with_symbol_search_base_url(mut self, url: impl AsRef<str>) -> Result<Self> {
756 self.symbol_search_base_url = parse_url(url)?;
757 Ok(self)
758 }
759
760 pub fn with_calendar_base_url(mut self, url: impl AsRef<str>) -> Result<Self> {
761 self.calendar_base_url = parse_url(url)?;
762 Ok(self)
763 }
764
765 pub fn with_websocket_url(mut self, url: impl AsRef<str>) -> Result<Self> {
766 self.websocket_url = parse_url(url)?;
767 Ok(self)
768 }
769
770 pub fn with_site_origin(mut self, url: impl AsRef<str>) -> Result<Self> {
771 self.site_origin = parse_url(url)?;
772 Ok(self)
773 }
774
775 pub fn with_data_origin(mut self, url: impl AsRef<str>) -> Result<Self> {
776 self.data_origin = parse_url(url)?;
777 Ok(self)
778 }
779
780 pub fn scanner_url(&self, route: &str) -> Result<Url> {
781 self.scanner_base_url
782 .join(route.trim_start_matches('/'))
783 .map_err(Into::into)
784 }
785
786 pub fn scanner_metainfo_url(&self, market: &Market) -> Result<Url> {
787 self.scanner_url(&format!("{}/metainfo", market.as_str()))
788 }
789}
790
791#[derive(Debug, Clone)]
813pub struct TradingViewClient {
814 http: ClientWithMiddleware,
815 endpoints: Endpoints,
816 user_agent: String,
817 auth_token: String,
818 session_id: Option<String>,
819 history_config: HistoryClientConfig,
820 snapshot_batch_config: SnapshotBatchConfig,
821 request_budget: RequestBudget,
822 request_budget_state: Arc<RequestBudgetState>,
823 websocket_connector: Arc<dyn WebSocketConnector>,
824 observer: Option<Arc<dyn ClientObserver>>,
825 metainfo_cache: Arc<RwLock<HashMap<String, ScannerMetainfo>>>,
826}
827
828#[bon]
829impl TradingViewClient {
830 #[builder]
832 pub fn new(
833 #[builder(default = Endpoints::default())] endpoints: Endpoints,
834 #[builder(default = default_timeout())] timeout: Duration,
835 #[builder(default = RetryConfig::default())] retry: RetryConfig,
836 #[builder(default = HistoryClientConfig::default())] history_config: HistoryClientConfig,
837 #[builder(default = SnapshotBatchConfig::default())]
838 snapshot_batch_config: SnapshotBatchConfig,
839 #[builder(default = RequestBudget::default())] request_budget: RequestBudget,
840 #[builder(default = default_user_agent(), into)] user_agent: String,
841 #[builder(default = default_auth_token(), into)] auth_token: String,
842 #[builder(into)] session_id: Option<String>,
843 auth: Option<AuthConfig>,
844 transport_config: Option<TransportConfig>,
845 http_client: Option<ClientWithMiddleware>,
846 websocket_connector: Option<Arc<dyn WebSocketConnector>>,
847 observer: Option<Arc<dyn ClientObserver>>,
848 ) -> Result<Self> {
849 let transport_config = transport_config.unwrap_or(TransportConfig {
850 timeout,
851 retry,
852 user_agent,
853 http_client,
854 websocket_connector,
855 });
856 let TransportConfig {
857 timeout,
858 retry,
859 user_agent,
860 http_client,
861 websocket_connector,
862 } = transport_config;
863
864 let (auth_token, session_id) = auth
865 .map(AuthConfig::resolve)
866 .unwrap_or((auth_token, session_id));
867
868 request_budget.validate()?;
869 snapshot_batch_config.validate()?;
870
871 let http = if let Some(http_client) = http_client {
872 http_client
873 } else {
874 retry.validate()?;
875
876 let base_http = reqwest::Client::builder()
877 .timeout(timeout)
878 .build()
879 .map_err(Error::from)?;
880
881 if retry.max_retries == 0 {
882 ClientWithMiddleware::from(base_http)
883 } else {
884 MiddlewareClientBuilder::new(base_http)
885 .with(RetryTransientMiddleware::new_with_policy(retry.to_policy()))
886 .build()
887 }
888 };
889
890 Ok(Self {
891 http,
892 endpoints,
893 user_agent,
894 auth_token,
895 session_id,
896 history_config,
897 snapshot_batch_config,
898 request_budget: request_budget.clone(),
899 request_budget_state: Arc::new(RequestBudgetState::new(&request_budget)),
900 websocket_connector: websocket_connector
901 .unwrap_or_else(|| Arc::new(DefaultWebSocketConnector)),
902 observer,
903 metainfo_cache: Arc::new(RwLock::new(HashMap::new())),
904 })
905 }
906
907 pub fn for_backend_history() -> Result<Self> {
908 Self::from_config(TradingViewClientConfig::backend_history())
909 }
910
911 pub fn for_research() -> Result<Self> {
913 Self::from_config(TradingViewClientConfig::research())
914 }
915
916 pub fn for_interactive() -> Result<Self> {
918 Self::from_config(TradingViewClientConfig::interactive())
919 }
920
921 pub fn from_config(config: TradingViewClientConfig) -> Result<Self> {
922 match config.observer {
923 Some(observer) => Self::builder()
924 .endpoints(config.endpoints)
925 .transport_config(config.transport)
926 .auth(config.auth)
927 .history_config(config.history)
928 .snapshot_batch_config(config.snapshot_batch)
929 .request_budget(config.request_budget)
930 .observer(observer)
931 .build(),
932 None => Self::builder()
933 .endpoints(config.endpoints)
934 .transport_config(config.transport)
935 .auth(config.auth)
936 .history_config(config.history)
937 .snapshot_batch_config(config.snapshot_batch)
938 .request_budget(config.request_budget)
939 .build(),
940 }
941 }
942
943 pub fn endpoints(&self) -> &Endpoints {
944 &self.endpoints
945 }
946
947 pub(crate) fn auth_token(&self) -> &str {
948 &self.auth_token
949 }
950
951 pub(crate) fn session_id(&self) -> Option<&str> {
952 self.session_id.as_deref()
953 }
954
955 pub fn history_config(&self) -> &HistoryClientConfig {
956 &self.history_config
957 }
958
959 pub fn snapshot_batch_config(&self) -> &SnapshotBatchConfig {
960 &self.snapshot_batch_config
961 }
962
963 pub fn request_budget(&self) -> &RequestBudget {
964 &self.request_budget
965 }
966
967 pub(crate) fn effective_history_batch_concurrency(&self, requested: usize) -> usize {
968 if requested == 0 {
969 return 0;
970 }
971
972 self.request_budget
973 .max_concurrent_websocket_sessions
974 .map(|cap| requested.min(cap))
975 .unwrap_or(requested)
976 }
977
978 #[cfg(any(
979 feature = "calendar",
980 feature = "crypto",
981 feature = "equity",
982 feature = "forex"
983 ))]
984 pub(crate) fn plan_snapshot_batch(&self, symbols: usize, columns: usize) -> SnapshotBatchPlan {
985 let effective_http_cap = self
986 .request_budget
987 .max_concurrent_http_requests
988 .unwrap_or(default_snapshot_chunk_concurrency());
989 let cap = effective_http_cap.max(1);
990
991 match self.snapshot_batch_config.strategy {
992 SnapshotBatchStrategy::SingleRequest => SnapshotBatchPlan {
993 chunk_size: symbols.max(1),
994 concurrency: 1,
995 },
996 SnapshotBatchStrategy::Chunked {
997 chunk_size,
998 max_concurrent_requests,
999 } => SnapshotBatchPlan {
1000 chunk_size: chunk_size.max(1),
1001 concurrency: max_concurrent_requests.min(cap).max(1),
1002 },
1003 SnapshotBatchStrategy::Auto => {
1004 let cells = symbols.saturating_mul(columns.max(1));
1005 let auto_chunk_size = (default_snapshot_auto_target_cells() / columns.max(1))
1006 .clamp(100, default_snapshot_chunk_size());
1007
1008 if symbols <= default_snapshot_chunk_size()
1009 || (symbols <= default_snapshot_auto_single_request_limit()
1010 && cells <= default_snapshot_auto_target_cells())
1011 {
1012 SnapshotBatchPlan {
1013 chunk_size: symbols.max(1),
1014 concurrency: 1,
1015 }
1016 } else {
1017 SnapshotBatchPlan {
1018 chunk_size: auto_chunk_size.max(1),
1019 concurrency: default_snapshot_chunk_concurrency().min(cap).max(1),
1020 }
1021 }
1022 }
1023 }
1024 }
1025
1026 pub async fn scan(&self, query: &ScanQuery) -> Result<ScanResponse> {
1053 let route = query.route_segment();
1054 #[cfg(feature = "tracing")]
1055 debug!(
1056 target: "tvdata_rs::scan",
1057 route = %route,
1058 columns = query.columns.len(),
1059 markets = query.markets.len(),
1060 "executing scanner query",
1061 );
1062
1063 let raw: RawScanResponse = self
1064 .execute_json(
1065 self.request(self.http.post(self.endpoints.scanner_url(&route)?))?
1066 .json(query),
1067 )
1068 .await?;
1069
1070 let response = raw.into_response()?;
1071 #[cfg(feature = "tracing")]
1072 debug!(
1073 target: "tvdata_rs::scan",
1074 route = %route,
1075 rows = response.rows.len(),
1076 "scanner query completed",
1077 );
1078 Ok(response)
1079 }
1080
1081 pub async fn validate_scan_query(&self, query: &ScanQuery) -> Result<ScanValidationReport> {
1106 let route_segment = query.route_segment();
1107 let markets = validation_markets(query)?;
1108 #[cfg(feature = "tracing")]
1109 debug!(
1110 target: "tvdata_rs::scan",
1111 route = %route_segment,
1112 columns = query.columns.len(),
1113 markets = markets.len(),
1114 "validating scanner query against live metainfo",
1115 );
1116 let mut market_metainfo = Vec::with_capacity(markets.len());
1117
1118 for market in &markets {
1119 market_metainfo.push((market.clone(), self.cached_metainfo(market).await?));
1120 }
1121
1122 let mut supported_columns = Vec::new();
1123 let mut partially_supported_columns = Vec::new();
1124 let mut unsupported_columns = Vec::new();
1125 let mut seen = HashSet::new();
1126
1127 for column in &query.columns {
1128 if !seen.insert(column.as_str().to_owned()) {
1129 continue;
1130 }
1131
1132 let mut supported_markets = Vec::new();
1133 let mut unsupported_markets = Vec::new();
1134
1135 for (market, metainfo) in &market_metainfo {
1136 if supports_column_for_market(market, metainfo, column.as_str()) {
1137 supported_markets.push(market.clone());
1138 } else {
1139 unsupported_markets.push(market.clone());
1140 }
1141 }
1142
1143 match (supported_markets.is_empty(), unsupported_markets.is_empty()) {
1144 (true, false) => unsupported_columns.push(column.clone()),
1145 (false, true) => supported_columns.push(column.clone()),
1146 (false, false) => partially_supported_columns.push(PartiallySupportedColumn {
1147 column: column.clone(),
1148 supported_markets,
1149 unsupported_markets,
1150 }),
1151 (true, true) => {}
1152 }
1153 }
1154
1155 let report = ScanValidationReport {
1156 route_segment,
1157 requested_markets: markets,
1158 supported_columns,
1159 partially_supported_columns,
1160 unsupported_columns,
1161 };
1162
1163 #[cfg(feature = "tracing")]
1164 debug!(
1165 target: "tvdata_rs::scan",
1166 route = %report.route_segment,
1167 supported = report.supported_columns.len(),
1168 partial = report.partially_supported_columns.len(),
1169 unsupported = report.unsupported_columns.len(),
1170 "scanner validation completed",
1171 );
1172
1173 Ok(report)
1174 }
1175
1176 pub async fn scan_validated(&self, query: &ScanQuery) -> Result<ScanResponse> {
1199 let report = self.validate_scan_query(query).await?;
1200 if !report.is_strictly_supported() {
1201 let fields = report
1202 .strict_violation_column_names()
1203 .into_iter()
1204 .map(str::to_owned)
1205 .collect();
1206 return Err(Error::UnsupportedScanFields {
1207 route: report.route_segment,
1208 fields,
1209 });
1210 }
1211
1212 self.scan(query).await
1213 }
1214
1215 pub async fn filter_scan_query(
1242 &self,
1243 query: &ScanQuery,
1244 ) -> Result<(ScanQuery, ScanValidationReport)> {
1245 let report = self.validate_scan_query(query).await?;
1246 let filtered = report.filtered_query(query);
1247
1248 if filtered.columns.is_empty() {
1249 let fields = report
1250 .strict_violation_column_names()
1251 .into_iter()
1252 .map(str::to_owned)
1253 .collect();
1254 return Err(Error::UnsupportedScanFields {
1255 route: report.route_segment,
1256 fields,
1257 });
1258 }
1259
1260 Ok((filtered, report))
1261 }
1262
1263 pub async fn scan_supported(&self, query: &ScanQuery) -> Result<ScanResponse> {
1286 let (filtered, _) = self.filter_scan_query(query).await?;
1287 self.scan(&filtered).await
1288 }
1289
1290 pub async fn metainfo(&self, market: impl Into<Market>) -> Result<ScannerMetainfo> {
1310 let market = market.into();
1311 self.cached_metainfo(&market).await
1312 }
1313
1314 #[cfg(feature = "search")]
1333 pub async fn search(&self, request: &SearchRequest) -> Result<Vec<SearchHit>> {
1334 Ok(self.search_response(request).await?.hits)
1335 }
1336
1337 #[cfg(feature = "search")]
1339 pub async fn search_equities(&self, text: impl Into<String>) -> Result<Vec<SearchHit>> {
1340 Ok(self.search_equities_response(text).await?.hits)
1341 }
1342
1343 #[cfg(feature = "search")]
1345 pub async fn search_equities_response(
1346 &self,
1347 text: impl Into<String>,
1348 ) -> Result<SearchResponse> {
1349 self.search_response(&SearchRequest::equities(text)).await
1350 }
1351
1352 #[cfg(feature = "search")]
1354 pub async fn search_forex(&self, text: impl Into<String>) -> Result<Vec<SearchHit>> {
1355 Ok(self.search_forex_response(text).await?.hits)
1356 }
1357
1358 #[cfg(feature = "search")]
1360 pub async fn search_forex_response(&self, text: impl Into<String>) -> Result<SearchResponse> {
1361 self.search_response(&SearchRequest::forex(text)).await
1362 }
1363
1364 #[cfg(feature = "search")]
1366 pub async fn search_crypto(&self, text: impl Into<String>) -> Result<Vec<SearchHit>> {
1367 Ok(self.search_crypto_response(text).await?.hits)
1368 }
1369
1370 #[cfg(feature = "search")]
1372 pub async fn search_crypto_response(&self, text: impl Into<String>) -> Result<SearchResponse> {
1373 self.search_response(&SearchRequest::crypto(text)).await
1374 }
1375
1376 #[cfg(feature = "search")]
1382 pub async fn search_options(&self, text: impl Into<String>) -> Result<Vec<SearchHit>> {
1383 Ok(self.search_options_response(text).await?.hits)
1384 }
1385
1386 #[cfg(feature = "search")]
1388 pub async fn search_options_response(&self, text: impl Into<String>) -> Result<SearchResponse> {
1389 let response = self.search_response(&SearchRequest::options(text)).await?;
1390 Ok(response.filtered(SearchHit::is_option_like))
1391 }
1392
1393 #[cfg(feature = "search")]
1416 pub async fn search_response(&self, request: &SearchRequest) -> Result<SearchResponse> {
1417 if request.text.trim().is_empty() {
1418 return Err(Error::EmptySearchQuery);
1419 }
1420
1421 #[cfg(feature = "tracing")]
1422 debug!(
1423 target: "tvdata_rs::search",
1424 text_len = request.text.len(),
1425 exchange = request.exchange.as_deref().unwrap_or(""),
1426 search_type = request.instrument_type.as_deref().unwrap_or(""),
1427 start = request.start,
1428 "executing TradingView symbol search",
1429 );
1430
1431 let raw: RawSearchResponse = self
1432 .execute_json(
1433 self.request(self.http.get(self.endpoints.symbol_search_base_url.clone()))?
1434 .query(&request.to_query_pairs()),
1435 )
1436 .await?;
1437
1438 let response = sanitize_response(raw);
1439 #[cfg(feature = "tracing")]
1440 debug!(
1441 target: "tvdata_rs::search",
1442 hits = response.hits.len(),
1443 symbols_remaining = response.symbols_remaining,
1444 "TradingView symbol search completed",
1445 );
1446 Ok(response)
1447 }
1448
1449 #[cfg(feature = "economics")]
1468 pub async fn economic_calendar(
1469 &self,
1470 request: &EconomicCalendarRequest,
1471 ) -> Result<EconomicCalendarResponse> {
1472 #[cfg(feature = "tracing")]
1473 debug!(
1474 target: "tvdata_rs::calendar",
1475 from = %request.from,
1476 to = %request.to,
1477 "executing economic calendar request",
1478 );
1479
1480 let raw: RawEconomicCalendarResponse = self
1481 .execute_json(
1482 self.request(self.http.get(self.endpoints.calendar_base_url().clone()))?
1483 .query(&request.to_query_pairs()?),
1484 )
1485 .await?;
1486
1487 let response = sanitize_calendar(raw);
1488 #[cfg(feature = "tracing")]
1489 debug!(
1490 target: "tvdata_rs::calendar",
1491 events = response.events.len(),
1492 status = response.status.as_deref().unwrap_or(""),
1493 "economic calendar request completed",
1494 );
1495 Ok(response)
1496 }
1497
1498 #[cfg(feature = "calendar")]
1521 pub async fn earnings_calendar(
1522 &self,
1523 request: &CalendarWindowRequest,
1524 ) -> Result<Vec<EarningsCalendarEntry>> {
1525 self.corporate_earnings_calendar(request).await
1526 }
1527
1528 #[cfg(feature = "calendar")]
1550 pub async fn dividend_calendar(
1551 &self,
1552 request: &DividendCalendarRequest,
1553 ) -> Result<Vec<DividendCalendarEntry>> {
1554 self.corporate_dividend_calendar(request).await
1555 }
1556
1557 #[cfg(feature = "calendar")]
1576 pub async fn ipo_calendar(
1577 &self,
1578 request: &CalendarWindowRequest,
1579 ) -> Result<Vec<IpoCalendarEntry>> {
1580 self.corporate_ipo_calendar(request).await
1581 }
1582
1583 pub async fn history(&self, request: &HistoryRequest) -> Result<HistorySeries> {
1604 let _websocket_budget = self.acquire_websocket_slot().await?;
1605
1606 #[cfg(feature = "tracing")]
1607 debug!(
1608 target: "tvdata_rs::history",
1609 symbol = %request.symbol.as_str(),
1610 interval = request.interval.as_code(),
1611 bars = request.bars,
1612 fetch_all = request.fetch_all,
1613 session = request.session.as_code(),
1614 adjustment = request.adjustment.as_code(),
1615 authenticated = self.session_id().is_some(),
1616 "fetching TradingView history",
1617 );
1618
1619 let series = fetch_history_with_timeout_for_client(
1620 self,
1621 request,
1622 self.history_config.session_timeout,
1623 )
1624 .await?;
1625
1626 #[cfg(feature = "tracing")]
1627 debug!(
1628 target: "tvdata_rs::history",
1629 symbol = %series.symbol.as_str(),
1630 bars = series.bars.len(),
1631 authenticated = series.provenance.authenticated,
1632 "TradingView history fetch completed",
1633 );
1634
1635 Ok(series)
1636 }
1637
1638 async fn execute_json<T>(&self, request: RequestBuilder) -> Result<T>
1639 where
1640 T: DeserializeOwned,
1641 {
1642 let body = self.execute_text(request).await?;
1643 serde_json::from_str(&body).map_err(Into::into)
1644 }
1645
1646 async fn execute_text(&self, request: RequestBuilder) -> Result<String> {
1647 let _http_budget = self.acquire_http_slot().await?;
1648
1649 let preview = request_preview(&request);
1650 let started_at = Instant::now();
1651 let response = match request.send().await {
1652 Ok(response) => response,
1653 Err(error) => {
1654 self.emit_event(ClientEvent::HttpRequestFailed(HttpRequestFailedEvent {
1655 method: preview
1656 .as_ref()
1657 .map(|(method, _)| method.clone())
1658 .unwrap_or_else(|| "UNKNOWN".to_owned()),
1659 url: preview
1660 .as_ref()
1661 .map(|(_, url)| url.clone())
1662 .unwrap_or_else(|| "<opaque>".to_owned()),
1663 elapsed_ms: started_at.elapsed().as_millis() as u64,
1664 authenticated: self.session_id().is_some(),
1665 kind: crate::error::ErrorKind::Transport,
1666 }));
1667 #[cfg(feature = "tracing")]
1668 warn!(
1669 target: "tvdata_rs::http",
1670 method = preview.as_ref().map(|(method, _)| method.as_str()).unwrap_or("UNKNOWN"),
1671 url = preview.as_ref().map(|(_, url)| url.as_str()).unwrap_or("<opaque>"),
1672 elapsed_ms = started_at.elapsed().as_millis() as u64,
1673 error = %error,
1674 "TradingView HTTP request failed before receiving a response",
1675 );
1676 return Err(Error::from(error));
1677 }
1678 };
1679 let status = response.status();
1680 let body = match response.text().await {
1681 Ok(body) => body,
1682 Err(error) => {
1683 self.emit_event(ClientEvent::HttpRequestFailed(HttpRequestFailedEvent {
1684 method: preview
1685 .as_ref()
1686 .map(|(method, _)| method.clone())
1687 .unwrap_or_else(|| "UNKNOWN".to_owned()),
1688 url: preview
1689 .as_ref()
1690 .map(|(_, url)| url.clone())
1691 .unwrap_or_else(|| "<opaque>".to_owned()),
1692 elapsed_ms: started_at.elapsed().as_millis() as u64,
1693 authenticated: self.session_id().is_some(),
1694 kind: crate::error::ErrorKind::Transport,
1695 }));
1696 #[cfg(feature = "tracing")]
1697 warn!(
1698 target: "tvdata_rs::http",
1699 method = preview.as_ref().map(|(method, _)| method.as_str()).unwrap_or("UNKNOWN"),
1700 url = preview.as_ref().map(|(_, url)| url.as_str()).unwrap_or("<opaque>"),
1701 status = status.as_u16(),
1702 elapsed_ms = started_at.elapsed().as_millis() as u64,
1703 error = %error,
1704 "TradingView HTTP response body could not be read",
1705 );
1706 return Err(Error::from(error));
1707 }
1708 };
1709
1710 self.emit_event(ClientEvent::HttpRequestCompleted(
1711 HttpRequestCompletedEvent {
1712 method: preview
1713 .as_ref()
1714 .map(|(method, _)| method.clone())
1715 .unwrap_or_else(|| "UNKNOWN".to_owned()),
1716 url: preview
1717 .as_ref()
1718 .map(|(_, url)| url.clone())
1719 .unwrap_or_else(|| "<opaque>".to_owned()),
1720 status: status.as_u16(),
1721 elapsed_ms: started_at.elapsed().as_millis() as u64,
1722 authenticated: self.session_id().is_some(),
1723 },
1724 ));
1725
1726 #[cfg(feature = "tracing")]
1727 debug!(
1728 target: "tvdata_rs::http",
1729 method = preview.as_ref().map(|(method, _)| method.as_str()).unwrap_or("UNKNOWN"),
1730 url = preview.as_ref().map(|(_, url)| url.as_str()).unwrap_or("<opaque>"),
1731 status = status.as_u16(),
1732 body_bytes = body.len(),
1733 elapsed_ms = started_at.elapsed().as_millis() as u64,
1734 "TradingView HTTP request completed",
1735 );
1736
1737 if !status.is_success() {
1738 #[cfg(feature = "tracing")]
1739 warn!(
1740 target: "tvdata_rs::http",
1741 method = preview.as_ref().map(|(method, _)| method.as_str()).unwrap_or("UNKNOWN"),
1742 url = preview.as_ref().map(|(_, url)| url.as_str()).unwrap_or("<opaque>"),
1743 status = status.as_u16(),
1744 elapsed_ms = started_at.elapsed().as_millis() as u64,
1745 "TradingView HTTP request returned non-success status",
1746 );
1747 return Err(Error::ApiStatus { status, body });
1748 }
1749
1750 Ok(body)
1751 }
1752
1753 fn request(&self, request: RequestBuilder) -> Result<RequestBuilder> {
1754 let request = request
1755 .header(
1756 ORIGIN,
1757 HeaderValue::from_str(self.endpoints.site_origin.as_str())
1758 .map_err(|_| Error::Protocol("invalid site origin configured for request"))?,
1759 )
1760 .header(
1761 REFERER,
1762 HeaderValue::from_str(&referer(&self.endpoints.site_origin))
1763 .map_err(|_| Error::Protocol("invalid referer configured for request"))?,
1764 )
1765 .header(
1766 USER_AGENT,
1767 HeaderValue::from_str(&self.user_agent)
1768 .map_err(|_| Error::Protocol("invalid user agent configured for request"))?,
1769 );
1770
1771 let request = if let Some(session_id) = self.session_id.as_deref() {
1772 request.header(COOKIE, cookie_header_value(session_id)?)
1773 } else {
1774 request
1775 };
1776
1777 Ok(request)
1778 }
1779
1780 async fn acquire_http_slot(&self) -> Result<Option<OwnedSemaphorePermit>> {
1781 let permit = match self.request_budget_state.http_limiter.as_ref() {
1782 Some(limiter) => Some(
1783 limiter
1784 .clone()
1785 .acquire_owned()
1786 .await
1787 .map_err(|_| Error::Protocol("http request budget closed"))?,
1788 ),
1789 None => None,
1790 };
1791
1792 if let (Some(pacer), Some(min_interval)) = (
1793 self.request_budget_state.http_pacer.as_ref(),
1794 self.request_budget.min_http_interval,
1795 ) {
1796 let mut next_allowed_at = pacer.lock().await;
1797 let now = TokioInstant::now();
1798 if *next_allowed_at > now {
1799 sleep_until(*next_allowed_at).await;
1800 }
1801 *next_allowed_at = TokioInstant::now() + min_interval;
1802 }
1803
1804 Ok(permit)
1805 }
1806
1807 pub(crate) async fn acquire_websocket_slot(&self) -> Result<Option<OwnedSemaphorePermit>> {
1808 match self.request_budget_state.websocket_limiter.as_ref() {
1809 Some(limiter) => limiter
1810 .clone()
1811 .acquire_owned()
1812 .await
1813 .map(Some)
1814 .map_err(|_| Error::Protocol("websocket request budget closed")),
1815 None => Ok(None),
1816 }
1817 }
1818
1819 pub(crate) fn emit_event(&self, event: ClientEvent) {
1820 if let Some(observer) = self.observer.as_ref() {
1821 observer.on_event(&event);
1822 }
1823 }
1824
1825 pub(crate) async fn connect_socket(&self) -> Result<TradingViewWebSocket> {
1826 let authenticated = self.session_id().is_some();
1827 let url = self.endpoints().websocket_url().to_string();
1828 let result = self
1829 .websocket_connector
1830 .connect(self.endpoints(), &self.user_agent, self.session_id())
1831 .await;
1832
1833 match &result {
1834 Ok(_) => self.emit_event(ClientEvent::WebSocketConnected(WebSocketConnectedEvent {
1835 url,
1836 authenticated,
1837 })),
1838 Err(error) => {
1839 self.emit_event(ClientEvent::WebSocketConnectionFailed(
1840 WebSocketConnectionFailedEvent {
1841 url,
1842 authenticated,
1843 kind: error.kind(),
1844 },
1845 ));
1846 }
1847 }
1848
1849 result
1850 }
1851
1852 async fn cached_metainfo(&self, market: &Market) -> Result<ScannerMetainfo> {
1853 if let Some(cached) = self
1854 .metainfo_cache
1855 .read()
1856 .await
1857 .get(market.as_str())
1858 .cloned()
1859 {
1860 #[cfg(feature = "tracing")]
1861 debug!(
1862 target: "tvdata_rs::metainfo",
1863 market = market.as_str(),
1864 "scanner metainfo cache hit",
1865 );
1866 return Ok(cached);
1867 }
1868
1869 #[cfg(feature = "tracing")]
1870 debug!(
1871 target: "tvdata_rs::metainfo",
1872 market = market.as_str(),
1873 "scanner metainfo cache miss",
1874 );
1875
1876 let metainfo: ScannerMetainfo = self
1877 .execute_json(
1878 self.request(self.http.get(self.endpoints.scanner_metainfo_url(market)?))?,
1879 )
1880 .await?;
1881
1882 self.metainfo_cache
1883 .write()
1884 .await
1885 .insert(market.as_str().to_owned(), metainfo.clone());
1886
1887 #[cfg(feature = "tracing")]
1888 debug!(
1889 target: "tvdata_rs::metainfo",
1890 market = market.as_str(),
1891 fields = metainfo.fields.len(),
1892 "scanner metainfo cached",
1893 );
1894
1895 Ok(metainfo)
1896 }
1897}
1898
1899fn validation_markets(query: &ScanQuery) -> Result<Vec<Market>> {
1900 if query.markets.is_empty() {
1901 return Err(Error::ScanValidationUnavailable {
1902 reason: "query does not specify any markets".to_owned(),
1903 });
1904 }
1905
1906 Ok(query.markets.clone())
1907}
1908
1909fn supports_column_for_market(market: &Market, metainfo: &ScannerMetainfo, column: &str) -> bool {
1910 metainfo.supports_field(column)
1911 || market_to_screener_kind(market)
1912 .and_then(|kind| embedded_registry().find_by_api_name(kind, column))
1913 .is_some()
1914}
1915
1916fn market_to_screener_kind(market: &Market) -> Option<ScreenerKind> {
1917 match market.as_str() {
1918 "crypto" => Some(ScreenerKind::Crypto),
1919 "forex" => Some(ScreenerKind::Forex),
1920 "bond" | "bonds" => Some(ScreenerKind::Bond),
1921 "futures" => Some(ScreenerKind::Futures),
1922 "coin" => Some(ScreenerKind::Coin),
1923 "options" | "economics2" | "cfd" => None,
1924 _ => Some(ScreenerKind::Stock),
1925 }
1926}
1927
1928#[cfg(test)]
1929mod tests;