1use std::fs::File;
15use std::io::{BufReader, Cursor, Read};
16use std::path::Path;
17
18use crate::error::TickParseError;
19use byteorder::{LittleEndian, ReadBytesExt};
20#[cfg(any(test, feature = "polars"))]
21use chrono::{FixedOffset, NaiveDate, TimeZone};
22#[cfg(feature = "polars")]
23use polars::datatypes::PlSmallStr;
24#[cfg(feature = "polars")]
25use polars::prelude::*;
26
27const RECORD_SIZE: usize = 144;
28const PRICE_SCALE: f64 = 1000.0;
29const CALL_AUCTION_PHASE_CODE: u32 = 12;
30#[cfg(any(test, feature = "polars"))]
31const QMT_TICK_TIME_OFFSET_MS: u32 = 396_300_000;
32#[cfg(any(test, feature = "polars"))]
33const BJ_TICK_TIME_OFFSET_MS: u32 = 50_400_000;
34
35pub const FULL_TICK_API_FIELD_NAMES: [&str; 17] = [
37 "lastPrice",
38 "amount",
39 "volume",
40 "pvolume",
41 "openInt",
42 "stockStatus",
43 "lastSettlementPrice",
44 "open",
45 "high",
46 "low",
47 "settlementPrice",
48 "lastClose",
49 "askPrice",
50 "bidPrice",
51 "askVol",
52 "bidVol",
53 "timetag",
54];
55
56pub const TICK_DATAFRAME_COLUMN_NAMES: [&str; 26] = [
58 "market",
59 "symbol",
60 "date",
61 "raw_qmt_timestamp",
62 "time",
63 "last_price",
64 "open",
65 "high",
66 "low",
67 "last_close",
68 "amount",
69 "volume",
70 "pvolume",
71 "tickvol",
72 "market_phase_status",
73 "stockStatus",
74 "qmt_status_field_1_raw",
75 "qmt_status_field_2_raw",
76 "lastSettlementPrice",
77 "askPrice",
78 "bidPrice",
79 "askVol",
80 "bidVol",
81 "settlementPrice",
82 "transactionNum",
83 "pe",
84];
85
86pub fn tick_api_field_names() -> &'static [&'static str] {
88 &FULL_TICK_API_FIELD_NAMES
89}
90
91pub fn tick_dataframe_column_names() -> &'static [&'static str] {
93 &TICK_DATAFRAME_COLUMN_NAMES
94}
95
96#[derive(Debug, Clone)]
98pub struct TickData {
99 pub market: Option<String>,
101 pub symbol: String,
103 pub date: String,
105 pub raw_qmt_timestamp: u32,
107 pub market_phase_status: u32,
109 pub last_price: Option<f64>,
111 pub last_close: f64,
113 pub amount: Option<f64>,
115 pub volume: Option<u64>,
117 pub ask_prices: [Option<f64>; 5],
119 pub ask_vols: [Option<u32>; 5],
121 pub bid_prices: [Option<f64>; 5],
123 pub bid_vols: [Option<u32>; 5],
125 pub qmt_status_field_1_raw: u32,
127 pub qmt_status_field_2_raw: u32,
129}
130
131pub struct TickReader<R: Read> {
133 reader: BufReader<R>,
134 market: Option<String>,
135 symbol: String,
136 date: String,
137 buffer: [u8; RECORD_SIZE],
138}
139
140impl TickReader<File> {
141 pub fn from_path(path: impl AsRef<Path>) -> Result<Self, TickParseError> {
145 let path = path.as_ref();
146 validate_dat_path(path)?;
147 let (market, symbol, date) = extract_tick_file_metadata(path)?;
148 let file = File::open(path)?;
149 Ok(Self::new(file, market, symbol, date))
150 }
151}
152
153impl<R: Read> TickReader<R> {
154 pub fn new(
156 reader: R,
157 market: Option<String>,
158 symbol: impl Into<String>,
159 date: impl Into<String>,
160 ) -> Self {
161 TickReader {
162 reader: BufReader::new(reader),
163 market,
164 symbol: symbol.into(),
165 date: date.into(),
166 buffer: [0u8; RECORD_SIZE],
167 }
168 }
169}
170
171impl<R: Read> Iterator for TickReader<R> {
172 type Item = Result<TickData, TickParseError>;
173
174 fn next(&mut self) -> Option<Self::Item> {
175 if let Err(err) = self.reader.read_exact(&mut self.buffer) {
176 if err.kind() == std::io::ErrorKind::UnexpectedEof {
177 return None;
178 }
179 return Some(Err(TickParseError::Io(err)));
180 }
181
182 let mut cursor = Cursor::new(&self.buffer[..]);
183 Some(
184 parse_single_record(
185 &mut cursor,
186 self.market.as_deref(),
187 &self.symbol,
188 &self.date,
189 )
190 .map_err(TickParseError::Io),
191 )
192 }
193}
194
195pub fn parse_ticks_to_structs(path: impl AsRef<Path>) -> Result<Vec<TickData>, TickParseError> {
211 let path_ref = path.as_ref();
212 let estimated_rows = estimate_rows(path_ref)?;
213 let mut reader = TickReader::from_path(path_ref)?;
214 let mut rows = Vec::with_capacity(estimated_rows);
215 for tick in &mut reader {
216 rows.push(tick?);
217 }
218 Ok(rows)
219}
220
221#[cfg(feature = "polars")]
244pub fn parse_ticks_to_dataframe(path: impl AsRef<Path>) -> Result<DataFrame, TickParseError> {
245 let path_ref = path.as_ref();
246 let estimated_rows = estimate_rows(path_ref)?;
247 let mut reader = TickReader::from_path(path_ref)?;
248
249 let price_levels = 5;
250
251 let mut dates = Vec::with_capacity(estimated_rows);
252 let mut markets = Vec::with_capacity(estimated_rows);
253 let mut symbols = Vec::with_capacity(estimated_rows);
254 let mut raw_qmt_timestamps = Vec::with_capacity(estimated_rows);
255 let mut time_values = Vec::with_capacity(estimated_rows);
256 let mut last_prices: Vec<Option<f64>> = Vec::with_capacity(estimated_rows);
257 let mut amounts: Vec<Option<f64>> = Vec::with_capacity(estimated_rows);
258 let mut volumes: Vec<Option<u64>> = Vec::with_capacity(estimated_rows);
259 let mut market_phase_statuses = Vec::with_capacity(estimated_rows);
260 let mut last_closes = Vec::with_capacity(estimated_rows);
261 let mut qmt_status_1 = Vec::with_capacity(estimated_rows);
262 let mut qmt_status_2 = Vec::with_capacity(estimated_rows);
263
264 let mut ask_price_builder = ListPrimitiveChunkedBuilder::<Float64Type>::new(
265 "askPrice".into(),
266 estimated_rows,
267 estimated_rows * price_levels,
268 DataType::Float64,
269 );
270 let mut ask_vol_builder = ListPrimitiveChunkedBuilder::<UInt32Type>::new(
271 "askVol".into(),
272 estimated_rows,
273 estimated_rows * price_levels,
274 DataType::UInt32,
275 );
276 let mut bid_price_builder = ListPrimitiveChunkedBuilder::<Float64Type>::new(
277 "bidPrice".into(),
278 estimated_rows,
279 estimated_rows * price_levels,
280 DataType::Float64,
281 );
282 let mut bid_vol_builder = ListPrimitiveChunkedBuilder::<UInt32Type>::new(
283 "bidVol".into(),
284 estimated_rows,
285 estimated_rows * price_levels,
286 DataType::UInt32,
287 );
288
289 for result in &mut reader {
290 let tick = result?;
291 let decoded_time =
292 compose_tick_datetime_ms(tick.market.as_deref(), &tick.date, tick.raw_qmt_timestamp);
293 markets.push(tick.market);
294 symbols.push(tick.symbol);
295 dates.push(tick.date);
296 raw_qmt_timestamps.push(tick.raw_qmt_timestamp);
297 time_values.push(decoded_time);
298 market_phase_statuses.push(tick.market_phase_status);
299 last_closes.push(tick.last_close);
300 last_prices.push(tick.last_price);
301 amounts.push(tick.amount);
302 volumes.push(tick.volume);
303 qmt_status_1.push(tick.qmt_status_field_1_raw);
304 qmt_status_2.push(tick.qmt_status_field_2_raw);
305
306 ask_price_builder.append_iter(tick.ask_prices.iter().copied());
307 ask_vol_builder.append_iter(tick.ask_vols.iter().copied());
308 bid_price_builder.append_iter(tick.bid_prices.iter().copied());
309 bid_vol_builder.append_iter(tick.bid_vols.iter().copied());
310 }
311
312 if dates.is_empty() {
313 return Ok(DataFrame::default());
314 }
315
316 let num_rows = dates.len();
317 let empty_f64: Series = Series::new(PlSmallStr::from("empty_f64"), vec![None::<f64>; num_rows]);
318 let empty_i64: Series = Series::new(PlSmallStr::from("empty_i64"), vec![None::<i64>; num_rows]);
319
320 let df = df![
321 "market" => markets,
322 "symbol" => symbols,
323 "date" => dates,
324 "raw_qmt_timestamp" => raw_qmt_timestamps,
325 "time" => time_values,
326 "last_price" => last_prices,
327 "open" => empty_f64.clone(),
328 "high" => empty_f64.clone(),
329 "low" => empty_f64.clone(),
330 "last_close" => last_closes,
331 "amount" => amounts,
332 "volume" => volumes,
333 "pvolume" => empty_i64.clone(),
334 "tickvol" => empty_i64.clone(),
335 "market_phase_status" => market_phase_statuses,
336 "stockStatus" => empty_i64.clone(),
337 "qmt_status_field_1_raw" => qmt_status_1,
338 "qmt_status_field_2_raw" => qmt_status_2,
339 "lastSettlementPrice" => empty_f64.clone(),
340 "askPrice" => ask_price_builder.finish(),
341 "bidPrice" => bid_price_builder.finish(),
342 "askVol" => ask_vol_builder.finish(),
343 "bidVol" => bid_vol_builder.finish(),
344 "settlementPrice" => empty_f64.clone(),
345 "transactionNum" => empty_f64.clone(),
346 "pe" => empty_f64,
347 ]?;
348
349 let raw_tz = polars::prelude::TimeZone::opt_try_new(None::<PlSmallStr>)?;
350 let china_tz = polars::prelude::TimeZone::opt_try_new(Some("Asia/Shanghai"))?;
351
352 let df = df
353 .lazy()
354 .with_column(
355 col("time")
356 .cast(DataType::Datetime(TimeUnit::Milliseconds, raw_tz))
357 .dt()
358 .convert_time_zone(china_tz.unwrap())
359 .alias("time"),
360 )
361 .collect()?;
362
363 Ok(df)
364}
365
366#[cfg(any(test, feature = "polars"))]
367fn decode_qmt_timestamp_ms(raw: u32) -> Option<u32> {
368 raw.checked_sub(QMT_TICK_TIME_OFFSET_MS)
369 .filter(|ms| *ms < 86_400_000)
370}
371
372#[cfg(any(test, feature = "polars"))]
373fn decode_qmt_timestamp_ms_for_market(market: Option<&str>, raw: u32) -> Option<u32> {
374 match market {
375 Some("BJ") => Some((raw % 86_400_000 + 86_400_000 - BJ_TICK_TIME_OFFSET_MS) % 86_400_000),
376 _ => decode_qmt_timestamp_ms(raw),
377 }
378}
379
380#[cfg(any(test, feature = "polars"))]
381fn compose_tick_datetime_ms(market: Option<&str>, date_str: &str, raw: u32) -> Option<i64> {
382 let trade_date = extract_trade_date(date_str)?;
383 let time_ms = decode_qmt_timestamp_ms_for_market(market, raw)? as i64;
384 let bj = FixedOffset::east_opt(8 * 3600)?;
385 let day_start = trade_date.and_hms_opt(0, 0, 0)?;
386 let local_dt = bj.from_local_datetime(&day_start).single()?;
387 Some(local_dt.timestamp_millis() + time_ms)
388}
389
390#[cfg(any(test, feature = "polars"))]
391fn extract_trade_date(date_str: &str) -> Option<NaiveDate> {
392 if date_str.len() == 8 && date_str.chars().all(|c| c.is_ascii_digit()) {
393 return NaiveDate::parse_from_str(date_str, "%Y%m%d").ok();
394 }
395
396 date_str
397 .split('-')
398 .find(|part| part.len() == 8 && part.chars().all(|c| c.is_ascii_digit()))
399 .and_then(|part| NaiveDate::parse_from_str(part, "%Y%m%d").ok())
400}
401
402fn validate_dat_path(path: &Path) -> Result<(), TickParseError> {
403 if path.as_os_str().is_empty() {
404 return Err(TickParseError::EmptyPath);
405 }
406 let ext = path
407 .extension()
408 .and_then(|s| s.to_str())
409 .unwrap_or_default()
410 .to_ascii_lowercase();
411 if ext != "dat" {
412 return Err(TickParseError::InvalidExtension(path.display().to_string()));
413 }
414 Ok(())
415}
416
417fn extract_tick_file_metadata(
418 path: &Path,
419) -> Result<(Option<String>, String, String), TickParseError> {
420 let filename = path
421 .file_name()
422 .and_then(|s| s.to_str())
423 .ok_or(TickParseError::InvalidFileName)?;
424 let stem = filename
425 .split('.')
426 .next()
427 .ok_or(TickParseError::InvalidFileName)?;
428
429 let market = path
430 .ancestors()
431 .filter_map(|p| p.file_name().and_then(|s| s.to_str()))
432 .find(|s| matches!(*s, "SH" | "SZ" | "BJ"))
433 .map(|s| s.to_string());
434
435 let (symbol, date) = if stem.len() == 8 && stem.chars().all(|c| c.is_ascii_digit()) {
436 let symbol = path
437 .parent()
438 .and_then(|p| p.file_name())
439 .and_then(|s| s.to_str())
440 .ok_or(TickParseError::InvalidFileName)?;
441 (symbol.to_string(), stem.to_string())
442 } else {
443 let mut parts = stem.split('-');
444 let symbol = parts.next().ok_or(TickParseError::InvalidFileName)?;
445 let date = parts.next().ok_or(TickParseError::InvalidFileName)?;
446 (symbol.to_string(), date.to_string())
447 };
448
449 if symbol.is_empty() || date.len() != 8 || !date.chars().all(|c| c.is_ascii_digit()) {
450 return Err(TickParseError::InvalidFileName);
451 }
452
453 Ok((market, symbol.to_string(), date.to_string()))
454}
455
456fn estimate_rows(path: &Path) -> Result<usize, TickParseError> {
457 let file_len = std::fs::metadata(path)?.len();
458 Ok((file_len as usize) / RECORD_SIZE + 1)
459}
460
461fn parse_single_record(
462 cursor: &mut Cursor<&[u8]>,
463 market: Option<&str>,
464 symbol: &str,
465 date_str: &str,
466) -> std::io::Result<TickData> {
467 let raw_qmt_timestamp = cursor.read_u32::<LittleEndian>()?;
468 let qmt_status_field_1_raw = cursor.read_u32::<LittleEndian>()?;
469 cursor.set_position(8);
470 let raw_last_price = cursor.read_u32::<LittleEndian>()?;
471 let qmt_status_field_2_raw = cursor.read_u32::<LittleEndian>()?;
472 let raw_amount = cursor.read_u32::<LittleEndian>()?;
473 cursor.set_position(24);
474 let raw_volume = cursor.read_u32::<LittleEndian>()?;
475 let market_phase_status = cursor.read_u32::<LittleEndian>()?;
476 cursor.set_position(60);
477 let last_close = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
478
479 let mut tick = TickData {
480 market: market.map(str::to_string),
481 symbol: symbol.to_string(),
482 date: date_str.to_string(),
483 raw_qmt_timestamp,
484 market_phase_status,
485 last_close,
486 qmt_status_field_1_raw,
487 qmt_status_field_2_raw,
488 last_price: None,
489 amount: None,
490 volume: None,
491 ask_prices: [None; 5],
492 ask_vols: [None; 5],
493 bid_prices: [None; 5],
494 bid_vols: [None; 5],
495 };
496
497 if market_phase_status == CALL_AUCTION_PHASE_CODE {
498 tick.last_price = Some(0.0);
499 tick.amount = Some(0.0);
500 tick.volume = Some(0);
501 tick.ask_vols = [Some(0); 5];
502 tick.bid_vols = [Some(0); 5];
503 cursor.set_position(64);
504 let ref_price = cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE;
505 tick.ask_prices[0] = Some(ref_price);
506 tick.bid_prices[0] = Some(ref_price);
507 cursor.set_position(84);
508 tick.ask_vols[0] = Some(cursor.read_u32::<LittleEndian>()?);
509 tick.ask_vols[1] = Some(cursor.read_u32::<LittleEndian>()?);
510 cursor.set_position(124);
511 tick.bid_vols[0] = Some(cursor.read_u32::<LittleEndian>()?);
512 } else {
513 tick.last_price = Some(raw_last_price as f64 / PRICE_SCALE);
514 tick.amount = Some(raw_amount as f64);
515 tick.volume = Some(raw_volume as u64);
516 for i in 0..5 {
517 cursor.set_position(64 + (i * 4) as u64);
518 tick.ask_prices[i] = Some(cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE);
519 cursor.set_position(84 + (i * 4) as u64);
520 tick.ask_vols[i] = Some(cursor.read_u32::<LittleEndian>()?);
521 cursor.set_position(104 + (i * 4) as u64);
522 tick.bid_prices[i] = Some(cursor.read_u32::<LittleEndian>()? as f64 / PRICE_SCALE);
523 cursor.set_position(124 + (i * 4) as u64);
524 tick.bid_vols[i] = Some(cursor.read_u32::<LittleEndian>()?);
525 }
526 }
527
528 Ok(tick)
529}
530
531#[cfg(test)]
532mod test {
533 use super::*;
534 use std::fs;
535 use std::path::PathBuf;
536
537 const DAT_FILE: &str = "data/000001-20250529-tick.dat";
538
539 #[test]
540 fn run_struct_demo() -> Result<(), TickParseError> {
541 let file_to_parse = PathBuf::from(DAT_FILE);
542 let all_ticks = parse_ticks_to_structs(file_to_parse)?;
543 println!("成功解析 {} 条 tick 数据。\n", all_ticks.len());
544 if let Some(first_tick) = all_ticks.first() {
545 println!("--- 第一条 Tick 示例 ---\n{:#?}", first_tick);
546 }
547 if let Some(last_tick) = all_ticks.last() {
548 println!("\n--- 最后一条 Tick 示例 ---\n{:#?}", last_tick);
549 }
550 Ok(())
551 }
552
553 #[test]
554 #[cfg(feature = "polars")]
555 fn run_polars_demo() -> Result<(), TickParseError> {
556 let file_to_parse = PathBuf::from(DAT_FILE);
557 let df = parse_ticks_to_dataframe(file_to_parse)?;
558 println!("成功解析 DataFrame,尺寸: {:?}\n", df.shape());
559 println!("--- DataFrame (前5行和后5行) ---\n{}", df);
560
561 if df.height() > 0 {
562 let result_df = df
563 .clone()
564 .lazy()
565 .select([col("last_price").mean().alias("mean_price")])
566 .collect()?;
567
568 let mean_price: f64 = result_df.column("mean_price")?.get(0)?.try_extract()?;
569
570 println!("\n--- Polars 分析示例 ---");
571 println!("所有Tick的平均价格: {:.4}", mean_price);
572 }
573 Ok(())
574 }
575
576 #[test]
577 #[cfg(feature = "polars")]
578 fn test_tick_schema_names() -> Result<(), TickParseError> {
579 assert_eq!(tick_api_field_names()[0], "lastPrice");
580 assert_eq!(tick_api_field_names()[4], "openInt");
581 assert_eq!(tick_api_field_names()[16], "timetag");
582
583 let df = parse_ticks_to_dataframe(PathBuf::from(DAT_FILE))?;
584 let names = df.get_column_names_str();
585 assert_eq!(names.as_slice(), tick_dataframe_column_names());
586 Ok(())
587 }
588
589 #[test]
590 fn test_extract_tick_file_metadata() -> Result<(), TickParseError> {
591 let (market, symbol, date) = extract_tick_file_metadata(Path::new(DAT_FILE))?;
592 assert_eq!(market, None);
593 assert_eq!(symbol, "000001");
594 assert_eq!(date, "20250529");
595 Ok(())
596 }
597
598 #[test]
599 fn test_decode_qmt_timestamp() {
600 assert_eq!(decode_qmt_timestamp_ms(429_610_528), Some(33_310_528));
601 assert_eq!(decode_qmt_timestamp_ms(450_316_528), Some(54_016_528));
602 assert_eq!(
603 compose_tick_datetime_ms(None, "20250529", 429_610_528),
604 Some(1_748_481_310_528),
605 );
606 assert_eq!(
607 decode_qmt_timestamp_ms_for_market(Some("BJ"), 2_070_911_528),
608 Some(33_311_528)
609 );
610 }
611
612 #[test]
613 #[cfg(feature = "polars")]
614 fn test_tick_dataframe_time_column_populated() -> Result<(), TickParseError> {
615 let df = parse_ticks_to_dataframe(PathBuf::from(DAT_FILE))?;
616 assert_eq!(df.column("market")?.str()?.get(0), None);
617 assert_eq!(df.column("symbol")?.str()?.get(0), Some("000001"));
618 assert_eq!(df.column("date")?.str()?.get(0), Some("20250529"));
619 let time_col = df.column("time")?;
620 assert_eq!(time_col.null_count(), 0);
621 assert!(matches!(time_col.dtype(), DataType::Datetime(_, _)));
622 Ok(())
623 }
624
625 #[test]
626 fn test_extract_tick_file_metadata_with_market() -> Result<(), TickParseError> {
627 let path = Path::new("/mnt/data/trade/qmtdata/datadir/BJ/0/430017/20250617.dat");
628 let (market, symbol, date) = extract_tick_file_metadata(path)?;
629 assert_eq!(market.as_deref(), Some("BJ"));
630 assert_eq!(symbol, "430017");
631 assert_eq!(date, "20250617");
632 Ok(())
633 }
634
635 #[test]
636 fn test_tick_reader_accepts_uppercase_dat_extension() -> Result<(), TickParseError> {
637 let src = PathBuf::from(DAT_FILE);
638 let tmp = std::env::temp_dir().join("000001-20250529-tick.DAT");
639 fs::copy(&src, &tmp)?;
640
641 let reader = TickReader::from_path(&tmp)?;
642 let first = reader.take(1).collect::<Result<Vec<_>, _>>()?;
643 assert_eq!(first.len(), 1);
644
645 fs::remove_file(tmp)?;
646 Ok(())
647 }
648
649 #[test]
650 fn test_parse_real_bj_tick_sample_when_available() -> Result<(), TickParseError> {
651 let path = PathBuf::from("/mnt/data/trade/qmtdata/datadir/BJ/0/430017/20250617.dat");
652 if !path.exists() {
653 return Ok(());
654 }
655
656 let ticks = parse_ticks_to_structs(&path)?;
657 assert!(!ticks.is_empty());
658 let first = &ticks[0];
659 assert_eq!(first.market.as_deref(), Some("BJ"));
660 assert_eq!(first.symbol, "430017");
661 assert_eq!(first.date, "20250617");
662 assert_eq!(
663 decode_qmt_timestamp_ms_for_market(first.market.as_deref(), first.raw_qmt_timestamp),
664 Some(33_311_528),
665 );
666 Ok(())
667 }
668}