1mod fetch;
2mod request;
3
4use futures_util::stream::{self, StreamExt as FuturesStreamExt};
5use request::HistorySeriesMap;
6use time::{Date, OffsetDateTime};
7#[cfg(feature = "tracing")]
8use tracing::debug;
9
10use crate::batch::{BatchResult, SymbolFailure};
11use crate::client::{ClientEvent, HistoryBatchCompletedEvent, HistoryBatchMode, TradingViewClient};
12use crate::error::Result;
13use crate::scanner::{InstrumentRef, Ticker};
14
15pub use request::{
16 Adjustment, Bar, BarSelectionPolicy, DailyBarRangeRequest, DailyBarRequest,
17 HistoryBatchRequest, HistoryProvenance, HistoryRequest, HistorySeries, Interval,
18 TradingSession,
19};
20
21fn estimated_daily_bars_since(date: Date) -> u32 {
22 let today = OffsetDateTime::now_utc().date();
23 let days = if date <= today {
24 (today - date).whole_days().max(0) as u32
25 } else {
26 0
27 };
28
29 days.saturating_add(32).max(64)
30}
31
32fn daily_batch_request(
33 symbols: &[InstrumentRef],
34 start: Date,
35 session: TradingSession,
36 adjustment: Adjustment,
37 concurrency: usize,
38) -> HistoryBatchRequest {
39 HistoryBatchRequest::new(
40 symbols.iter().cloned().map(Into::<Ticker>::into),
41 Interval::Day1,
42 estimated_daily_bars_since(start),
43 )
44 .session(session)
45 .adjustment(adjustment)
46 .concurrency(concurrency)
47}
48
49fn daily_bar_socket_chunk_size(symbols: usize, socket_concurrency: usize) -> usize {
50 if symbols == 0 {
51 return 0;
52 }
53
54 let socket_concurrency = socket_concurrency.max(1);
55 symbols.div_ceil(socket_concurrency).clamp(16, 64)
56}
57
58impl TradingViewClient {
59 pub async fn history_batch(&self, request: &HistoryBatchRequest) -> Result<Vec<HistorySeries>> {
77 let effective_concurrency = self.effective_history_batch_concurrency(request.concurrency);
78
79 #[cfg(feature = "tracing")]
80 debug!(
81 target: "tvdata_rs::history",
82 requested = request.symbols.len(),
83 interval = request.interval.as_code(),
84 bars = request.bars,
85 requested_concurrency = request.concurrency,
86 effective_concurrency,
87 "starting history batch",
88 );
89
90 let series = fetch::fetch_history_batch_with(
91 request.to_requests(),
92 effective_concurrency,
93 |request| async move { self.history(&request).await },
94 )
95 .await?;
96
97 self.emit_event(ClientEvent::HistoryBatchCompleted(
98 HistoryBatchCompletedEvent {
99 requested: request.symbols.len(),
100 successes: series.len(),
101 missing: 0,
102 failures: 0,
103 concurrency: effective_concurrency,
104 mode: HistoryBatchMode::Strict,
105 },
106 ));
107
108 Ok(series)
109 }
110
111 pub async fn history_batch_detailed(
114 &self,
115 request: &HistoryBatchRequest,
116 ) -> Result<BatchResult<HistorySeries>> {
117 let effective_concurrency = self.effective_history_batch_concurrency(request.concurrency);
118
119 #[cfg(feature = "tracing")]
120 debug!(
121 target: "tvdata_rs::history",
122 requested = request.symbols.len(),
123 interval = request.interval.as_code(),
124 bars = request.bars,
125 requested_concurrency = request.concurrency,
126 effective_concurrency,
127 "starting detailed history batch",
128 );
129
130 let batch = fetch::fetch_history_batch_detailed_with(
131 request.to_requests(),
132 effective_concurrency,
133 |request| async move { self.history(&request).await },
134 )
135 .await?;
136
137 self.emit_event(ClientEvent::HistoryBatchCompleted(
138 HistoryBatchCompletedEvent {
139 requested: request.symbols.len(),
140 successes: batch.successes.len(),
141 missing: batch.missing.len(),
142 failures: batch.failures.len(),
143 concurrency: effective_concurrency,
144 mode: HistoryBatchMode::Detailed,
145 },
146 ));
147
148 Ok(batch)
149 }
150
151 pub async fn download_history_max<I, T>(
173 &self,
174 symbols: I,
175 interval: Interval,
176 ) -> Result<Vec<HistorySeries>>
177 where
178 I: IntoIterator<Item = T>,
179 T: Into<Ticker>,
180 {
181 let defaults = self.history_config();
182 let request = HistoryBatchRequest::max(symbols, interval)
183 .session(defaults.default_session)
184 .adjustment(defaults.default_adjustment)
185 .concurrency(defaults.default_batch_concurrency);
186 self.history_batch(&request).await
187 }
188
189 pub async fn download_history<I, T>(
191 &self,
192 symbols: I,
193 interval: Interval,
194 bars: u32,
195 ) -> Result<Vec<HistorySeries>>
196 where
197 I: IntoIterator<Item = T>,
198 T: Into<Ticker>,
199 {
200 let defaults = self.history_config();
201 let request = HistoryBatchRequest::new(symbols, interval, bars)
202 .session(defaults.default_session)
203 .adjustment(defaults.default_adjustment)
204 .concurrency(defaults.default_batch_concurrency);
205 self.history_batch(&request).await
206 }
207
208 pub async fn download_history_map<I, T>(
210 &self,
211 symbols: I,
212 interval: Interval,
213 bars: u32,
214 ) -> Result<HistorySeriesMap>
215 where
216 I: IntoIterator<Item = T>,
217 T: Into<Ticker>,
218 {
219 let series = self.download_history(symbols, interval, bars).await?;
220 Ok(series
221 .into_iter()
222 .map(|series| (series.symbol.clone(), series))
223 .collect())
224 }
225
226 pub async fn download_history_map_max<I, T>(
228 &self,
229 symbols: I,
230 interval: Interval,
231 ) -> Result<HistorySeriesMap>
232 where
233 I: IntoIterator<Item = T>,
234 T: Into<Ticker>,
235 {
236 let series = self.download_history_max(symbols, interval).await?;
237 Ok(series
238 .into_iter()
239 .map(|series| (series.symbol.clone(), series))
240 .collect())
241 }
242
243 pub async fn daily_bars_on(&self, request: &DailyBarRequest) -> Result<BatchResult<Bar>> {
246 if request.symbols.is_empty() {
247 return Ok(BatchResult::default());
248 }
249
250 let effective_concurrency = self.effective_history_batch_concurrency(request.concurrency);
251
252 #[cfg(feature = "tracing")]
253 debug!(
254 target: "tvdata_rs::history",
255 symbols = request.symbols.len(),
256 asof = %request.asof,
257 selection = ?request.selection,
258 requested_concurrency = request.concurrency,
259 effective_concurrency,
260 "starting daily bar selection",
261 );
262
263 let tickers = request
264 .symbols
265 .iter()
266 .cloned()
267 .map(Into::<Ticker>::into)
268 .collect::<Vec<_>>();
269 let chunk_size = daily_bar_socket_chunk_size(tickers.len(), effective_concurrency);
270
271 let mut outcomes = stream::iter(
272 tickers
273 .chunks(chunk_size)
274 .map(|chunk| chunk.to_vec())
275 .enumerate()
276 .map(|(index, chunk)| async move {
277 let outcome = fetch::fetch_daily_bars_batch_with_timeout_for_client(
278 self,
279 &chunk,
280 request.asof,
281 request.selection,
282 request.session,
283 request.adjustment,
284 self.history_config().session_timeout,
285 )
286 .await;
287 (index, chunk, outcome)
288 }),
289 )
290 .buffer_unordered(effective_concurrency)
291 .collect::<Vec<_>>()
292 .await;
293
294 outcomes.sort_by_key(|(index, _, _)| *index);
295
296 let mut selected = BatchResult::default();
297 for (_, chunk, outcome) in outcomes {
298 match outcome {
299 Ok(batch) => {
300 selected.successes.extend(batch.successes);
301 selected.missing.extend(batch.missing);
302 selected.failures.extend(batch.failures);
303 }
304 Err(error) if error.is_symbol_error() => selected.missing.extend(chunk),
305 Err(error) => {
306 let kind = error.kind();
307 let retryable = error.is_retryable();
308 let message = error.to_string();
309 selected
310 .failures
311 .extend(chunk.into_iter().map(|ticker| SymbolFailure {
312 symbol: ticker,
313 kind,
314 message: message.clone(),
315 retryable,
316 }));
317 }
318 }
319 }
320
321 self.emit_event(ClientEvent::HistoryBatchCompleted(
322 HistoryBatchCompletedEvent {
323 requested: request.symbols.len(),
324 successes: selected.successes.len(),
325 missing: selected.missing.len(),
326 failures: selected.failures.len(),
327 concurrency: effective_concurrency,
328 mode: HistoryBatchMode::Detailed,
329 },
330 ));
331
332 #[cfg(feature = "tracing")]
333 debug!(
334 target: "tvdata_rs::history",
335 asof = %request.asof,
336 successes = selected.successes.len(),
337 missing = selected.missing.len(),
338 failures = selected.failures.len(),
339 "daily bar selection completed",
340 );
341
342 Ok(selected)
343 }
344
345 pub async fn daily_bars_range(
347 &self,
348 request: &DailyBarRangeRequest,
349 ) -> Result<BatchResult<HistorySeries>> {
350 if request.start > request.end {
351 return Ok(BatchResult::default());
352 }
353
354 #[cfg(feature = "tracing")]
355 debug!(
356 target: "tvdata_rs::history",
357 symbols = request.symbols.len(),
358 start = %request.start,
359 end = %request.end,
360 concurrency = request.concurrency,
361 "starting daily history range selection",
362 );
363
364 let history_request = daily_batch_request(
365 &request.symbols,
366 request.start,
367 request.session,
368 request.adjustment,
369 request.concurrency,
370 );
371 let batch = self.history_batch_detailed(&history_request).await?;
372
373 let mut selected = BatchResult {
374 missing: batch.missing,
375 failures: batch.failures,
376 ..BatchResult::default()
377 };
378
379 for (ticker, mut series) in batch.successes {
380 series
381 .bars
382 .retain(|bar| bar.time.date() >= request.start && bar.time.date() <= request.end);
383
384 if series.bars.is_empty() {
385 selected.missing.push(ticker);
386 } else {
387 selected.successes.insert(ticker, series);
388 }
389 }
390
391 #[cfg(feature = "tracing")]
392 debug!(
393 target: "tvdata_rs::history",
394 start = %request.start,
395 end = %request.end,
396 successes = selected.successes.len(),
397 missing = selected.missing.len(),
398 failures = selected.failures.len(),
399 "daily history range selection completed",
400 );
401
402 Ok(selected)
403 }
404}
405
406pub(crate) use fetch::fetch_history_with_timeout_for_client;