Skip to main content

fin_stream/tick/
mod.rs

1//! Tick normalization — raw exchange messages → canonical NormalizedTick.
2//!
3//! ## Responsibility
4//! Convert heterogeneous exchange tick formats (Binance, Coinbase, Alpaca,
5//! Polygon) into a single canonical representation. This stage must add
6//! <1μs overhead per tick on the hot path.
7//!
8//! ## Guarantees
9//! - Deterministic: same raw bytes always produce the same NormalizedTick
10//! - Non-allocating hot path: NormalizedTick is stack-allocated
11//! - Thread-safe: TickNormalizer is Send + Sync
12
13use crate::error::StreamError;
14use rust_decimal::Decimal;
15use std::str::FromStr;
16
17/// Supported exchanges.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
19pub enum Exchange {
20    Binance,
21    Coinbase,
22    Alpaca,
23    Polygon,
24}
25
26impl std::fmt::Display for Exchange {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        match self {
29            Exchange::Binance => write!(f, "Binance"),
30            Exchange::Coinbase => write!(f, "Coinbase"),
31            Exchange::Alpaca => write!(f, "Alpaca"),
32            Exchange::Polygon => write!(f, "Polygon"),
33        }
34    }
35}
36
37impl FromStr for Exchange {
38    type Err = StreamError;
39    fn from_str(s: &str) -> Result<Self, Self::Err> {
40        match s.to_lowercase().as_str() {
41            "binance" => Ok(Exchange::Binance),
42            "coinbase" => Ok(Exchange::Coinbase),
43            "alpaca" => Ok(Exchange::Alpaca),
44            "polygon" => Ok(Exchange::Polygon),
45            _ => Err(StreamError::UnknownExchange(s.to_string())),
46        }
47    }
48}
49
50/// Raw tick — unprocessed bytes from an exchange WebSocket.
51#[derive(Debug, Clone)]
52pub struct RawTick {
53    pub exchange: Exchange,
54    pub symbol: String,
55    pub payload: serde_json::Value,
56    pub received_at_ms: u64,
57}
58
59impl RawTick {
60    pub fn new(exchange: Exchange, symbol: impl Into<String>, payload: serde_json::Value) -> Self {
61        Self {
62            exchange,
63            symbol: symbol.into(),
64            payload,
65            received_at_ms: now_ms(),
66        }
67    }
68}
69
70/// Canonical normalized tick — exchange-agnostic.
71#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
72pub struct NormalizedTick {
73    pub exchange: Exchange,
74    pub symbol: String,
75    pub price: Decimal,
76    pub quantity: Decimal,
77    pub side: Option<TradeSide>,
78    pub trade_id: Option<String>,
79    pub exchange_ts_ms: Option<u64>,
80    pub received_at_ms: u64,
81}
82
83/// Direction of trade that generated the tick.
84#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
85pub enum TradeSide {
86    Buy,
87    Sell,
88}
89
90/// Normalizes raw ticks from any supported exchange.
91pub struct TickNormalizer;
92
93impl TickNormalizer {
94    pub fn new() -> Self {
95        Self
96    }
97
98    /// Normalize a raw tick into canonical form.
99    pub fn normalize(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
100        match raw.exchange {
101            Exchange::Binance => self.normalize_binance(raw),
102            Exchange::Coinbase => self.normalize_coinbase(raw),
103            Exchange::Alpaca => self.normalize_alpaca(raw),
104            Exchange::Polygon => self.normalize_polygon(raw),
105        }
106    }
107
108    fn normalize_binance(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
109        let p = &raw.payload;
110        let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
111        let qty = parse_decimal_field(p, "q", &raw.exchange.to_string())?;
112        let side = p.get("m").and_then(|v| v.as_bool()).map(|maker| {
113            if maker { TradeSide::Sell } else { TradeSide::Buy }
114        });
115        let trade_id = p.get("t").and_then(|v| v.as_u64()).map(|id| id.to_string());
116        let exchange_ts = p.get("T").and_then(|v| v.as_u64());
117        Ok(NormalizedTick {
118            exchange: raw.exchange,
119            symbol: raw.symbol,
120            price,
121            quantity: qty,
122            side,
123            trade_id,
124            exchange_ts_ms: exchange_ts,
125            received_at_ms: raw.received_at_ms,
126        })
127    }
128
129    fn normalize_coinbase(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
130        let p = &raw.payload;
131        let price = parse_decimal_field(p, "price", &raw.exchange.to_string())?;
132        let qty = parse_decimal_field(p, "size", &raw.exchange.to_string())?;
133        let side = p.get("side").and_then(|v| v.as_str()).map(|s| {
134            if s == "buy" { TradeSide::Buy } else { TradeSide::Sell }
135        });
136        let trade_id = p.get("trade_id").and_then(|v| v.as_str()).map(str::to_string);
137        Ok(NormalizedTick {
138            exchange: raw.exchange,
139            symbol: raw.symbol,
140            price,
141            quantity: qty,
142            side,
143            trade_id,
144            exchange_ts_ms: None,
145            received_at_ms: raw.received_at_ms,
146        })
147    }
148
149    fn normalize_alpaca(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
150        let p = &raw.payload;
151        let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
152        let qty = parse_decimal_field(p, "s", &raw.exchange.to_string())?;
153        let trade_id = p.get("i").and_then(|v| v.as_u64()).map(|id| id.to_string());
154        Ok(NormalizedTick {
155            exchange: raw.exchange,
156            symbol: raw.symbol,
157            price,
158            quantity: qty,
159            side: None,
160            trade_id,
161            exchange_ts_ms: None,
162            received_at_ms: raw.received_at_ms,
163        })
164    }
165
166    fn normalize_polygon(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
167        let p = &raw.payload;
168        let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
169        let qty = parse_decimal_field(p, "s", &raw.exchange.to_string())?;
170        let trade_id = p.get("i").and_then(|v| v.as_str()).map(str::to_string);
171        let exchange_ts = p.get("t").and_then(|v| v.as_u64());
172        Ok(NormalizedTick {
173            exchange: raw.exchange,
174            symbol: raw.symbol,
175            price,
176            quantity: qty,
177            side: None,
178            trade_id,
179            exchange_ts_ms: exchange_ts,
180            received_at_ms: raw.received_at_ms,
181        })
182    }
183}
184
185impl Default for TickNormalizer {
186    fn default() -> Self {
187        Self::new()
188    }
189}
190
191fn parse_decimal_field(v: &serde_json::Value, field: &str, exchange: &str) -> Result<Decimal, StreamError> {
192    let raw = v.get(field).ok_or_else(|| StreamError::ParseError {
193        exchange: exchange.to_string(),
194        reason: format!("missing field '{}'", field),
195    })?;
196    let s = if let Some(s) = raw.as_str() {
197        s.to_string()
198    } else if let Some(n) = raw.as_f64() {
199        n.to_string()
200    } else {
201        return Err(StreamError::ParseError {
202            exchange: exchange.to_string(),
203            reason: format!("field '{}' is not a string or number", field),
204        });
205    };
206    Decimal::from_str(&s).map_err(|e| StreamError::ParseError {
207        exchange: exchange.to_string(),
208        reason: format!("field '{}' parse error: {}", field, e),
209    })
210}
211
212fn now_ms() -> u64 {
213    std::time::SystemTime::now()
214        .duration_since(std::time::UNIX_EPOCH)
215        .map(|d| d.as_millis() as u64)
216        .unwrap_or(0)
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222    use serde_json::json;
223
224    fn normalizer() -> TickNormalizer { TickNormalizer::new() }
225
226    fn binance_tick(symbol: &str) -> RawTick {
227        RawTick {
228            exchange: Exchange::Binance,
229            symbol: symbol.to_string(),
230            payload: json!({ "p": "50000.12", "q": "0.001", "m": false, "t": 12345, "T": 1700000000000u64 }),
231            received_at_ms: 1700000000001,
232        }
233    }
234
235    fn coinbase_tick(symbol: &str) -> RawTick {
236        RawTick {
237            exchange: Exchange::Coinbase,
238            symbol: symbol.to_string(),
239            payload: json!({ "price": "50001.00", "size": "0.5", "side": "buy", "trade_id": "abc123" }),
240            received_at_ms: 1700000000002,
241        }
242    }
243
244    fn alpaca_tick(symbol: &str) -> RawTick {
245        RawTick {
246            exchange: Exchange::Alpaca,
247            symbol: symbol.to_string(),
248            payload: json!({ "p": "180.50", "s": "10", "i": 99 }),
249            received_at_ms: 1700000000003,
250        }
251    }
252
253    fn polygon_tick(symbol: &str) -> RawTick {
254        RawTick {
255            exchange: Exchange::Polygon,
256            symbol: symbol.to_string(),
257            payload: json!({ "p": "180.51", "s": "5", "i": "XYZ-001", "t": 1700000000004u64 }),
258            received_at_ms: 1700000000005,
259        }
260    }
261
262    #[test]
263    fn test_exchange_from_str_valid() {
264        assert_eq!("binance".parse::<Exchange>().unwrap(), Exchange::Binance);
265        assert_eq!("Coinbase".parse::<Exchange>().unwrap(), Exchange::Coinbase);
266        assert_eq!("ALPACA".parse::<Exchange>().unwrap(), Exchange::Alpaca);
267        assert_eq!("polygon".parse::<Exchange>().unwrap(), Exchange::Polygon);
268    }
269
270    #[test]
271    fn test_exchange_from_str_unknown_returns_error() {
272        let result = "Kraken".parse::<Exchange>();
273        assert!(matches!(result, Err(StreamError::UnknownExchange(_))));
274    }
275
276    #[test]
277    fn test_exchange_display() {
278        assert_eq!(Exchange::Binance.to_string(), "Binance");
279        assert_eq!(Exchange::Coinbase.to_string(), "Coinbase");
280    }
281
282    #[test]
283    fn test_normalize_binance_tick_price_and_qty() {
284        let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
285        assert_eq!(tick.price, Decimal::from_str("50000.12").unwrap());
286        assert_eq!(tick.quantity, Decimal::from_str("0.001").unwrap());
287        assert_eq!(tick.exchange, Exchange::Binance);
288        assert_eq!(tick.symbol, "BTCUSDT");
289    }
290
291    #[test]
292    fn test_normalize_binance_side_maker_false_is_buy() {
293        let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
294        assert_eq!(tick.side, Some(TradeSide::Buy));
295    }
296
297    #[test]
298    fn test_normalize_binance_side_maker_true_is_sell() {
299        let raw = RawTick {
300            exchange: Exchange::Binance,
301            symbol: "BTCUSDT".into(),
302            payload: json!({ "p": "50000", "q": "1", "m": true }),
303            received_at_ms: 0,
304        };
305        let tick = normalizer().normalize(raw).unwrap();
306        assert_eq!(tick.side, Some(TradeSide::Sell));
307    }
308
309    #[test]
310    fn test_normalize_binance_trade_id_and_ts() {
311        let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
312        assert_eq!(tick.trade_id, Some("12345".to_string()));
313        assert_eq!(tick.exchange_ts_ms, Some(1700000000000));
314    }
315
316    #[test]
317    fn test_normalize_coinbase_tick() {
318        let tick = normalizer().normalize(coinbase_tick("BTC-USD")).unwrap();
319        assert_eq!(tick.price, Decimal::from_str("50001.00").unwrap());
320        assert_eq!(tick.quantity, Decimal::from_str("0.5").unwrap());
321        assert_eq!(tick.side, Some(TradeSide::Buy));
322        assert_eq!(tick.trade_id, Some("abc123".to_string()));
323    }
324
325    #[test]
326    fn test_normalize_coinbase_sell_side() {
327        let raw = RawTick {
328            exchange: Exchange::Coinbase,
329            symbol: "BTC-USD".into(),
330            payload: json!({ "price": "50000", "size": "1", "side": "sell" }),
331            received_at_ms: 0,
332        };
333        let tick = normalizer().normalize(raw).unwrap();
334        assert_eq!(tick.side, Some(TradeSide::Sell));
335    }
336
337    #[test]
338    fn test_normalize_alpaca_tick() {
339        let tick = normalizer().normalize(alpaca_tick("AAPL")).unwrap();
340        assert_eq!(tick.price, Decimal::from_str("180.50").unwrap());
341        assert_eq!(tick.quantity, Decimal::from_str("10").unwrap());
342        assert_eq!(tick.trade_id, Some("99".to_string()));
343        assert_eq!(tick.side, None);
344    }
345
346    #[test]
347    fn test_normalize_polygon_tick() {
348        let tick = normalizer().normalize(polygon_tick("AAPL")).unwrap();
349        assert_eq!(tick.price, Decimal::from_str("180.51").unwrap());
350        assert_eq!(tick.exchange_ts_ms, Some(1700000000004));
351        assert_eq!(tick.trade_id, Some("XYZ-001".to_string()));
352    }
353
354    #[test]
355    fn test_normalize_missing_price_field_returns_parse_error() {
356        let raw = RawTick {
357            exchange: Exchange::Binance,
358            symbol: "BTCUSDT".into(),
359            payload: json!({ "q": "1" }),
360            received_at_ms: 0,
361        };
362        let result = normalizer().normalize(raw);
363        assert!(matches!(result, Err(StreamError::ParseError { .. })));
364    }
365
366    #[test]
367    fn test_normalize_invalid_decimal_returns_parse_error() {
368        let raw = RawTick {
369            exchange: Exchange::Coinbase,
370            symbol: "BTC-USD".into(),
371            payload: json!({ "price": "not-a-number", "size": "1" }),
372            received_at_ms: 0,
373        };
374        let result = normalizer().normalize(raw);
375        assert!(matches!(result, Err(StreamError::ParseError { .. })));
376    }
377
378    #[test]
379    fn test_raw_tick_new_sets_received_at() {
380        let raw = RawTick::new(Exchange::Binance, "BTCUSDT", json!({}));
381        assert!(raw.received_at_ms > 0);
382    }
383
384    #[test]
385    fn test_normalize_numeric_price_field() {
386        let raw = RawTick {
387            exchange: Exchange::Binance,
388            symbol: "BTCUSDT".into(),
389            payload: json!({ "p": 50000.0, "q": 1.0 }),
390            received_at_ms: 0,
391        };
392        let tick = normalizer().normalize(raw).unwrap();
393        assert!(tick.price > Decimal::ZERO);
394    }
395}