fetch_candlesticks_traits/
fetch_candlesticks.rs1use crate::chrono_range::TimestampBuilder;
2use binance::{api::*, futures::market::*, futures::rest_model::*};
3use chrono::Duration;
4use futures::stream::{StreamExt, TryStreamExt};
5use polars::prelude::*;
6use std::{error::Error, io::Cursor, sync::Arc};
7use tokio::runtime::Runtime;
8
9pub trait FetchCandleSticks<S1, S2, S3, S4>
10where
11 S1: AsRef<str>,
12 S2: AsRef<str>,
13 S3: AsRef<str>,
14 S4: AsRef<str>,
15{
16 type Output;
17
18 fn fetch_candlesticks(
19 self,
20 symbol: S1,
21 start: Option<S2>,
22 end: Option<S3>,
23 interval: S4,
24 ) -> Result<Self::Output, Box<dyn Error>>;
25}
26
27impl<S1, S2, S3, S4> FetchCandleSticks<S1, S2, S3, S4> for DataFrame
28where
29 S1: AsRef<str>,
30 S2: AsRef<str>,
31 S3: AsRef<str>,
32 S4: AsRef<str>,
33{
34 type Output = DataFrame;
35
36 fn fetch_candlesticks(
37 mut self,
38 symbol: S1,
39 start: Option<S2>,
40 end: Option<S3>,
41 interval: S4,
42 ) -> Result<Self::Output, Box<dyn Error>> {
43 if self.is_empty() {
44 let ts_builder =
45 TimestampBuilder::new(start.ok_or("no start time")?, end, interval.as_ref())?;
46 let df = dataframe(
47 ts_builder.build(),
48 symbol.as_ref(),
49 interval.as_ref(),
50 ts_builder.limit as u16,
51 );
52 return df;
53 };
54 let start: String = (self["openTime"]
55 .datetime()?
56 .as_datetime_iter()
57 .last()
58 .unwrap()
59 .unwrap()
60 + Duration::minutes(15))
61 .format("%Y-%m-%d %H:%M")
62 .to_string();
63 let ts_builder = TimestampBuilder::new(start, end, interval.as_ref())?;
64 let df = dataframe(
65 ts_builder.build(),
66 symbol.as_ref(),
67 interval.as_ref(),
68 ts_builder.limit as u16,
69 )?;
70 if df.is_empty() {
71 return Ok(self);
72 }
73 self.vstack_mut(&df)?;
74 self.as_single_chunk_par();
75 Ok(self)
76 }
77}
78
79async fn request<S1, S2>(
80 client: Arc<FuturesMarket>,
81 symbol: S1,
82 start: u64,
83 end: u64,
84 interval: S2,
85 limit: u16,
86) -> Result<Vec<KlineSummary>, Box<dyn std::error::Error>>
87where
88 S1: AsRef<str>,
89 S2: AsRef<str>,
90{
91 let klines = client
92 .get_klines(
93 symbol.as_ref(),
94 interval.as_ref(),
95 limit,
96 Some(start),
97 Some(end),
98 )
99 .await?;
100 let KlineSummaries::AllKlineSummaries(klines) = klines;
101 Ok(klines)
102}
103
104fn dataframe<S1, S2>(
105 ts: Vec<i64>,
106 symbol: S1,
107 interval: S2,
108 limit: u16,
109) -> Result<DataFrame, Box<dyn Error>>
110where
111 S1: AsRef<str>,
112 S2: AsRef<str>,
113{
114 let market: Arc<FuturesMarket> = Arc::new(Binance::new(None, None));
115 let stream = futures::stream::iter(ts.windows(2).into_iter())
116 .map(|ts| {
117 let market = market.clone();
118 let symbol = symbol.as_ref();
119 let interval = interval.as_ref();
120 request(market, symbol, ts[0] as u64, ts[1] as u64, interval, limit)
121 })
122 .buffer_unordered(50)
123 .map_ok(|rq| futures::stream::iter(rq.into_iter().map(Ok)))
124 .try_flatten();
125 let rt = Runtime::new()?;
126 let candlesticks: Result<Vec<KlineSummary>, Box<dyn Error>> =
127 rt.block_on(async move { stream.try_collect().await });
128
129 let json = serde_json::to_string(&candlesticks?).unwrap();
130 let cursor = Cursor::new(json);
131 let df = JsonReader::new(cursor)
132 .finish()?
133 .lazy()
134 .select([
135 col("openTime").cast(DataType::Datetime(TimeUnit::Milliseconds, None)),
136 all().exclude(["openTime"]),
137 ])
138 .sort("openTime", Default::default())
139 .collect()?;
140 Ok(df)
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146
147 #[test]
148 fn df_fetch_from_empty() -> Result<(), Box<dyn Error>> {
149 let df = DataFrame::empty()
150 .fetch_candlesticks(
151 "btcusdt",
152 Some("2022-02-01 00:00"),
153 Some("2022-02-01 15:00"),
154 "15m",
155 )
156 .unwrap();
157 assert_eq!(df.height(), 61);
158 Ok(())
159 }
160 #[test]
161 fn df_update_new_fetchs() -> Result<(), Box<dyn Error>> {
162 let df = DataFrame::empty()
163 .fetch_candlesticks(
164 "btcusdt",
165 Some("2022-02-01 00:00"),
166 Some("2022-02-01 15:00"),
167 "15m",
168 )
169 .unwrap();
170 assert_eq!(df.height(), 61);
171 let df = df
172 .fetch_candlesticks("btcusdt", None::<&str>, Some("2022-02-01 23:59"), "15m")
173 .unwrap();
174 assert_eq!(df.height(), 96);
175 Ok(())
176 }
177
178 #[test]
179 fn df_ts_short() -> Result<(), Box<dyn std::error::Error>> {
180 let ts = TimestampBuilder::new("2023-02-21 00:00", Some("2023-02-21 23:59"), "15m")?;
181 let df = dataframe(ts.build(), "btcusdt", "15m", ts.limit as u16)?;
182 println!("{}", df);
183 assert_eq!(df.height(), 96);
184 Ok(())
185 }
186 #[test]
187 fn df_ts_bound() -> Result<(), Box<dyn std::error::Error>> {
188 let ts = TimestampBuilder::new("2023-02-21 00:00", Some("2023-02-22 00:00"), "15m")?;
189 let df = dataframe(ts.build(), "btcusdt", "15m", ts.limit as u16)?;
190 println!("{}", df);
191 assert_eq!(df.height(), 97);
192 Ok(())
193 }
194 #[ignore]
195 #[test]
196 fn df_ts_long() -> Result<(), Box<dyn std::error::Error>> {
197 let ts = TimestampBuilder::new("2020-01-01 00:00", Some("2023-02-22 23:59"), "15m")?;
198 let df = dataframe(ts.build(), "btcusdt", "15m", ts.limit as u16)?;
199 println!("{}", df);
200 assert_eq!(df.height(), 110304);
201 Ok(())
202 }
203}