1use crate::error::StreamError;
14use rust_decimal::Decimal;
15use std::str::FromStr;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
19pub enum Exchange {
20 Binance,
21 Coinbase,
22 Alpaca,
23 Polygon,
24}
25
26impl std::fmt::Display for Exchange {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 match self {
29 Exchange::Binance => write!(f, "Binance"),
30 Exchange::Coinbase => write!(f, "Coinbase"),
31 Exchange::Alpaca => write!(f, "Alpaca"),
32 Exchange::Polygon => write!(f, "Polygon"),
33 }
34 }
35}
36
37impl FromStr for Exchange {
38 type Err = StreamError;
39 fn from_str(s: &str) -> Result<Self, Self::Err> {
40 match s.to_lowercase().as_str() {
41 "binance" => Ok(Exchange::Binance),
42 "coinbase" => Ok(Exchange::Coinbase),
43 "alpaca" => Ok(Exchange::Alpaca),
44 "polygon" => Ok(Exchange::Polygon),
45 _ => Err(StreamError::UnknownExchange(s.to_string())),
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct RawTick {
53 pub exchange: Exchange,
54 pub symbol: String,
55 pub payload: serde_json::Value,
56 pub received_at_ms: u64,
57}
58
59impl RawTick {
60 pub fn new(exchange: Exchange, symbol: impl Into<String>, payload: serde_json::Value) -> Self {
61 Self {
62 exchange,
63 symbol: symbol.into(),
64 payload,
65 received_at_ms: now_ms(),
66 }
67 }
68}
69
70#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
72pub struct NormalizedTick {
73 pub exchange: Exchange,
74 pub symbol: String,
75 pub price: Decimal,
76 pub quantity: Decimal,
77 pub side: Option<TradeSide>,
78 pub trade_id: Option<String>,
79 pub exchange_ts_ms: Option<u64>,
80 pub received_at_ms: u64,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
85pub enum TradeSide {
86 Buy,
87 Sell,
88}
89
90pub struct TickNormalizer;
92
93impl TickNormalizer {
94 pub fn new() -> Self {
95 Self
96 }
97
98 pub fn normalize(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
100 match raw.exchange {
101 Exchange::Binance => self.normalize_binance(raw),
102 Exchange::Coinbase => self.normalize_coinbase(raw),
103 Exchange::Alpaca => self.normalize_alpaca(raw),
104 Exchange::Polygon => self.normalize_polygon(raw),
105 }
106 }
107
108 fn normalize_binance(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
109 let p = &raw.payload;
110 let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
111 let qty = parse_decimal_field(p, "q", &raw.exchange.to_string())?;
112 let side = p.get("m").and_then(|v| v.as_bool()).map(|maker| {
113 if maker { TradeSide::Sell } else { TradeSide::Buy }
114 });
115 let trade_id = p.get("t").and_then(|v| v.as_u64()).map(|id| id.to_string());
116 let exchange_ts = p.get("T").and_then(|v| v.as_u64());
117 Ok(NormalizedTick {
118 exchange: raw.exchange,
119 symbol: raw.symbol,
120 price,
121 quantity: qty,
122 side,
123 trade_id,
124 exchange_ts_ms: exchange_ts,
125 received_at_ms: raw.received_at_ms,
126 })
127 }
128
129 fn normalize_coinbase(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
130 let p = &raw.payload;
131 let price = parse_decimal_field(p, "price", &raw.exchange.to_string())?;
132 let qty = parse_decimal_field(p, "size", &raw.exchange.to_string())?;
133 let side = p.get("side").and_then(|v| v.as_str()).map(|s| {
134 if s == "buy" { TradeSide::Buy } else { TradeSide::Sell }
135 });
136 let trade_id = p.get("trade_id").and_then(|v| v.as_str()).map(str::to_string);
137 Ok(NormalizedTick {
138 exchange: raw.exchange,
139 symbol: raw.symbol,
140 price,
141 quantity: qty,
142 side,
143 trade_id,
144 exchange_ts_ms: None,
145 received_at_ms: raw.received_at_ms,
146 })
147 }
148
149 fn normalize_alpaca(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
150 let p = &raw.payload;
151 let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
152 let qty = parse_decimal_field(p, "s", &raw.exchange.to_string())?;
153 let trade_id = p.get("i").and_then(|v| v.as_u64()).map(|id| id.to_string());
154 Ok(NormalizedTick {
155 exchange: raw.exchange,
156 symbol: raw.symbol,
157 price,
158 quantity: qty,
159 side: None,
160 trade_id,
161 exchange_ts_ms: None,
162 received_at_ms: raw.received_at_ms,
163 })
164 }
165
166 fn normalize_polygon(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
167 let p = &raw.payload;
168 let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
169 let qty = parse_decimal_field(p, "s", &raw.exchange.to_string())?;
170 let trade_id = p.get("i").and_then(|v| v.as_str()).map(str::to_string);
171 let exchange_ts = p.get("t").and_then(|v| v.as_u64());
172 Ok(NormalizedTick {
173 exchange: raw.exchange,
174 symbol: raw.symbol,
175 price,
176 quantity: qty,
177 side: None,
178 trade_id,
179 exchange_ts_ms: exchange_ts,
180 received_at_ms: raw.received_at_ms,
181 })
182 }
183}
184
185impl Default for TickNormalizer {
186 fn default() -> Self {
187 Self::new()
188 }
189}
190
191fn parse_decimal_field(v: &serde_json::Value, field: &str, exchange: &str) -> Result<Decimal, StreamError> {
192 let raw = v.get(field).ok_or_else(|| StreamError::ParseError {
193 exchange: exchange.to_string(),
194 reason: format!("missing field '{}'", field),
195 })?;
196 let s = if let Some(s) = raw.as_str() {
197 s.to_string()
198 } else if let Some(n) = raw.as_f64() {
199 n.to_string()
200 } else {
201 return Err(StreamError::ParseError {
202 exchange: exchange.to_string(),
203 reason: format!("field '{}' is not a string or number", field),
204 });
205 };
206 Decimal::from_str(&s).map_err(|e| StreamError::ParseError {
207 exchange: exchange.to_string(),
208 reason: format!("field '{}' parse error: {}", field, e),
209 })
210}
211
212fn now_ms() -> u64 {
213 std::time::SystemTime::now()
214 .duration_since(std::time::UNIX_EPOCH)
215 .map(|d| d.as_millis() as u64)
216 .unwrap_or(0)
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222 use serde_json::json;
223
224 fn normalizer() -> TickNormalizer { TickNormalizer::new() }
225
226 fn binance_tick(symbol: &str) -> RawTick {
227 RawTick {
228 exchange: Exchange::Binance,
229 symbol: symbol.to_string(),
230 payload: json!({ "p": "50000.12", "q": "0.001", "m": false, "t": 12345, "T": 1700000000000u64 }),
231 received_at_ms: 1700000000001,
232 }
233 }
234
235 fn coinbase_tick(symbol: &str) -> RawTick {
236 RawTick {
237 exchange: Exchange::Coinbase,
238 symbol: symbol.to_string(),
239 payload: json!({ "price": "50001.00", "size": "0.5", "side": "buy", "trade_id": "abc123" }),
240 received_at_ms: 1700000000002,
241 }
242 }
243
244 fn alpaca_tick(symbol: &str) -> RawTick {
245 RawTick {
246 exchange: Exchange::Alpaca,
247 symbol: symbol.to_string(),
248 payload: json!({ "p": "180.50", "s": "10", "i": 99 }),
249 received_at_ms: 1700000000003,
250 }
251 }
252
253 fn polygon_tick(symbol: &str) -> RawTick {
254 RawTick {
255 exchange: Exchange::Polygon,
256 symbol: symbol.to_string(),
257 payload: json!({ "p": "180.51", "s": "5", "i": "XYZ-001", "t": 1700000000004u64 }),
258 received_at_ms: 1700000000005,
259 }
260 }
261
262 #[test]
263 fn test_exchange_from_str_valid() {
264 assert_eq!("binance".parse::<Exchange>().unwrap(), Exchange::Binance);
265 assert_eq!("Coinbase".parse::<Exchange>().unwrap(), Exchange::Coinbase);
266 assert_eq!("ALPACA".parse::<Exchange>().unwrap(), Exchange::Alpaca);
267 assert_eq!("polygon".parse::<Exchange>().unwrap(), Exchange::Polygon);
268 }
269
270 #[test]
271 fn test_exchange_from_str_unknown_returns_error() {
272 let result = "Kraken".parse::<Exchange>();
273 assert!(matches!(result, Err(StreamError::UnknownExchange(_))));
274 }
275
276 #[test]
277 fn test_exchange_display() {
278 assert_eq!(Exchange::Binance.to_string(), "Binance");
279 assert_eq!(Exchange::Coinbase.to_string(), "Coinbase");
280 }
281
282 #[test]
283 fn test_normalize_binance_tick_price_and_qty() {
284 let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
285 assert_eq!(tick.price, Decimal::from_str("50000.12").unwrap());
286 assert_eq!(tick.quantity, Decimal::from_str("0.001").unwrap());
287 assert_eq!(tick.exchange, Exchange::Binance);
288 assert_eq!(tick.symbol, "BTCUSDT");
289 }
290
291 #[test]
292 fn test_normalize_binance_side_maker_false_is_buy() {
293 let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
294 assert_eq!(tick.side, Some(TradeSide::Buy));
295 }
296
297 #[test]
298 fn test_normalize_binance_side_maker_true_is_sell() {
299 let raw = RawTick {
300 exchange: Exchange::Binance,
301 symbol: "BTCUSDT".into(),
302 payload: json!({ "p": "50000", "q": "1", "m": true }),
303 received_at_ms: 0,
304 };
305 let tick = normalizer().normalize(raw).unwrap();
306 assert_eq!(tick.side, Some(TradeSide::Sell));
307 }
308
309 #[test]
310 fn test_normalize_binance_trade_id_and_ts() {
311 let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
312 assert_eq!(tick.trade_id, Some("12345".to_string()));
313 assert_eq!(tick.exchange_ts_ms, Some(1700000000000));
314 }
315
316 #[test]
317 fn test_normalize_coinbase_tick() {
318 let tick = normalizer().normalize(coinbase_tick("BTC-USD")).unwrap();
319 assert_eq!(tick.price, Decimal::from_str("50001.00").unwrap());
320 assert_eq!(tick.quantity, Decimal::from_str("0.5").unwrap());
321 assert_eq!(tick.side, Some(TradeSide::Buy));
322 assert_eq!(tick.trade_id, Some("abc123".to_string()));
323 }
324
325 #[test]
326 fn test_normalize_coinbase_sell_side() {
327 let raw = RawTick {
328 exchange: Exchange::Coinbase,
329 symbol: "BTC-USD".into(),
330 payload: json!({ "price": "50000", "size": "1", "side": "sell" }),
331 received_at_ms: 0,
332 };
333 let tick = normalizer().normalize(raw).unwrap();
334 assert_eq!(tick.side, Some(TradeSide::Sell));
335 }
336
337 #[test]
338 fn test_normalize_alpaca_tick() {
339 let tick = normalizer().normalize(alpaca_tick("AAPL")).unwrap();
340 assert_eq!(tick.price, Decimal::from_str("180.50").unwrap());
341 assert_eq!(tick.quantity, Decimal::from_str("10").unwrap());
342 assert_eq!(tick.trade_id, Some("99".to_string()));
343 assert_eq!(tick.side, None);
344 }
345
346 #[test]
347 fn test_normalize_polygon_tick() {
348 let tick = normalizer().normalize(polygon_tick("AAPL")).unwrap();
349 assert_eq!(tick.price, Decimal::from_str("180.51").unwrap());
350 assert_eq!(tick.exchange_ts_ms, Some(1700000000004));
351 assert_eq!(tick.trade_id, Some("XYZ-001".to_string()));
352 }
353
354 #[test]
355 fn test_normalize_missing_price_field_returns_parse_error() {
356 let raw = RawTick {
357 exchange: Exchange::Binance,
358 symbol: "BTCUSDT".into(),
359 payload: json!({ "q": "1" }),
360 received_at_ms: 0,
361 };
362 let result = normalizer().normalize(raw);
363 assert!(matches!(result, Err(StreamError::ParseError { .. })));
364 }
365
366 #[test]
367 fn test_normalize_invalid_decimal_returns_parse_error() {
368 let raw = RawTick {
369 exchange: Exchange::Coinbase,
370 symbol: "BTC-USD".into(),
371 payload: json!({ "price": "not-a-number", "size": "1" }),
372 received_at_ms: 0,
373 };
374 let result = normalizer().normalize(raw);
375 assert!(matches!(result, Err(StreamError::ParseError { .. })));
376 }
377
378 #[test]
379 fn test_raw_tick_new_sets_received_at() {
380 let raw = RawTick::new(Exchange::Binance, "BTCUSDT", json!({}));
381 assert!(raw.received_at_ms > 0);
382 }
383
384 #[test]
385 fn test_normalize_numeric_price_field() {
386 let raw = RawTick {
387 exchange: Exchange::Binance,
388 symbol: "BTCUSDT".into(),
389 payload: json!({ "p": 50000.0, "q": 1.0 }),
390 received_at_ms: 0,
391 };
392 let tick = normalizer().normalize(raw).unwrap();
393 assert!(tick.price > Decimal::ZERO);
394 }
395}