1use std::fs::File;
10use std::io::{BufReader, Cursor, Read};
11use std::path::Path;
12
13use crate::error::DailyParseError;
14use byteorder::{LittleEndian, ReadBytesExt};
15use chrono::{DateTime, FixedOffset, NaiveDate, TimeZone};
16#[cfg(feature = "polars")]
17use polars::datatypes::PlSmallStr;
18#[cfg(feature = "polars")]
19use polars::prelude::*;
20
21const RECORD_SIZE: usize = 64;
22const PRICE_SCALE: f64 = 1000.0;
23const AMOUNT_SCALE: f64 = 100.0;
24
25pub const DAILY_DATAFRAME_COLUMN_NAMES: [&str; 11] = [
27 "time",
28 "open",
29 "high",
30 "low",
31 "close",
32 "volume",
33 "amount",
34 "settlementPrice",
35 "openInterest",
36 "preClose",
37 "suspendFlag",
38];
39
40pub fn daily_dataframe_column_names() -> &'static [&'static str] {
42 &DAILY_DATAFRAME_COLUMN_NAMES
43}
44
45#[derive(Debug, Clone)]
47pub struct DailyKlineData {
48 pub timestamp_ms: i64,
50 pub open: f64,
52 pub high: f64,
54 pub low: f64,
56 pub close: f64,
58 pub volume: u32,
60 pub amount: f64,
62 pub open_interest: u32,
64 pub file_pre_close: f64,
66}
67
68pub struct DailyReader<R: Read> {
73 reader: BufReader<R>,
74 buffer: [u8; RECORD_SIZE],
75 start: Option<NaiveDate>,
76 end: Option<NaiveDate>,
77 tz_offset: FixedOffset,
78}
79
80impl DailyReader<File> {
81 pub fn from_path(
83 path: impl AsRef<Path>,
84 start: Option<NaiveDate>,
85 end: Option<NaiveDate>,
86 ) -> Result<Self, DailyParseError> {
87 let path = path.as_ref();
88 validate_dat_path(path)?;
89 let file = File::open(path)?;
90 Ok(Self::new(file, start, end))
91 }
92}
93
94impl<R: Read> DailyReader<R> {
95 pub fn new(reader: R, start: Option<NaiveDate>, end: Option<NaiveDate>) -> Self {
97 DailyReader {
98 reader: BufReader::new(reader),
99 buffer: [0u8; RECORD_SIZE],
100 start,
101 end,
102 tz_offset: FixedOffset::east_opt(8 * 3600).expect("valid offset"),
103 }
104 }
105}
106
107impl<R: Read> Iterator for DailyReader<R> {
108 type Item = Result<DailyKlineData, DailyParseError>;
109
110 fn next(&mut self) -> Option<Self::Item> {
111 loop {
112 if let Err(err) = self.reader.read_exact(&mut self.buffer) {
113 if err.kind() == std::io::ErrorKind::UnexpectedEof {
114 return None;
115 }
116 return Some(Err(DailyParseError::Io(err)));
117 }
118
119 let mut cursor = Cursor::new(&self.buffer[..]);
120 match parse_record(&mut cursor, self.start, self.end, self.tz_offset) {
121 Ok(Some(record)) => return Some(Ok(record)),
122 Ok(None) => continue,
123 Err(e) => return Some(Err(e)),
124 }
125 }
126 }
127}
128
129pub fn parse_daily_to_structs(
145 path: impl AsRef<Path>,
146 start_date_str: &str,
147 end_date_str: &str,
148) -> Result<Vec<DailyKlineData>, DailyParseError> {
149 let start = parse_date(start_date_str).map_err(DailyParseError::InvalidStartDate)?;
150 let end = parse_date(end_date_str).map_err(DailyParseError::InvalidEndDate)?;
151 parse_daily_to_structs_in_range(path, Some(start), Some(end))
152}
153
154pub fn parse_daily_file_to_structs(
156 path: impl AsRef<Path>,
157) -> Result<Vec<DailyKlineData>, DailyParseError> {
158 parse_daily_to_structs_in_range(path, None, None)
159}
160
161pub fn parse_daily_to_structs_in_range(
163 path: impl AsRef<Path>,
164 start: Option<NaiveDate>,
165 end: Option<NaiveDate>,
166) -> Result<Vec<DailyKlineData>, DailyParseError> {
167 let path_ref = path.as_ref();
168 let mut reader = DailyReader::from_path(path_ref, start, end)?;
169 let mut out = Vec::with_capacity(estimate_rows(path_ref)?);
170 for item in &mut reader {
171 out.push(item?);
172 }
173 Ok(out)
174}
175
176#[cfg(feature = "polars")]
200pub fn parse_daily_to_dataframe(
201 path: impl AsRef<Path>,
202 start_date_str: &str,
203 end_date_str: &str,
204) -> Result<DataFrame, DailyParseError> {
205 let start = parse_date(start_date_str).map_err(DailyParseError::InvalidStartDate)?;
206 let end = parse_date(end_date_str).map_err(DailyParseError::InvalidEndDate)?;
207 parse_daily_to_dataframe_in_range(path, Some(start), Some(end))
208}
209
210#[cfg(feature = "polars")]
212pub fn parse_daily_file_to_dataframe(path: impl AsRef<Path>) -> Result<DataFrame, DailyParseError> {
213 parse_daily_to_dataframe_in_range(path, None, None)
214}
215
216#[cfg(feature = "polars")]
218pub fn parse_daily_to_dataframe_in_range(
219 path: impl AsRef<Path>,
220 start: Option<NaiveDate>,
221 end: Option<NaiveDate>,
222) -> Result<DataFrame, DailyParseError> {
223 let path_ref = path.as_ref();
224 let mut reader = DailyReader::from_path(path_ref, start, end)?;
225
226 let estimated_rows = estimate_rows(path_ref)?;
227 let mut timestamps = Vec::with_capacity(estimated_rows);
228 let mut opens = Vec::with_capacity(estimated_rows);
229 let mut highs = Vec::with_capacity(estimated_rows);
230 let mut lows = Vec::with_capacity(estimated_rows);
231 let mut closes = Vec::with_capacity(estimated_rows);
232 let mut volumes = Vec::with_capacity(estimated_rows);
233 let mut amounts = Vec::with_capacity(estimated_rows);
234 let mut open_interests = Vec::with_capacity(estimated_rows);
235 let mut file_pre_closes = Vec::with_capacity(estimated_rows);
236
237 for item in &mut reader {
238 let record = item?;
239 timestamps.push(record.timestamp_ms);
240 opens.push(record.open);
241 highs.push(record.high);
242 lows.push(record.low);
243 closes.push(record.close);
244 volumes.push(record.volume);
245 amounts.push(record.amount);
246 open_interests.push(record.open_interest);
247 file_pre_closes.push(record.file_pre_close);
248 }
249
250 if timestamps.is_empty() {
251 return Ok(DataFrame::empty());
252 }
253
254 let df = df![
255 "timestamp_ms" => timestamps,
256 "open" => opens,
257 "high" => highs,
258 "low" => lows,
259 "close" => closes,
260 "volume" => volumes,
261 "amount" => amounts,
262 "openInterest" => open_interests,
263 "file_preClose" => file_pre_closes,
264 ]?;
265
266 let raw_tz = polars::prelude::TimeZone::opt_try_new(None::<PlSmallStr>)?;
267 let china_tz = polars::prelude::TimeZone::opt_try_new(Some("Asia/Shanghai"))?;
268 let df_final = df
269 .lazy()
270 .sort(["timestamp_ms"], Default::default())
271 .with_column(
272 (col("volume").eq(lit(0)).and(col("amount").eq(lit(0.0))))
273 .cast(DataType::Int32)
274 .alias("suspendFlag"),
275 )
276 .with_column(col("close").shift(lit(1)).alias("calc_pre_close"))
277 .with_column(
278 when(col("suspendFlag").eq(lit(1)))
279 .then(col("close"))
280 .otherwise(col("calc_pre_close").fill_null(col("close")))
281 .alias("preClose"),
282 )
283 .with_columns(vec![
284 col("timestamp_ms")
285 .cast(DataType::Datetime(TimeUnit::Milliseconds, raw_tz))
286 .dt()
287 .convert_time_zone(china_tz.unwrap())
288 .alias("time"),
289 lit(0.0).alias("settlementPrice"),
290 ])
291 .select([
292 col("time"),
293 col("open"),
294 col("high"),
295 col("low"),
296 col("close"),
297 col("volume"),
298 col("amount"),
299 col("settlementPrice"),
300 col("openInterest"),
301 col("preClose"),
302 col("suspendFlag"),
303 ])
304 .collect()?;
305
306 Ok(df_final)
307}
308
309fn validate_dat_path(path: &Path) -> Result<(), DailyParseError> {
310 if path.as_os_str().is_empty() {
311 return Err(DailyParseError::EmptyPath);
312 }
313 let ext = path
314 .extension()
315 .and_then(|s| s.to_str())
316 .unwrap_or_default()
317 .to_ascii_lowercase();
318 if ext != "dat" {
319 return Err(DailyParseError::InvalidExtension(
320 path.display().to_string(),
321 ));
322 }
323 Ok(())
324}
325
326fn estimate_rows(path: &Path) -> Result<usize, DailyParseError> {
327 let file_len = std::fs::metadata(path)?.len();
328 Ok((file_len as usize) / RECORD_SIZE + 1)
329}
330
331fn parse_date(date: &str) -> std::result::Result<NaiveDate, String> {
332 NaiveDate::parse_from_str(date, "%Y%m%d").map_err(|e| e.to_string())
333}
334
335fn parse_record(
336 cursor: &mut Cursor<&[u8]>,
337 start: Option<NaiveDate>,
338 end: Option<NaiveDate>,
339 tz_offset: FixedOffset,
340) -> Result<Option<DailyKlineData>, DailyParseError> {
341 cursor.set_position(8);
342 let ts_seconds = cursor.read_u32::<LittleEndian>()?;
343 let dt_utc = DateTime::from_timestamp(ts_seconds as i64, 0)
344 .ok_or(DailyParseError::InvalidTimestamp)?
345 .naive_utc();
346
347 let current_date = tz_offset.from_utc_datetime(&dt_utc).date_naive();
348 if let Some(start) = start
349 && current_date < start
350 {
351 return Ok(None);
352 }
353 if let Some(end) = end
354 && current_date > end
355 {
356 return Ok(None);
357 }
358
359 let open = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
360 let high = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
361 let low = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
362 let close = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
363
364 cursor.set_position(32);
365 let volume = cursor.read_u32::<LittleEndian>()?;
366
367 cursor.set_position(40);
368 let raw_amount = cursor.read_u64::<LittleEndian>()?;
369 let amount = raw_amount as f64 / AMOUNT_SCALE;
370
371 let open_interest = cursor.read_u32::<LittleEndian>()?;
372
373 cursor.set_position(60);
374 let file_pre_close = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
375
376 Ok(Some(DailyKlineData {
377 timestamp_ms: ts_seconds as i64 * 1000,
378 open,
379 high,
380 low,
381 close,
382 volume,
383 amount,
384 open_interest,
385 file_pre_close,
386 }))
387}
388
389#[cfg(test)]
390mod tests {
391 use super::*;
392 use chrono::{DateTime, FixedOffset};
393 use std::path::PathBuf;
394
395 #[test]
396 #[cfg(feature = "polars")]
397 fn test_parse_daily_dataframe() -> Result<(), DailyParseError> {
398 let daily_path = PathBuf::from("/mnt/data/trade/qmtdata/datadir/SZ/86400/000001.DAT");
399
400 if !daily_path.exists() {
401 println!("测试文件不存在,跳过测试: {:?}", daily_path);
402 return Ok(());
403 }
404
405 let start = "19910401";
406 let end = "19910425";
407
408 let df = parse_daily_to_dataframe(&daily_path, start, end)?;
409
410 println!("--- Daily DataFrame (Shape: {:?}) ---", df.shape());
411 println!("{}", df);
412
413 if df.height() > 0 {
414 assert_eq!(
415 df.get_column_names_str().as_slice(),
416 daily_dataframe_column_names()
417 );
418 let cols = df.get_column_names();
419 assert!(cols.iter().any(|c| c.as_str() == "suspendFlag"));
420 assert!(cols.iter().any(|c| c.as_str() == "preClose"));
421
422 if df.height() >= 2 {
423 let s_close = df.column("close")?;
424 let s_pre = df.column("preClose")?;
425 let s_suspend = df.column("suspendFlag")?;
426
427 let close_0 = s_close.f64()?.get(0).unwrap();
428 let pre_1 = s_pre.f64()?.get(1).unwrap();
429 let suspend_1 = s_suspend.i32()?.get(1).unwrap();
430
431 if suspend_1 == 0 {
432 assert!(
433 (pre_1 - close_0).abs() < 0.001,
434 "PreClose calculation logic error"
435 );
436 }
437 }
438 }
439
440 Ok(())
441 }
442
443 #[test]
444 fn test_parse_daily_full_file_and_typed_range_api() -> Result<(), DailyParseError> {
445 let daily_path = PathBuf::from("data/000001-1d.dat");
446
447 let full = parse_daily_file_to_structs(&daily_path)?;
448 assert!(!full.is_empty());
449
450 let bj = FixedOffset::east_opt(8 * 3600).expect("valid offset");
451 let start = DateTime::from_timestamp_millis(full.first().unwrap().timestamp_ms)
452 .unwrap()
453 .with_timezone(&bj)
454 .date_naive();
455 let end = DateTime::from_timestamp_millis(full.last().unwrap().timestamp_ms)
456 .unwrap()
457 .with_timezone(&bj)
458 .date_naive();
459
460 let typed = parse_daily_to_structs_in_range(&daily_path, Some(start), Some(end))?;
461 let legacy = parse_daily_to_structs(
462 &daily_path,
463 &start.format("%Y%m%d").to_string(),
464 &end.format("%Y%m%d").to_string(),
465 )?;
466
467 assert_eq!(typed.len(), legacy.len());
468 Ok(())
469 }
470}