1use crate::error::StreamError;
14use rust_decimal::Decimal;
15use std::str::FromStr;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
19pub enum Exchange {
20 Binance,
22 Coinbase,
24 Alpaca,
26 Polygon,
28}
29
30impl std::fmt::Display for Exchange {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 match self {
33 Exchange::Binance => write!(f, "Binance"),
34 Exchange::Coinbase => write!(f, "Coinbase"),
35 Exchange::Alpaca => write!(f, "Alpaca"),
36 Exchange::Polygon => write!(f, "Polygon"),
37 }
38 }
39}
40
41impl FromStr for Exchange {
42 type Err = StreamError;
43 fn from_str(s: &str) -> Result<Self, Self::Err> {
44 match s.to_lowercase().as_str() {
45 "binance" => Ok(Exchange::Binance),
46 "coinbase" => Ok(Exchange::Coinbase),
47 "alpaca" => Ok(Exchange::Alpaca),
48 "polygon" => Ok(Exchange::Polygon),
49 _ => Err(StreamError::UnknownExchange(s.to_string())),
50 }
51 }
52}
53
54#[derive(Debug, Clone)]
56pub struct RawTick {
57 pub exchange: Exchange,
59 pub symbol: String,
61 pub payload: serde_json::Value,
63 pub received_at_ms: u64,
65}
66
67impl RawTick {
68 pub fn new(exchange: Exchange, symbol: impl Into<String>, payload: serde_json::Value) -> Self {
70 Self {
71 exchange,
72 symbol: symbol.into(),
73 payload,
74 received_at_ms: now_ms(),
75 }
76 }
77}
78
79#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
81pub struct NormalizedTick {
82 pub exchange: Exchange,
84 pub symbol: String,
86 pub price: Decimal,
88 pub quantity: Decimal,
90 pub side: Option<TradeSide>,
92 pub trade_id: Option<String>,
94 pub exchange_ts_ms: Option<u64>,
96 pub received_at_ms: u64,
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
102pub enum TradeSide {
103 Buy,
105 Sell,
107}
108
109pub struct TickNormalizer;
114
115impl TickNormalizer {
116 pub fn new() -> Self {
118 Self
119 }
120
121 pub fn normalize(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
123 match raw.exchange {
124 Exchange::Binance => self.normalize_binance(raw),
125 Exchange::Coinbase => self.normalize_coinbase(raw),
126 Exchange::Alpaca => self.normalize_alpaca(raw),
127 Exchange::Polygon => self.normalize_polygon(raw),
128 }
129 }
130
131 fn normalize_binance(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
132 let p = &raw.payload;
133 let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
134 let qty = parse_decimal_field(p, "q", &raw.exchange.to_string())?;
135 let side = p.get("m").and_then(|v| v.as_bool()).map(|maker| {
136 if maker {
137 TradeSide::Sell
138 } else {
139 TradeSide::Buy
140 }
141 });
142 let trade_id = p.get("t").and_then(|v| v.as_u64()).map(|id| id.to_string());
143 let exchange_ts = p.get("T").and_then(|v| v.as_u64());
144 Ok(NormalizedTick {
145 exchange: raw.exchange,
146 symbol: raw.symbol,
147 price,
148 quantity: qty,
149 side,
150 trade_id,
151 exchange_ts_ms: exchange_ts,
152 received_at_ms: raw.received_at_ms,
153 })
154 }
155
156 fn normalize_coinbase(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
157 let p = &raw.payload;
158 let price = parse_decimal_field(p, "price", &raw.exchange.to_string())?;
159 let qty = parse_decimal_field(p, "size", &raw.exchange.to_string())?;
160 let side = p.get("side").and_then(|v| v.as_str()).map(|s| {
161 if s == "buy" {
162 TradeSide::Buy
163 } else {
164 TradeSide::Sell
165 }
166 });
167 let trade_id = p
168 .get("trade_id")
169 .and_then(|v| v.as_str())
170 .map(str::to_string);
171 Ok(NormalizedTick {
172 exchange: raw.exchange,
173 symbol: raw.symbol,
174 price,
175 quantity: qty,
176 side,
177 trade_id,
178 exchange_ts_ms: None,
179 received_at_ms: raw.received_at_ms,
180 })
181 }
182
183 fn normalize_alpaca(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
184 let p = &raw.payload;
185 let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
186 let qty = parse_decimal_field(p, "s", &raw.exchange.to_string())?;
187 let trade_id = p.get("i").and_then(|v| v.as_u64()).map(|id| id.to_string());
188 Ok(NormalizedTick {
189 exchange: raw.exchange,
190 symbol: raw.symbol,
191 price,
192 quantity: qty,
193 side: None,
194 trade_id,
195 exchange_ts_ms: None,
196 received_at_ms: raw.received_at_ms,
197 })
198 }
199
200 fn normalize_polygon(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
201 let p = &raw.payload;
202 let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
203 let qty = parse_decimal_field(p, "s", &raw.exchange.to_string())?;
204 let trade_id = p.get("i").and_then(|v| v.as_str()).map(str::to_string);
205 let exchange_ts = p.get("t").and_then(|v| v.as_u64());
206 Ok(NormalizedTick {
207 exchange: raw.exchange,
208 symbol: raw.symbol,
209 price,
210 quantity: qty,
211 side: None,
212 trade_id,
213 exchange_ts_ms: exchange_ts,
214 received_at_ms: raw.received_at_ms,
215 })
216 }
217}
218
219impl Default for TickNormalizer {
220 fn default() -> Self {
221 Self::new()
222 }
223}
224
225fn parse_decimal_field(
226 v: &serde_json::Value,
227 field: &str,
228 exchange: &str,
229) -> Result<Decimal, StreamError> {
230 let raw = v.get(field).ok_or_else(|| StreamError::ParseError {
231 exchange: exchange.to_string(),
232 reason: format!("missing field '{}'", field),
233 })?;
234 let s = if let Some(s) = raw.as_str() {
235 s.to_string()
236 } else if let Some(n) = raw.as_f64() {
237 n.to_string()
238 } else {
239 return Err(StreamError::ParseError {
240 exchange: exchange.to_string(),
241 reason: format!("field '{}' is not a string or number", field),
242 });
243 };
244 Decimal::from_str(&s).map_err(|e| StreamError::ParseError {
245 exchange: exchange.to_string(),
246 reason: format!("field '{}' parse error: {}", field, e),
247 })
248}
249
250fn now_ms() -> u64 {
251 std::time::SystemTime::now()
252 .duration_since(std::time::UNIX_EPOCH)
253 .map(|d| d.as_millis() as u64)
254 .unwrap_or(0)
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use serde_json::json;
261
262 fn normalizer() -> TickNormalizer {
263 TickNormalizer::new()
264 }
265
266 fn binance_tick(symbol: &str) -> RawTick {
267 RawTick {
268 exchange: Exchange::Binance,
269 symbol: symbol.to_string(),
270 payload: json!({ "p": "50000.12", "q": "0.001", "m": false, "t": 12345, "T": 1700000000000u64 }),
271 received_at_ms: 1700000000001,
272 }
273 }
274
275 fn coinbase_tick(symbol: &str) -> RawTick {
276 RawTick {
277 exchange: Exchange::Coinbase,
278 symbol: symbol.to_string(),
279 payload: json!({ "price": "50001.00", "size": "0.5", "side": "buy", "trade_id": "abc123" }),
280 received_at_ms: 1700000000002,
281 }
282 }
283
284 fn alpaca_tick(symbol: &str) -> RawTick {
285 RawTick {
286 exchange: Exchange::Alpaca,
287 symbol: symbol.to_string(),
288 payload: json!({ "p": "180.50", "s": "10", "i": 99 }),
289 received_at_ms: 1700000000003,
290 }
291 }
292
293 fn polygon_tick(symbol: &str) -> RawTick {
294 RawTick {
295 exchange: Exchange::Polygon,
296 symbol: symbol.to_string(),
297 payload: json!({ "p": "180.51", "s": "5", "i": "XYZ-001", "t": 1700000000004u64 }),
298 received_at_ms: 1700000000005,
299 }
300 }
301
302 #[test]
303 fn test_exchange_from_str_valid() {
304 assert_eq!("binance".parse::<Exchange>().unwrap(), Exchange::Binance);
305 assert_eq!("Coinbase".parse::<Exchange>().unwrap(), Exchange::Coinbase);
306 assert_eq!("ALPACA".parse::<Exchange>().unwrap(), Exchange::Alpaca);
307 assert_eq!("polygon".parse::<Exchange>().unwrap(), Exchange::Polygon);
308 }
309
310 #[test]
311 fn test_exchange_from_str_unknown_returns_error() {
312 let result = "Kraken".parse::<Exchange>();
313 assert!(matches!(result, Err(StreamError::UnknownExchange(_))));
314 }
315
316 #[test]
317 fn test_exchange_display() {
318 assert_eq!(Exchange::Binance.to_string(), "Binance");
319 assert_eq!(Exchange::Coinbase.to_string(), "Coinbase");
320 }
321
322 #[test]
323 fn test_normalize_binance_tick_price_and_qty() {
324 let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
325 assert_eq!(tick.price, Decimal::from_str("50000.12").unwrap());
326 assert_eq!(tick.quantity, Decimal::from_str("0.001").unwrap());
327 assert_eq!(tick.exchange, Exchange::Binance);
328 assert_eq!(tick.symbol, "BTCUSDT");
329 }
330
331 #[test]
332 fn test_normalize_binance_side_maker_false_is_buy() {
333 let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
334 assert_eq!(tick.side, Some(TradeSide::Buy));
335 }
336
337 #[test]
338 fn test_normalize_binance_side_maker_true_is_sell() {
339 let raw = RawTick {
340 exchange: Exchange::Binance,
341 symbol: "BTCUSDT".into(),
342 payload: json!({ "p": "50000", "q": "1", "m": true }),
343 received_at_ms: 0,
344 };
345 let tick = normalizer().normalize(raw).unwrap();
346 assert_eq!(tick.side, Some(TradeSide::Sell));
347 }
348
349 #[test]
350 fn test_normalize_binance_trade_id_and_ts() {
351 let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
352 assert_eq!(tick.trade_id, Some("12345".to_string()));
353 assert_eq!(tick.exchange_ts_ms, Some(1700000000000));
354 }
355
356 #[test]
357 fn test_normalize_coinbase_tick() {
358 let tick = normalizer().normalize(coinbase_tick("BTC-USD")).unwrap();
359 assert_eq!(tick.price, Decimal::from_str("50001.00").unwrap());
360 assert_eq!(tick.quantity, Decimal::from_str("0.5").unwrap());
361 assert_eq!(tick.side, Some(TradeSide::Buy));
362 assert_eq!(tick.trade_id, Some("abc123".to_string()));
363 }
364
365 #[test]
366 fn test_normalize_coinbase_sell_side() {
367 let raw = RawTick {
368 exchange: Exchange::Coinbase,
369 symbol: "BTC-USD".into(),
370 payload: json!({ "price": "50000", "size": "1", "side": "sell" }),
371 received_at_ms: 0,
372 };
373 let tick = normalizer().normalize(raw).unwrap();
374 assert_eq!(tick.side, Some(TradeSide::Sell));
375 }
376
377 #[test]
378 fn test_normalize_alpaca_tick() {
379 let tick = normalizer().normalize(alpaca_tick("AAPL")).unwrap();
380 assert_eq!(tick.price, Decimal::from_str("180.50").unwrap());
381 assert_eq!(tick.quantity, Decimal::from_str("10").unwrap());
382 assert_eq!(tick.trade_id, Some("99".to_string()));
383 assert_eq!(tick.side, None);
384 }
385
386 #[test]
387 fn test_normalize_polygon_tick() {
388 let tick = normalizer().normalize(polygon_tick("AAPL")).unwrap();
389 assert_eq!(tick.price, Decimal::from_str("180.51").unwrap());
390 assert_eq!(tick.exchange_ts_ms, Some(1700000000004));
391 assert_eq!(tick.trade_id, Some("XYZ-001".to_string()));
392 }
393
394 #[test]
395 fn test_normalize_missing_price_field_returns_parse_error() {
396 let raw = RawTick {
397 exchange: Exchange::Binance,
398 symbol: "BTCUSDT".into(),
399 payload: json!({ "q": "1" }),
400 received_at_ms: 0,
401 };
402 let result = normalizer().normalize(raw);
403 assert!(matches!(result, Err(StreamError::ParseError { .. })));
404 }
405
406 #[test]
407 fn test_normalize_invalid_decimal_returns_parse_error() {
408 let raw = RawTick {
409 exchange: Exchange::Coinbase,
410 symbol: "BTC-USD".into(),
411 payload: json!({ "price": "not-a-number", "size": "1" }),
412 received_at_ms: 0,
413 };
414 let result = normalizer().normalize(raw);
415 assert!(matches!(result, Err(StreamError::ParseError { .. })));
416 }
417
418 #[test]
419 fn test_raw_tick_new_sets_received_at() {
420 let raw = RawTick::new(Exchange::Binance, "BTCUSDT", json!({}));
421 assert!(raw.received_at_ms > 0);
422 }
423
424 #[test]
425 fn test_normalize_numeric_price_field() {
426 let raw = RawTick {
427 exchange: Exchange::Binance,
428 symbol: "BTCUSDT".into(),
429 payload: json!({ "p": 50000.0, "q": 1.0 }),
430 received_at_ms: 0,
431 };
432 let tick = normalizer().normalize(raw).unwrap();
433 assert!(tick.price > Decimal::ZERO);
434 }
435}