fetch_candlesticks_traits/
fetch_candlesticks.rs

1use crate::chrono_range::TimestampBuilder;
2use binance::{api::*, futures::market::*, futures::rest_model::*};
3use chrono::Duration;
4use futures::stream::{StreamExt, TryStreamExt};
5use polars::prelude::*;
6use std::{error::Error, io::Cursor, sync::Arc};
7use tokio::runtime::Runtime;
8
9pub trait FetchCandleSticks<S1, S2, S3, S4>
10where
11    S1: AsRef<str>,
12    S2: AsRef<str>,
13    S3: AsRef<str>,
14    S4: AsRef<str>,
15{
16    type Output;
17
18    fn fetch_candlesticks(
19        self,
20        symbol: S1,
21        start: Option<S2>,
22        end: Option<S3>,
23        interval: S4,
24    ) -> Result<Self::Output, Box<dyn Error>>;
25}
26
27impl<S1, S2, S3, S4> FetchCandleSticks<S1, S2, S3, S4> for DataFrame
28where
29    S1: AsRef<str>,
30    S2: AsRef<str>,
31    S3: AsRef<str>,
32    S4: AsRef<str>,
33{
34    type Output = DataFrame;
35
36    fn fetch_candlesticks(
37        mut self,
38        symbol: S1,
39        start: Option<S2>,
40        end: Option<S3>,
41        interval: S4,
42    ) -> Result<Self::Output, Box<dyn Error>> {
43        if self.is_empty() {
44            let ts_builder =
45                TimestampBuilder::new(start.ok_or("no start time")?, end, interval.as_ref())?;
46            let df = dataframe(
47                ts_builder.build(),
48                symbol.as_ref(),
49                interval.as_ref(),
50                ts_builder.limit as u16,
51            );
52            return df;
53        };
54        let start: String = (self["openTime"]
55            .datetime()?
56            .as_datetime_iter()
57            .last()
58            .unwrap()
59            .unwrap()
60            + Duration::minutes(15))
61        .format("%Y-%m-%d %H:%M")
62        .to_string();
63        let ts_builder = TimestampBuilder::new(start, end, interval.as_ref())?;
64        let df = dataframe(
65            ts_builder.build(),
66            symbol.as_ref(),
67            interval.as_ref(),
68            ts_builder.limit as u16,
69        )?;
70        if df.is_empty() {
71            return Ok(self);
72        }
73        self.vstack_mut(&df)?;
74        self.as_single_chunk_par();
75        Ok(self)
76    }
77}
78
79async fn request<S1, S2>(
80    client: Arc<FuturesMarket>,
81    symbol: S1,
82    start: u64,
83    end: u64,
84    interval: S2,
85    limit: u16,
86) -> Result<Vec<KlineSummary>, Box<dyn std::error::Error>>
87where
88    S1: AsRef<str>,
89    S2: AsRef<str>,
90{
91    let klines = client
92        .get_klines(
93            symbol.as_ref(),
94            interval.as_ref(),
95            limit,
96            Some(start),
97            Some(end),
98        )
99        .await?;
100    let KlineSummaries::AllKlineSummaries(klines) = klines;
101    Ok(klines)
102}
103
104fn dataframe<S1, S2>(
105    ts: Vec<i64>,
106    symbol: S1,
107    interval: S2,
108    limit: u16,
109) -> Result<DataFrame, Box<dyn Error>>
110where
111    S1: AsRef<str>,
112    S2: AsRef<str>,
113{
114    let market: Arc<FuturesMarket> = Arc::new(Binance::new(None, None));
115    let stream = futures::stream::iter(ts.windows(2).into_iter())
116        .map(|ts| {
117            let market = market.clone();
118            let symbol = symbol.as_ref();
119            let interval = interval.as_ref();
120            request(market, symbol, ts[0] as u64, ts[1] as u64, interval, limit)
121        })
122        .buffer_unordered(50)
123        .map_ok(|rq| futures::stream::iter(rq.into_iter().map(Ok)))
124        .try_flatten();
125    let rt = Runtime::new()?;
126    let candlesticks: Result<Vec<KlineSummary>, Box<dyn Error>> =
127        rt.block_on(async move { stream.try_collect().await });
128
129    let json = serde_json::to_string(&candlesticks?).unwrap();
130    let cursor = Cursor::new(json);
131    let df = JsonReader::new(cursor)
132        .finish()?
133        .lazy()
134        .select([
135            col("openTime").cast(DataType::Datetime(TimeUnit::Milliseconds, None)),
136            all().exclude(["openTime"]),
137        ])
138        .sort("openTime", Default::default())
139        .collect()?;
140    Ok(df)
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146
147    #[test]
148    fn df_fetch_from_empty() -> Result<(), Box<dyn Error>> {
149        let df = DataFrame::empty()
150            .fetch_candlesticks(
151                "btcusdt",
152                Some("2022-02-01 00:00"),
153                Some("2022-02-01 15:00"),
154                "15m",
155            )
156            .unwrap();
157        assert_eq!(df.height(), 61);
158        Ok(())
159    }
160    #[test]
161    fn df_update_new_fetchs() -> Result<(), Box<dyn Error>> {
162        let df = DataFrame::empty()
163            .fetch_candlesticks(
164                "btcusdt",
165                Some("2022-02-01 00:00"),
166                Some("2022-02-01 15:00"),
167                "15m",
168            )
169            .unwrap();
170        assert_eq!(df.height(), 61);
171        let df = df
172            .fetch_candlesticks("btcusdt", None::<&str>, Some("2022-02-01 23:59"), "15m")
173            .unwrap();
174        assert_eq!(df.height(), 96);
175        Ok(())
176    }
177
178    #[test]
179    fn df_ts_short() -> Result<(), Box<dyn std::error::Error>> {
180        let ts = TimestampBuilder::new("2023-02-21 00:00", Some("2023-02-21 23:59"), "15m")?;
181        let df = dataframe(ts.build(), "btcusdt", "15m", ts.limit as u16)?;
182        println!("{}", df);
183        assert_eq!(df.height(), 96);
184        Ok(())
185    }
186    #[test]
187    fn df_ts_bound() -> Result<(), Box<dyn std::error::Error>> {
188        let ts = TimestampBuilder::new("2023-02-21 00:00", Some("2023-02-22 00:00"), "15m")?;
189        let df = dataframe(ts.build(), "btcusdt", "15m", ts.limit as u16)?;
190        println!("{}", df);
191        assert_eq!(df.height(), 97);
192        Ok(())
193    }
194    #[ignore]
195    #[test]
196    fn df_ts_long() -> Result<(), Box<dyn std::error::Error>> {
197        let ts = TimestampBuilder::new("2020-01-01 00:00", Some("2023-02-22 23:59"), "15m")?;
198        let df = dataframe(ts.build(), "btcusdt", "15m", ts.limit as u16)?;
199        println!("{}", df);
200        assert_eq!(df.height(), 110304);
201        Ok(())
202    }
203}