1use std::collections::{HashMap, HashSet};
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use bon::{Builder, bon};
8use reqwest::header::{COOKIE, HeaderValue, ORIGIN, REFERER, USER_AGENT};
9use reqwest_middleware::{
10 ClientBuilder as MiddlewareClientBuilder, ClientWithMiddleware, RequestBuilder,
11};
12use reqwest_retry::{Jitter, RetryTransientMiddleware, policies::ExponentialBackoff};
13use serde::de::DeserializeOwned;
14use tokio::sync::{Mutex, OwnedSemaphorePermit, RwLock, Semaphore};
15use tokio::time::{Instant as TokioInstant, sleep_until};
16#[cfg(feature = "tracing")]
17use tracing::{debug, warn};
18use url::Url;
19
20#[cfg(feature = "calendar")]
21use crate::calendar::{
22 CalendarWindowRequest, DividendCalendarEntry, DividendCalendarRequest, EarningsCalendarEntry,
23 IpoCalendarEntry,
24};
25#[cfg(feature = "economics")]
26use crate::economics::{
27 EconomicCalendarRequest, EconomicCalendarResponse, RawEconomicCalendarResponse,
28 sanitize_calendar,
29};
30use crate::error::{Error, Result};
31use crate::history::{
32 Adjustment, HistoryRequest, HistorySeries, TradingSession,
33 fetch_history_with_timeout_for_client,
34};
35use crate::scanner::{
36 Market, PartiallySupportedColumn, RawScanResponse, ScanQuery, ScanResponse,
37 ScanValidationReport, ScannerMetainfo, ScreenerKind, embedded_registry,
38};
39#[cfg(feature = "search")]
40use crate::search::{
41 RawSearchResponse, SearchHit, SearchRequest, SearchResponse, sanitize_response,
42};
43use crate::transport::websocket::{TradingViewWebSocket, connect_socket};
44
45const DEFAULT_USER_AGENT: &str =
46 "tvdata-rs/0.1 (+https://github.com/deepentropy/tvscreener reference)";
47const DEFAULT_AUTH_TOKEN: &str = "unauthorized_user_token";
48
49fn default_scanner_base_url() -> Url {
50 Url::parse("https://scanner.tradingview.com").expect("default scanner endpoint must be valid")
51}
52
53fn default_symbol_search_base_url() -> Url {
54 Url::parse("https://symbol-search.tradingview.com/symbol_search/v3/")
55 .expect("default symbol search endpoint must be valid")
56}
57
58fn default_calendar_base_url() -> Url {
59 Url::parse("https://chartevents-reuters.tradingview.com/events")
60 .expect("default calendar endpoint must be valid")
61}
62
63fn default_websocket_url() -> Url {
64 Url::parse("wss://data.tradingview.com/socket.io/websocket")
65 .expect("default websocket endpoint must be valid")
66}
67
68fn default_site_origin() -> Url {
69 Url::parse("https://www.tradingview.com").expect("default site origin must be valid")
70}
71
72fn default_data_origin() -> Url {
73 Url::parse("https://data.tradingview.com").expect("default data origin must be valid")
74}
75
76fn default_timeout() -> Duration {
77 Duration::from_secs(30)
78}
79
80fn default_history_session_timeout() -> Duration {
81 Duration::from_secs(30)
82}
83
84fn default_history_batch_concurrency() -> usize {
85 4
86}
87
88fn default_backend_http_budget_concurrency() -> usize {
89 8
90}
91
92fn default_backend_websocket_budget_concurrency() -> usize {
93 8
94}
95
96fn default_backend_http_min_interval() -> Duration {
97 Duration::from_millis(50)
98}
99
100fn default_research_http_budget_concurrency() -> usize {
101 4
102}
103
104fn default_research_websocket_budget_concurrency() -> usize {
105 4
106}
107
108fn default_research_http_min_interval() -> Duration {
109 Duration::from_millis(25)
110}
111
112fn default_interactive_http_budget_concurrency() -> usize {
113 2
114}
115
116fn default_interactive_websocket_budget_concurrency() -> usize {
117 2
118}
119
120fn default_user_agent() -> String {
121 DEFAULT_USER_AGENT.to_owned()
122}
123
124fn default_auth_token() -> String {
125 DEFAULT_AUTH_TOKEN.to_owned()
126}
127
128fn default_anonymous_auth_token() -> String {
129 DEFAULT_AUTH_TOKEN.to_owned()
130}
131
132fn cookie_header_value(session_id: &str) -> Result<HeaderValue> {
133 HeaderValue::from_str(&format!("sessionid={session_id}"))
134 .map_err(|_| Error::Protocol("invalid session id configured for cookie header"))
135}
136
137fn default_min_retry_interval() -> Duration {
138 Duration::from_millis(250)
139}
140
141fn default_max_retry_interval() -> Duration {
142 Duration::from_secs(2)
143}
144
145fn parse_url(value: impl AsRef<str>) -> Result<Url> {
146 Url::parse(value.as_ref()).map_err(Into::into)
147}
148
149fn referer(origin: &Url) -> String {
150 format!("{}/", origin.as_str().trim_end_matches('/'))
151}
152
153fn request_preview(request: &RequestBuilder) -> Option<(String, String)> {
154 request.try_clone().and_then(|builder| {
155 builder
156 .build()
157 .ok()
158 .map(|request| (request.method().to_string(), request.url().to_string()))
159 })
160}
161
162#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
163pub enum RetryJitter {
164 None,
165 Full,
166 #[default]
167 Bounded,
168}
169
170impl From<RetryJitter> for Jitter {
171 fn from(value: RetryJitter) -> Self {
172 match value {
173 RetryJitter::None => Self::None,
174 RetryJitter::Full => Self::Full,
175 RetryJitter::Bounded => Self::Bounded,
176 }
177 }
178}
179
180#[derive(Debug, Clone, PartialEq, Eq, Builder)]
181pub struct RetryConfig {
182 #[builder(default = 2)]
183 pub max_retries: u32,
184 #[builder(default = default_min_retry_interval())]
185 pub min_retry_interval: Duration,
186 #[builder(default = default_max_retry_interval())]
187 pub max_retry_interval: Duration,
188 #[builder(default)]
189 pub jitter: RetryJitter,
190}
191
192impl Default for RetryConfig {
193 fn default() -> Self {
194 Self::builder().build()
195 }
196}
197
198impl RetryConfig {
199 pub fn disabled() -> Self {
200 Self {
201 max_retries: 0,
202 ..Self::default()
203 }
204 }
205
206 fn validate(&self) -> Result<()> {
207 if self.min_retry_interval > self.max_retry_interval {
208 return Err(Error::InvalidRetryBounds {
209 min: self.min_retry_interval,
210 max: self.max_retry_interval,
211 });
212 }
213
214 Ok(())
215 }
216
217 fn to_policy(&self) -> ExponentialBackoff {
218 ExponentialBackoff::builder()
219 .retry_bounds(self.min_retry_interval, self.max_retry_interval)
220 .jitter(self.jitter.into())
221 .build_with_max_retries(self.max_retries)
222 }
223}
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
227pub enum AuthMode {
228 #[default]
229 Anonymous,
230 Token,
231 Session,
232 SessionAndToken,
233}
234
235#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct AuthConfig {
242 mode: AuthMode,
243 auth_token: Option<String>,
244 session_id: Option<String>,
245}
246
247impl AuthConfig {
248 pub fn anonymous() -> Self {
249 Self {
250 mode: AuthMode::Anonymous,
251 auth_token: None,
252 session_id: None,
253 }
254 }
255
256 pub fn token(auth_token: impl Into<String>) -> Self {
257 Self {
258 mode: AuthMode::Token,
259 auth_token: Some(auth_token.into()),
260 session_id: None,
261 }
262 }
263
264 pub fn session(session_id: impl Into<String>) -> Self {
265 Self {
266 mode: AuthMode::Session,
267 auth_token: None,
268 session_id: Some(session_id.into()),
269 }
270 }
271
272 pub fn session_and_token(session_id: impl Into<String>, auth_token: impl Into<String>) -> Self {
273 Self {
274 mode: AuthMode::SessionAndToken,
275 auth_token: Some(auth_token.into()),
276 session_id: Some(session_id.into()),
277 }
278 }
279
280 pub fn mode(&self) -> AuthMode {
281 self.mode
282 }
283
284 fn resolve(self) -> (String, Option<String>) {
285 match self.mode {
286 AuthMode::Anonymous => (default_anonymous_auth_token(), None),
287 AuthMode::Token => (
288 self.auth_token.unwrap_or_else(default_anonymous_auth_token),
289 None,
290 ),
291 AuthMode::Session => (
292 default_anonymous_auth_token(),
293 self.session_id.filter(|value| !value.is_empty()),
294 ),
295 AuthMode::SessionAndToken => (
296 self.auth_token.unwrap_or_else(default_anonymous_auth_token),
297 self.session_id.filter(|value| !value.is_empty()),
298 ),
299 }
300 }
301}
302
303impl Default for AuthConfig {
304 fn default() -> Self {
305 Self::anonymous()
306 }
307}
308
309#[derive(Debug, Clone, PartialEq, Eq, Builder)]
310pub struct HistoryClientConfig {
311 #[builder(default = default_history_session_timeout())]
312 pub session_timeout: Duration,
313 #[builder(default = default_history_batch_concurrency())]
314 pub default_batch_concurrency: usize,
315 #[builder(default)]
316 pub default_session: TradingSession,
317 #[builder(default)]
318 pub default_adjustment: Adjustment,
319}
320
321impl Default for HistoryClientConfig {
322 fn default() -> Self {
323 Self::builder().build()
324 }
325}
326
327#[derive(Debug, Clone, PartialEq, Eq, Builder)]
328pub struct RequestBudget {
329 pub max_concurrent_http_requests: Option<usize>,
330 pub max_concurrent_websocket_sessions: Option<usize>,
331 pub min_http_interval: Option<Duration>,
332}
333
334impl Default for RequestBudget {
335 fn default() -> Self {
336 Self::builder().build()
337 }
338}
339
340impl RequestBudget {
341 pub fn disabled() -> Self {
342 Self::default()
343 }
344
345 fn validate(&self) -> Result<()> {
346 if self.max_concurrent_http_requests == Some(0) {
347 return Err(Error::InvalidRequestBudget {
348 field: "max_concurrent_http_requests",
349 });
350 }
351
352 if self.max_concurrent_websocket_sessions == Some(0) {
353 return Err(Error::InvalidRequestBudget {
354 field: "max_concurrent_websocket_sessions",
355 });
356 }
357
358 Ok(())
359 }
360}
361
362pub type WebSocketConnectFuture<'a> =
363 Pin<Box<dyn Future<Output = Result<TradingViewWebSocket>> + Send + 'a>>;
364
365#[derive(Debug, Clone, PartialEq, Eq)]
366pub struct HttpRequestCompletedEvent {
367 pub method: String,
368 pub url: String,
369 pub status: u16,
370 pub elapsed_ms: u64,
371 pub authenticated: bool,
372}
373
374#[derive(Debug, Clone, PartialEq, Eq)]
375pub struct HttpRequestFailedEvent {
376 pub method: String,
377 pub url: String,
378 pub elapsed_ms: u64,
379 pub authenticated: bool,
380 pub kind: crate::error::ErrorKind,
381}
382
383#[derive(Debug, Clone, PartialEq, Eq)]
384pub struct WebSocketConnectedEvent {
385 pub url: String,
386 pub authenticated: bool,
387}
388
389#[derive(Debug, Clone, PartialEq, Eq)]
390pub struct WebSocketConnectionFailedEvent {
391 pub url: String,
392 pub authenticated: bool,
393 pub kind: crate::error::ErrorKind,
394}
395
396#[derive(Debug, Clone, Copy, PartialEq, Eq)]
397pub enum HistoryBatchMode {
398 Strict,
399 Detailed,
400}
401
402#[derive(Debug, Clone, PartialEq, Eq)]
403pub struct HistoryBatchCompletedEvent {
404 pub requested: usize,
405 pub successes: usize,
406 pub missing: usize,
407 pub failures: usize,
408 pub concurrency: usize,
409 pub mode: HistoryBatchMode,
410}
411
412#[derive(Debug, Clone, PartialEq, Eq)]
413pub enum ClientEvent {
414 HttpRequestCompleted(HttpRequestCompletedEvent),
415 HttpRequestFailed(HttpRequestFailedEvent),
416 WebSocketConnected(WebSocketConnectedEvent),
417 WebSocketConnectionFailed(WebSocketConnectionFailedEvent),
418 HistoryBatchCompleted(HistoryBatchCompletedEvent),
419}
420
421pub trait ClientObserver: std::fmt::Debug + Send + Sync {
422 fn on_event(&self, event: &ClientEvent);
423}
424
425pub trait WebSocketConnector: std::fmt::Debug + Send + Sync {
426 fn connect<'a>(
427 &'a self,
428 endpoints: &'a Endpoints,
429 user_agent: &'a str,
430 session_id: Option<&'a str>,
431 ) -> WebSocketConnectFuture<'a>;
432}
433
434#[derive(Debug, Clone, Copy, Default)]
435pub struct DefaultWebSocketConnector;
436
437impl WebSocketConnector for DefaultWebSocketConnector {
438 fn connect<'a>(
439 &'a self,
440 endpoints: &'a Endpoints,
441 user_agent: &'a str,
442 session_id: Option<&'a str>,
443 ) -> WebSocketConnectFuture<'a> {
444 Box::pin(connect_socket(endpoints, user_agent, session_id))
445 }
446}
447
448#[derive(Debug, Clone, Builder)]
449pub struct TransportConfig {
450 #[builder(default = default_timeout())]
451 pub timeout: Duration,
452 #[builder(default = RetryConfig::default())]
453 pub retry: RetryConfig,
454 #[builder(default = default_user_agent(), into)]
455 pub user_agent: String,
456 pub http_client: Option<ClientWithMiddleware>,
457 pub websocket_connector: Option<Arc<dyn WebSocketConnector>>,
458}
459
460impl Default for TransportConfig {
461 fn default() -> Self {
462 Self::builder().build()
463 }
464}
465
466#[derive(Debug)]
467struct RequestBudgetState {
468 http_limiter: Option<Arc<Semaphore>>,
469 websocket_limiter: Option<Arc<Semaphore>>,
470 http_pacer: Option<Arc<Mutex<TokioInstant>>>,
471}
472
473impl RequestBudgetState {
474 fn new(config: &RequestBudget) -> Self {
475 Self {
476 http_limiter: config
477 .max_concurrent_http_requests
478 .map(|limit| Arc::new(Semaphore::new(limit))),
479 websocket_limiter: config
480 .max_concurrent_websocket_sessions
481 .map(|limit| Arc::new(Semaphore::new(limit))),
482 http_pacer: config
483 .min_http_interval
484 .map(|_| Arc::new(Mutex::new(TokioInstant::now()))),
485 }
486 }
487}
488
489#[derive(Debug, Clone, Builder)]
490pub struct TradingViewClientConfig {
491 #[builder(default = Endpoints::default())]
492 pub endpoints: Endpoints,
493 #[builder(default = TransportConfig::default())]
494 pub transport: TransportConfig,
495 #[builder(default = AuthConfig::default())]
496 pub auth: AuthConfig,
497 #[builder(default = HistoryClientConfig::default())]
498 pub history: HistoryClientConfig,
499 #[builder(default = RequestBudget::default())]
500 pub request_budget: RequestBudget,
501 pub observer: Option<Arc<dyn ClientObserver>>,
502}
503
504impl Default for TradingViewClientConfig {
505 fn default() -> Self {
506 Self::builder().build()
507 }
508}
509
510impl TradingViewClientConfig {
511 pub fn backend_history() -> Self {
512 Self::builder()
513 .transport(
514 TransportConfig::builder()
515 .timeout(Duration::from_secs(60))
516 .retry(
517 RetryConfig::builder()
518 .max_retries(4)
519 .min_retry_interval(Duration::from_millis(500))
520 .max_retry_interval(Duration::from_secs(5))
521 .build(),
522 )
523 .build(),
524 )
525 .history(
526 HistoryClientConfig::builder()
527 .session_timeout(Duration::from_secs(60))
528 .default_batch_concurrency(8)
529 .default_session(TradingSession::Regular)
530 .default_adjustment(Adjustment::Splits)
531 .build(),
532 )
533 .request_budget(
534 RequestBudget::builder()
535 .max_concurrent_http_requests(default_backend_http_budget_concurrency())
536 .max_concurrent_websocket_sessions(
537 default_backend_websocket_budget_concurrency(),
538 )
539 .min_http_interval(default_backend_http_min_interval())
540 .build(),
541 )
542 .build()
543 }
544
545 pub fn research() -> Self {
546 Self::builder()
547 .transport(
548 TransportConfig::builder()
549 .timeout(Duration::from_secs(45))
550 .retry(
551 RetryConfig::builder()
552 .max_retries(2)
553 .min_retry_interval(Duration::from_millis(250))
554 .max_retry_interval(Duration::from_secs(2))
555 .build(),
556 )
557 .build(),
558 )
559 .request_budget(
560 RequestBudget::builder()
561 .max_concurrent_http_requests(default_research_http_budget_concurrency())
562 .max_concurrent_websocket_sessions(
563 default_research_websocket_budget_concurrency(),
564 )
565 .min_http_interval(default_research_http_min_interval())
566 .build(),
567 )
568 .build()
569 }
570
571 pub fn interactive() -> Self {
572 Self::builder()
573 .transport(
574 TransportConfig::builder()
575 .timeout(Duration::from_secs(15))
576 .retry(
577 RetryConfig::builder()
578 .max_retries(1)
579 .min_retry_interval(Duration::from_millis(100))
580 .max_retry_interval(Duration::from_millis(500))
581 .build(),
582 )
583 .build(),
584 )
585 .request_budget(
586 RequestBudget::builder()
587 .max_concurrent_http_requests(default_interactive_http_budget_concurrency())
588 .max_concurrent_websocket_sessions(
589 default_interactive_websocket_budget_concurrency(),
590 )
591 .build(),
592 )
593 .build()
594 }
595}
596
597#[derive(Debug, Clone, PartialEq, Eq, Builder)]
599pub struct Endpoints {
600 #[builder(default = default_scanner_base_url())]
601 scanner_base_url: Url,
602 #[builder(default = default_symbol_search_base_url())]
603 symbol_search_base_url: Url,
604 #[builder(default = default_calendar_base_url())]
605 calendar_base_url: Url,
606 #[builder(default = default_websocket_url())]
607 websocket_url: Url,
608 #[builder(default = default_site_origin())]
609 site_origin: Url,
610 #[builder(default = default_data_origin())]
611 data_origin: Url,
612}
613
614impl Default for Endpoints {
615 fn default() -> Self {
616 Self::builder().build()
617 }
618}
619
620impl Endpoints {
621 pub fn scanner_base_url(&self) -> &Url {
622 &self.scanner_base_url
623 }
624
625 pub fn symbol_search_base_url(&self) -> &Url {
626 &self.symbol_search_base_url
627 }
628
629 pub fn calendar_base_url(&self) -> &Url {
630 &self.calendar_base_url
631 }
632
633 pub fn websocket_url(&self) -> &Url {
634 &self.websocket_url
635 }
636
637 pub fn site_origin(&self) -> &Url {
638 &self.site_origin
639 }
640
641 pub fn data_origin(&self) -> &Url {
642 &self.data_origin
643 }
644
645 pub fn with_scanner_base_url(mut self, url: impl AsRef<str>) -> Result<Self> {
646 self.scanner_base_url = parse_url(url)?;
647 Ok(self)
648 }
649
650 pub fn with_symbol_search_base_url(mut self, url: impl AsRef<str>) -> Result<Self> {
651 self.symbol_search_base_url = parse_url(url)?;
652 Ok(self)
653 }
654
655 pub fn with_calendar_base_url(mut self, url: impl AsRef<str>) -> Result<Self> {
656 self.calendar_base_url = parse_url(url)?;
657 Ok(self)
658 }
659
660 pub fn with_websocket_url(mut self, url: impl AsRef<str>) -> Result<Self> {
661 self.websocket_url = parse_url(url)?;
662 Ok(self)
663 }
664
665 pub fn with_site_origin(mut self, url: impl AsRef<str>) -> Result<Self> {
666 self.site_origin = parse_url(url)?;
667 Ok(self)
668 }
669
670 pub fn with_data_origin(mut self, url: impl AsRef<str>) -> Result<Self> {
671 self.data_origin = parse_url(url)?;
672 Ok(self)
673 }
674
675 pub fn scanner_url(&self, route: &str) -> Result<Url> {
676 self.scanner_base_url
677 .join(route.trim_start_matches('/'))
678 .map_err(Into::into)
679 }
680
681 pub fn scanner_metainfo_url(&self, market: &Market) -> Result<Url> {
682 self.scanner_url(&format!("{}/metainfo", market.as_str()))
683 }
684}
685
686#[derive(Debug, Clone)]
708pub struct TradingViewClient {
709 http: ClientWithMiddleware,
710 endpoints: Endpoints,
711 user_agent: String,
712 auth_token: String,
713 session_id: Option<String>,
714 history_config: HistoryClientConfig,
715 request_budget: RequestBudget,
716 request_budget_state: Arc<RequestBudgetState>,
717 websocket_connector: Arc<dyn WebSocketConnector>,
718 observer: Option<Arc<dyn ClientObserver>>,
719 metainfo_cache: Arc<RwLock<HashMap<String, ScannerMetainfo>>>,
720}
721
722#[bon]
723impl TradingViewClient {
724 #[builder]
726 pub fn new(
727 #[builder(default = Endpoints::default())] endpoints: Endpoints,
728 #[builder(default = default_timeout())] timeout: Duration,
729 #[builder(default = RetryConfig::default())] retry: RetryConfig,
730 #[builder(default = HistoryClientConfig::default())] history_config: HistoryClientConfig,
731 #[builder(default = RequestBudget::default())] request_budget: RequestBudget,
732 #[builder(default = default_user_agent(), into)] user_agent: String,
733 #[builder(default = default_auth_token(), into)] auth_token: String,
734 #[builder(into)] session_id: Option<String>,
735 auth: Option<AuthConfig>,
736 transport_config: Option<TransportConfig>,
737 http_client: Option<ClientWithMiddleware>,
738 websocket_connector: Option<Arc<dyn WebSocketConnector>>,
739 observer: Option<Arc<dyn ClientObserver>>,
740 ) -> Result<Self> {
741 let transport_config = transport_config.unwrap_or(TransportConfig {
742 timeout,
743 retry,
744 user_agent,
745 http_client,
746 websocket_connector,
747 });
748 let TransportConfig {
749 timeout,
750 retry,
751 user_agent,
752 http_client,
753 websocket_connector,
754 } = transport_config;
755
756 let (auth_token, session_id) = auth
757 .map(AuthConfig::resolve)
758 .unwrap_or((auth_token, session_id));
759
760 request_budget.validate()?;
761
762 let http = if let Some(http_client) = http_client {
763 http_client
764 } else {
765 retry.validate()?;
766
767 let base_http = reqwest::Client::builder()
768 .timeout(timeout)
769 .build()
770 .map_err(Error::from)?;
771
772 if retry.max_retries == 0 {
773 ClientWithMiddleware::from(base_http)
774 } else {
775 MiddlewareClientBuilder::new(base_http)
776 .with(RetryTransientMiddleware::new_with_policy(retry.to_policy()))
777 .build()
778 }
779 };
780
781 Ok(Self {
782 http,
783 endpoints,
784 user_agent,
785 auth_token,
786 session_id,
787 history_config,
788 request_budget: request_budget.clone(),
789 request_budget_state: Arc::new(RequestBudgetState::new(&request_budget)),
790 websocket_connector: websocket_connector
791 .unwrap_or_else(|| Arc::new(DefaultWebSocketConnector)),
792 observer,
793 metainfo_cache: Arc::new(RwLock::new(HashMap::new())),
794 })
795 }
796
797 pub fn for_backend_history() -> Result<Self> {
798 Self::from_config(TradingViewClientConfig::backend_history())
799 }
800
801 pub fn for_research() -> Result<Self> {
803 Self::from_config(TradingViewClientConfig::research())
804 }
805
806 pub fn for_interactive() -> Result<Self> {
808 Self::from_config(TradingViewClientConfig::interactive())
809 }
810
811 pub fn from_config(config: TradingViewClientConfig) -> Result<Self> {
812 match config.observer {
813 Some(observer) => Self::builder()
814 .endpoints(config.endpoints)
815 .transport_config(config.transport)
816 .auth(config.auth)
817 .history_config(config.history)
818 .request_budget(config.request_budget)
819 .observer(observer)
820 .build(),
821 None => Self::builder()
822 .endpoints(config.endpoints)
823 .transport_config(config.transport)
824 .auth(config.auth)
825 .history_config(config.history)
826 .request_budget(config.request_budget)
827 .build(),
828 }
829 }
830
831 pub fn endpoints(&self) -> &Endpoints {
832 &self.endpoints
833 }
834
835 pub(crate) fn auth_token(&self) -> &str {
836 &self.auth_token
837 }
838
839 pub(crate) fn session_id(&self) -> Option<&str> {
840 self.session_id.as_deref()
841 }
842
843 pub fn history_config(&self) -> &HistoryClientConfig {
844 &self.history_config
845 }
846
847 pub fn request_budget(&self) -> &RequestBudget {
848 &self.request_budget
849 }
850
851 pub async fn scan(&self, query: &ScanQuery) -> Result<ScanResponse> {
878 let route = query.route_segment();
879 #[cfg(feature = "tracing")]
880 debug!(
881 target: "tvdata_rs::scan",
882 route = %route,
883 columns = query.columns.len(),
884 markets = query.markets.len(),
885 "executing scanner query",
886 );
887
888 let raw: RawScanResponse = self
889 .execute_json(
890 self.request(self.http.post(self.endpoints.scanner_url(&route)?))?
891 .json(query),
892 )
893 .await?;
894
895 let response = raw.into_response()?;
896 #[cfg(feature = "tracing")]
897 debug!(
898 target: "tvdata_rs::scan",
899 route = %route,
900 rows = response.rows.len(),
901 "scanner query completed",
902 );
903 Ok(response)
904 }
905
906 pub async fn validate_scan_query(&self, query: &ScanQuery) -> Result<ScanValidationReport> {
931 let route_segment = query.route_segment();
932 let markets = validation_markets(query)?;
933 #[cfg(feature = "tracing")]
934 debug!(
935 target: "tvdata_rs::scan",
936 route = %route_segment,
937 columns = query.columns.len(),
938 markets = markets.len(),
939 "validating scanner query against live metainfo",
940 );
941 let mut market_metainfo = Vec::with_capacity(markets.len());
942
943 for market in &markets {
944 market_metainfo.push((market.clone(), self.cached_metainfo(market).await?));
945 }
946
947 let mut supported_columns = Vec::new();
948 let mut partially_supported_columns = Vec::new();
949 let mut unsupported_columns = Vec::new();
950 let mut seen = HashSet::new();
951
952 for column in &query.columns {
953 if !seen.insert(column.as_str().to_owned()) {
954 continue;
955 }
956
957 let mut supported_markets = Vec::new();
958 let mut unsupported_markets = Vec::new();
959
960 for (market, metainfo) in &market_metainfo {
961 if supports_column_for_market(market, metainfo, column.as_str()) {
962 supported_markets.push(market.clone());
963 } else {
964 unsupported_markets.push(market.clone());
965 }
966 }
967
968 match (supported_markets.is_empty(), unsupported_markets.is_empty()) {
969 (true, false) => unsupported_columns.push(column.clone()),
970 (false, true) => supported_columns.push(column.clone()),
971 (false, false) => partially_supported_columns.push(PartiallySupportedColumn {
972 column: column.clone(),
973 supported_markets,
974 unsupported_markets,
975 }),
976 (true, true) => {}
977 }
978 }
979
980 let report = ScanValidationReport {
981 route_segment,
982 requested_markets: markets,
983 supported_columns,
984 partially_supported_columns,
985 unsupported_columns,
986 };
987
988 #[cfg(feature = "tracing")]
989 debug!(
990 target: "tvdata_rs::scan",
991 route = %report.route_segment,
992 supported = report.supported_columns.len(),
993 partial = report.partially_supported_columns.len(),
994 unsupported = report.unsupported_columns.len(),
995 "scanner validation completed",
996 );
997
998 Ok(report)
999 }
1000
1001 pub async fn scan_validated(&self, query: &ScanQuery) -> Result<ScanResponse> {
1024 let report = self.validate_scan_query(query).await?;
1025 if !report.is_strictly_supported() {
1026 let fields = report
1027 .strict_violation_column_names()
1028 .into_iter()
1029 .map(str::to_owned)
1030 .collect();
1031 return Err(Error::UnsupportedScanFields {
1032 route: report.route_segment,
1033 fields,
1034 });
1035 }
1036
1037 self.scan(query).await
1038 }
1039
1040 pub async fn filter_scan_query(
1067 &self,
1068 query: &ScanQuery,
1069 ) -> Result<(ScanQuery, ScanValidationReport)> {
1070 let report = self.validate_scan_query(query).await?;
1071 let filtered = report.filtered_query(query);
1072
1073 if filtered.columns.is_empty() {
1074 let fields = report
1075 .strict_violation_column_names()
1076 .into_iter()
1077 .map(str::to_owned)
1078 .collect();
1079 return Err(Error::UnsupportedScanFields {
1080 route: report.route_segment,
1081 fields,
1082 });
1083 }
1084
1085 Ok((filtered, report))
1086 }
1087
1088 pub async fn scan_supported(&self, query: &ScanQuery) -> Result<ScanResponse> {
1111 let (filtered, _) = self.filter_scan_query(query).await?;
1112 self.scan(&filtered).await
1113 }
1114
1115 pub async fn metainfo(&self, market: impl Into<Market>) -> Result<ScannerMetainfo> {
1135 let market = market.into();
1136 self.cached_metainfo(&market).await
1137 }
1138
1139 #[cfg(feature = "search")]
1158 pub async fn search(&self, request: &SearchRequest) -> Result<Vec<SearchHit>> {
1159 Ok(self.search_response(request).await?.hits)
1160 }
1161
1162 #[cfg(feature = "search")]
1164 pub async fn search_equities(&self, text: impl Into<String>) -> Result<Vec<SearchHit>> {
1165 Ok(self.search_equities_response(text).await?.hits)
1166 }
1167
1168 #[cfg(feature = "search")]
1170 pub async fn search_equities_response(
1171 &self,
1172 text: impl Into<String>,
1173 ) -> Result<SearchResponse> {
1174 self.search_response(&SearchRequest::equities(text)).await
1175 }
1176
1177 #[cfg(feature = "search")]
1179 pub async fn search_forex(&self, text: impl Into<String>) -> Result<Vec<SearchHit>> {
1180 Ok(self.search_forex_response(text).await?.hits)
1181 }
1182
1183 #[cfg(feature = "search")]
1185 pub async fn search_forex_response(&self, text: impl Into<String>) -> Result<SearchResponse> {
1186 self.search_response(&SearchRequest::forex(text)).await
1187 }
1188
1189 #[cfg(feature = "search")]
1191 pub async fn search_crypto(&self, text: impl Into<String>) -> Result<Vec<SearchHit>> {
1192 Ok(self.search_crypto_response(text).await?.hits)
1193 }
1194
1195 #[cfg(feature = "search")]
1197 pub async fn search_crypto_response(&self, text: impl Into<String>) -> Result<SearchResponse> {
1198 self.search_response(&SearchRequest::crypto(text)).await
1199 }
1200
1201 #[cfg(feature = "search")]
1207 pub async fn search_options(&self, text: impl Into<String>) -> Result<Vec<SearchHit>> {
1208 Ok(self.search_options_response(text).await?.hits)
1209 }
1210
1211 #[cfg(feature = "search")]
1213 pub async fn search_options_response(&self, text: impl Into<String>) -> Result<SearchResponse> {
1214 let response = self.search_response(&SearchRequest::options(text)).await?;
1215 Ok(response.filtered(SearchHit::is_option_like))
1216 }
1217
1218 #[cfg(feature = "search")]
1241 pub async fn search_response(&self, request: &SearchRequest) -> Result<SearchResponse> {
1242 if request.text.trim().is_empty() {
1243 return Err(Error::EmptySearchQuery);
1244 }
1245
1246 #[cfg(feature = "tracing")]
1247 debug!(
1248 target: "tvdata_rs::search",
1249 text_len = request.text.len(),
1250 exchange = request.exchange.as_deref().unwrap_or(""),
1251 search_type = request.instrument_type.as_deref().unwrap_or(""),
1252 start = request.start,
1253 "executing TradingView symbol search",
1254 );
1255
1256 let raw: RawSearchResponse = self
1257 .execute_json(
1258 self.request(self.http.get(self.endpoints.symbol_search_base_url.clone()))?
1259 .query(&request.to_query_pairs()),
1260 )
1261 .await?;
1262
1263 let response = sanitize_response(raw);
1264 #[cfg(feature = "tracing")]
1265 debug!(
1266 target: "tvdata_rs::search",
1267 hits = response.hits.len(),
1268 symbols_remaining = response.symbols_remaining,
1269 "TradingView symbol search completed",
1270 );
1271 Ok(response)
1272 }
1273
1274 #[cfg(feature = "economics")]
1293 pub async fn economic_calendar(
1294 &self,
1295 request: &EconomicCalendarRequest,
1296 ) -> Result<EconomicCalendarResponse> {
1297 #[cfg(feature = "tracing")]
1298 debug!(
1299 target: "tvdata_rs::calendar",
1300 from = %request.from,
1301 to = %request.to,
1302 "executing economic calendar request",
1303 );
1304
1305 let raw: RawEconomicCalendarResponse = self
1306 .execute_json(
1307 self.request(self.http.get(self.endpoints.calendar_base_url().clone()))?
1308 .query(&request.to_query_pairs()?),
1309 )
1310 .await?;
1311
1312 let response = sanitize_calendar(raw);
1313 #[cfg(feature = "tracing")]
1314 debug!(
1315 target: "tvdata_rs::calendar",
1316 events = response.events.len(),
1317 status = response.status.as_deref().unwrap_or(""),
1318 "economic calendar request completed",
1319 );
1320 Ok(response)
1321 }
1322
1323 #[cfg(feature = "calendar")]
1346 pub async fn earnings_calendar(
1347 &self,
1348 request: &CalendarWindowRequest,
1349 ) -> Result<Vec<EarningsCalendarEntry>> {
1350 self.corporate_earnings_calendar(request).await
1351 }
1352
1353 #[cfg(feature = "calendar")]
1375 pub async fn dividend_calendar(
1376 &self,
1377 request: &DividendCalendarRequest,
1378 ) -> Result<Vec<DividendCalendarEntry>> {
1379 self.corporate_dividend_calendar(request).await
1380 }
1381
1382 #[cfg(feature = "calendar")]
1401 pub async fn ipo_calendar(
1402 &self,
1403 request: &CalendarWindowRequest,
1404 ) -> Result<Vec<IpoCalendarEntry>> {
1405 self.corporate_ipo_calendar(request).await
1406 }
1407
1408 pub async fn history(&self, request: &HistoryRequest) -> Result<HistorySeries> {
1429 let _websocket_budget = self.acquire_websocket_slot().await?;
1430
1431 #[cfg(feature = "tracing")]
1432 debug!(
1433 target: "tvdata_rs::history",
1434 symbol = %request.symbol.as_str(),
1435 interval = request.interval.as_code(),
1436 bars = request.bars,
1437 fetch_all = request.fetch_all,
1438 session = request.session.as_code(),
1439 adjustment = request.adjustment.as_code(),
1440 authenticated = self.session_id().is_some(),
1441 "fetching TradingView history",
1442 );
1443
1444 let series = fetch_history_with_timeout_for_client(
1445 self,
1446 request,
1447 self.history_config.session_timeout,
1448 )
1449 .await?;
1450
1451 #[cfg(feature = "tracing")]
1452 debug!(
1453 target: "tvdata_rs::history",
1454 symbol = %series.symbol.as_str(),
1455 bars = series.bars.len(),
1456 authenticated = series.provenance.authenticated,
1457 "TradingView history fetch completed",
1458 );
1459
1460 Ok(series)
1461 }
1462
1463 async fn execute_json<T>(&self, request: RequestBuilder) -> Result<T>
1464 where
1465 T: DeserializeOwned,
1466 {
1467 let body = self.execute_text(request).await?;
1468 serde_json::from_str(&body).map_err(Into::into)
1469 }
1470
1471 async fn execute_text(&self, request: RequestBuilder) -> Result<String> {
1472 let _http_budget = self.acquire_http_slot().await?;
1473
1474 let preview = request_preview(&request);
1475 let started_at = Instant::now();
1476 let response = match request.send().await {
1477 Ok(response) => response,
1478 Err(error) => {
1479 self.emit_event(ClientEvent::HttpRequestFailed(HttpRequestFailedEvent {
1480 method: preview
1481 .as_ref()
1482 .map(|(method, _)| method.clone())
1483 .unwrap_or_else(|| "UNKNOWN".to_owned()),
1484 url: preview
1485 .as_ref()
1486 .map(|(_, url)| url.clone())
1487 .unwrap_or_else(|| "<opaque>".to_owned()),
1488 elapsed_ms: started_at.elapsed().as_millis() as u64,
1489 authenticated: self.session_id().is_some(),
1490 kind: crate::error::ErrorKind::Transport,
1491 }));
1492 #[cfg(feature = "tracing")]
1493 warn!(
1494 target: "tvdata_rs::http",
1495 method = preview.as_ref().map(|(method, _)| method.as_str()).unwrap_or("UNKNOWN"),
1496 url = preview.as_ref().map(|(_, url)| url.as_str()).unwrap_or("<opaque>"),
1497 elapsed_ms = started_at.elapsed().as_millis() as u64,
1498 error = %error,
1499 "TradingView HTTP request failed before receiving a response",
1500 );
1501 return Err(Error::from(error));
1502 }
1503 };
1504 let status = response.status();
1505 let body = match response.text().await {
1506 Ok(body) => body,
1507 Err(error) => {
1508 self.emit_event(ClientEvent::HttpRequestFailed(HttpRequestFailedEvent {
1509 method: preview
1510 .as_ref()
1511 .map(|(method, _)| method.clone())
1512 .unwrap_or_else(|| "UNKNOWN".to_owned()),
1513 url: preview
1514 .as_ref()
1515 .map(|(_, url)| url.clone())
1516 .unwrap_or_else(|| "<opaque>".to_owned()),
1517 elapsed_ms: started_at.elapsed().as_millis() as u64,
1518 authenticated: self.session_id().is_some(),
1519 kind: crate::error::ErrorKind::Transport,
1520 }));
1521 #[cfg(feature = "tracing")]
1522 warn!(
1523 target: "tvdata_rs::http",
1524 method = preview.as_ref().map(|(method, _)| method.as_str()).unwrap_or("UNKNOWN"),
1525 url = preview.as_ref().map(|(_, url)| url.as_str()).unwrap_or("<opaque>"),
1526 status = status.as_u16(),
1527 elapsed_ms = started_at.elapsed().as_millis() as u64,
1528 error = %error,
1529 "TradingView HTTP response body could not be read",
1530 );
1531 return Err(Error::from(error));
1532 }
1533 };
1534
1535 self.emit_event(ClientEvent::HttpRequestCompleted(
1536 HttpRequestCompletedEvent {
1537 method: preview
1538 .as_ref()
1539 .map(|(method, _)| method.clone())
1540 .unwrap_or_else(|| "UNKNOWN".to_owned()),
1541 url: preview
1542 .as_ref()
1543 .map(|(_, url)| url.clone())
1544 .unwrap_or_else(|| "<opaque>".to_owned()),
1545 status: status.as_u16(),
1546 elapsed_ms: started_at.elapsed().as_millis() as u64,
1547 authenticated: self.session_id().is_some(),
1548 },
1549 ));
1550
1551 #[cfg(feature = "tracing")]
1552 debug!(
1553 target: "tvdata_rs::http",
1554 method = preview.as_ref().map(|(method, _)| method.as_str()).unwrap_or("UNKNOWN"),
1555 url = preview.as_ref().map(|(_, url)| url.as_str()).unwrap_or("<opaque>"),
1556 status = status.as_u16(),
1557 body_bytes = body.len(),
1558 elapsed_ms = started_at.elapsed().as_millis() as u64,
1559 "TradingView HTTP request completed",
1560 );
1561
1562 if !status.is_success() {
1563 #[cfg(feature = "tracing")]
1564 warn!(
1565 target: "tvdata_rs::http",
1566 method = preview.as_ref().map(|(method, _)| method.as_str()).unwrap_or("UNKNOWN"),
1567 url = preview.as_ref().map(|(_, url)| url.as_str()).unwrap_or("<opaque>"),
1568 status = status.as_u16(),
1569 elapsed_ms = started_at.elapsed().as_millis() as u64,
1570 "TradingView HTTP request returned non-success status",
1571 );
1572 return Err(Error::ApiStatus { status, body });
1573 }
1574
1575 Ok(body)
1576 }
1577
1578 fn request(&self, request: RequestBuilder) -> Result<RequestBuilder> {
1579 let request = request
1580 .header(
1581 ORIGIN,
1582 HeaderValue::from_str(self.endpoints.site_origin.as_str())
1583 .map_err(|_| Error::Protocol("invalid site origin configured for request"))?,
1584 )
1585 .header(
1586 REFERER,
1587 HeaderValue::from_str(&referer(&self.endpoints.site_origin))
1588 .map_err(|_| Error::Protocol("invalid referer configured for request"))?,
1589 )
1590 .header(
1591 USER_AGENT,
1592 HeaderValue::from_str(&self.user_agent)
1593 .map_err(|_| Error::Protocol("invalid user agent configured for request"))?,
1594 );
1595
1596 let request = if let Some(session_id) = self.session_id.as_deref() {
1597 request.header(COOKIE, cookie_header_value(session_id)?)
1598 } else {
1599 request
1600 };
1601
1602 Ok(request)
1603 }
1604
1605 async fn acquire_http_slot(&self) -> Result<Option<OwnedSemaphorePermit>> {
1606 let permit = match self.request_budget_state.http_limiter.as_ref() {
1607 Some(limiter) => Some(
1608 limiter
1609 .clone()
1610 .acquire_owned()
1611 .await
1612 .map_err(|_| Error::Protocol("http request budget closed"))?,
1613 ),
1614 None => None,
1615 };
1616
1617 if let (Some(pacer), Some(min_interval)) = (
1618 self.request_budget_state.http_pacer.as_ref(),
1619 self.request_budget.min_http_interval,
1620 ) {
1621 let mut next_allowed_at = pacer.lock().await;
1622 let now = TokioInstant::now();
1623 if *next_allowed_at > now {
1624 sleep_until(*next_allowed_at).await;
1625 }
1626 *next_allowed_at = TokioInstant::now() + min_interval;
1627 }
1628
1629 Ok(permit)
1630 }
1631
1632 pub(crate) async fn acquire_websocket_slot(&self) -> Result<Option<OwnedSemaphorePermit>> {
1633 match self.request_budget_state.websocket_limiter.as_ref() {
1634 Some(limiter) => limiter
1635 .clone()
1636 .acquire_owned()
1637 .await
1638 .map(Some)
1639 .map_err(|_| Error::Protocol("websocket request budget closed")),
1640 None => Ok(None),
1641 }
1642 }
1643
1644 pub(crate) fn emit_event(&self, event: ClientEvent) {
1645 if let Some(observer) = self.observer.as_ref() {
1646 observer.on_event(&event);
1647 }
1648 }
1649
1650 pub(crate) async fn connect_socket(&self) -> Result<TradingViewWebSocket> {
1651 let authenticated = self.session_id().is_some();
1652 let url = self.endpoints().websocket_url().to_string();
1653 let result = self
1654 .websocket_connector
1655 .connect(self.endpoints(), &self.user_agent, self.session_id())
1656 .await;
1657
1658 match &result {
1659 Ok(_) => self.emit_event(ClientEvent::WebSocketConnected(WebSocketConnectedEvent {
1660 url,
1661 authenticated,
1662 })),
1663 Err(error) => {
1664 self.emit_event(ClientEvent::WebSocketConnectionFailed(
1665 WebSocketConnectionFailedEvent {
1666 url,
1667 authenticated,
1668 kind: error.kind(),
1669 },
1670 ));
1671 }
1672 }
1673
1674 result
1675 }
1676
1677 async fn cached_metainfo(&self, market: &Market) -> Result<ScannerMetainfo> {
1678 if let Some(cached) = self
1679 .metainfo_cache
1680 .read()
1681 .await
1682 .get(market.as_str())
1683 .cloned()
1684 {
1685 #[cfg(feature = "tracing")]
1686 debug!(
1687 target: "tvdata_rs::metainfo",
1688 market = market.as_str(),
1689 "scanner metainfo cache hit",
1690 );
1691 return Ok(cached);
1692 }
1693
1694 #[cfg(feature = "tracing")]
1695 debug!(
1696 target: "tvdata_rs::metainfo",
1697 market = market.as_str(),
1698 "scanner metainfo cache miss",
1699 );
1700
1701 let metainfo: ScannerMetainfo = self
1702 .execute_json(
1703 self.request(self.http.get(self.endpoints.scanner_metainfo_url(market)?))?,
1704 )
1705 .await?;
1706
1707 self.metainfo_cache
1708 .write()
1709 .await
1710 .insert(market.as_str().to_owned(), metainfo.clone());
1711
1712 #[cfg(feature = "tracing")]
1713 debug!(
1714 target: "tvdata_rs::metainfo",
1715 market = market.as_str(),
1716 fields = metainfo.fields.len(),
1717 "scanner metainfo cached",
1718 );
1719
1720 Ok(metainfo)
1721 }
1722}
1723
1724fn validation_markets(query: &ScanQuery) -> Result<Vec<Market>> {
1725 if query.markets.is_empty() {
1726 return Err(Error::ScanValidationUnavailable {
1727 reason: "query does not specify any markets".to_owned(),
1728 });
1729 }
1730
1731 Ok(query.markets.clone())
1732}
1733
1734fn supports_column_for_market(market: &Market, metainfo: &ScannerMetainfo, column: &str) -> bool {
1735 metainfo.supports_field(column)
1736 || market_to_screener_kind(market)
1737 .and_then(|kind| embedded_registry().find_by_api_name(kind, column))
1738 .is_some()
1739}
1740
1741fn market_to_screener_kind(market: &Market) -> Option<ScreenerKind> {
1742 match market.as_str() {
1743 "crypto" => Some(ScreenerKind::Crypto),
1744 "forex" => Some(ScreenerKind::Forex),
1745 "bond" | "bonds" => Some(ScreenerKind::Bond),
1746 "futures" => Some(ScreenerKind::Futures),
1747 "coin" => Some(ScreenerKind::Coin),
1748 "options" | "economics2" | "cfd" => None,
1749 _ => Some(ScreenerKind::Stock),
1750 }
1751}
1752
1753#[cfg(test)]
1754mod tests;