Skip to main content

qmt_parser/
min.rs

1//! 1 分钟 K 线解析。
2//!
3//! 这个模块提供两层对外接口:
4//!
5//! - [`MinReader`]:按记录流式迭代读取
6//! - [`parse_min_to_structs`] / [`parse_min_to_dataframe`]:一次性读完整个文件
7
8use std::fs::File;
9use std::io::{BufReader, Cursor, Read};
10use std::path::Path;
11
12use crate::error::MinParseError;
13use byteorder::{LittleEndian, ReadBytesExt};
14#[cfg(feature = "polars")]
15use polars::datatypes::PlSmallStr;
16#[cfg(feature = "polars")]
17use polars::prelude::*;
18
19const RECORD_SIZE: usize = 64;
20const PRICE_SCALE: f64 = 1000.0;
21
22/// 分钟线 `DataFrame` 输出列名。
23pub const MIN_DATAFRAME_COLUMN_NAMES: [&str; 11] = [
24    "time",
25    "open",
26    "high",
27    "low",
28    "close",
29    "volume",
30    "amount",
31    "settlementPrice",
32    "openInterest",
33    "preClose",
34    "suspendFlag",
35];
36
37/// 返回当前分钟线 `DataFrame` 输出列名。
38pub fn min_dataframe_column_names() -> &'static [&'static str] {
39    &MIN_DATAFRAME_COLUMN_NAMES
40}
41
42/// 单条 1 分钟 K 线记录。
43#[derive(Debug, Clone)]
44pub struct MinKlineData {
45    /// 北京时间毫秒时间戳。
46    pub timestamp_ms: i64,
47    /// 开盘价。
48    pub open: f64,
49    /// 最高价。
50    pub high: f64,
51    /// 最低价。
52    pub low: f64,
53    /// 收盘价。
54    pub close: f64,
55    /// 成交量。
56    pub volume: u32,
57    /// 成交额。
58    pub amount: f64,
59    /// 持仓量。
60    pub open_interest: u32,
61    /// 文件中记录的昨收价。
62    pub pre_close: f64,
63}
64
65/// 流式读取分钟线文件的迭代器。
66pub struct MinReader<R: Read> {
67    reader: BufReader<R>,
68    buffer: [u8; RECORD_SIZE],
69}
70
71impl MinReader<File> {
72    /// 从 `.dat` 文件路径创建分钟线读取器。
73    pub fn from_path(path: impl AsRef<Path>) -> Result<Self, MinParseError> {
74        let path = path.as_ref();
75        validate_dat_path(path)?;
76        let file = File::open(path)?;
77        Ok(Self::new(file))
78    }
79}
80
81impl<R: Read> MinReader<R> {
82    /// 从任意 `Read` 实例构造分钟线读取器。
83    pub fn new(reader: R) -> Self {
84        MinReader {
85            reader: BufReader::new(reader),
86            buffer: [0u8; RECORD_SIZE],
87        }
88    }
89}
90
91impl<R: Read> Iterator for MinReader<R> {
92    type Item = Result<MinKlineData, MinParseError>;
93
94    fn next(&mut self) -> Option<Self::Item> {
95        if let Err(err) = self.reader.read_exact(&mut self.buffer) {
96            if err.kind() == std::io::ErrorKind::UnexpectedEof {
97                return None;
98            }
99            return Some(Err(MinParseError::Io(err)));
100        }
101
102        let mut cursor = Cursor::new(&self.buffer[..]);
103        Some(parse_record(&mut cursor).map_err(MinParseError::Io))
104    }
105}
106
107/// 把分钟线文件完整解析为 `Vec<MinKlineData>`。
108///
109/// 适合应用层直接消费 typed 结构体。
110///
111/// # Examples
112///
113/// ```no_run
114/// use qmt_parser::parse_min_to_structs;
115///
116/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
117/// let rows = parse_min_to_structs("data/000001-1m.dat")?;
118/// println!("rows = {}", rows.len());
119/// # Ok(())
120/// # }
121/// ```
122pub fn parse_min_to_structs(path: impl AsRef<Path>) -> Result<Vec<MinKlineData>, MinParseError> {
123    let path_ref = path.as_ref();
124    let mut reader = MinReader::from_path(path_ref)?;
125    let estimated_rows = estimate_rows(path_ref)?;
126    let mut out = Vec::with_capacity(estimated_rows);
127    for item in &mut reader {
128        out.push(item?);
129    }
130    Ok(out)
131}
132
133/// 把分钟线文件完整解析为 Polars `DataFrame`。
134///
135/// 输出 schema 可通过 [`min_dataframe_column_names`] 获取。
136///
137/// # Examples
138///
139/// ```no_run
140/// # #[cfg(feature = "polars")]
141/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
142/// use qmt_parser::parse_min_to_dataframe;
143///
144/// let df = parse_min_to_dataframe("data/000001-1m.dat")?;
145/// println!("{:?}", df.shape());
146/// # Ok(())
147/// # }
148/// #
149/// # #[cfg(not(feature = "polars"))]
150/// # fn main() {}
151/// ```
152#[cfg(feature = "polars")]
153pub fn parse_min_to_dataframe(path: impl AsRef<Path>) -> Result<DataFrame, MinParseError> {
154    let path_ref = path.as_ref();
155    let mut reader = MinReader::from_path(path_ref)?;
156    let estimated_rows = estimate_rows(path_ref)?;
157
158    let mut timestamps = Vec::with_capacity(estimated_rows);
159    let mut opens = Vec::with_capacity(estimated_rows);
160    let mut highs = Vec::with_capacity(estimated_rows);
161    let mut lows = Vec::with_capacity(estimated_rows);
162    let mut closes = Vec::with_capacity(estimated_rows);
163    let mut volumes = Vec::with_capacity(estimated_rows);
164    let mut amounts = Vec::with_capacity(estimated_rows);
165    let mut open_interests = Vec::with_capacity(estimated_rows);
166    let mut pre_closes = Vec::with_capacity(estimated_rows);
167
168    for item in &mut reader {
169        let record = item?;
170        timestamps.push(record.timestamp_ms);
171        opens.push(record.open);
172        highs.push(record.high);
173        lows.push(record.low);
174        closes.push(record.close);
175        volumes.push(record.volume);
176        amounts.push(record.amount);
177        open_interests.push(record.open_interest);
178        pre_closes.push(record.pre_close);
179    }
180
181    if timestamps.is_empty() {
182        return Ok(DataFrame::empty());
183    }
184
185    let len = timestamps.len();
186    let settlement_prices = Series::new("settlementPrice".into(), vec![0.0f64; len]);
187    let suspend_flags = Series::new("suspendFlag".into(), vec![0i32; len]);
188
189    let df = df![
190        "timestamp_ms" => timestamps,
191        "open" => opens,
192        "high" => highs,
193        "low" => lows,
194        "close" => closes,
195        "volume" => volumes,
196        "amount" => amounts,
197        "settlementPrice" => settlement_prices,
198        "openInterest" => open_interests,
199        "preClose" => pre_closes,
200        "suspendFlag" => suspend_flags,
201    ]?;
202
203    let raw_tz = TimeZone::opt_try_new(None::<PlSmallStr>)?;
204    let china_tz = TimeZone::opt_try_new(Some("Asia/Shanghai"))?;
205    let df = df
206        .lazy()
207        .with_column(
208            col("timestamp_ms")
209                .cast(DataType::Datetime(TimeUnit::Milliseconds, raw_tz))
210                .dt()
211                .convert_time_zone(china_tz.unwrap())
212                .alias("time"),
213        )
214        .select([
215            col("time"),
216            col("open"),
217            col("high"),
218            col("low"),
219            col("close"),
220            col("volume"),
221            col("amount"),
222            col("settlementPrice"),
223            col("openInterest"),
224            col("preClose"),
225            col("suspendFlag"),
226        ])
227        .collect()?;
228
229    Ok(df)
230}
231
232fn validate_dat_path(path: &Path) -> Result<(), MinParseError> {
233    if path.as_os_str().is_empty() {
234        return Err(MinParseError::EmptyPath);
235    }
236    let ext = path
237        .extension()
238        .and_then(|s| s.to_str())
239        .unwrap_or_default()
240        .to_ascii_lowercase();
241    if ext != "dat" {
242        return Err(MinParseError::InvalidExtension(path.display().to_string()));
243    }
244    Ok(())
245}
246
247fn estimate_rows(path: &Path) -> Result<usize, MinParseError> {
248    let file_len = std::fs::metadata(path)?.len();
249    Ok((file_len as usize) / RECORD_SIZE + 1)
250}
251
252fn parse_record(cursor: &mut Cursor<&[u8]>) -> std::io::Result<MinKlineData> {
253    cursor.set_position(8);
254    let ts_seconds = cursor.read_u32::<LittleEndian>()?;
255    let open = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
256    let high = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
257    let low = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
258    let close = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
259
260    cursor.set_position(32);
261    let volume = cursor.read_u32::<LittleEndian>()?;
262
263    cursor.set_position(40);
264    let amount = cursor.read_u64::<LittleEndian>()? as f64;
265
266    let open_interest = cursor.read_u32::<LittleEndian>()?;
267
268    cursor.set_position(60);
269    let pre_close = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
270
271    Ok(MinKlineData {
272        timestamp_ms: ts_seconds as i64 * 1000,
273        open,
274        high,
275        low,
276        close,
277        volume,
278        amount,
279        open_interest,
280        pre_close,
281    })
282}
283
284#[cfg(all(test, feature = "polars"))]
285mod tests {
286    use super::*;
287    use std::path::PathBuf;
288
289    #[test]
290    fn test_parse_min_dataframe() -> Result<(), MinParseError> {
291        let test_file = PathBuf::from("data/000000-1m.dat");
292
293        let df = parse_min_to_dataframe(&test_file)?;
294
295        println!("--- Tail ---");
296        println!("{}", df.tail(Some(5)));
297        assert_eq!(
298            df.get_column_names_str().as_slice(),
299            min_dataframe_column_names()
300        );
301        Ok(())
302    }
303}