Skip to main content

data_preprocess/
convert.rs

1//! Bidirectional conversion between Polars DataFrames and domain types (Tick, Bar).
2
3use chrono::NaiveDateTime;
4use polars::prelude::*;
5
6use crate::error::Result;
7use crate::models::{Bar, Tick, Timeframe};
8
9/// Convert a slice of Ticks into a Polars DataFrame.
10pub fn ticks_to_dataframe(ticks: &[Tick]) -> Result<DataFrame> {
11    let exchanges: Vec<&str> = ticks.iter().map(|t| t.exchange.as_str()).collect();
12    let symbols: Vec<&str> = ticks.iter().map(|t| t.symbol.as_str()).collect();
13    let timestamps: Vec<i64> = ticks
14        .iter()
15        .map(|t| t.ts.and_utc().timestamp_micros())
16        .collect();
17    let bids: Vec<Option<f64>> = ticks.iter().map(|t| t.bid).collect();
18    let asks: Vec<Option<f64>> = ticks.iter().map(|t| t.ask).collect();
19    let lasts: Vec<Option<f64>> = ticks.iter().map(|t| t.last).collect();
20    let volumes: Vec<Option<f64>> = ticks.iter().map(|t| t.volume).collect();
21    let flags: Vec<Option<i32>> = ticks.iter().map(|t| t.flags).collect();
22
23    let df = DataFrame::new(vec![
24        Column::new("exchange".into(), &exchanges),
25        Column::new("symbol".into(), &symbols),
26        Column::new("ts".into(), &timestamps)
27            .cast(&DataType::Datetime(TimeUnit::Microseconds, None))
28            .map_err(|e| crate::error::DataError::Polars(e))?,
29        Column::new("bid".into(), &bids),
30        Column::new("ask".into(), &asks),
31        Column::new("last".into(), &lasts),
32        Column::new("volume".into(), &volumes),
33        Column::new("flags".into(), &flags),
34    ])?;
35
36    Ok(df)
37}
38
39/// Convert a Polars DataFrame back into Vec<Tick>.
40pub fn dataframe_to_ticks(df: &DataFrame) -> Result<Vec<Tick>> {
41    let exchanges = df.column("exchange")?.str()?;
42    let symbols = df.column("symbol")?.str()?;
43    let ts_col = df.column("ts")?.datetime()?;
44    let bids = df.column("bid")?.f64()?;
45    let asks = df.column("ask")?.f64()?;
46    let lasts = df.column("last")?.f64()?;
47    let volumes = df.column("volume")?.f64()?;
48    let flags = df.column("flags")?.i32()?;
49
50    let mut ticks = Vec::with_capacity(df.height());
51    for i in 0..df.height() {
52        let ts_micros = ts_col.get(i).ok_or_else(|| {
53            crate::error::DataError::InvalidTimestamp("null timestamp in dataframe".into())
54        })?;
55        let ts = micros_to_ndt(ts_micros);
56
57        ticks.push(Tick {
58            exchange: exchanges.get(i).unwrap_or("").to_string(),
59            symbol: symbols.get(i).unwrap_or("").to_string(),
60            ts,
61            bid: bids.get(i),
62            ask: asks.get(i),
63            last: lasts.get(i),
64            volume: volumes.get(i),
65            flags: flags.get(i),
66        });
67    }
68    Ok(ticks)
69}
70
71/// Convert a slice of Bars into a Polars DataFrame.
72pub fn bars_to_dataframe(bars: &[Bar]) -> Result<DataFrame> {
73    let exchanges: Vec<&str> = bars.iter().map(|b| b.exchange.as_str()).collect();
74    let symbols: Vec<&str> = bars.iter().map(|b| b.symbol.as_str()).collect();
75    let timeframes: Vec<&str> = bars.iter().map(|b| b.timeframe.as_str()).collect();
76    let timestamps: Vec<i64> = bars
77        .iter()
78        .map(|b| b.ts.and_utc().timestamp_micros())
79        .collect();
80    let opens: Vec<f64> = bars.iter().map(|b| b.open).collect();
81    let highs: Vec<f64> = bars.iter().map(|b| b.high).collect();
82    let lows: Vec<f64> = bars.iter().map(|b| b.low).collect();
83    let closes: Vec<f64> = bars.iter().map(|b| b.close).collect();
84    let tick_vols: Vec<i64> = bars.iter().map(|b| b.tick_vol).collect();
85    let volumes: Vec<i64> = bars.iter().map(|b| b.volume).collect();
86    let spreads: Vec<i32> = bars.iter().map(|b| b.spread).collect();
87
88    let df = DataFrame::new(vec![
89        Column::new("exchange".into(), &exchanges),
90        Column::new("symbol".into(), &symbols),
91        Column::new("timeframe".into(), &timeframes),
92        Column::new("ts".into(), &timestamps)
93            .cast(&DataType::Datetime(TimeUnit::Microseconds, None))
94            .map_err(|e| crate::error::DataError::Polars(e))?,
95        Column::new("open".into(), &opens),
96        Column::new("high".into(), &highs),
97        Column::new("low".into(), &lows),
98        Column::new("close".into(), &closes),
99        Column::new("tick_vol".into(), &tick_vols),
100        Column::new("volume".into(), &volumes),
101        Column::new("spread".into(), &spreads),
102    ])?;
103
104    Ok(df)
105}
106
107/// Convert a Polars DataFrame back into Vec<Bar>.
108pub fn dataframe_to_bars(df: &DataFrame) -> Result<Vec<Bar>> {
109    let exchanges = df.column("exchange")?.str()?;
110    let symbols = df.column("symbol")?.str()?;
111    let timeframes = df.column("timeframe")?.str()?;
112    let ts_col = df.column("ts")?.datetime()?;
113    let opens = df.column("open")?.f64()?;
114    let highs = df.column("high")?.f64()?;
115    let lows = df.column("low")?.f64()?;
116    let closes = df.column("close")?.f64()?;
117    let tick_vols = df.column("tick_vol")?.i64()?;
118    let volumes = df.column("volume")?.i64()?;
119    let spreads = df.column("spread")?.i32()?;
120
121    let mut bars = Vec::with_capacity(df.height());
122    for i in 0..df.height() {
123        let ts_micros = ts_col.get(i).ok_or_else(|| {
124            crate::error::DataError::InvalidTimestamp("null timestamp in dataframe".into())
125        })?;
126        let ts = micros_to_ndt(ts_micros);
127        let tf_str = timeframes.get(i).unwrap_or("1m");
128        let timeframe = Timeframe::parse(tf_str).unwrap_or(Timeframe::M1);
129
130        bars.push(Bar {
131            exchange: exchanges.get(i).unwrap_or("").to_string(),
132            symbol: symbols.get(i).unwrap_or("").to_string(),
133            timeframe,
134            ts,
135            open: opens.get(i).unwrap_or(0.0),
136            high: highs.get(i).unwrap_or(0.0),
137            low: lows.get(i).unwrap_or(0.0),
138            close: closes.get(i).unwrap_or(0.0),
139            tick_vol: tick_vols.get(i).unwrap_or(0),
140            volume: volumes.get(i).unwrap_or(0),
141            spread: spreads.get(i).unwrap_or(0),
142        });
143    }
144    Ok(bars)
145}
146
147/// Convert microsecond epoch to NaiveDateTime.
148fn micros_to_ndt(micros: i64) -> NaiveDateTime {
149    let secs = micros / 1_000_000;
150    let nsecs = ((micros % 1_000_000) * 1_000) as u32;
151    chrono::DateTime::from_timestamp(secs, nsecs)
152        .map(|dt| dt.naive_utc())
153        .unwrap_or_default()
154}
155
156/// Extract the date portion from a NaiveDateTime as a formatted string.
157pub fn ndt_to_date_string(ndt: &NaiveDateTime) -> String {
158    ndt.format("%Y-%m-%d").to_string()
159}