data_preprocess/
convert.rs1use chrono::NaiveDateTime;
4use polars::prelude::*;
5
6use crate::error::Result;
7use crate::models::{Bar, Tick, Timeframe};
8
9pub fn ticks_to_dataframe(ticks: &[Tick]) -> Result<DataFrame> {
11 let exchanges: Vec<&str> = ticks.iter().map(|t| t.exchange.as_str()).collect();
12 let symbols: Vec<&str> = ticks.iter().map(|t| t.symbol.as_str()).collect();
13 let timestamps: Vec<i64> = ticks
14 .iter()
15 .map(|t| t.ts.and_utc().timestamp_micros())
16 .collect();
17 let bids: Vec<Option<f64>> = ticks.iter().map(|t| t.bid).collect();
18 let asks: Vec<Option<f64>> = ticks.iter().map(|t| t.ask).collect();
19 let lasts: Vec<Option<f64>> = ticks.iter().map(|t| t.last).collect();
20 let volumes: Vec<Option<f64>> = ticks.iter().map(|t| t.volume).collect();
21 let flags: Vec<Option<i32>> = ticks.iter().map(|t| t.flags).collect();
22
23 let df = DataFrame::new(vec![
24 Column::new("exchange".into(), &exchanges),
25 Column::new("symbol".into(), &symbols),
26 Column::new("ts".into(), ×tamps)
27 .cast(&DataType::Datetime(TimeUnit::Microseconds, None))
28 .map_err(|e| crate::error::DataError::Polars(e))?,
29 Column::new("bid".into(), &bids),
30 Column::new("ask".into(), &asks),
31 Column::new("last".into(), &lasts),
32 Column::new("volume".into(), &volumes),
33 Column::new("flags".into(), &flags),
34 ])?;
35
36 Ok(df)
37}
38
39pub fn dataframe_to_ticks(df: &DataFrame) -> Result<Vec<Tick>> {
41 let exchanges = df.column("exchange")?.str()?;
42 let symbols = df.column("symbol")?.str()?;
43 let ts_col = df.column("ts")?.datetime()?;
44 let bids = df.column("bid")?.f64()?;
45 let asks = df.column("ask")?.f64()?;
46 let lasts = df.column("last")?.f64()?;
47 let volumes = df.column("volume")?.f64()?;
48 let flags = df.column("flags")?.i32()?;
49
50 let mut ticks = Vec::with_capacity(df.height());
51 for i in 0..df.height() {
52 let ts_micros = ts_col.get(i).ok_or_else(|| {
53 crate::error::DataError::InvalidTimestamp("null timestamp in dataframe".into())
54 })?;
55 let ts = micros_to_ndt(ts_micros);
56
57 ticks.push(Tick {
58 exchange: exchanges.get(i).unwrap_or("").to_string(),
59 symbol: symbols.get(i).unwrap_or("").to_string(),
60 ts,
61 bid: bids.get(i),
62 ask: asks.get(i),
63 last: lasts.get(i),
64 volume: volumes.get(i),
65 flags: flags.get(i),
66 });
67 }
68 Ok(ticks)
69}
70
71pub fn bars_to_dataframe(bars: &[Bar]) -> Result<DataFrame> {
73 let exchanges: Vec<&str> = bars.iter().map(|b| b.exchange.as_str()).collect();
74 let symbols: Vec<&str> = bars.iter().map(|b| b.symbol.as_str()).collect();
75 let timeframes: Vec<&str> = bars.iter().map(|b| b.timeframe.as_str()).collect();
76 let timestamps: Vec<i64> = bars
77 .iter()
78 .map(|b| b.ts.and_utc().timestamp_micros())
79 .collect();
80 let opens: Vec<f64> = bars.iter().map(|b| b.open).collect();
81 let highs: Vec<f64> = bars.iter().map(|b| b.high).collect();
82 let lows: Vec<f64> = bars.iter().map(|b| b.low).collect();
83 let closes: Vec<f64> = bars.iter().map(|b| b.close).collect();
84 let tick_vols: Vec<i64> = bars.iter().map(|b| b.tick_vol).collect();
85 let volumes: Vec<i64> = bars.iter().map(|b| b.volume).collect();
86 let spreads: Vec<i32> = bars.iter().map(|b| b.spread).collect();
87
88 let df = DataFrame::new(vec![
89 Column::new("exchange".into(), &exchanges),
90 Column::new("symbol".into(), &symbols),
91 Column::new("timeframe".into(), &timeframes),
92 Column::new("ts".into(), ×tamps)
93 .cast(&DataType::Datetime(TimeUnit::Microseconds, None))
94 .map_err(|e| crate::error::DataError::Polars(e))?,
95 Column::new("open".into(), &opens),
96 Column::new("high".into(), &highs),
97 Column::new("low".into(), &lows),
98 Column::new("close".into(), &closes),
99 Column::new("tick_vol".into(), &tick_vols),
100 Column::new("volume".into(), &volumes),
101 Column::new("spread".into(), &spreads),
102 ])?;
103
104 Ok(df)
105}
106
107pub fn dataframe_to_bars(df: &DataFrame) -> Result<Vec<Bar>> {
109 let exchanges = df.column("exchange")?.str()?;
110 let symbols = df.column("symbol")?.str()?;
111 let timeframes = df.column("timeframe")?.str()?;
112 let ts_col = df.column("ts")?.datetime()?;
113 let opens = df.column("open")?.f64()?;
114 let highs = df.column("high")?.f64()?;
115 let lows = df.column("low")?.f64()?;
116 let closes = df.column("close")?.f64()?;
117 let tick_vols = df.column("tick_vol")?.i64()?;
118 let volumes = df.column("volume")?.i64()?;
119 let spreads = df.column("spread")?.i32()?;
120
121 let mut bars = Vec::with_capacity(df.height());
122 for i in 0..df.height() {
123 let ts_micros = ts_col.get(i).ok_or_else(|| {
124 crate::error::DataError::InvalidTimestamp("null timestamp in dataframe".into())
125 })?;
126 let ts = micros_to_ndt(ts_micros);
127 let tf_str = timeframes.get(i).unwrap_or("1m");
128 let timeframe = Timeframe::parse(tf_str).unwrap_or(Timeframe::M1);
129
130 bars.push(Bar {
131 exchange: exchanges.get(i).unwrap_or("").to_string(),
132 symbol: symbols.get(i).unwrap_or("").to_string(),
133 timeframe,
134 ts,
135 open: opens.get(i).unwrap_or(0.0),
136 high: highs.get(i).unwrap_or(0.0),
137 low: lows.get(i).unwrap_or(0.0),
138 close: closes.get(i).unwrap_or(0.0),
139 tick_vol: tick_vols.get(i).unwrap_or(0),
140 volume: volumes.get(i).unwrap_or(0),
141 spread: spreads.get(i).unwrap_or(0),
142 });
143 }
144 Ok(bars)
145}
146
147fn micros_to_ndt(micros: i64) -> NaiveDateTime {
149 let secs = micros / 1_000_000;
150 let nsecs = ((micros % 1_000_000) * 1_000) as u32;
151 chrono::DateTime::from_timestamp(secs, nsecs)
152 .map(|dt| dt.naive_utc())
153 .unwrap_or_default()
154}
155
156pub fn ndt_to_date_string(ndt: &NaiveDateTime) -> String {
158 ndt.format("%Y-%m-%d").to_string()
159}