Skip to main content

qmt_parser/
day.rs

1//! 日线解析。
2//!
3//! 这个模块同时提供:
4//!
5//! - 原始日线记录读取
6//! - 按 `NaiveDate` 或字符串日期范围过滤
7//! - 在 `polars` feature 下生成带业务派生列的 `DataFrame`
8
9use std::fs::File;
10use std::io::{BufReader, Cursor, Read};
11use std::path::Path;
12
13use crate::error::DailyParseError;
14use byteorder::{LittleEndian, ReadBytesExt};
15use chrono::{DateTime, FixedOffset, NaiveDate, TimeZone};
16#[cfg(feature = "polars")]
17use polars::datatypes::PlSmallStr;
18#[cfg(feature = "polars")]
19use polars::prelude::*;
20
21const RECORD_SIZE: usize = 64;
22const PRICE_SCALE: f64 = 1000.0;
23const AMOUNT_SCALE: f64 = 100.0;
24
25/// 日线 `DataFrame` 输出列名。
26pub const DAILY_DATAFRAME_COLUMN_NAMES: [&str; 11] = [
27    "time",
28    "open",
29    "high",
30    "low",
31    "close",
32    "volume",
33    "amount",
34    "settlementPrice",
35    "openInterest",
36    "preClose",
37    "suspendFlag",
38];
39
40/// 返回当前日线 `DataFrame` 输出列名。
41pub fn daily_dataframe_column_names() -> &'static [&'static str] {
42    &DAILY_DATAFRAME_COLUMN_NAMES
43}
44
45/// 单条日线原始记录。
46#[derive(Debug, Clone)]
47pub struct DailyKlineData {
48    /// 北京时间毫秒时间戳。
49    pub timestamp_ms: i64,
50    /// 开盘价。
51    pub open: f64,
52    /// 最高价。
53    pub high: f64,
54    /// 最低价。
55    pub low: f64,
56    /// 收盘价。
57    pub close: f64,
58    /// 成交量。
59    pub volume: u32,
60    /// 成交额,已按 QMT 日线比例缩放。
61    pub amount: f64,
62    /// 持仓量。
63    pub open_interest: u32,
64    /// 文件原始昨收价。
65    pub file_pre_close: f64,
66}
67
68/// 流式读取日线文件的迭代器。
69///
70/// 这个 reader 只负责原始记录读取和日期过滤,不做 `preClose`、`suspendFlag`
71/// 等衍生逻辑。
72pub struct DailyReader<R: Read> {
73    reader: BufReader<R>,
74    buffer: [u8; RECORD_SIZE],
75    start: Option<NaiveDate>,
76    end: Option<NaiveDate>,
77    tz_offset: FixedOffset,
78}
79
80impl DailyReader<File> {
81    /// 从 `.dat` 文件路径创建日线读取器。
82    pub fn from_path(
83        path: impl AsRef<Path>,
84        start: Option<NaiveDate>,
85        end: Option<NaiveDate>,
86    ) -> Result<Self, DailyParseError> {
87        let path = path.as_ref();
88        validate_dat_path(path)?;
89        let file = File::open(path)?;
90        Ok(Self::new(file, start, end))
91    }
92}
93
94impl<R: Read> DailyReader<R> {
95    /// 从任意 `Read` 实例构造日线读取器。
96    pub fn new(reader: R, start: Option<NaiveDate>, end: Option<NaiveDate>) -> Self {
97        DailyReader {
98            reader: BufReader::new(reader),
99            buffer: [0u8; RECORD_SIZE],
100            start,
101            end,
102            tz_offset: FixedOffset::east_opt(8 * 3600).expect("valid offset"),
103        }
104    }
105}
106
107impl<R: Read> Iterator for DailyReader<R> {
108    type Item = Result<DailyKlineData, DailyParseError>;
109
110    fn next(&mut self) -> Option<Self::Item> {
111        loop {
112            if let Err(err) = self.reader.read_exact(&mut self.buffer) {
113                if err.kind() == std::io::ErrorKind::UnexpectedEof {
114                    return None;
115                }
116                return Some(Err(DailyParseError::Io(err)));
117            }
118
119            let mut cursor = Cursor::new(&self.buffer[..]);
120            match parse_record(&mut cursor, self.start, self.end, self.tz_offset) {
121                Ok(Some(record)) => return Some(Ok(record)),
122                Ok(None) => continue,
123                Err(e) => return Some(Err(e)),
124            }
125        }
126    }
127}
128
129/// 按字符串日期范围解析日线文件为结构体。
130///
131/// `start_date_str` 和 `end_date_str` 必须使用 `YYYYMMDD` 格式。
132///
133/// # Examples
134///
135/// ```no_run
136/// use qmt_parser::parse_daily_to_structs;
137///
138/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
139/// let rows = parse_daily_to_structs("data/day/000001.dat", "20230101", "20231231")?;
140/// println!("rows = {}", rows.len());
141/// # Ok(())
142/// # }
143/// ```
144pub fn parse_daily_to_structs(
145    path: impl AsRef<Path>,
146    start_date_str: &str,
147    end_date_str: &str,
148) -> Result<Vec<DailyKlineData>, DailyParseError> {
149    let start = parse_date(start_date_str).map_err(DailyParseError::InvalidStartDate)?;
150    let end = parse_date(end_date_str).map_err(DailyParseError::InvalidEndDate)?;
151    parse_daily_to_structs_in_range(path, Some(start), Some(end))
152}
153
154/// 解析整个日线文件为结构体,不做日期过滤。
155pub fn parse_daily_file_to_structs(
156    path: impl AsRef<Path>,
157) -> Result<Vec<DailyKlineData>, DailyParseError> {
158    parse_daily_to_structs_in_range(path, None, None)
159}
160
161/// 按 typed 日期范围解析日线文件为结构体。
162pub fn parse_daily_to_structs_in_range(
163    path: impl AsRef<Path>,
164    start: Option<NaiveDate>,
165    end: Option<NaiveDate>,
166) -> Result<Vec<DailyKlineData>, DailyParseError> {
167    let path_ref = path.as_ref();
168    let mut reader = DailyReader::from_path(path_ref, start, end)?;
169    let mut out = Vec::with_capacity(estimate_rows(path_ref)?);
170    for item in &mut reader {
171        out.push(item?);
172    }
173    Ok(out)
174}
175
176/// 按字符串日期范围解析日线文件为 `DataFrame`。
177///
178/// `DataFrame` 输出会额外补齐:
179///
180/// - `suspendFlag`
181/// - `preClose`
182/// - `settlementPrice`
183///
184/// # Examples
185///
186/// ```no_run
187/// # #[cfg(feature = "polars")]
188/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
189/// use qmt_parser::parse_daily_to_dataframe;
190///
191/// let df = parse_daily_to_dataframe("data/day/000001.dat", "20230101", "20231231")?;
192/// println!("{:?}", df.shape());
193/// # Ok(())
194/// # }
195/// #
196/// # #[cfg(not(feature = "polars"))]
197/// # fn main() {}
198/// ```
199#[cfg(feature = "polars")]
200pub fn parse_daily_to_dataframe(
201    path: impl AsRef<Path>,
202    start_date_str: &str,
203    end_date_str: &str,
204) -> Result<DataFrame, DailyParseError> {
205    let start = parse_date(start_date_str).map_err(DailyParseError::InvalidStartDate)?;
206    let end = parse_date(end_date_str).map_err(DailyParseError::InvalidEndDate)?;
207    parse_daily_to_dataframe_in_range(path, Some(start), Some(end))
208}
209
210/// 解析整个日线文件为 `DataFrame`,不做日期过滤。
211#[cfg(feature = "polars")]
212pub fn parse_daily_file_to_dataframe(path: impl AsRef<Path>) -> Result<DataFrame, DailyParseError> {
213    parse_daily_to_dataframe_in_range(path, None, None)
214}
215
216/// 按 typed 日期范围解析日线文件为 `DataFrame`。
217#[cfg(feature = "polars")]
218pub fn parse_daily_to_dataframe_in_range(
219    path: impl AsRef<Path>,
220    start: Option<NaiveDate>,
221    end: Option<NaiveDate>,
222) -> Result<DataFrame, DailyParseError> {
223    let path_ref = path.as_ref();
224    let mut reader = DailyReader::from_path(path_ref, start, end)?;
225
226    let estimated_rows = estimate_rows(path_ref)?;
227    let mut timestamps = Vec::with_capacity(estimated_rows);
228    let mut opens = Vec::with_capacity(estimated_rows);
229    let mut highs = Vec::with_capacity(estimated_rows);
230    let mut lows = Vec::with_capacity(estimated_rows);
231    let mut closes = Vec::with_capacity(estimated_rows);
232    let mut volumes = Vec::with_capacity(estimated_rows);
233    let mut amounts = Vec::with_capacity(estimated_rows);
234    let mut open_interests = Vec::with_capacity(estimated_rows);
235    let mut file_pre_closes = Vec::with_capacity(estimated_rows);
236
237    for item in &mut reader {
238        let record = item?;
239        timestamps.push(record.timestamp_ms);
240        opens.push(record.open);
241        highs.push(record.high);
242        lows.push(record.low);
243        closes.push(record.close);
244        volumes.push(record.volume);
245        amounts.push(record.amount);
246        open_interests.push(record.open_interest);
247        file_pre_closes.push(record.file_pre_close);
248    }
249
250    if timestamps.is_empty() {
251        return Ok(DataFrame::empty());
252    }
253
254    let df = df![
255        "timestamp_ms" => timestamps,
256        "open" => opens,
257        "high" => highs,
258        "low" => lows,
259        "close" => closes,
260        "volume" => volumes,
261        "amount" => amounts,
262        "openInterest" => open_interests,
263        "file_preClose" => file_pre_closes,
264    ]?;
265
266    let raw_tz = polars::prelude::TimeZone::opt_try_new(None::<PlSmallStr>)?;
267    let china_tz = polars::prelude::TimeZone::opt_try_new(Some("Asia/Shanghai"))?;
268    let df_final = df
269        .lazy()
270        .sort(["timestamp_ms"], Default::default())
271        .with_column(
272            (col("volume").eq(lit(0)).and(col("amount").eq(lit(0.0))))
273                .cast(DataType::Int32)
274                .alias("suspendFlag"),
275        )
276        .with_column(col("close").shift(lit(1)).alias("calc_pre_close"))
277        .with_column(
278            when(col("suspendFlag").eq(lit(1)))
279                .then(col("close"))
280                .otherwise(col("calc_pre_close").fill_null(col("close")))
281                .alias("preClose"),
282        )
283        .with_columns(vec![
284            col("timestamp_ms")
285                .cast(DataType::Datetime(TimeUnit::Milliseconds, raw_tz))
286                .dt()
287                .convert_time_zone(china_tz.unwrap())
288                .alias("time"),
289            lit(0.0).alias("settlementPrice"),
290        ])
291        .select([
292            col("time"),
293            col("open"),
294            col("high"),
295            col("low"),
296            col("close"),
297            col("volume"),
298            col("amount"),
299            col("settlementPrice"),
300            col("openInterest"),
301            col("preClose"),
302            col("suspendFlag"),
303        ])
304        .collect()?;
305
306    Ok(df_final)
307}
308
309fn validate_dat_path(path: &Path) -> Result<(), DailyParseError> {
310    if path.as_os_str().is_empty() {
311        return Err(DailyParseError::EmptyPath);
312    }
313    let ext = path
314        .extension()
315        .and_then(|s| s.to_str())
316        .unwrap_or_default()
317        .to_ascii_lowercase();
318    if ext != "dat" {
319        return Err(DailyParseError::InvalidExtension(
320            path.display().to_string(),
321        ));
322    }
323    Ok(())
324}
325
326fn estimate_rows(path: &Path) -> Result<usize, DailyParseError> {
327    let file_len = std::fs::metadata(path)?.len();
328    Ok((file_len as usize) / RECORD_SIZE + 1)
329}
330
331fn parse_date(date: &str) -> std::result::Result<NaiveDate, String> {
332    NaiveDate::parse_from_str(date, "%Y%m%d").map_err(|e| e.to_string())
333}
334
335fn parse_record(
336    cursor: &mut Cursor<&[u8]>,
337    start: Option<NaiveDate>,
338    end: Option<NaiveDate>,
339    tz_offset: FixedOffset,
340) -> Result<Option<DailyKlineData>, DailyParseError> {
341    cursor.set_position(8);
342    let ts_seconds = cursor.read_u32::<LittleEndian>()?;
343    let dt_utc = DateTime::from_timestamp(ts_seconds as i64, 0)
344        .ok_or(DailyParseError::InvalidTimestamp)?
345        .naive_utc();
346
347    let current_date = tz_offset.from_utc_datetime(&dt_utc).date_naive();
348    if let Some(start) = start
349        && current_date < start
350    {
351        return Ok(None);
352    }
353    if let Some(end) = end
354        && current_date > end
355    {
356        return Ok(None);
357    }
358
359    let open = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
360    let high = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
361    let low = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
362    let close = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
363
364    cursor.set_position(32);
365    let volume = cursor.read_u32::<LittleEndian>()?;
366
367    cursor.set_position(40);
368    let raw_amount = cursor.read_u64::<LittleEndian>()?;
369    let amount = raw_amount as f64 / AMOUNT_SCALE;
370
371    let open_interest = cursor.read_u32::<LittleEndian>()?;
372
373    cursor.set_position(60);
374    let file_pre_close = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
375
376    Ok(Some(DailyKlineData {
377        timestamp_ms: ts_seconds as i64 * 1000,
378        open,
379        high,
380        low,
381        close,
382        volume,
383        amount,
384        open_interest,
385        file_pre_close,
386    }))
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use chrono::{DateTime, FixedOffset};
393    use std::path::PathBuf;
394
395    #[test]
396    #[cfg(feature = "polars")]
397    fn test_parse_daily_dataframe() -> Result<(), DailyParseError> {
398        let daily_path = PathBuf::from("/mnt/data/trade/qmtdata/datadir/SZ/86400/000001.DAT");
399
400        if !daily_path.exists() {
401            println!("测试文件不存在,跳过测试: {:?}", daily_path);
402            return Ok(());
403        }
404
405        let start = "19910401";
406        let end = "19910425";
407
408        let df = parse_daily_to_dataframe(&daily_path, start, end)?;
409
410        println!("--- Daily DataFrame (Shape: {:?}) ---", df.shape());
411        println!("{}", df);
412
413        if df.height() > 0 {
414            assert_eq!(
415                df.get_column_names_str().as_slice(),
416                daily_dataframe_column_names()
417            );
418            let cols = df.get_column_names();
419            assert!(cols.iter().any(|c| c.as_str() == "suspendFlag"));
420            assert!(cols.iter().any(|c| c.as_str() == "preClose"));
421
422            if df.height() >= 2 {
423                let s_close = df.column("close")?;
424                let s_pre = df.column("preClose")?;
425                let s_suspend = df.column("suspendFlag")?;
426
427                let close_0 = s_close.f64()?.get(0).unwrap();
428                let pre_1 = s_pre.f64()?.get(1).unwrap();
429                let suspend_1 = s_suspend.i32()?.get(1).unwrap();
430
431                if suspend_1 == 0 {
432                    assert!(
433                        (pre_1 - close_0).abs() < 0.001,
434                        "PreClose calculation logic error"
435                    );
436                }
437            }
438        }
439
440        Ok(())
441    }
442
443    #[test]
444    fn test_parse_daily_full_file_and_typed_range_api() -> Result<(), DailyParseError> {
445        let daily_path = PathBuf::from("data/000001-1d.dat");
446
447        let full = parse_daily_file_to_structs(&daily_path)?;
448        assert!(!full.is_empty());
449
450        let bj = FixedOffset::east_opt(8 * 3600).expect("valid offset");
451        let start = DateTime::from_timestamp_millis(full.first().unwrap().timestamp_ms)
452            .unwrap()
453            .with_timezone(&bj)
454            .date_naive();
455        let end = DateTime::from_timestamp_millis(full.last().unwrap().timestamp_ms)
456            .unwrap()
457            .with_timezone(&bj)
458            .date_naive();
459
460        let typed = parse_daily_to_structs_in_range(&daily_path, Some(start), Some(end))?;
461        let legacy = parse_daily_to_structs(
462            &daily_path,
463            &start.format("%Y%m%d").to_string(),
464            &end.format("%Y%m%d").to_string(),
465        )?;
466
467        assert_eq!(typed.len(), legacy.len());
468        Ok(())
469    }
470}