tesser_data/
download.rs

1use std::collections::HashSet;
2use std::fs::File as StdFile;
3use std::io::{BufRead as StdBufRead, BufReader as StdBufReader};
4use std::path::{Path, PathBuf};
5
6use anyhow::{anyhow, Context, Result};
7use async_trait::async_trait;
8use chrono::{DateTime, Days, Utc};
9use futures::StreamExt;
10use reqwest::{Client, StatusCode};
11use rust_decimal::Decimal;
12use serde::Deserialize;
13use serde_json::Value as JsonValue;
14use tesser_core::{Candle, Interval, Side, Symbol, Tick};
15use tokio::fs::{self, OpenOptions};
16use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
17use tokio::task;
18use tracing::{debug, info};
19use zip::ZipArchive;
20
21const MAX_LIMIT: usize = 1000;
22const BYBIT_PUBLIC_BASE_URL: &str = "https://public.bybit.com/trading";
23const BINANCE_PUBLIC_BASE_URL: &str = "https://data.binance.vision/data/futures/um/daily/aggTrades";
24const NANOS_PER_SECOND: i64 = 1_000_000_000;
25
26#[async_trait]
27pub trait MarketDataDownloader {
28    async fn download_klines(&self, req: &KlineRequest<'_>) -> Result<Vec<Candle>>;
29    async fn download_trades(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>>;
30}
31
32#[derive(Clone, Copy, Debug, Eq, PartialEq)]
33pub enum TradeSource {
34    Rest,
35    BybitPublicArchive,
36    BinancePublicArchive,
37}
38
39/// Parameters for a trade download request.
40#[derive(Clone)]
41pub struct TradeRequest<'a> {
42    pub symbol: &'a str,
43    pub category: Option<&'a str>,
44    pub start: DateTime<Utc>,
45    pub end: DateTime<Utc>,
46    pub limit: usize,
47    pub source: TradeSource,
48    pub public_data_url: Option<&'a str>,
49    pub archive_cache_dir: Option<PathBuf>,
50    pub resume_archives: bool,
51}
52
53impl<'a> TradeRequest<'a> {
54    pub fn new(symbol: &'a str, start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
55        Self {
56            symbol,
57            category: None,
58            start,
59            end,
60            limit: MAX_LIMIT,
61            source: TradeSource::Rest,
62            public_data_url: None,
63            archive_cache_dir: None,
64            resume_archives: false,
65        }
66    }
67
68    #[must_use]
69    pub fn with_category(mut self, category: &'a str) -> Self {
70        self.category = Some(category);
71        self
72    }
73
74    #[must_use]
75    pub fn with_limit(mut self, limit: usize) -> Self {
76        self.limit = limit.clamp(1, MAX_LIMIT);
77        self
78    }
79
80    #[must_use]
81    pub fn with_source(mut self, source: TradeSource) -> Self {
82        self.source = source;
83        self
84    }
85
86    #[must_use]
87    pub fn with_public_data_url(mut self, url: &'a str) -> Self {
88        self.public_data_url = Some(url);
89        self
90    }
91
92    #[must_use]
93    pub fn with_archive_cache_dir(mut self, dir: PathBuf) -> Self {
94        self.archive_cache_dir = Some(dir);
95        self
96    }
97
98    #[must_use]
99    pub fn with_resume_archives(mut self, resume: bool) -> Self {
100        self.resume_archives = resume;
101        self
102    }
103}
104
105/// Normalized trade enriched with the exchange-provided identifier.
106#[derive(Clone, Debug)]
107pub struct NormalizedTrade {
108    pub tick: Tick,
109    pub trade_id: Option<String>,
110}
111
112impl NormalizedTrade {
113    pub fn new(tick: Tick, trade_id: Option<String>) -> Self {
114        Self { tick, trade_id }
115    }
116}
117
118/// Parameters for a kline download request.
119pub struct KlineRequest<'a> {
120    pub category: &'a str,
121    pub symbol: &'a str,
122    pub interval: Interval,
123    pub start: DateTime<Utc>,
124    pub end: DateTime<Utc>,
125    pub limit: usize,
126}
127
128impl<'a> KlineRequest<'a> {
129    pub fn new(
130        category: &'a str,
131        symbol: &'a str,
132        interval: Interval,
133        start: DateTime<Utc>,
134        end: DateTime<Utc>,
135    ) -> Self {
136        Self {
137            category,
138            symbol,
139            interval,
140            start,
141            end,
142            limit: MAX_LIMIT,
143        }
144    }
145}
146
147/// Simple Bybit REST downloader for kline data.
148pub struct BybitDownloader {
149    client: Client,
150    base_url: String,
151}
152
153impl BybitDownloader {
154    pub fn new(base_url: impl Into<String>) -> Self {
155        Self {
156            client: Client::new(),
157            base_url: base_url.into(),
158        }
159    }
160
161    fn endpoint(&self, path: &str) -> String {
162        let base = self.base_url.trim_end_matches('/');
163        format!("{base}/{path}")
164    }
165
166    /// Download klines from Bybit, returning a chronologically sorted list of candles.
167    pub async fn download_klines(&self, req: &KlineRequest<'_>) -> Result<Vec<Candle>> {
168        <Self as MarketDataDownloader>::download_klines(self, req).await
169    }
170
171    /// Download historical trades from Bybit within the requested range.
172    pub async fn download_trades(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
173        <Self as MarketDataDownloader>::download_trades(self, req).await
174    }
175}
176
177#[async_trait]
178impl MarketDataDownloader for BybitDownloader {
179    async fn download_klines(&self, req: &KlineRequest<'_>) -> Result<Vec<Candle>> {
180        let mut cursor = req.start.timestamp_millis();
181        let end_ms = req.end.timestamp_millis();
182        if cursor >= end_ms {
183            return Err(anyhow!("start must be earlier than end"));
184        }
185
186        let mut candles = Vec::new();
187        let interval_ms = req.interval.as_duration().num_milliseconds();
188
189        while cursor < end_ms {
190            let limit = req.limit.min(MAX_LIMIT).to_string();
191            let response = self
192                .client
193                .get(self.endpoint("v5/market/kline"))
194                .query(&[
195                    ("category", req.category),
196                    ("symbol", req.symbol),
197                    ("interval", req.interval.to_bybit()),
198                    ("start", &cursor.to_string()),
199                    ("end", &end_ms.to_string()),
200                    ("limit", &limit),
201                ])
202                .send()
203                .await
204                .context("request to Bybit failed")?;
205
206            let status = response.status();
207            let body = response
208                .text()
209                .await
210                .context("failed to read Bybit response body")?;
211            debug!(
212                "bybit kline response (status {}): {}",
213                status,
214                truncate(&body, 512)
215            );
216            if !status.is_success() {
217                return Err(anyhow!(
218                    "Bybit responded with status {}: {}",
219                    status,
220                    truncate(&body, 256)
221                ));
222            }
223
224            let response: BybitKlineResponse = serde_json::from_str(&body).map_err(|err| {
225                anyhow!(
226                    "failed to parse Bybit response: {} (body snippet: {})",
227                    err,
228                    truncate(&body, 256)
229                )
230            })?;
231
232            if response.ret_code != 0 {
233                return Err(anyhow!(
234                    "Bybit returned error {}: {}",
235                    response.ret_code,
236                    response.ret_msg
237                ));
238            }
239
240            let result = match response.result {
241                Some(result) => result,
242                None => break,
243            };
244            if result.list.is_empty() {
245                break;
246            }
247
248            let mut batch = Vec::new();
249            for entry in result.list {
250                if let Some(candle) = parse_entry(&entry, req.symbol, req.interval) {
251                    if candle.timestamp.timestamp_millis() >= cursor
252                        && candle.timestamp.timestamp_millis() <= end_ms
253                    {
254                        batch.push(candle);
255                    }
256                }
257            }
258
259            if batch.is_empty() {
260                break;
261            }
262
263            batch.sort_by_key(|c| c.timestamp);
264            cursor = batch
265                .last()
266                .map(|c| c.timestamp.timestamp_millis() + interval_ms)
267                .unwrap_or(end_ms);
268            candles.extend(batch);
269        }
270
271        candles.sort_by_key(|c| c.timestamp);
272        candles.dedup_by_key(|c| c.timestamp);
273        Ok(candles)
274    }
275
276    async fn download_trades(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
277        match req.source {
278            TradeSource::Rest => self.download_trades_rest(req).await,
279            TradeSource::BybitPublicArchive => self.download_trades_public(req).await,
280            TradeSource::BinancePublicArchive => Err(anyhow!(
281                "binance public archive source is invalid for Bybit requests"
282            )),
283        }
284    }
285}
286
287impl BybitDownloader {
288    async fn download_trades_rest(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
289        let start_ms = req.start.timestamp_millis();
290        let end_ms = req.end.timestamp_millis();
291        if start_ms >= end_ms {
292            return Err(anyhow!("start must be earlier than end"));
293        }
294
295        let mut trades = Vec::new();
296        let mut seen_ids = HashSet::new();
297        let mut cursor: Option<String> = None;
298        let limit = req.limit.min(MAX_LIMIT);
299
300        loop {
301            let mut params = Vec::with_capacity(6);
302            if let Some(category) = req.category {
303                params.push(("category", category.to_string()));
304            }
305            params.push(("symbol", req.symbol.to_string()));
306            params.push(("start", start_ms.to_string()));
307            params.push(("end", end_ms.to_string()));
308            params.push(("limit", limit.to_string()));
309            if let Some(token) = &cursor {
310                params.push(("cursor", token.clone()));
311            }
312            let params_ref: Vec<(&str, &str)> =
313                params.iter().map(|(k, v)| (*k, v.as_str())).collect();
314
315            let response = self
316                .client
317                .get(self.endpoint("v5/market/history-trade"))
318                .query(&params_ref)
319                .send()
320                .await
321                .context("request to Bybit failed")?;
322
323            let status = response.status();
324            let body = response
325                .text()
326                .await
327                .context("failed to read Bybit response body")?;
328            debug!(
329                "bybit trades response (status {}): {}",
330                status,
331                truncate(&body, 512)
332            );
333            if !status.is_success() {
334                return Err(anyhow!(
335                    "Bybit responded with status {}: {}",
336                    status,
337                    truncate(&body, 256)
338                ));
339            }
340
341            let response: BybitTradeResponse = serde_json::from_str(&body).map_err(|err| {
342                anyhow!(
343                    "failed to parse Bybit response: {} (body snippet: {})",
344                    err,
345                    truncate(&body, 256)
346                )
347            })?;
348
349            if response.ret_code != 0 {
350                return Err(anyhow!(
351                    "Bybit returned error {}: {}",
352                    response.ret_code,
353                    response.ret_msg
354                ));
355            }
356            let Some(result) = response.result else {
357                break;
358            };
359            if result.list.is_empty() {
360                break;
361            }
362
363            for entry in result.list {
364                if !seen_ids.insert(entry.exec_id.clone()) {
365                    continue;
366                }
367                if let Some(trade) = parse_bybit_trade(req.symbol, entry) {
368                    if trade.tick.exchange_timestamp.timestamp_millis() < start_ms
369                        || trade.tick.exchange_timestamp.timestamp_millis() > end_ms
370                    {
371                        continue;
372                    }
373                    trades.push(trade);
374                }
375            }
376
377            if let Some(next_cursor) = result.next_page_cursor {
378                cursor = Some(next_cursor);
379            } else {
380                break;
381            }
382        }
383
384        trades.sort_by_key(|trade| trade.tick.exchange_timestamp);
385        trades.dedup_by(|a, b| {
386            a.tick.exchange_timestamp == b.tick.exchange_timestamp
387                && a.tick.price == b.tick.price
388                && a.tick.size == b.tick.size
389                && a.tick.side == b.tick.side
390        });
391        Ok(trades)
392    }
393
394    async fn download_trades_public(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
395        let mut cursor_date = req.start.date_naive();
396        let end_date = req.end.date_naive();
397        let mut trades = Vec::new();
398        let mut seen_ids = HashSet::new();
399        let base_url = req.public_data_url.unwrap_or(BYBIT_PUBLIC_BASE_URL);
400        let cache_root = resolve_archive_cache_dir(req, "bybit", req.symbol);
401        let total_days = (end_date
402            .signed_duration_since(cursor_date)
403            .num_days()
404            .max(0)
405            + 1)
406        .try_into()
407        .unwrap_or(0u32);
408        info!(
409            symbol = req.symbol,
410            "downloading {} day(s) from Bybit public archive", total_days
411        );
412
413        while cursor_date <= end_date {
414            let next_date = cursor_date
415                .checked_add_days(Days::new(1))
416                .unwrap_or(cursor_date);
417            let day_start = DateTime::<Utc>::from_naive_utc_and_offset(
418                cursor_date
419                    .and_hms_opt(0, 0, 0)
420                    .ok_or_else(|| anyhow!("invalid day {}", cursor_date))?,
421                Utc,
422            )
423            .max(req.start);
424            let day_end = DateTime::<Utc>::from_naive_utc_and_offset(
425                next_date
426                    .and_hms_opt(0, 0, 0)
427                    .ok_or_else(|| anyhow!("invalid day {}", cursor_date))?,
428                Utc,
429            )
430            .min(req.end);
431            if day_start >= day_end {
432                if next_date == cursor_date {
433                    break;
434                }
435                cursor_date = next_date;
436                continue;
437            }
438
439            let filename = format!("{}_{}.csv.gz", req.symbol, cursor_date.format("%Y-%m-%d"));
440            let cache_path = cache_root.join(&filename);
441            let url = format!(
442                "{}/{symbol}/{symbol}{}.csv.gz",
443                base_url,
444                cursor_date.format("%Y-%m-%d"),
445                symbol = req.symbol
446            );
447            if download_archive_file(&self.client, &url, &cache_path, req.resume_archives)
448                .await?
449                .is_none()
450            {
451                if next_date == cursor_date {
452                    break;
453                }
454                cursor_date = next_date;
455                continue;
456            }
457            let mut day_trades = read_bybit_archive(
458                &cache_path,
459                req.symbol,
460                day_start.timestamp_millis(),
461                day_end.timestamp_millis(),
462                &mut seen_ids,
463            )
464            .await?;
465            trades.append(&mut day_trades);
466
467            if next_date == cursor_date {
468                break;
469            }
470            cursor_date = next_date;
471        }
472
473        trades.sort_by_key(|trade| trade.tick.exchange_timestamp);
474        trades.dedup_by(|a, b| {
475            a.tick.exchange_timestamp == b.tick.exchange_timestamp
476                && a.tick.price == b.tick.price
477                && a.tick.size == b.tick.size
478                && a.tick.side == b.tick.side
479        });
480        Ok(trades)
481    }
482}
483
484fn parse_entry(entry: &[String], symbol: &str, interval: Interval) -> Option<Candle> {
485    if entry.len() < 6 {
486        return None;
487    }
488    let ts = entry.first()?.parse::<i64>().ok()?;
489    let timestamp = DateTime::<Utc>::from_timestamp_millis(ts)?;
490    let open = entry.get(1)?.parse::<Decimal>().ok()?;
491    let high = entry.get(2)?.parse::<Decimal>().ok()?;
492    let low = entry.get(3)?.parse::<Decimal>().ok()?;
493    let close = entry.get(4)?.parse::<Decimal>().ok()?;
494    let volume = entry.get(5)?.parse::<Decimal>().ok()?;
495    Some(Candle {
496        symbol: Symbol::from(symbol),
497        interval,
498        open,
499        high,
500        low,
501        close,
502        volume,
503        timestamp,
504    })
505}
506
507#[derive(Debug, Deserialize)]
508struct BybitKlineResponse {
509    #[serde(rename = "retCode")]
510    ret_code: i64,
511    #[serde(rename = "retMsg")]
512    ret_msg: String,
513    result: Option<KlineResult>,
514}
515
516#[derive(Debug, Deserialize)]
517struct KlineResult {
518    list: Vec<Vec<String>>,
519}
520
521#[derive(Debug, Deserialize)]
522struct BybitTradeResponse {
523    #[serde(rename = "retCode")]
524    ret_code: i64,
525    #[serde(rename = "retMsg")]
526    ret_msg: String,
527    result: Option<BybitTradeResult>,
528}
529
530#[derive(Debug, Deserialize)]
531struct BybitTradeResult {
532    list: Vec<BybitTradeEntry>,
533    #[serde(rename = "nextPageCursor")]
534    next_page_cursor: Option<String>,
535}
536
537#[derive(Debug, Deserialize)]
538struct BybitTradeEntry {
539    #[serde(rename = "execId")]
540    exec_id: String,
541    price: String,
542    size: String,
543    side: String,
544    #[serde(rename = "time", alias = "execTime", alias = "tradeTime")]
545    time: String,
546}
547
548/// Simple Binance REST downloader for kline data.
549pub struct BinanceDownloader {
550    client: Client,
551    base_url: String,
552}
553
554impl BinanceDownloader {
555    pub fn new(base_url: impl Into<String>) -> Self {
556        Self {
557            client: Client::new(),
558            base_url: base_url.into(),
559        }
560    }
561
562    fn endpoint(&self, path: &str) -> String {
563        let base = self.base_url.trim_end_matches('/');
564        format!("{base}/{path}")
565    }
566
567    pub async fn download_klines(&self, req: &KlineRequest<'_>) -> Result<Vec<Candle>> {
568        <Self as MarketDataDownloader>::download_klines(self, req).await
569    }
570
571    /// Download aggregated trades via Binance's `aggTrades` endpoint.
572    pub async fn download_agg_trades(
573        &self,
574        req: &TradeRequest<'_>,
575    ) -> Result<Vec<NormalizedTrade>> {
576        self.fetch_agg_trades(req).await
577    }
578
579    /// Exchange-agnostic wrapper for parity with Bybit downloader.
580    pub async fn download_trades(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
581        <Self as MarketDataDownloader>::download_trades(self, req).await
582    }
583
584    async fn fetch_agg_trades(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
585        let mut cursor = req.start.timestamp_millis();
586        let end_ms = req.end.timestamp_millis();
587        if cursor >= end_ms {
588            return Err(anyhow!("start must be earlier than end"));
589        }
590        let limit = req.limit.min(MAX_LIMIT);
591        let mut trades = Vec::new();
592        let mut seen_ids = HashSet::new();
593        while cursor < end_ms {
594            let response = self
595                .client
596                .get(self.endpoint("fapi/v1/aggTrades"))
597                .query(&[
598                    ("symbol", req.symbol),
599                    ("startTime", &cursor.to_string()),
600                    ("endTime", &end_ms.to_string()),
601                    ("limit", &limit.to_string()),
602                ])
603                .send()
604                .await
605                .context("request to Binance failed")?;
606            let status = response.status();
607            let body = response
608                .text()
609                .await
610                .context("failed to read Binance response body")?;
611            debug!(
612                "binance aggTrades response (status {}): {}",
613                status,
614                truncate(&body, 512)
615            );
616            if !status.is_success() {
617                return Err(anyhow!(
618                    "Binance responded with status {}: {}",
619                    status,
620                    truncate(&body, 256)
621                ));
622            }
623            let entries: Vec<BinanceAggTrade> = serde_json::from_str(&body).map_err(|err| {
624                anyhow!(
625                    "failed to parse Binance response: {} (body snippet: {})",
626                    err,
627                    truncate(&body, 256)
628                )
629            })?;
630            if entries.is_empty() {
631                break;
632            }
633            let mut last_ts: Option<i64> = None;
634            for entry in entries {
635                if !seen_ids.insert(entry.agg_id) {
636                    continue;
637                }
638                if let Some(trade) = parse_binance_trade(req.symbol, entry) {
639                    let ts = trade.tick.exchange_timestamp.timestamp_millis();
640                    last_ts = Some(last_ts.map_or(ts, |prev| prev.max(ts)));
641                    trades.push(trade);
642                }
643            }
644            if let Some(ts) = last_ts {
645                cursor = ts + 1;
646            } else {
647                break;
648            }
649        }
650        trades.sort_by_key(|trade| trade.tick.exchange_timestamp);
651        trades.dedup_by(|a, b| {
652            a.tick.exchange_timestamp == b.tick.exchange_timestamp
653                && a.tick.price == b.tick.price
654                && a.tick.size == b.tick.size
655                && a.tick.side == b.tick.side
656        });
657        Ok(trades)
658    }
659
660    async fn download_trades_public(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
661        let mut cursor_date = req.start.date_naive();
662        let end_date = req.end.date_naive();
663        let mut trades = Vec::new();
664        let mut seen_ids = HashSet::new();
665        let base_url = req.public_data_url.unwrap_or(BINANCE_PUBLIC_BASE_URL);
666        let cache_root = resolve_archive_cache_dir(req, "binance", req.symbol);
667
668        while cursor_date <= end_date {
669            let next_date = cursor_date
670                .checked_add_days(Days::new(1))
671                .unwrap_or(cursor_date);
672            let day_start = DateTime::<Utc>::from_naive_utc_and_offset(
673                cursor_date
674                    .and_hms_opt(0, 0, 0)
675                    .ok_or_else(|| anyhow!("invalid date {}", cursor_date))?,
676                Utc,
677            )
678            .max(req.start);
679            let day_end = DateTime::<Utc>::from_naive_utc_and_offset(
680                next_date
681                    .and_hms_opt(0, 0, 0)
682                    .ok_or_else(|| anyhow!("invalid date {}", next_date))?,
683                Utc,
684            )
685            .min(req.end);
686            if day_start >= day_end {
687                if next_date == cursor_date {
688                    break;
689                }
690                cursor_date = next_date;
691                continue;
692            }
693
694            let filename = format!(
695                "{}-aggTrades-{}.zip",
696                req.symbol,
697                cursor_date.format("%Y-%m-%d")
698            );
699            let cache_path = cache_root.join(&filename);
700            let url = format!("{}/{symbol}/{filename}", base_url, symbol = req.symbol);
701            if download_archive_file(&self.client, &url, &cache_path, req.resume_archives)
702                .await?
703                .is_none()
704            {
705                if next_date == cursor_date {
706                    break;
707                }
708                cursor_date = next_date;
709                continue;
710            }
711            let parsed = read_binance_archive(cache_path.clone(), req.symbol.to_string()).await?;
712            let start_ms = day_start.timestamp_millis();
713            let end_ms = day_end.timestamp_millis();
714            for trade in parsed {
715                let ts = trade.tick.exchange_timestamp.timestamp_millis();
716                if ts < start_ms || ts > end_ms {
717                    continue;
718                }
719                if let Some(id) = trade.trade_id.as_ref() {
720                    if !seen_ids.insert(id.clone()) {
721                        continue;
722                    }
723                }
724                trades.push(trade);
725            }
726
727            if next_date == cursor_date {
728                break;
729            }
730            cursor_date = next_date;
731        }
732
733        trades.sort_by_key(|trade| trade.tick.exchange_timestamp);
734        trades.dedup_by(|a, b| {
735            a.tick.exchange_timestamp == b.tick.exchange_timestamp
736                && a.tick.price == b.tick.price
737                && a.tick.size == b.tick.size
738                && a.tick.side == b.tick.side
739        });
740        Ok(trades)
741    }
742}
743
744#[async_trait]
745impl MarketDataDownloader for BinanceDownloader {
746    async fn download_klines(&self, req: &KlineRequest<'_>) -> Result<Vec<Candle>> {
747        let mut cursor = req.start.timestamp_millis();
748        let end_ms = req.end.timestamp_millis();
749        if cursor >= end_ms {
750            return Err(anyhow!("start must be earlier than end"));
751        }
752        let mut candles = Vec::new();
753        let interval_ms = req.interval.as_duration().num_milliseconds();
754        while cursor < end_ms {
755            let response = self
756                .client
757                .get(self.endpoint("fapi/v1/klines"))
758                .query(&[
759                    ("symbol", req.symbol),
760                    ("interval", req.interval.to_binance()),
761                    ("startTime", &cursor.to_string()),
762                    ("endTime", &end_ms.to_string()),
763                    ("limit", &req.limit.min(MAX_LIMIT).to_string()),
764                ])
765                .send()
766                .await
767                .context("request to Binance failed")?;
768            let status = response.status();
769            let body = response
770                .text()
771                .await
772                .context("failed to read Binance response body")?;
773            debug!(
774                "binance kline response (status {}): {}",
775                status,
776                truncate(&body, 512)
777            );
778            if !status.is_success() {
779                return Err(anyhow!(
780                    "Binance responded with status {}: {}",
781                    status,
782                    truncate(&body, 256)
783                ));
784            }
785            let entries: Vec<Vec<JsonValue>> = serde_json::from_str(&body).map_err(|err| {
786                anyhow!(
787                    "failed to parse Binance response: {} (body snippet: {})",
788                    err,
789                    truncate(&body, 256)
790                )
791            })?;
792            if entries.is_empty() {
793                break;
794            }
795            let mut batch = Vec::new();
796            for entry in entries {
797                if let Some(candle) = parse_binance_entry(&entry, req.symbol, req.interval) {
798                    if candle.timestamp.timestamp_millis() >= cursor
799                        && candle.timestamp.timestamp_millis() <= end_ms
800                    {
801                        batch.push(candle);
802                    }
803                }
804            }
805            if batch.is_empty() {
806                break;
807            }
808            batch.sort_by_key(|c| c.timestamp);
809            cursor = batch
810                .last()
811                .map(|c| c.timestamp.timestamp_millis() + interval_ms)
812                .unwrap_or(end_ms);
813            candles.extend(batch);
814        }
815        candles.sort_by_key(|c| c.timestamp);
816        candles.dedup_by_key(|c| c.timestamp);
817        Ok(candles)
818    }
819
820    async fn download_trades(&self, req: &TradeRequest<'_>) -> Result<Vec<NormalizedTrade>> {
821        match req.source {
822            TradeSource::Rest => self.fetch_agg_trades(req).await,
823            TradeSource::BinancePublicArchive => self.download_trades_public(req).await,
824            TradeSource::BybitPublicArchive => Err(anyhow!(
825                "bybit public archive source is invalid for Binance requests"
826            )),
827        }
828    }
829}
830
831fn parse_binance_entry(entry: &[JsonValue], symbol: &str, interval: Interval) -> Option<Candle> {
832    if entry.len() < 6 {
833        return None;
834    }
835    let ts = entry.first()?.as_i64()?;
836    let timestamp = DateTime::<Utc>::from_timestamp_millis(ts)?;
837    let open = entry.get(1)?.as_str()?.parse::<Decimal>().ok()?;
838    let high = entry.get(2)?.as_str()?.parse::<Decimal>().ok()?;
839    let low = entry.get(3)?.as_str()?.parse::<Decimal>().ok()?;
840    let close = entry.get(4)?.as_str()?.parse::<Decimal>().ok()?;
841    let volume = entry.get(5)?.as_str()?.parse::<Decimal>().ok()?;
842    Some(Candle {
843        symbol: Symbol::from(symbol),
844        interval,
845        open,
846        high,
847        low,
848        close,
849        volume,
850        timestamp,
851    })
852}
853
854#[derive(Debug, Deserialize)]
855struct BinanceAggTrade {
856    #[serde(rename = "a")]
857    agg_id: u64,
858    #[serde(rename = "p")]
859    price: String,
860    #[serde(rename = "q")]
861    quantity: String,
862    #[serde(rename = "T")]
863    timestamp: i64,
864    #[serde(rename = "m")]
865    is_buyer_maker: bool,
866}
867
868fn parse_bybit_public_line(symbol: &str, line: &str) -> Option<NormalizedTrade> {
869    let mut columns = line.split(',');
870    let timestamp = parse_public_timestamp(columns.next()?.trim())?;
871    let _symbol = columns.next()?;
872    let side = parse_side(columns.next()?.trim())?;
873    let size = columns.next()?.trim().parse::<Decimal>().ok()?;
874    let price = columns.next()?.trim().parse::<Decimal>().ok()?;
875    columns.next()?; // tickDirection
876    let trade_id = columns.next().map(|value| value.trim().to_string());
877
878    let tick = Tick {
879        symbol: Symbol::from(symbol),
880        price,
881        size,
882        side,
883        exchange_timestamp: timestamp,
884        received_at: timestamp,
885    };
886    Some(NormalizedTrade::new(tick, trade_id))
887}
888
889fn parse_binance_public_line(symbol: &str, line: &str) -> Option<NormalizedTrade> {
890    let mut columns = line.split(',');
891    let agg_id = columns.next()?.trim().parse::<u64>().ok()?;
892    let price = columns.next()?.trim().parse::<Decimal>().ok()?;
893    let size = columns.next()?.trim().parse::<Decimal>().ok()?;
894    columns.next()?; // firstTradeId
895    columns.next()?; // lastTradeId
896    let timestamp = columns
897        .next()?
898        .trim()
899        .parse::<i64>()
900        .ok()
901        .and_then(DateTime::<Utc>::from_timestamp_millis)?;
902    let maker_flag = columns.next()?.trim();
903    let is_buyer_maker = match maker_flag {
904        "true" | "True" | "1" => true,
905        "false" | "False" | "0" => false,
906        _ => return None,
907    };
908    let _ = columns.next(); // ignore bestPriceMatch flag
909    let side = if is_buyer_maker {
910        Side::Sell
911    } else {
912        Side::Buy
913    };
914    let tick = Tick {
915        symbol: Symbol::from(symbol),
916        price,
917        size,
918        side,
919        exchange_timestamp: timestamp,
920        received_at: timestamp,
921    };
922    Some(NormalizedTrade::new(tick, Some(agg_id.to_string())))
923}
924
925fn parse_public_timestamp(value: &str) -> Option<DateTime<Utc>> {
926    let seconds = value.parse::<f64>().ok()?;
927    let secs = seconds.trunc() as i64;
928    let fractional = seconds - secs as f64;
929    let nanos = (fractional * NANOS_PER_SECOND as f64).round() as i64;
930    let clamped = nanos.clamp(0, NANOS_PER_SECOND - 1) as u32;
931    DateTime::<Utc>::from_timestamp(secs, clamped)
932}
933
934fn parse_bybit_trade(symbol: &str, entry: BybitTradeEntry) -> Option<NormalizedTrade> {
935    let timestamp = entry
936        .time
937        .parse::<i64>()
938        .ok()
939        .and_then(DateTime::<Utc>::from_timestamp_millis)?;
940    let price = entry.price.parse::<Decimal>().ok()?;
941    let size = entry.size.parse::<Decimal>().ok()?;
942    let side = parse_side(&entry.side)?;
943    let tick = Tick {
944        symbol: Symbol::from(symbol),
945        price,
946        size,
947        side,
948        exchange_timestamp: timestamp,
949        received_at: timestamp,
950    };
951    Some(NormalizedTrade::new(tick, Some(entry.exec_id)))
952}
953
954fn parse_binance_trade(symbol: &str, entry: BinanceAggTrade) -> Option<NormalizedTrade> {
955    let timestamp = DateTime::<Utc>::from_timestamp_millis(entry.timestamp)?;
956    let price = entry.price.parse::<Decimal>().ok()?;
957    let size = entry.quantity.parse::<Decimal>().ok()?;
958    let side = if entry.is_buyer_maker {
959        Side::Sell
960    } else {
961        Side::Buy
962    };
963    let tick = Tick {
964        symbol: Symbol::from(symbol),
965        price,
966        size,
967        side,
968        exchange_timestamp: timestamp,
969        received_at: timestamp,
970    };
971    Some(NormalizedTrade::new(tick, Some(entry.agg_id.to_string())))
972}
973
974fn parse_side(value: &str) -> Option<Side> {
975    match value.to_ascii_lowercase().as_str() {
976        "buy" => Some(Side::Buy),
977        "sell" => Some(Side::Sell),
978        _ => None,
979    }
980}
981
982fn truncate(body: &str, max: usize) -> String {
983    if body.len() <= max {
984        body.to_string()
985    } else {
986        format!("{}…", &body[..max])
987    }
988}
989
990fn resolve_archive_cache_dir(req: &TradeRequest<'_>, exchange: &str, symbol: &str) -> PathBuf {
991    req.archive_cache_dir.clone().unwrap_or_else(|| {
992        std::env::temp_dir()
993            .join("tesser-data")
994            .join(exchange)
995            .join(symbol)
996    })
997}
998
999async fn download_archive_file(
1000    client: &Client,
1001    url: &str,
1002    cache_path: &Path,
1003    resume: bool,
1004) -> Result<Option<()>> {
1005    if let Some(parent) = cache_path.parent() {
1006        fs::create_dir_all(parent)
1007            .await
1008            .with_context(|| format!("failed to create {}", parent.display()))?;
1009    }
1010    let mut start = 0;
1011    if resume {
1012        if let Ok(meta) = fs::metadata(cache_path).await {
1013            start = meta.len();
1014        }
1015    } else if fs::try_exists(cache_path).await? {
1016        fs::remove_file(cache_path).await?;
1017    }
1018    let mut request = client.get(url);
1019    if resume && start > 0 {
1020        request = request.header(reqwest::header::RANGE, format!("bytes={start}-"));
1021    }
1022    let response = request
1023        .send()
1024        .await
1025        .with_context(|| format!("failed to fetch archive {url}"))?;
1026    let status = response.status();
1027    if status == StatusCode::NOT_FOUND {
1028        debug!("archive missing {}", url);
1029        return Ok(None);
1030    }
1031    if resume && status == StatusCode::RANGE_NOT_SATISFIABLE {
1032        debug!("archive already complete {}", url);
1033        return Ok(Some(()));
1034    }
1035    if !(status.is_success() || status == StatusCode::PARTIAL_CONTENT) {
1036        return Err(anyhow!(
1037            "archive request {} failed with status {}",
1038            url,
1039            status
1040        ));
1041    }
1042
1043    let mut file = if start > 0 {
1044        OpenOptions::new()
1045            .create(true)
1046            .append(true)
1047            .open(cache_path)
1048            .await?
1049    } else {
1050        OpenOptions::new()
1051            .create(true)
1052            .write(true)
1053            .truncate(true)
1054            .open(cache_path)
1055            .await?
1056    };
1057    let mut stream = response.bytes_stream();
1058    while let Some(chunk) = stream.next().await {
1059        let bytes = chunk.context("failed to read archive chunk")?;
1060        file.write_all(&bytes).await?;
1061    }
1062    file.flush().await?;
1063    Ok(Some(()))
1064}
1065
1066async fn read_bybit_archive(
1067    cache_path: &Path,
1068    symbol: &str,
1069    start_ms: i64,
1070    end_ms: i64,
1071    seen_ids: &mut HashSet<String>,
1072) -> Result<Vec<NormalizedTrade>> {
1073    let file = tokio::fs::File::open(cache_path)
1074        .await
1075        .with_context(|| format!("failed to open {}", cache_path.display()))?;
1076    let reader = BufReader::new(file);
1077    let decoder = async_compression::tokio::bufread::GzipDecoder::new(reader);
1078    let reader = BufReader::new(decoder);
1079    let mut lines = reader.lines();
1080    let mut trades = Vec::new();
1081    while let Some(line) = lines.next_line().await? {
1082        if line.starts_with("timestamp") {
1083            continue;
1084        }
1085        let Some(trade) = parse_bybit_public_line(symbol, line.trim()) else {
1086            continue;
1087        };
1088        let ts = trade.tick.exchange_timestamp.timestamp_millis();
1089        if ts < start_ms || ts > end_ms {
1090            continue;
1091        }
1092        if let Some(id) = trade.trade_id.as_ref() {
1093            if !seen_ids.insert(id.clone()) {
1094                continue;
1095            }
1096        }
1097        trades.push(trade);
1098    }
1099    Ok(trades)
1100}
1101
1102async fn read_binance_archive(cache_path: PathBuf, symbol: String) -> Result<Vec<NormalizedTrade>> {
1103    task::spawn_blocking(move || -> Result<Vec<NormalizedTrade>> {
1104        let file = StdFile::open(&cache_path)
1105            .with_context(|| format!("failed to open {}", cache_path.display()))?;
1106        let mut archive = ZipArchive::new(file)
1107            .with_context(|| format!("failed to open zip {}", cache_path.display()))?;
1108        let mut trades = Vec::new();
1109        for index in 0..archive.len() {
1110            let file = archive.by_index(index)?;
1111            if !file.name().ends_with(".csv") {
1112                continue;
1113            }
1114            let reader = StdBufReader::new(file);
1115            for line in reader.lines() {
1116                let line = line?;
1117                if line.starts_with("aggTradeId") || line.trim().is_empty() {
1118                    continue;
1119                }
1120                if let Some(trade) = parse_binance_public_line(&symbol, line.trim()) {
1121                    trades.push(trade);
1122                }
1123            }
1124        }
1125        Ok(trades)
1126    })
1127    .await?
1128}
1129
1130#[cfg(test)]
1131mod tests {
1132    use std::str::FromStr;
1133
1134    use super::*;
1135
1136    #[test]
1137    fn parses_public_trade_line() {
1138        let line = "1585180700.0647,BTCUSDT,Buy,0.042,6698.5,PlusTick,08ff9568-cb50-55d6-b497-13727eec09dc,28133700000.0,0.042,281.337";
1139        let trade = parse_bybit_public_line("BTCUSDT", line).expect("trade");
1140        assert_eq!(trade.tick.symbol.code(), "BTCUSDT");
1141        assert_eq!(
1142            trade.trade_id.as_deref(),
1143            Some("08ff9568-cb50-55d6-b497-13727eec09dc")
1144        );
1145        assert_eq!(trade.tick.side, Side::Buy);
1146        assert_eq!(trade.tick.price, Decimal::from_str("6698.5").unwrap());
1147        assert_eq!(trade.tick.size, Decimal::from_str("0.042").unwrap());
1148    }
1149
1150    #[test]
1151    fn parses_public_timestamp_fractional_seconds() {
1152        let ts = parse_public_timestamp("1585180700.0647").expect("timestamp");
1153        assert_eq!(ts.timestamp(), 1_585_180_700);
1154        assert!(ts.timestamp_subsec_nanos() > 0);
1155    }
1156
1157    #[test]
1158    fn parses_binance_public_line() {
1159        let line = "1001,51234.5,0.010,200,205,1585180700064,true,false";
1160        let trade = parse_binance_public_line("BTCUSDT", line).expect("trade");
1161        assert_eq!(trade.trade_id.as_deref(), Some("1001"));
1162        assert_eq!(trade.tick.price, Decimal::from_str("51234.5").unwrap());
1163        assert_eq!(trade.tick.side, Side::Sell);
1164    }
1165}