Skip to main content

tvdata_rs/history/
mod.rs

1mod fetch;
2mod request;
3
4use futures_util::stream::{self, StreamExt as FuturesStreamExt};
5use request::HistorySeriesMap;
6use time::{Date, OffsetDateTime};
7#[cfg(feature = "tracing")]
8use tracing::debug;
9
10use crate::batch::{BatchResult, SymbolFailure};
11use crate::client::{ClientEvent, HistoryBatchCompletedEvent, HistoryBatchMode, TradingViewClient};
12use crate::error::Result;
13use crate::scanner::{InstrumentRef, Ticker};
14
15pub use request::{
16    Adjustment, Bar, BarSelectionPolicy, DailyBarRangeRequest, DailyBarRequest,
17    HistoryBatchRequest, HistoryProvenance, HistoryRequest, HistorySeries, Interval,
18    TradingSession,
19};
20
21fn estimated_daily_bars_since(date: Date) -> u32 {
22    let today = OffsetDateTime::now_utc().date();
23    let days = if date <= today {
24        (today - date).whole_days().max(0) as u32
25    } else {
26        0
27    };
28
29    days.saturating_add(32).max(64)
30}
31
32fn daily_batch_request(
33    symbols: &[InstrumentRef],
34    start: Date,
35    session: TradingSession,
36    adjustment: Adjustment,
37    concurrency: usize,
38) -> HistoryBatchRequest {
39    HistoryBatchRequest::new(
40        symbols.iter().cloned().map(Into::<Ticker>::into),
41        Interval::Day1,
42        estimated_daily_bars_since(start),
43    )
44    .session(session)
45    .adjustment(adjustment)
46    .concurrency(concurrency)
47}
48
49fn daily_bar_socket_chunk_size(symbols: usize, socket_concurrency: usize) -> usize {
50    if symbols == 0 {
51        return 0;
52    }
53
54    let socket_concurrency = socket_concurrency.max(1);
55    symbols.div_ceil(socket_concurrency).clamp(16, 64)
56}
57
58impl TradingViewClient {
59    /// Downloads multiple OHLCV history series with bounded concurrency.
60    ///
61    /// # Examples
62    ///
63    /// ```no_run
64    /// use tvdata_rs::{HistoryBatchRequest, Interval, Result, TradingViewClient};
65    ///
66    /// #[tokio::main]
67    /// async fn main() -> Result<()> {
68    ///     let client = TradingViewClient::builder().build()?;
69    ///     let request = HistoryBatchRequest::new(["NASDAQ:AAPL", "NASDAQ:MSFT"], Interval::Day1, 30);
70    ///     let series = client.history_batch(&request).await?;
71    ///
72    ///     println!("series: {}", series.len());
73    ///     Ok(())
74    /// }
75    /// ```
76    pub async fn history_batch(&self, request: &HistoryBatchRequest) -> Result<Vec<HistorySeries>> {
77        let effective_concurrency = self.effective_history_batch_concurrency(request.concurrency);
78
79        #[cfg(feature = "tracing")]
80        debug!(
81            target: "tvdata_rs::history",
82            requested = request.symbols.len(),
83            interval = request.interval.as_code(),
84            bars = request.bars,
85            requested_concurrency = request.concurrency,
86            effective_concurrency,
87            "starting history batch",
88        );
89
90        let series = fetch::fetch_history_batch_with(
91            request.to_requests(),
92            effective_concurrency,
93            |request| async move { self.history(&request).await },
94        )
95        .await?;
96
97        self.emit_event(ClientEvent::HistoryBatchCompleted(
98            HistoryBatchCompletedEvent {
99                requested: request.symbols.len(),
100                successes: series.len(),
101                missing: 0,
102                failures: 0,
103                concurrency: effective_concurrency,
104                mode: HistoryBatchMode::Strict,
105            },
106        ));
107
108        Ok(series)
109    }
110
111    /// Downloads multiple OHLCV history series and returns successes, missing symbols, and
112    /// failures separately.
113    pub async fn history_batch_detailed(
114        &self,
115        request: &HistoryBatchRequest,
116    ) -> Result<BatchResult<HistorySeries>> {
117        let effective_concurrency = self.effective_history_batch_concurrency(request.concurrency);
118
119        #[cfg(feature = "tracing")]
120        debug!(
121            target: "tvdata_rs::history",
122            requested = request.symbols.len(),
123            interval = request.interval.as_code(),
124            bars = request.bars,
125            requested_concurrency = request.concurrency,
126            effective_concurrency,
127            "starting detailed history batch",
128        );
129
130        let batch = fetch::fetch_history_batch_detailed_with(
131            request.to_requests(),
132            effective_concurrency,
133            |request| async move { self.history(&request).await },
134        )
135        .await?;
136
137        self.emit_event(ClientEvent::HistoryBatchCompleted(
138            HistoryBatchCompletedEvent {
139                requested: request.symbols.len(),
140                successes: batch.successes.len(),
141                missing: batch.missing.len(),
142                failures: batch.failures.len(),
143                concurrency: effective_concurrency,
144                mode: HistoryBatchMode::Detailed,
145            },
146        ));
147
148        Ok(batch)
149    }
150
151    /// Downloads the maximum history currently available for multiple symbols.
152    ///
153    /// The crate keeps requesting older bars over the chart websocket until
154    /// TradingView stops returning new history.
155    ///
156    /// # Examples
157    ///
158    /// ```no_run
159    /// use tvdata_rs::{Interval, Result, TradingViewClient};
160    ///
161    /// #[tokio::main]
162    /// async fn main() -> Result<()> {
163    ///     let client = TradingViewClient::builder().build()?;
164    ///     let series = client
165    ///         .download_history_max(["NASDAQ:AAPL", "NASDAQ:MSFT"], Interval::Day1)
166    ///         .await?;
167    ///
168    ///     println!("series: {}", series.len());
169    ///     Ok(())
170    /// }
171    /// ```
172    pub async fn download_history_max<I, T>(
173        &self,
174        symbols: I,
175        interval: Interval,
176    ) -> Result<Vec<HistorySeries>>
177    where
178        I: IntoIterator<Item = T>,
179        T: Into<Ticker>,
180    {
181        let defaults = self.history_config();
182        let request = HistoryBatchRequest::max(symbols, interval)
183            .session(defaults.default_session)
184            .adjustment(defaults.default_adjustment)
185            .concurrency(defaults.default_batch_concurrency);
186        self.history_batch(&request).await
187    }
188
189    /// Convenience wrapper around [`TradingViewClient::history_batch`] for a list of symbols.
190    pub async fn download_history<I, T>(
191        &self,
192        symbols: I,
193        interval: Interval,
194        bars: u32,
195    ) -> Result<Vec<HistorySeries>>
196    where
197        I: IntoIterator<Item = T>,
198        T: Into<Ticker>,
199    {
200        let defaults = self.history_config();
201        let request = HistoryBatchRequest::new(symbols, interval, bars)
202            .session(defaults.default_session)
203            .adjustment(defaults.default_adjustment)
204            .concurrency(defaults.default_batch_concurrency);
205        self.history_batch(&request).await
206    }
207
208    /// Downloads multiple history series and returns them keyed by symbol.
209    pub async fn download_history_map<I, T>(
210        &self,
211        symbols: I,
212        interval: Interval,
213        bars: u32,
214    ) -> Result<HistorySeriesMap>
215    where
216        I: IntoIterator<Item = T>,
217        T: Into<Ticker>,
218    {
219        let series = self.download_history(symbols, interval, bars).await?;
220        Ok(series
221            .into_iter()
222            .map(|series| (series.symbol.clone(), series))
223            .collect())
224    }
225
226    /// Downloads the maximum history available and returns it keyed by symbol.
227    pub async fn download_history_map_max<I, T>(
228        &self,
229        symbols: I,
230        interval: Interval,
231    ) -> Result<HistorySeriesMap>
232    where
233        I: IntoIterator<Item = T>,
234        T: Into<Ticker>,
235    {
236        let series = self.download_history_max(symbols, interval).await?;
237        Ok(series
238            .into_iter()
239            .map(|series| (series.symbol.clone(), series))
240            .collect())
241    }
242
243    /// Downloads daily bars for a set of instruments and selects the best bar for the requested
244    /// trading date.
245    pub async fn daily_bars_on(&self, request: &DailyBarRequest) -> Result<BatchResult<Bar>> {
246        if request.symbols.is_empty() {
247            return Ok(BatchResult::default());
248        }
249
250        let effective_concurrency = self.effective_history_batch_concurrency(request.concurrency);
251
252        #[cfg(feature = "tracing")]
253        debug!(
254            target: "tvdata_rs::history",
255            symbols = request.symbols.len(),
256            asof = %request.asof,
257            selection = ?request.selection,
258            requested_concurrency = request.concurrency,
259            effective_concurrency,
260            "starting daily bar selection",
261        );
262
263        let tickers = request
264            .symbols
265            .iter()
266            .cloned()
267            .map(Into::<Ticker>::into)
268            .collect::<Vec<_>>();
269        let chunk_size = daily_bar_socket_chunk_size(tickers.len(), effective_concurrency);
270
271        let mut outcomes = stream::iter(
272            tickers
273                .chunks(chunk_size)
274                .map(|chunk| chunk.to_vec())
275                .enumerate()
276                .map(|(index, chunk)| async move {
277                    let outcome = fetch::fetch_daily_bars_batch_with_timeout_for_client(
278                        self,
279                        &chunk,
280                        request.asof,
281                        request.selection,
282                        request.session,
283                        request.adjustment,
284                        self.history_config().session_timeout,
285                    )
286                    .await;
287                    (index, chunk, outcome)
288                }),
289        )
290        .buffer_unordered(effective_concurrency)
291        .collect::<Vec<_>>()
292        .await;
293
294        outcomes.sort_by_key(|(index, _, _)| *index);
295
296        let mut selected = BatchResult::default();
297        for (_, chunk, outcome) in outcomes {
298            match outcome {
299                Ok(batch) => {
300                    selected.successes.extend(batch.successes);
301                    selected.missing.extend(batch.missing);
302                    selected.failures.extend(batch.failures);
303                }
304                Err(error) if error.is_symbol_error() => selected.missing.extend(chunk),
305                Err(error) => {
306                    let kind = error.kind();
307                    let retryable = error.is_retryable();
308                    let message = error.to_string();
309                    selected
310                        .failures
311                        .extend(chunk.into_iter().map(|ticker| SymbolFailure {
312                            symbol: ticker,
313                            kind,
314                            message: message.clone(),
315                            retryable,
316                        }));
317                }
318            }
319        }
320
321        self.emit_event(ClientEvent::HistoryBatchCompleted(
322            HistoryBatchCompletedEvent {
323                requested: request.symbols.len(),
324                successes: selected.successes.len(),
325                missing: selected.missing.len(),
326                failures: selected.failures.len(),
327                concurrency: effective_concurrency,
328                mode: HistoryBatchMode::Detailed,
329            },
330        ));
331
332        #[cfg(feature = "tracing")]
333        debug!(
334            target: "tvdata_rs::history",
335            asof = %request.asof,
336            successes = selected.successes.len(),
337            missing = selected.missing.len(),
338            failures = selected.failures.len(),
339            "daily bar selection completed",
340        );
341
342        Ok(selected)
343    }
344
345    /// Downloads daily history and trims each successful series to the requested date window.
346    pub async fn daily_bars_range(
347        &self,
348        request: &DailyBarRangeRequest,
349    ) -> Result<BatchResult<HistorySeries>> {
350        if request.start > request.end {
351            return Ok(BatchResult::default());
352        }
353
354        #[cfg(feature = "tracing")]
355        debug!(
356            target: "tvdata_rs::history",
357            symbols = request.symbols.len(),
358            start = %request.start,
359            end = %request.end,
360            concurrency = request.concurrency,
361            "starting daily history range selection",
362        );
363
364        let history_request = daily_batch_request(
365            &request.symbols,
366            request.start,
367            request.session,
368            request.adjustment,
369            request.concurrency,
370        );
371        let batch = self.history_batch_detailed(&history_request).await?;
372
373        let mut selected = BatchResult {
374            missing: batch.missing,
375            failures: batch.failures,
376            ..BatchResult::default()
377        };
378
379        for (ticker, mut series) in batch.successes {
380            series
381                .bars
382                .retain(|bar| bar.time.date() >= request.start && bar.time.date() <= request.end);
383
384            if series.bars.is_empty() {
385                selected.missing.push(ticker);
386            } else {
387                selected.successes.insert(ticker, series);
388            }
389        }
390
391        #[cfg(feature = "tracing")]
392        debug!(
393            target: "tvdata_rs::history",
394            start = %request.start,
395            end = %request.end,
396            successes = selected.successes.len(),
397            missing = selected.missing.len(),
398            failures = selected.failures.len(),
399            "daily history range selection completed",
400        );
401
402        Ok(selected)
403    }
404}
405
406pub(crate) use fetch::fetch_history_with_timeout_for_client;