1use std::fs::File;
9use std::io::{BufReader, Cursor, Read};
10use std::path::Path;
11
12use crate::error::MinParseError;
13use byteorder::{LittleEndian, ReadBytesExt};
14#[cfg(feature = "polars")]
15use polars::datatypes::PlSmallStr;
16#[cfg(feature = "polars")]
17use polars::prelude::*;
18
19const RECORD_SIZE: usize = 64;
20const PRICE_SCALE: f64 = 1000.0;
21
22pub const MIN_DATAFRAME_COLUMN_NAMES: [&str; 11] = [
24 "time",
25 "open",
26 "high",
27 "low",
28 "close",
29 "volume",
30 "amount",
31 "settlementPrice",
32 "openInterest",
33 "preClose",
34 "suspendFlag",
35];
36
37pub fn min_dataframe_column_names() -> &'static [&'static str] {
39 &MIN_DATAFRAME_COLUMN_NAMES
40}
41
42#[derive(Debug, Clone)]
44pub struct MinKlineData {
45 pub timestamp_ms: i64,
47 pub open: f64,
49 pub high: f64,
51 pub low: f64,
53 pub close: f64,
55 pub volume: u32,
57 pub amount: f64,
59 pub open_interest: u32,
61 pub pre_close: f64,
63}
64
65pub struct MinReader<R: Read> {
67 reader: BufReader<R>,
68 buffer: [u8; RECORD_SIZE],
69}
70
71impl MinReader<File> {
72 pub fn from_path(path: impl AsRef<Path>) -> Result<Self, MinParseError> {
74 let path = path.as_ref();
75 validate_dat_path(path)?;
76 let file = File::open(path)?;
77 Ok(Self::new(file))
78 }
79}
80
81impl<R: Read> MinReader<R> {
82 pub fn new(reader: R) -> Self {
84 MinReader {
85 reader: BufReader::new(reader),
86 buffer: [0u8; RECORD_SIZE],
87 }
88 }
89}
90
91impl<R: Read> Iterator for MinReader<R> {
92 type Item = Result<MinKlineData, MinParseError>;
93
94 fn next(&mut self) -> Option<Self::Item> {
95 if let Err(err) = self.reader.read_exact(&mut self.buffer) {
96 if err.kind() == std::io::ErrorKind::UnexpectedEof {
97 return None;
98 }
99 return Some(Err(MinParseError::Io(err)));
100 }
101
102 let mut cursor = Cursor::new(&self.buffer[..]);
103 Some(parse_record(&mut cursor).map_err(MinParseError::Io))
104 }
105}
106
107pub fn parse_min_to_structs(path: impl AsRef<Path>) -> Result<Vec<MinKlineData>, MinParseError> {
123 let path_ref = path.as_ref();
124 let mut reader = MinReader::from_path(path_ref)?;
125 let estimated_rows = estimate_rows(path_ref)?;
126 let mut out = Vec::with_capacity(estimated_rows);
127 for item in &mut reader {
128 out.push(item?);
129 }
130 Ok(out)
131}
132
133#[cfg(feature = "polars")]
153pub fn parse_min_to_dataframe(path: impl AsRef<Path>) -> Result<DataFrame, MinParseError> {
154 let path_ref = path.as_ref();
155 let mut reader = MinReader::from_path(path_ref)?;
156 let estimated_rows = estimate_rows(path_ref)?;
157
158 let mut timestamps = Vec::with_capacity(estimated_rows);
159 let mut opens = Vec::with_capacity(estimated_rows);
160 let mut highs = Vec::with_capacity(estimated_rows);
161 let mut lows = Vec::with_capacity(estimated_rows);
162 let mut closes = Vec::with_capacity(estimated_rows);
163 let mut volumes = Vec::with_capacity(estimated_rows);
164 let mut amounts = Vec::with_capacity(estimated_rows);
165 let mut open_interests = Vec::with_capacity(estimated_rows);
166 let mut pre_closes = Vec::with_capacity(estimated_rows);
167
168 for item in &mut reader {
169 let record = item?;
170 timestamps.push(record.timestamp_ms);
171 opens.push(record.open);
172 highs.push(record.high);
173 lows.push(record.low);
174 closes.push(record.close);
175 volumes.push(record.volume);
176 amounts.push(record.amount);
177 open_interests.push(record.open_interest);
178 pre_closes.push(record.pre_close);
179 }
180
181 if timestamps.is_empty() {
182 return Ok(DataFrame::empty());
183 }
184
185 let len = timestamps.len();
186 let settlement_prices = Series::new("settlementPrice".into(), vec![0.0f64; len]);
187 let suspend_flags = Series::new("suspendFlag".into(), vec![0i32; len]);
188
189 let df = df![
190 "timestamp_ms" => timestamps,
191 "open" => opens,
192 "high" => highs,
193 "low" => lows,
194 "close" => closes,
195 "volume" => volumes,
196 "amount" => amounts,
197 "settlementPrice" => settlement_prices,
198 "openInterest" => open_interests,
199 "preClose" => pre_closes,
200 "suspendFlag" => suspend_flags,
201 ]?;
202
203 let raw_tz = TimeZone::opt_try_new(None::<PlSmallStr>)?;
204 let china_tz = TimeZone::opt_try_new(Some("Asia/Shanghai"))?;
205 let df = df
206 .lazy()
207 .with_column(
208 col("timestamp_ms")
209 .cast(DataType::Datetime(TimeUnit::Milliseconds, raw_tz))
210 .dt()
211 .convert_time_zone(china_tz.unwrap())
212 .alias("time"),
213 )
214 .select([
215 col("time"),
216 col("open"),
217 col("high"),
218 col("low"),
219 col("close"),
220 col("volume"),
221 col("amount"),
222 col("settlementPrice"),
223 col("openInterest"),
224 col("preClose"),
225 col("suspendFlag"),
226 ])
227 .collect()?;
228
229 Ok(df)
230}
231
232fn validate_dat_path(path: &Path) -> Result<(), MinParseError> {
233 if path.as_os_str().is_empty() {
234 return Err(MinParseError::EmptyPath);
235 }
236 let ext = path
237 .extension()
238 .and_then(|s| s.to_str())
239 .unwrap_or_default()
240 .to_ascii_lowercase();
241 if ext != "dat" {
242 return Err(MinParseError::InvalidExtension(path.display().to_string()));
243 }
244 Ok(())
245}
246
247fn estimate_rows(path: &Path) -> Result<usize, MinParseError> {
248 let file_len = std::fs::metadata(path)?.len();
249 Ok((file_len as usize) / RECORD_SIZE + 1)
250}
251
252fn parse_record(cursor: &mut Cursor<&[u8]>) -> std::io::Result<MinKlineData> {
253 cursor.set_position(8);
254 let ts_seconds = cursor.read_u32::<LittleEndian>()?;
255 let open = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
256 let high = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
257 let low = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
258 let close = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
259
260 cursor.set_position(32);
261 let volume = cursor.read_u32::<LittleEndian>()?;
262
263 cursor.set_position(40);
264 let amount = cursor.read_u64::<LittleEndian>()? as f64;
265
266 let open_interest = cursor.read_u32::<LittleEndian>()?;
267
268 cursor.set_position(60);
269 let pre_close = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
270
271 Ok(MinKlineData {
272 timestamp_ms: ts_seconds as i64 * 1000,
273 open,
274 high,
275 low,
276 close,
277 volume,
278 amount,
279 open_interest,
280 pre_close,
281 })
282}
283
284#[cfg(all(test, feature = "polars"))]
285mod tests {
286 use super::*;
287 use std::path::PathBuf;
288
289 #[test]
290 fn test_parse_min_dataframe() -> Result<(), MinParseError> {
291 let test_file = PathBuf::from("data/000000-1m.dat");
292
293 let df = parse_min_to_dataframe(&test_file)?;
294
295 println!("--- Tail ---");
296 println!("{}", df.tail(Some(5)));
297 assert_eq!(
298 df.get_column_names_str().as_slice(),
299 min_dataframe_column_names()
300 );
301 Ok(())
302 }
303}