Skip to main content

data_preprocess/parser/
tick_csv.rs

1use std::path::Path;
2
3use chrono::FixedOffset;
4
5use crate::error::Result;
6use crate::models::Tick;
7use crate::parser::parse_datetime_to_utc;
8
9/// Parse optional float field; empty or whitespace-only → None.
10fn parse_optional_f64(field: Option<&str>) -> Option<f64> {
11    field
12        .map(str::trim)
13        .filter(|s| !s.is_empty())
14        .and_then(|s| s.parse::<f64>().ok())
15}
16
17/// Parse optional integer field; empty or whitespace-only → None.
18fn parse_optional_i32(field: Option<&str>) -> Option<i32> {
19    field
20        .map(str::trim)
21        .filter(|s| !s.is_empty())
22        .and_then(|s| s.parse::<i32>().ok())
23}
24
25/// Parse a tab-delimited tick CSV into `Vec<Tick>`.
26///
27/// Malformed rows produce warnings instead of hard errors so partial
28/// files can still be imported.
29pub fn parse_tick_csv(
30    path: &Path,
31    exchange: &str,
32    symbol: &str,
33    source_offset: &FixedOffset,
34) -> Result<(Vec<Tick>, Vec<String>)> {
35    let mut reader = csv::ReaderBuilder::new()
36        .delimiter(b'\t')
37        .has_headers(true)
38        .flexible(true)
39        .from_path(path)?;
40
41    let mut ticks = Vec::new();
42    let mut warnings = Vec::new();
43
44    for (line_idx, record) in reader.records().enumerate() {
45        let record = match record {
46            Ok(r) => r,
47            Err(e) => {
48                warnings.push(format!("line {}: {}", line_idx + 2, e));
49                continue;
50            }
51        };
52
53        let date_str = record.get(0).unwrap_or("").trim();
54        let time_str = record.get(1).unwrap_or("").trim();
55        let bid = parse_optional_f64(record.get(2));
56        let ask = parse_optional_f64(record.get(3));
57        let last = parse_optional_f64(record.get(4));
58        let vol = parse_optional_f64(record.get(5));
59        let flags = parse_optional_i32(record.get(6));
60
61        let ts = match parse_datetime_to_utc(date_str, time_str, source_offset) {
62            Ok(t) => t,
63            Err(e) => {
64                warnings.push(format!("line {}: {}", line_idx + 2, e));
65                continue;
66            }
67        };
68
69        ticks.push(Tick {
70            exchange: exchange.to_string(),
71            symbol: symbol.to_string(),
72            ts,
73            bid,
74            ask,
75            last,
76            volume: vol,
77            flags,
78        });
79    }
80
81    Ok((ticks, warnings))
82}