1use std::collections::HashSet;
2use std::fs::File as StdFile;
3use std::io::{BufRead as StdBufRead, BufReader as StdBufReader};
4use std::path::{Path, PathBuf};
5
6use anyhow::{anyhow, Context, Result};
7use async_trait::async_trait;
8use chrono::{DateTime, Days, Utc};
9use futures::StreamExt;
10use reqwest::{Client, StatusCode};
11use rust_decimal::Decimal;
12use serde::Deserialize;
13use serde_json::Value as JsonValue;
14use tesser_core::{Candle, Interval, Side, Symbol, Tick};
15use tokio::fs::{self, OpenOptions};
16use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
17use tokio::task;
18use tracing::{debug, info};
19use zip::ZipArchive;
20
21const MAX_LIMIT: usize = 1000;
22const BYBIT_PUBLIC_BASE_URL: &str = "https://public.bybit.com/trading";
23const BINANCE_PUBLIC_BASE_URL: &str = "https://data.binance.vision/data/futures/um/daily/aggTrades";
24const NANOS_PER_SECOND: i64 = 1_000_000_000;
25
26#[async_trait]
27pub trait MarketDataDownloader {
28 async fn download_klines(&self, req: &KlineRequest<'_>) -> Result<Vec<Candle>>;
29 async fn download_trades(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>>;
30}
31
32#[derive(Clone, Copy, Debug, Eq, PartialEq)]
33pub enum TradeSource {
34 Rest,
35 BybitPublicArchive,
36 BinancePublicArchive,
37}
38
39#[derive(Clone)]
41pub struct TradeRequest<'a> {
42 pub symbol: &'a str,
43 pub category: Option<&'a str>,
44 pub start: DateTime<Utc>,
45 pub end: DateTime<Utc>,
46 pub limit: usize,
47 pub source: TradeSource,
48 pub public_data_url: Option<&'a str>,
49 pub archive_cache_dir: Option<PathBuf>,
50 pub resume_archives: bool,
51}
52
53impl<'a> TradeRequest<'a> {
54 pub fn new(symbol: &'a str, start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
55 Self {
56 symbol,
57 category: None,
58 start,
59 end,
60 limit: MAX_LIMIT,
61 source: TradeSource::Rest,
62 public_data_url: None,
63 archive_cache_dir: None,
64 resume_archives: false,
65 }
66 }
67
68 #[must_use]
69 pub fn with_category(mut self, category: &'a str) -> Self {
70 self.category = Some(category);
71 self
72 }
73
74 #[must_use]
75 pub fn with_limit(mut self, limit: usize) -> Self {
76 self.limit = limit.clamp(1, MAX_LIMIT);
77 self
78 }
79
80 #[must_use]
81 pub fn with_source(mut self, source: TradeSource) -> Self {
82 self.source = source;
83 self
84 }
85
86 #[must_use]
87 pub fn with_public_data_url(mut self, url: &'a str) -> Self {
88 self.public_data_url = Some(url);
89 self
90 }
91
92 #[must_use]
93 pub fn with_archive_cache_dir(mut self, dir: PathBuf) -> Self {
94 self.archive_cache_dir = Some(dir);
95 self
96 }
97
98 #[must_use]
99 pub fn with_resume_archives(mut self, resume: bool) -> Self {
100 self.resume_archives = resume;
101 self
102 }
103}
104
105#[derive(Clone, Debug)]
107pub struct NormalizedTrade {
108 pub tick: Tick,
109 pub trade_id: Option<String>,
110}
111
112impl NormalizedTrade {
113 pub fn new(tick: Tick, trade_id: Option<String>) -> Self {
114 Self { tick, trade_id }
115 }
116}
117
118pub struct KlineRequest<'a> {
120 pub category: &'a str,
121 pub symbol: &'a str,
122 pub interval: Interval,
123 pub start: DateTime<Utc>,
124 pub end: DateTime<Utc>,
125 pub limit: usize,
126}
127
128impl<'a> KlineRequest<'a> {
129 pub fn new(
130 category: &'a str,
131 symbol: &'a str,
132 interval: Interval,
133 start: DateTime<Utc>,
134 end: DateTime<Utc>,
135 ) -> Self {
136 Self {
137 category,
138 symbol,
139 interval,
140 start,
141 end,
142 limit: MAX_LIMIT,
143 }
144 }
145}
146
147pub struct BybitDownloader {
149 client: Client,
150 base_url: String,
151}
152
153impl BybitDownloader {
154 pub fn new(base_url: impl Into<String>) -> Self {
155 Self {
156 client: Client::new(),
157 base_url: base_url.into(),
158 }
159 }
160
161 fn endpoint(&self, path: &str) -> String {
162 let base = self.base_url.trim_end_matches('/');
163 format!("{base}/{path}")
164 }
165
166 pub async fn download_klines(&self, req: &KlineRequest<'_>) -> Result<Vec<Candle>> {
168 <Self as MarketDataDownloader>::download_klines(self, req).await
169 }
170
171 pub async fn download_trades(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
173 <Self as MarketDataDownloader>::download_trades(self, req).await
174 }
175}
176
177#[async_trait]
178impl MarketDataDownloader for BybitDownloader {
179 async fn download_klines(&self, req: &KlineRequest<'_>) -> Result<Vec<Candle>> {
180 let mut cursor = req.start.timestamp_millis();
181 let end_ms = req.end.timestamp_millis();
182 if cursor >= end_ms {
183 return Err(anyhow!("start must be earlier than end"));
184 }
185
186 let mut candles = Vec::new();
187 let interval_ms = req.interval.as_duration().num_milliseconds();
188
189 while cursor < end_ms {
190 let limit = req.limit.min(MAX_LIMIT).to_string();
191 let response = self
192 .client
193 .get(self.endpoint("v5/market/kline"))
194 .query(&[
195 ("category", req.category),
196 ("symbol", req.symbol),
197 ("interval", req.interval.to_bybit()),
198 ("start", &cursor.to_string()),
199 ("end", &end_ms.to_string()),
200 ("limit", &limit),
201 ])
202 .send()
203 .await
204 .context("request to Bybit failed")?;
205
206 let status = response.status();
207 let body = response
208 .text()
209 .await
210 .context("failed to read Bybit response body")?;
211 debug!(
212 "bybit kline response (status {}): {}",
213 status,
214 truncate(&body, 512)
215 );
216 if !status.is_success() {
217 return Err(anyhow!(
218 "Bybit responded with status {}: {}",
219 status,
220 truncate(&body, 256)
221 ));
222 }
223
224 let response: BybitKlineResponse = serde_json::from_str(&body).map_err(|err| {
225 anyhow!(
226 "failed to parse Bybit response: {} (body snippet: {})",
227 err,
228 truncate(&body, 256)
229 )
230 })?;
231
232 if response.ret_code != 0 {
233 return Err(anyhow!(
234 "Bybit returned error {}: {}",
235 response.ret_code,
236 response.ret_msg
237 ));
238 }
239
240 let result = match response.result {
241 Some(result) => result,
242 None => break,
243 };
244 if result.list.is_empty() {
245 break;
246 }
247
248 let mut batch = Vec::new();
249 for entry in result.list {
250 if let Some(candle) = parse_entry(&entry, req.symbol, req.interval) {
251 if candle.timestamp.timestamp_millis() >= cursor
252 && candle.timestamp.timestamp_millis() <= end_ms
253 {
254 batch.push(candle);
255 }
256 }
257 }
258
259 if batch.is_empty() {
260 break;
261 }
262
263 batch.sort_by_key(|c| c.timestamp);
264 cursor = batch
265 .last()
266 .map(|c| c.timestamp.timestamp_millis() + interval_ms)
267 .unwrap_or(end_ms);
268 candles.extend(batch);
269 }
270
271 candles.sort_by_key(|c| c.timestamp);
272 candles.dedup_by_key(|c| c.timestamp);
273 Ok(candles)
274 }
275
276 async fn download_trades(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
277 match req.source {
278 TradeSource::Rest => self.download_trades_rest(req).await,
279 TradeSource::BybitPublicArchive => self.download_trades_public(req).await,
280 TradeSource::BinancePublicArchive => Err(anyhow!(
281 "binance public archive source is invalid for Bybit requests"
282 )),
283 }
284 }
285}
286
287impl BybitDownloader {
288 async fn download_trades_rest(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
289 let start_ms = req.start.timestamp_millis();
290 let end_ms = req.end.timestamp_millis();
291 if start_ms >= end_ms {
292 return Err(anyhow!("start must be earlier than end"));
293 }
294
295 let mut trades = Vec::new();
296 let mut seen_ids = HashSet::new();
297 let mut cursor: Option<String> = None;
298 let limit = req.limit.min(MAX_LIMIT);
299
300 loop {
301 let mut params = Vec::with_capacity(6);
302 if let Some(category) = req.category {
303 params.push(("category", category.to_string()));
304 }
305 params.push(("symbol", req.symbol.to_string()));
306 params.push(("start", start_ms.to_string()));
307 params.push(("end", end_ms.to_string()));
308 params.push(("limit", limit.to_string()));
309 if let Some(token) = &cursor {
310 params.push(("cursor", token.clone()));
311 }
312 let params_ref: Vec<(&str, &str)> =
313 params.iter().map(|(k, v)| (*k, v.as_str())).collect();
314
315 let response = self
316 .client
317 .get(self.endpoint("v5/market/history-trade"))
318 .query(¶ms_ref)
319 .send()
320 .await
321 .context("request to Bybit failed")?;
322
323 let status = response.status();
324 let body = response
325 .text()
326 .await
327 .context("failed to read Bybit response body")?;
328 debug!(
329 "bybit trades response (status {}): {}",
330 status,
331 truncate(&body, 512)
332 );
333 if !status.is_success() {
334 return Err(anyhow!(
335 "Bybit responded with status {}: {}",
336 status,
337 truncate(&body, 256)
338 ));
339 }
340
341 let response: BybitTradeResponse = serde_json::from_str(&body).map_err(|err| {
342 anyhow!(
343 "failed to parse Bybit response: {} (body snippet: {})",
344 err,
345 truncate(&body, 256)
346 )
347 })?;
348
349 if response.ret_code != 0 {
350 return Err(anyhow!(
351 "Bybit returned error {}: {}",
352 response.ret_code,
353 response.ret_msg
354 ));
355 }
356 let Some(result) = response.result else {
357 break;
358 };
359 if result.list.is_empty() {
360 break;
361 }
362
363 for entry in result.list {
364 if !seen_ids.insert(entry.exec_id.clone()) {
365 continue;
366 }
367 if let Some(trade) = parse_bybit_trade(req.symbol, entry) {
368 if trade.tick.exchange_timestamp.timestamp_millis() < start_ms
369 || trade.tick.exchange_timestamp.timestamp_millis() > end_ms
370 {
371 continue;
372 }
373 trades.push(trade);
374 }
375 }
376
377 if let Some(next_cursor) = result.next_page_cursor {
378 cursor = Some(next_cursor);
379 } else {
380 break;
381 }
382 }
383
384 trades.sort_by_key(|trade| trade.tick.exchange_timestamp);
385 trades.dedup_by(|a, b| {
386 a.tick.exchange_timestamp == b.tick.exchange_timestamp
387 && a.tick.price == b.tick.price
388 && a.tick.size == b.tick.size
389 && a.tick.side == b.tick.side
390 });
391 Ok(trades)
392 }
393
394 async fn download_trades_public(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
395 let mut cursor_date = req.start.date_naive();
396 let end_date = req.end.date_naive();
397 let mut trades = Vec::new();
398 let mut seen_ids = HashSet::new();
399 let base_url = req.public_data_url.unwrap_or(BYBIT_PUBLIC_BASE_URL);
400 let cache_root = resolve_archive_cache_dir(req, "bybit", req.symbol);
401 let total_days = (end_date
402 .signed_duration_since(cursor_date)
403 .num_days()
404 .max(0)
405 + 1)
406 .try_into()
407 .unwrap_or(0u32);
408 info!(
409 symbol = req.symbol,
410 "downloading {} day(s) from Bybit public archive", total_days
411 );
412
413 while cursor_date <= end_date {
414 let next_date = cursor_date
415 .checked_add_days(Days::new(1))
416 .unwrap_or(cursor_date);
417 let day_start = DateTime::<Utc>::from_naive_utc_and_offset(
418 cursor_date
419 .and_hms_opt(0, 0, 0)
420 .ok_or_else(|| anyhow!("invalid day {}", cursor_date))?,
421 Utc,
422 )
423 .max(req.start);
424 let day_end = DateTime::<Utc>::from_naive_utc_and_offset(
425 next_date
426 .and_hms_opt(0, 0, 0)
427 .ok_or_else(|| anyhow!("invalid day {}", cursor_date))?,
428 Utc,
429 )
430 .min(req.end);
431 if day_start >= day_end {
432 if next_date == cursor_date {
433 break;
434 }
435 cursor_date = next_date;
436 continue;
437 }
438
439 let filename = format!("{}_{}.csv.gz", req.symbol, cursor_date.format("%Y-%m-%d"));
440 let cache_path = cache_root.join(&filename);
441 let url = format!(
442 "{}/{symbol}/{symbol}{}.csv.gz",
443 base_url,
444 cursor_date.format("%Y-%m-%d"),
445 symbol = req.symbol
446 );
447 if download_archive_file(&self.client, &url, &cache_path, req.resume_archives)
448 .await?
449 .is_none()
450 {
451 if next_date == cursor_date {
452 break;
453 }
454 cursor_date = next_date;
455 continue;
456 }
457 let mut day_trades = read_bybit_archive(
458 &cache_path,
459 req.symbol,
460 day_start.timestamp_millis(),
461 day_end.timestamp_millis(),
462 &mut seen_ids,
463 )
464 .await?;
465 trades.append(&mut day_trades);
466
467 if next_date == cursor_date {
468 break;
469 }
470 cursor_date = next_date;
471 }
472
473 trades.sort_by_key(|trade| trade.tick.exchange_timestamp);
474 trades.dedup_by(|a, b| {
475 a.tick.exchange_timestamp == b.tick.exchange_timestamp
476 && a.tick.price == b.tick.price
477 && a.tick.size == b.tick.size
478 && a.tick.side == b.tick.side
479 });
480 Ok(trades)
481 }
482}
483
484fn parse_entry(entry: &[String], symbol: &str, interval: Interval) -> Option<Candle> {
485 if entry.len() < 6 {
486 return None;
487 }
488 let ts = entry.first()?.parse::<i64>().ok()?;
489 let timestamp = DateTime::<Utc>::from_timestamp_millis(ts)?;
490 let open = entry.get(1)?.parse::<Decimal>().ok()?;
491 let high = entry.get(2)?.parse::<Decimal>().ok()?;
492 let low = entry.get(3)?.parse::<Decimal>().ok()?;
493 let close = entry.get(4)?.parse::<Decimal>().ok()?;
494 let volume = entry.get(5)?.parse::<Decimal>().ok()?;
495 Some(Candle {
496 symbol: Symbol::from(symbol),
497 interval,
498 open,
499 high,
500 low,
501 close,
502 volume,
503 timestamp,
504 })
505}
506
507#[derive(Debug, Deserialize)]
508struct BybitKlineResponse {
509 #[serde(rename = "retCode")]
510 ret_code: i64,
511 #[serde(rename = "retMsg")]
512 ret_msg: String,
513 result: Option<KlineResult>,
514}
515
516#[derive(Debug, Deserialize)]
517struct KlineResult {
518 list: Vec<Vec<String>>,
519}
520
521#[derive(Debug, Deserialize)]
522struct BybitTradeResponse {
523 #[serde(rename = "retCode")]
524 ret_code: i64,
525 #[serde(rename = "retMsg")]
526 ret_msg: String,
527 result: Option<BybitTradeResult>,
528}
529
530#[derive(Debug, Deserialize)]
531struct BybitTradeResult {
532 list: Vec<BybitTradeEntry>,
533 #[serde(rename = "nextPageCursor")]
534 next_page_cursor: Option<String>,
535}
536
537#[derive(Debug, Deserialize)]
538struct BybitTradeEntry {
539 #[serde(rename = "execId")]
540 exec_id: String,
541 price: String,
542 size: String,
543 side: String,
544 #[serde(rename = "time", alias = "execTime", alias = "tradeTime")]
545 time: String,
546}
547
548pub struct BinanceDownloader {
550 client: Client,
551 base_url: String,
552}
553
554impl BinanceDownloader {
555 pub fn new(base_url: impl Into<String>) -> Self {
556 Self {
557 client: Client::new(),
558 base_url: base_url.into(),
559 }
560 }
561
562 fn endpoint(&self, path: &str) -> String {
563 let base = self.base_url.trim_end_matches('/');
564 format!("{base}/{path}")
565 }
566
567 pub async fn download_klines(&self, req: &KlineRequest<'_>) -> Result<Vec<Candle>> {
568 <Self as MarketDataDownloader>::download_klines(self, req).await
569 }
570
571 pub async fn download_agg_trades(
573 &self,
574 req: &TradeRequest<'_>,
575 ) -> Result<Vec<NormalizedTrade>> {
576 self.fetch_agg_trades(req).await
577 }
578
579 pub async fn download_trades(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
581 <Self as MarketDataDownloader>::download_trades(self, req).await
582 }
583
584 async fn fetch_agg_trades(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
585 let mut cursor = req.start.timestamp_millis();
586 let end_ms = req.end.timestamp_millis();
587 if cursor >= end_ms {
588 return Err(anyhow!("start must be earlier than end"));
589 }
590 let limit = req.limit.min(MAX_LIMIT);
591 let mut trades = Vec::new();
592 let mut seen_ids = HashSet::new();
593 while cursor < end_ms {
594 let response = self
595 .client
596 .get(self.endpoint("fapi/v1/aggTrades"))
597 .query(&[
598 ("symbol", req.symbol),
599 ("startTime", &cursor.to_string()),
600 ("endTime", &end_ms.to_string()),
601 ("limit", &limit.to_string()),
602 ])
603 .send()
604 .await
605 .context("request to Binance failed")?;
606 let status = response.status();
607 let body = response
608 .text()
609 .await
610 .context("failed to read Binance response body")?;
611 debug!(
612 "binance aggTrades response (status {}): {}",
613 status,
614 truncate(&body, 512)
615 );
616 if !status.is_success() {
617 return Err(anyhow!(
618 "Binance responded with status {}: {}",
619 status,
620 truncate(&body, 256)
621 ));
622 }
623 let entries: Vec<BinanceAggTrade> = serde_json::from_str(&body).map_err(|err| {
624 anyhow!(
625 "failed to parse Binance response: {} (body snippet: {})",
626 err,
627 truncate(&body, 256)
628 )
629 })?;
630 if entries.is_empty() {
631 break;
632 }
633 let mut last_ts: Option<i64> = None;
634 for entry in entries {
635 if !seen_ids.insert(entry.agg_id) {
636 continue;
637 }
638 if let Some(trade) = parse_binance_trade(req.symbol, entry) {
639 let ts = trade.tick.exchange_timestamp.timestamp_millis();
640 last_ts = Some(last_ts.map_or(ts, |prev| prev.max(ts)));
641 trades.push(trade);
642 }
643 }
644 if let Some(ts) = last_ts {
645 cursor = ts + 1;
646 } else {
647 break;
648 }
649 }
650 trades.sort_by_key(|trade| trade.tick.exchange_timestamp);
651 trades.dedup_by(|a, b| {
652 a.tick.exchange_timestamp == b.tick.exchange_timestamp
653 && a.tick.price == b.tick.price
654 && a.tick.size == b.tick.size
655 && a.tick.side == b.tick.side
656 });
657 Ok(trades)
658 }
659
660 async fn download_trades_public(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
661 let mut cursor_date = req.start.date_naive();
662 let end_date = req.end.date_naive();
663 let mut trades = Vec::new();
664 let mut seen_ids = HashSet::new();
665 let base_url = req.public_data_url.unwrap_or(BINANCE_PUBLIC_BASE_URL);
666 let cache_root = resolve_archive_cache_dir(req, "binance", req.symbol);
667
668 while cursor_date <= end_date {
669 let next_date = cursor_date
670 .checked_add_days(Days::new(1))
671 .unwrap_or(cursor_date);
672 let day_start = DateTime::<Utc>::from_naive_utc_and_offset(
673 cursor_date
674 .and_hms_opt(0, 0, 0)
675 .ok_or_else(|| anyhow!("invalid date {}", cursor_date))?,
676 Utc,
677 )
678 .max(req.start);
679 let day_end = DateTime::<Utc>::from_naive_utc_and_offset(
680 next_date
681 .and_hms_opt(0, 0, 0)
682 .ok_or_else(|| anyhow!("invalid date {}", next_date))?,
683 Utc,
684 )
685 .min(req.end);
686 if day_start >= day_end {
687 if next_date == cursor_date {
688 break;
689 }
690 cursor_date = next_date;
691 continue;
692 }
693
694 let filename = format!(
695 "{}-aggTrades-{}.zip",
696 req.symbol,
697 cursor_date.format("%Y-%m-%d")
698 );
699 let cache_path = cache_root.join(&filename);
700 let url = format!("{}/{symbol}/{filename}", base_url, symbol = req.symbol);
701 if download_archive_file(&self.client, &url, &cache_path, req.resume_archives)
702 .await?
703 .is_none()
704 {
705 if next_date == cursor_date {
706 break;
707 }
708 cursor_date = next_date;
709 continue;
710 }
711 let parsed = read_binance_archive(cache_path.clone(), req.symbol.to_string()).await?;
712 let start_ms = day_start.timestamp_millis();
713 let end_ms = day_end.timestamp_millis();
714 for trade in parsed {
715 let ts = trade.tick.exchange_timestamp.timestamp_millis();
716 if ts < start_ms || ts > end_ms {
717 continue;
718 }
719 if let Some(id) = trade.trade_id.as_ref() {
720 if !seen_ids.insert(id.clone()) {
721 continue;
722 }
723 }
724 trades.push(trade);
725 }
726
727 if next_date == cursor_date {
728 break;
729 }
730 cursor_date = next_date;
731 }
732
733 trades.sort_by_key(|trade| trade.tick.exchange_timestamp);
734 trades.dedup_by(|a, b| {
735 a.tick.exchange_timestamp == b.tick.exchange_timestamp
736 && a.tick.price == b.tick.price
737 && a.tick.size == b.tick.size
738 && a.tick.side == b.tick.side
739 });
740 Ok(trades)
741 }
742}
743
744#[async_trait]
745impl MarketDataDownloader for BinanceDownloader {
746 async fn download_klines(&self, req: &KlineRequest<'_>) -> Result<Vec<Candle>> {
747 let mut cursor = req.start.timestamp_millis();
748 let end_ms = req.end.timestamp_millis();
749 if cursor >= end_ms {
750 return Err(anyhow!("start must be earlier than end"));
751 }
752 let mut candles = Vec::new();
753 let interval_ms = req.interval.as_duration().num_milliseconds();
754 while cursor < end_ms {
755 let response = self
756 .client
757 .get(self.endpoint("fapi/v1/klines"))
758 .query(&[
759 ("symbol", req.symbol),
760 ("interval", req.interval.to_binance()),
761 ("startTime", &cursor.to_string()),
762 ("endTime", &end_ms.to_string()),
763 ("limit", &req.limit.min(MAX_LIMIT).to_string()),
764 ])
765 .send()
766 .await
767 .context("request to Binance failed")?;
768 let status = response.status();
769 let body = response
770 .text()
771 .await
772 .context("failed to read Binance response body")?;
773 debug!(
774 "binance kline response (status {}): {}",
775 status,
776 truncate(&body, 512)
777 );
778 if !status.is_success() {
779 return Err(anyhow!(
780 "Binance responded with status {}: {}",
781 status,
782 truncate(&body, 256)
783 ));
784 }
785 let entries: Vec<Vec<JsonValue>> = serde_json::from_str(&body).map_err(|err| {
786 anyhow!(
787 "failed to parse Binance response: {} (body snippet: {})",
788 err,
789 truncate(&body, 256)
790 )
791 })?;
792 if entries.is_empty() {
793 break;
794 }
795 let mut batch = Vec::new();
796 for entry in entries {
797 if let Some(candle) = parse_binance_entry(&entry, req.symbol, req.interval) {
798 if candle.timestamp.timestamp_millis() >= cursor
799 && candle.timestamp.timestamp_millis() <= end_ms
800 {
801 batch.push(candle);
802 }
803 }
804 }
805 if batch.is_empty() {
806 break;
807 }
808 batch.sort_by_key(|c| c.timestamp);
809 cursor = batch
810 .last()
811 .map(|c| c.timestamp.timestamp_millis() + interval_ms)
812 .unwrap_or(end_ms);
813 candles.extend(batch);
814 }
815 candles.sort_by_key(|c| c.timestamp);
816 candles.dedup_by_key(|c| c.timestamp);
817 Ok(candles)
818 }
819
820 async fn download_trades(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
821 match req.source {
822 TradeSource::Rest => self.fetch_agg_trades(req).await,
823 TradeSource::BinancePublicArchive => self.download_trades_public(req).await,
824 TradeSource::BybitPublicArchive => Err(anyhow!(
825 "bybit public archive source is invalid for Binance requests"
826 )),
827 }
828 }
829}
830
831fn parse_binance_entry(entry: &[JsonValue], symbol: &str, interval: Interval) -> Option<Candle> {
832 if entry.len() < 6 {
833 return None;
834 }
835 let ts = entry.first()?.as_i64()?;
836 let timestamp = DateTime::<Utc>::from_timestamp_millis(ts)?;
837 let open = entry.get(1)?.as_str()?.parse::<Decimal>().ok()?;
838 let high = entry.get(2)?.as_str()?.parse::<Decimal>().ok()?;
839 let low = entry.get(3)?.as_str()?.parse::<Decimal>().ok()?;
840 let close = entry.get(4)?.as_str()?.parse::<Decimal>().ok()?;
841 let volume = entry.get(5)?.as_str()?.parse::<Decimal>().ok()?;
842 Some(Candle {
843 symbol: Symbol::from(symbol),
844 interval,
845 open,
846 high,
847 low,
848 close,
849 volume,
850 timestamp,
851 })
852}
853
854#[derive(Debug, Deserialize)]
855struct BinanceAggTrade {
856 #[serde(rename = "a")]
857 agg_id: u64,
858 #[serde(rename = "p")]
859 price: String,
860 #[serde(rename = "q")]
861 quantity: String,
862 #[serde(rename = "T")]
863 timestamp: i64,
864 #[serde(rename = "m")]
865 is_buyer_maker: bool,
866}
867
868fn parse_bybit_public_line(symbol: &str, line: &str) -> Option<NormalizedTrade> {
869 let mut columns = line.split(',');
870 let timestamp = parse_public_timestamp(columns.next()?.trim())?;
871 let _symbol = columns.next()?;
872 let side = parse_side(columns.next()?.trim())?;
873 let size = columns.next()?.trim().parse::<Decimal>().ok()?;
874 let price = columns.next()?.trim().parse::<Decimal>().ok()?;
875 columns.next()?; let trade_id = columns.next().map(|value| value.trim().to_string());
877
878 let tick = Tick {
879 symbol: Symbol::from(symbol),
880 price,
881 size,
882 side,
883 exchange_timestamp: timestamp,
884 received_at: timestamp,
885 };
886 Some(NormalizedTrade::new(tick, trade_id))
887}
888
889fn parse_binance_public_line(symbol: &str, line: &str) -> Option<NormalizedTrade> {
890 let mut columns = line.split(',');
891 let agg_id = columns.next()?.trim().parse::<u64>().ok()?;
892 let price = columns.next()?.trim().parse::<Decimal>().ok()?;
893 let size = columns.next()?.trim().parse::<Decimal>().ok()?;
894 columns.next()?; columns.next()?; let timestamp = columns
897 .next()?
898 .trim()
899 .parse::<i64>()
900 .ok()
901 .and_then(DateTime::<Utc>::from_timestamp_millis)?;
902 let maker_flag = columns.next()?.trim();
903 let is_buyer_maker = match maker_flag {
904 "true" | "True" | "1" => true,
905 "false" | "False" | "0" => false,
906 _ => return None,
907 };
908 let _ = columns.next(); let side = if is_buyer_maker {
910 Side::Sell
911 } else {
912 Side::Buy
913 };
914 let tick = Tick {
915 symbol: Symbol::from(symbol),
916 price,
917 size,
918 side,
919 exchange_timestamp: timestamp,
920 received_at: timestamp,
921 };
922 Some(NormalizedTrade::new(tick, Some(agg_id.to_string())))
923}
924
925fn parse_public_timestamp(value: &str) -> Option<DateTime<Utc>> {
926 let seconds = value.parse::<f64>().ok()?;
927 let secs = seconds.trunc() as i64;
928 let fractional = seconds - secs as f64;
929 let nanos = (fractional * NANOS_PER_SECOND as f64).round() as i64;
930 let clamped = nanos.clamp(0, NANOS_PER_SECOND - 1) as u32;
931 DateTime::<Utc>::from_timestamp(secs, clamped)
932}
933
934fn parse_bybit_trade(symbol: &str, entry: BybitTradeEntry) -> Option<NormalizedTrade> {
935 let timestamp = entry
936 .time
937 .parse::<i64>()
938 .ok()
939 .and_then(DateTime::<Utc>::from_timestamp_millis)?;
940 let price = entry.price.parse::<Decimal>().ok()?;
941 let size = entry.size.parse::<Decimal>().ok()?;
942 let side = parse_side(&entry.side)?;
943 let tick = Tick {
944 symbol: Symbol::from(symbol),
945 price,
946 size,
947 side,
948 exchange_timestamp: timestamp,
949 received_at: timestamp,
950 };
951 Some(NormalizedTrade::new(tick, Some(entry.exec_id)))
952}
953
954fn parse_binance_trade(symbol: &str, entry: BinanceAggTrade) -> Option<NormalizedTrade> {
955 let timestamp = DateTime::<Utc>::from_timestamp_millis(entry.timestamp)?;
956 let price = entry.price.parse::<Decimal>().ok()?;
957 let size = entry.quantity.parse::<Decimal>().ok()?;
958 let side = if entry.is_buyer_maker {
959 Side::Sell
960 } else {
961 Side::Buy
962 };
963 let tick = Tick {
964 symbol: Symbol::from(symbol),
965 price,
966 size,
967 side,
968 exchange_timestamp: timestamp,
969 received_at: timestamp,
970 };
971 Some(NormalizedTrade::new(tick, Some(entry.agg_id.to_string())))
972}
973
974fn parse_side(value: &str) -> Option<Side> {
975 match value.to_ascii_lowercase().as_str() {
976 "buy" => Some(Side::Buy),
977 "sell" => Some(Side::Sell),
978 _ => None,
979 }
980}
981
982fn truncate(body: &str, max: usize) -> String {
983 if body.len() <= max {
984 body.to_string()
985 } else {
986 format!("{}…", &body[..max])
987 }
988}
989
990fn resolve_archive_cache_dir(req: &TradeRequest<'_>, exchange: &str, symbol: &str) -> PathBuf {
991 req.archive_cache_dir.clone().unwrap_or_else(|| {
992 std::env::temp_dir()
993 .join("tesser-data")
994 .join(exchange)
995 .join(symbol)
996 })
997}
998
999async fn download_archive_file(
1000 client: &Client,
1001 url: &str,
1002 cache_path: &Path,
1003 resume: bool,
1004) -> Result<Option<()>> {
1005 if let Some(parent) = cache_path.parent() {
1006 fs::create_dir_all(parent)
1007 .await
1008 .with_context(|| format!("failed to create {}", parent.display()))?;
1009 }
1010 let mut start = 0;
1011 if resume {
1012 if let Ok(meta) = fs::metadata(cache_path).await {
1013 start = meta.len();
1014 }
1015 } else if fs::try_exists(cache_path).await? {
1016 fs::remove_file(cache_path).await?;
1017 }
1018 let mut request = client.get(url);
1019 if resume && start > 0 {
1020 request = request.header(reqwest::header::RANGE, format!("bytes={start}-"));
1021 }
1022 let response = request
1023 .send()
1024 .await
1025 .with_context(|| format!("failed to fetch archive {url}"))?;
1026 let status = response.status();
1027 if status == StatusCode::NOT_FOUND {
1028 debug!("archive missing {}", url);
1029 return Ok(None);
1030 }
1031 if resume && status == StatusCode::RANGE_NOT_SATISFIABLE {
1032 debug!("archive already complete {}", url);
1033 return Ok(Some(()));
1034 }
1035 if !(status.is_success() || status == StatusCode::PARTIAL_CONTENT) {
1036 return Err(anyhow!(
1037 "archive request {} failed with status {}",
1038 url,
1039 status
1040 ));
1041 }
1042
1043 let mut file = if start > 0 {
1044 OpenOptions::new()
1045 .create(true)
1046 .append(true)
1047 .open(cache_path)
1048 .await?
1049 } else {
1050 OpenOptions::new()
1051 .create(true)
1052 .write(true)
1053 .truncate(true)
1054 .open(cache_path)
1055 .await?
1056 };
1057 let mut stream = response.bytes_stream();
1058 while let Some(chunk) = stream.next().await {
1059 let bytes = chunk.context("failed to read archive chunk")?;
1060 file.write_all(&bytes).await?;
1061 }
1062 file.flush().await?;
1063 Ok(Some(()))
1064}
1065
1066async fn read_bybit_archive(
1067 cache_path: &Path,
1068 symbol: &str,
1069 start_ms: i64,
1070 end_ms: i64,
1071 seen_ids: &mut HashSet<String>,
1072) -> Result<Vec<NormalizedTrade>> {
1073 let file = tokio::fs::File::open(cache_path)
1074 .await
1075 .with_context(|| format!("failed to open {}", cache_path.display()))?;
1076 let reader = BufReader::new(file);
1077 let decoder = async_compression::tokio::bufread::GzipDecoder::new(reader);
1078 let reader = BufReader::new(decoder);
1079 let mut lines = reader.lines();
1080 let mut trades = Vec::new();
1081 while let Some(line) = lines.next_line().await? {
1082 if line.starts_with("timestamp") {
1083 continue;
1084 }
1085 let Some(trade) = parse_bybit_public_line(symbol, line.trim()) else {
1086 continue;
1087 };
1088 let ts = trade.tick.exchange_timestamp.timestamp_millis();
1089 if ts < start_ms || ts > end_ms {
1090 continue;
1091 }
1092 if let Some(id) = trade.trade_id.as_ref() {
1093 if !seen_ids.insert(id.clone()) {
1094 continue;
1095 }
1096 }
1097 trades.push(trade);
1098 }
1099 Ok(trades)
1100}
1101
1102async fn read_binance_archive(cache_path: PathBuf, symbol: String) -> Result<Vec<NormalizedTrade>> {
1103 task::spawn_blocking(move || -> Result<Vec<NormalizedTrade>> {
1104 let file = StdFile::open(&cache_path)
1105 .with_context(|| format!("failed to open {}", cache_path.display()))?;
1106 let mut archive = ZipArchive::new(file)
1107 .with_context(|| format!("failed to open zip {}", cache_path.display()))?;
1108 let mut trades = Vec::new();
1109 for index in 0..archive.len() {
1110 let file = archive.by_index(index)?;
1111 if !file.name().ends_with(".csv") {
1112 continue;
1113 }
1114 let reader = StdBufReader::new(file);
1115 for line in reader.lines() {
1116 let line = line?;
1117 if line.starts_with("aggTradeId") || line.trim().is_empty() {
1118 continue;
1119 }
1120 if let Some(trade) = parse_binance_public_line(&symbol, line.trim()) {
1121 trades.push(trade);
1122 }
1123 }
1124 }
1125 Ok(trades)
1126 })
1127 .await?
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132 use std::str::FromStr;
1133
1134 use super::*;
1135
1136 #[test]
1137 fn parses_public_trade_line() {
1138 let line = "1585180700.0647,BTCUSDT,Buy,0.042,6698.5,PlusTick,08ff9568-cb50-55d6-b497-13727eec09dc,28133700000.0,0.042,281.337";
1139 let trade = parse_bybit_public_line("BTCUSDT", line).expect("trade");
1140 assert_eq!(trade.tick.symbol.code(), "BTCUSDT");
1141 assert_eq!(
1142 trade.trade_id.as_deref(),
1143 Some("08ff9568-cb50-55d6-b497-13727eec09dc")
1144 );
1145 assert_eq!(trade.tick.side, Side::Buy);
1146 assert_eq!(trade.tick.price, Decimal::from_str("6698.5").unwrap());
1147 assert_eq!(trade.tick.size, Decimal::from_str("0.042").unwrap());
1148 }
1149
1150 #[test]
1151 fn parses_public_timestamp_fractional_seconds() {
1152 let ts = parse_public_timestamp("1585180700.0647").expect("timestamp");
1153 assert_eq!(ts.timestamp(), 1_585_180_700);
1154 assert!(ts.timestamp_subsec_nanos() > 0);
1155 }
1156
1157 #[test]
1158 fn parses_binance_public_line() {
1159 let line = "1001,51234.5,0.010,200,205,1585180700064,true,false";
1160 let trade = parse_binance_public_line("BTCUSDT", line).expect("trade");
1161 assert_eq!(trade.trade_id.as_deref(), Some("1001"));
1162 assert_eq!(trade.tick.price, Decimal::from_str("51234.5").unwrap());
1163 assert_eq!(trade.tick.side, Side::Sell);
1164 }
1165}