1use crate::error::StreamError;
14use rust_decimal::Decimal;
15use std::str::FromStr;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
19pub enum Exchange {
20 Binance,
22 Coinbase,
24 Alpaca,
26 Polygon,
28}
29
30impl std::fmt::Display for Exchange {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 match self {
33 Exchange::Binance => write!(f, "Binance"),
34 Exchange::Coinbase => write!(f, "Coinbase"),
35 Exchange::Alpaca => write!(f, "Alpaca"),
36 Exchange::Polygon => write!(f, "Polygon"),
37 }
38 }
39}
40
41impl FromStr for Exchange {
42 type Err = StreamError;
43 fn from_str(s: &str) -> Result<Self, Self::Err> {
44 match s.to_lowercase().as_str() {
45 "binance" => Ok(Exchange::Binance),
46 "coinbase" => Ok(Exchange::Coinbase),
47 "alpaca" => Ok(Exchange::Alpaca),
48 "polygon" => Ok(Exchange::Polygon),
49 _ => Err(StreamError::UnknownExchange(s.to_string())),
50 }
51 }
52}
53
54#[derive(Debug, Clone)]
56pub struct RawTick {
57 pub exchange: Exchange,
59 pub symbol: String,
61 pub payload: serde_json::Value,
63 pub received_at_ms: u64,
65}
66
67impl RawTick {
68 pub fn new(exchange: Exchange, symbol: impl Into<String>, payload: serde_json::Value) -> Self {
70 Self {
71 exchange,
72 symbol: symbol.into(),
73 payload,
74 received_at_ms: now_ms(),
75 }
76 }
77}
78
79#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
81pub struct NormalizedTick {
82 pub exchange: Exchange,
84 pub symbol: String,
86 pub price: Decimal,
88 pub quantity: Decimal,
90 pub side: Option<TradeSide>,
92 pub trade_id: Option<String>,
94 pub exchange_ts_ms: Option<u64>,
96 pub received_at_ms: u64,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
102pub enum TradeSide {
103 Buy,
105 Sell,
107}
108
109pub struct TickNormalizer;
114
115impl TickNormalizer {
116 pub fn new() -> Self {
118 Self
119 }
120
121 pub fn normalize(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
123 match raw.exchange {
124 Exchange::Binance => self.normalize_binance(raw),
125 Exchange::Coinbase => self.normalize_coinbase(raw),
126 Exchange::Alpaca => self.normalize_alpaca(raw),
127 Exchange::Polygon => self.normalize_polygon(raw),
128 }
129 }
130
131 fn normalize_binance(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
132 let p = &raw.payload;
133 let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
134 let qty = parse_decimal_field(p, "q", &raw.exchange.to_string())?;
135 let side = p.get("m").and_then(|v| v.as_bool()).map(|maker| {
136 if maker { TradeSide::Sell } else { TradeSide::Buy }
137 });
138 let trade_id = p.get("t").and_then(|v| v.as_u64()).map(|id| id.to_string());
139 let exchange_ts = p.get("T").and_then(|v| v.as_u64());
140 Ok(NormalizedTick {
141 exchange: raw.exchange,
142 symbol: raw.symbol,
143 price,
144 quantity: qty,
145 side,
146 trade_id,
147 exchange_ts_ms: exchange_ts,
148 received_at_ms: raw.received_at_ms,
149 })
150 }
151
152 fn normalize_coinbase(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
153 let p = &raw.payload;
154 let price = parse_decimal_field(p, "price", &raw.exchange.to_string())?;
155 let qty = parse_decimal_field(p, "size", &raw.exchange.to_string())?;
156 let side = p.get("side").and_then(|v| v.as_str()).map(|s| {
157 if s == "buy" { TradeSide::Buy } else { TradeSide::Sell }
158 });
159 let trade_id = p.get("trade_id").and_then(|v| v.as_str()).map(str::to_string);
160 Ok(NormalizedTick {
161 exchange: raw.exchange,
162 symbol: raw.symbol,
163 price,
164 quantity: qty,
165 side,
166 trade_id,
167 exchange_ts_ms: None,
168 received_at_ms: raw.received_at_ms,
169 })
170 }
171
172 fn normalize_alpaca(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
173 let p = &raw.payload;
174 let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
175 let qty = parse_decimal_field(p, "s", &raw.exchange.to_string())?;
176 let trade_id = p.get("i").and_then(|v| v.as_u64()).map(|id| id.to_string());
177 Ok(NormalizedTick {
178 exchange: raw.exchange,
179 symbol: raw.symbol,
180 price,
181 quantity: qty,
182 side: None,
183 trade_id,
184 exchange_ts_ms: None,
185 received_at_ms: raw.received_at_ms,
186 })
187 }
188
189 fn normalize_polygon(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
190 let p = &raw.payload;
191 let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
192 let qty = parse_decimal_field(p, "s", &raw.exchange.to_string())?;
193 let trade_id = p.get("i").and_then(|v| v.as_str()).map(str::to_string);
194 let exchange_ts = p.get("t").and_then(|v| v.as_u64());
195 Ok(NormalizedTick {
196 exchange: raw.exchange,
197 symbol: raw.symbol,
198 price,
199 quantity: qty,
200 side: None,
201 trade_id,
202 exchange_ts_ms: exchange_ts,
203 received_at_ms: raw.received_at_ms,
204 })
205 }
206}
207
208impl Default for TickNormalizer {
209 fn default() -> Self {
210 Self::new()
211 }
212}
213
214fn parse_decimal_field(v: &serde_json::Value, field: &str, exchange: &str) -> Result<Decimal, StreamError> {
215 let raw = v.get(field).ok_or_else(|| StreamError::ParseError {
216 exchange: exchange.to_string(),
217 reason: format!("missing field '{}'", field),
218 })?;
219 let s = if let Some(s) = raw.as_str() {
220 s.to_string()
221 } else if let Some(n) = raw.as_f64() {
222 n.to_string()
223 } else {
224 return Err(StreamError::ParseError {
225 exchange: exchange.to_string(),
226 reason: format!("field '{}' is not a string or number", field),
227 });
228 };
229 Decimal::from_str(&s).map_err(|e| StreamError::ParseError {
230 exchange: exchange.to_string(),
231 reason: format!("field '{}' parse error: {}", field, e),
232 })
233}
234
235fn now_ms() -> u64 {
236 std::time::SystemTime::now()
237 .duration_since(std::time::UNIX_EPOCH)
238 .map(|d| d.as_millis() as u64)
239 .unwrap_or(0)
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245 use serde_json::json;
246
247 fn normalizer() -> TickNormalizer { TickNormalizer::new() }
248
249 fn binance_tick(symbol: &str) -> RawTick {
250 RawTick {
251 exchange: Exchange::Binance,
252 symbol: symbol.to_string(),
253 payload: json!({ "p": "50000.12", "q": "0.001", "m": false, "t": 12345, "T": 1700000000000u64 }),
254 received_at_ms: 1700000000001,
255 }
256 }
257
258 fn coinbase_tick(symbol: &str) -> RawTick {
259 RawTick {
260 exchange: Exchange::Coinbase,
261 symbol: symbol.to_string(),
262 payload: json!({ "price": "50001.00", "size": "0.5", "side": "buy", "trade_id": "abc123" }),
263 received_at_ms: 1700000000002,
264 }
265 }
266
267 fn alpaca_tick(symbol: &str) -> RawTick {
268 RawTick {
269 exchange: Exchange::Alpaca,
270 symbol: symbol.to_string(),
271 payload: json!({ "p": "180.50", "s": "10", "i": 99 }),
272 received_at_ms: 1700000000003,
273 }
274 }
275
276 fn polygon_tick(symbol: &str) -> RawTick {
277 RawTick {
278 exchange: Exchange::Polygon,
279 symbol: symbol.to_string(),
280 payload: json!({ "p": "180.51", "s": "5", "i": "XYZ-001", "t": 1700000000004u64 }),
281 received_at_ms: 1700000000005,
282 }
283 }
284
285 #[test]
286 fn test_exchange_from_str_valid() {
287 assert_eq!("binance".parse::<Exchange>().unwrap(), Exchange::Binance);
288 assert_eq!("Coinbase".parse::<Exchange>().unwrap(), Exchange::Coinbase);
289 assert_eq!("ALPACA".parse::<Exchange>().unwrap(), Exchange::Alpaca);
290 assert_eq!("polygon".parse::<Exchange>().unwrap(), Exchange::Polygon);
291 }
292
293 #[test]
294 fn test_exchange_from_str_unknown_returns_error() {
295 let result = "Kraken".parse::<Exchange>();
296 assert!(matches!(result, Err(StreamError::UnknownExchange(_))));
297 }
298
299 #[test]
300 fn test_exchange_display() {
301 assert_eq!(Exchange::Binance.to_string(), "Binance");
302 assert_eq!(Exchange::Coinbase.to_string(), "Coinbase");
303 }
304
305 #[test]
306 fn test_normalize_binance_tick_price_and_qty() {
307 let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
308 assert_eq!(tick.price, Decimal::from_str("50000.12").unwrap());
309 assert_eq!(tick.quantity, Decimal::from_str("0.001").unwrap());
310 assert_eq!(tick.exchange, Exchange::Binance);
311 assert_eq!(tick.symbol, "BTCUSDT");
312 }
313
314 #[test]
315 fn test_normalize_binance_side_maker_false_is_buy() {
316 let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
317 assert_eq!(tick.side, Some(TradeSide::Buy));
318 }
319
320 #[test]
321 fn test_normalize_binance_side_maker_true_is_sell() {
322 let raw = RawTick {
323 exchange: Exchange::Binance,
324 symbol: "BTCUSDT".into(),
325 payload: json!({ "p": "50000", "q": "1", "m": true }),
326 received_at_ms: 0,
327 };
328 let tick = normalizer().normalize(raw).unwrap();
329 assert_eq!(tick.side, Some(TradeSide::Sell));
330 }
331
332 #[test]
333 fn test_normalize_binance_trade_id_and_ts() {
334 let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
335 assert_eq!(tick.trade_id, Some("12345".to_string()));
336 assert_eq!(tick.exchange_ts_ms, Some(1700000000000));
337 }
338
339 #[test]
340 fn test_normalize_coinbase_tick() {
341 let tick = normalizer().normalize(coinbase_tick("BTC-USD")).unwrap();
342 assert_eq!(tick.price, Decimal::from_str("50001.00").unwrap());
343 assert_eq!(tick.quantity, Decimal::from_str("0.5").unwrap());
344 assert_eq!(tick.side, Some(TradeSide::Buy));
345 assert_eq!(tick.trade_id, Some("abc123".to_string()));
346 }
347
348 #[test]
349 fn test_normalize_coinbase_sell_side() {
350 let raw = RawTick {
351 exchange: Exchange::Coinbase,
352 symbol: "BTC-USD".into(),
353 payload: json!({ "price": "50000", "size": "1", "side": "sell" }),
354 received_at_ms: 0,
355 };
356 let tick = normalizer().normalize(raw).unwrap();
357 assert_eq!(tick.side, Some(TradeSide::Sell));
358 }
359
360 #[test]
361 fn test_normalize_alpaca_tick() {
362 let tick = normalizer().normalize(alpaca_tick("AAPL")).unwrap();
363 assert_eq!(tick.price, Decimal::from_str("180.50").unwrap());
364 assert_eq!(tick.quantity, Decimal::from_str("10").unwrap());
365 assert_eq!(tick.trade_id, Some("99".to_string()));
366 assert_eq!(tick.side, None);
367 }
368
369 #[test]
370 fn test_normalize_polygon_tick() {
371 let tick = normalizer().normalize(polygon_tick("AAPL")).unwrap();
372 assert_eq!(tick.price, Decimal::from_str("180.51").unwrap());
373 assert_eq!(tick.exchange_ts_ms, Some(1700000000004));
374 assert_eq!(tick.trade_id, Some("XYZ-001".to_string()));
375 }
376
377 #[test]
378 fn test_normalize_missing_price_field_returns_parse_error() {
379 let raw = RawTick {
380 exchange: Exchange::Binance,
381 symbol: "BTCUSDT".into(),
382 payload: json!({ "q": "1" }),
383 received_at_ms: 0,
384 };
385 let result = normalizer().normalize(raw);
386 assert!(matches!(result, Err(StreamError::ParseError { .. })));
387 }
388
389 #[test]
390 fn test_normalize_invalid_decimal_returns_parse_error() {
391 let raw = RawTick {
392 exchange: Exchange::Coinbase,
393 symbol: "BTC-USD".into(),
394 payload: json!({ "price": "not-a-number", "size": "1" }),
395 received_at_ms: 0,
396 };
397 let result = normalizer().normalize(raw);
398 assert!(matches!(result, Err(StreamError::ParseError { .. })));
399 }
400
401 #[test]
402 fn test_raw_tick_new_sets_received_at() {
403 let raw = RawTick::new(Exchange::Binance, "BTCUSDT", json!({}));
404 assert!(raw.received_at_ms > 0);
405 }
406
407 #[test]
408 fn test_normalize_numeric_price_field() {
409 let raw = RawTick {
410 exchange: Exchange::Binance,
411 symbol: "BTCUSDT".into(),
412 payload: json!({ "p": 50000.0, "q": 1.0 }),
413 received_at_ms: 0,
414 };
415 let tick = normalizer().normalize(raw).unwrap();
416 assert!(tick.price > Decimal::ZERO);
417 }
418}