cs_fetcher/
lib.rs

1use binance::{api::*, futures::market::*, futures::rest_model::*};
2use futures::stream::{StreamExt, TryStreamExt};
3use polars::prelude::*;
4use std::{error::Error, io::Cursor, sync::Arc};
5use tokio::runtime::Runtime;
6
7pub fn dataframe<S1, S2>(
8    ts: Vec<i64>,
9    symbol: S1,
10    interval: S2,
11    limit: u16,
12) -> Result<DataFrame, Box<dyn Error>>
13where
14    S1: AsRef<str>,
15    S2: AsRef<str>,
16{
17    let market: Arc<FuturesMarket> = Arc::new(Binance::new(None, None));
18    let stream = futures::stream::iter(ts.windows(2).into_iter())
19        .map(|ts| {
20            let market = market.clone();
21            let symbol = symbol.as_ref();
22            let interval = interval.as_ref();
23            request(market, symbol, ts[0] as u64, ts[1] as u64, interval, limit)
24        })
25        .buffer_unordered(50)
26        .map_ok(|rq| futures::stream::iter(rq.into_iter().map(Ok)))
27        .try_flatten();
28    let rt = Runtime::new()?;
29    let candlesticks: Result<Vec<KlineSummary>, Box<dyn Error>> =
30        rt.block_on(async move { stream.try_collect().await });
31
32    let candlesticks = candlesticks?;
33
34    let json = serde_json::to_string(&candlesticks).unwrap();
35    let cursor = Cursor::new(json);
36    let df = JsonReader::new(cursor)
37        .finish()?
38        .lazy()
39        .select([
40            col("openTime").cast(DataType::Datetime(TimeUnit::Milliseconds, None)),
41            all().exclude(["openTime"]),
42        ])
43        .sort("openTime", Default::default())
44        .collect()?;
45    Ok(df)
46}
47
48async fn request<S1, S2>(
49    client: Arc<FuturesMarket>,
50    symbol: S1,
51    start: u64,
52    end: u64,
53    interval: S2,
54    limit: u16,
55) -> Result<Vec<KlineSummary>, Box<dyn std::error::Error>>
56where
57    S1: AsRef<str>,
58    S2: AsRef<str>,
59{
60    let klines = client
61        .get_klines(
62            symbol.as_ref(),
63            interval.as_ref(),
64            limit,
65            Some(start),
66            Some(end),
67        )
68        .await?;
69    let KlineSummaries::AllKlineSummaries(klines) = klines;
70    Ok(klines)
71}