1use binance::{api::*, futures::market::*, futures::rest_model::*};
2use futures::stream::{StreamExt, TryStreamExt};
3use polars::prelude::*;
4use std::{error::Error, io::Cursor, sync::Arc};
5use tokio::runtime::Runtime;
6
7pub fn dataframe<S1, S2>(
8 ts: Vec<i64>,
9 symbol: S1,
10 interval: S2,
11 limit: u16,
12) -> Result<DataFrame, Box<dyn Error>>
13where
14 S1: AsRef<str>,
15 S2: AsRef<str>,
16{
17 let market: Arc<FuturesMarket> = Arc::new(Binance::new(None, None));
18 let stream = futures::stream::iter(ts.windows(2).into_iter())
19 .map(|ts| {
20 let market = market.clone();
21 let symbol = symbol.as_ref();
22 let interval = interval.as_ref();
23 request(market, symbol, ts[0] as u64, ts[1] as u64, interval, limit)
24 })
25 .buffer_unordered(50)
26 .map_ok(|rq| futures::stream::iter(rq.into_iter().map(Ok)))
27 .try_flatten();
28 let rt = Runtime::new()?;
29 let candlesticks: Result<Vec<KlineSummary>, Box<dyn Error>> =
30 rt.block_on(async move { stream.try_collect().await });
31
32 let candlesticks = candlesticks?;
33
34 let json = serde_json::to_string(&candlesticks).unwrap();
35 let cursor = Cursor::new(json);
36 let df = JsonReader::new(cursor)
37 .finish()?
38 .lazy()
39 .select([
40 col("openTime").cast(DataType::Datetime(TimeUnit::Milliseconds, None)),
41 all().exclude(["openTime"]),
42 ])
43 .sort("openTime", Default::default())
44 .collect()?;
45 Ok(df)
46}
47
48async fn request<S1, S2>(
49 client: Arc<FuturesMarket>,
50 symbol: S1,
51 start: u64,
52 end: u64,
53 interval: S2,
54 limit: u16,
55) -> Result<Vec<KlineSummary>, Box<dyn std::error::Error>>
56where
57 S1: AsRef<str>,
58 S2: AsRef<str>,
59{
60 let klines = client
61 .get_klines(
62 symbol.as_ref(),
63 interval.as_ref(),
64 limit,
65 Some(start),
66 Some(end),
67 )
68 .await?;
69 let KlineSummaries::AllKlineSummaries(klines) = klines;
70 Ok(klines)
71}