Skip to main content

fin_stream/tick/
mod.rs

1//! Tick normalization — raw exchange messages → canonical NormalizedTick.
2//!
3//! ## Responsibility
4//! Convert heterogeneous exchange tick formats (Binance, Coinbase, Alpaca,
5//! Polygon) into a single canonical representation. This stage must add
6//! <1μs overhead per tick on the hot path.
7//!
8//! ## Guarantees
9//! - Deterministic: same raw bytes always produce the same NormalizedTick
10//! - Non-allocating hot path: NormalizedTick is stack-allocated
11//! - Thread-safe: TickNormalizer is Send + Sync
12
13use crate::error::StreamError;
14use rust_decimal::Decimal;
15use std::str::FromStr;
16
17/// Supported exchanges.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
19pub enum Exchange {
20    /// Binance spot/futures WebSocket feed.
21    Binance,
22    /// Coinbase Advanced Trade WebSocket feed.
23    Coinbase,
24    /// Alpaca Markets data stream.
25    Alpaca,
26    /// Polygon.io WebSocket feed.
27    Polygon,
28}
29
30impl std::fmt::Display for Exchange {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        match self {
33            Exchange::Binance => write!(f, "Binance"),
34            Exchange::Coinbase => write!(f, "Coinbase"),
35            Exchange::Alpaca => write!(f, "Alpaca"),
36            Exchange::Polygon => write!(f, "Polygon"),
37        }
38    }
39}
40
41impl FromStr for Exchange {
42    type Err = StreamError;
43    fn from_str(s: &str) -> Result<Self, Self::Err> {
44        match s.to_lowercase().as_str() {
45            "binance" => Ok(Exchange::Binance),
46            "coinbase" => Ok(Exchange::Coinbase),
47            "alpaca" => Ok(Exchange::Alpaca),
48            "polygon" => Ok(Exchange::Polygon),
49            _ => Err(StreamError::UnknownExchange(s.to_string())),
50        }
51    }
52}
53
54/// Raw tick — unprocessed bytes from an exchange WebSocket.
55#[derive(Debug, Clone)]
56pub struct RawTick {
57    /// Source exchange.
58    pub exchange: Exchange,
59    /// Instrument symbol as reported by the exchange.
60    pub symbol: String,
61    /// Raw JSON payload from the WebSocket frame.
62    pub payload: serde_json::Value,
63    /// System-clock timestamp (ms since Unix epoch) when the tick was received.
64    pub received_at_ms: u64,
65}
66
67impl RawTick {
68    /// Construct a new [`RawTick`], stamping `received_at_ms` from the system clock.
69    pub fn new(exchange: Exchange, symbol: impl Into<String>, payload: serde_json::Value) -> Self {
70        Self {
71            exchange,
72            symbol: symbol.into(),
73            payload,
74            received_at_ms: now_ms(),
75        }
76    }
77}
78
79/// Canonical normalized tick — exchange-agnostic.
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
81pub struct NormalizedTick {
82    /// Source exchange.
83    pub exchange: Exchange,
84    /// Instrument symbol in the canonical form used by this crate.
85    pub symbol: String,
86    /// Trade price (exact decimal, never `f64`).
87    pub price: Decimal,
88    /// Trade quantity (exact decimal).
89    pub quantity: Decimal,
90    /// Direction of the aggressing order, if available from the exchange.
91    pub side: Option<TradeSide>,
92    /// Exchange-assigned trade identifier, if available.
93    pub trade_id: Option<String>,
94    /// Exchange-side timestamp (ms since Unix epoch), if included in the feed.
95    pub exchange_ts_ms: Option<u64>,
96    /// Local system-clock timestamp when this tick was received.
97    pub received_at_ms: u64,
98}
99
100/// Direction of trade that generated the tick.
101#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
102pub enum TradeSide {
103    /// Buyer was the aggressor.
104    Buy,
105    /// Seller was the aggressor.
106    Sell,
107}
108
109/// Normalizes raw ticks from any supported exchange into [`NormalizedTick`] form.
110///
111/// `TickNormalizer` is stateless and cheap to clone; a single instance can be
112/// shared across threads via `Arc` or constructed per-task.
113pub struct TickNormalizer;
114
115impl TickNormalizer {
116    /// Create a new normalizer. This is a zero-cost constructor.
117    pub fn new() -> Self {
118        Self
119    }
120
121    /// Normalize a raw tick into canonical form.
122    pub fn normalize(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
123        match raw.exchange {
124            Exchange::Binance => self.normalize_binance(raw),
125            Exchange::Coinbase => self.normalize_coinbase(raw),
126            Exchange::Alpaca => self.normalize_alpaca(raw),
127            Exchange::Polygon => self.normalize_polygon(raw),
128        }
129    }
130
131    fn normalize_binance(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
132        let p = &raw.payload;
133        let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
134        let qty = parse_decimal_field(p, "q", &raw.exchange.to_string())?;
135        let side = p.get("m").and_then(|v| v.as_bool()).map(|maker| {
136            if maker { TradeSide::Sell } else { TradeSide::Buy }
137        });
138        let trade_id = p.get("t").and_then(|v| v.as_u64()).map(|id| id.to_string());
139        let exchange_ts = p.get("T").and_then(|v| v.as_u64());
140        Ok(NormalizedTick {
141            exchange: raw.exchange,
142            symbol: raw.symbol,
143            price,
144            quantity: qty,
145            side,
146            trade_id,
147            exchange_ts_ms: exchange_ts,
148            received_at_ms: raw.received_at_ms,
149        })
150    }
151
152    fn normalize_coinbase(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
153        let p = &raw.payload;
154        let price = parse_decimal_field(p, "price", &raw.exchange.to_string())?;
155        let qty = parse_decimal_field(p, "size", &raw.exchange.to_string())?;
156        let side = p.get("side").and_then(|v| v.as_str()).map(|s| {
157            if s == "buy" { TradeSide::Buy } else { TradeSide::Sell }
158        });
159        let trade_id = p.get("trade_id").and_then(|v| v.as_str()).map(str::to_string);
160        Ok(NormalizedTick {
161            exchange: raw.exchange,
162            symbol: raw.symbol,
163            price,
164            quantity: qty,
165            side,
166            trade_id,
167            exchange_ts_ms: None,
168            received_at_ms: raw.received_at_ms,
169        })
170    }
171
172    fn normalize_alpaca(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
173        let p = &raw.payload;
174        let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
175        let qty = parse_decimal_field(p, "s", &raw.exchange.to_string())?;
176        let trade_id = p.get("i").and_then(|v| v.as_u64()).map(|id| id.to_string());
177        Ok(NormalizedTick {
178            exchange: raw.exchange,
179            symbol: raw.symbol,
180            price,
181            quantity: qty,
182            side: None,
183            trade_id,
184            exchange_ts_ms: None,
185            received_at_ms: raw.received_at_ms,
186        })
187    }
188
189    fn normalize_polygon(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
190        let p = &raw.payload;
191        let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
192        let qty = parse_decimal_field(p, "s", &raw.exchange.to_string())?;
193        let trade_id = p.get("i").and_then(|v| v.as_str()).map(str::to_string);
194        let exchange_ts = p.get("t").and_then(|v| v.as_u64());
195        Ok(NormalizedTick {
196            exchange: raw.exchange,
197            symbol: raw.symbol,
198            price,
199            quantity: qty,
200            side: None,
201            trade_id,
202            exchange_ts_ms: exchange_ts,
203            received_at_ms: raw.received_at_ms,
204        })
205    }
206}
207
208impl Default for TickNormalizer {
209    fn default() -> Self {
210        Self::new()
211    }
212}
213
214fn parse_decimal_field(v: &serde_json::Value, field: &str, exchange: &str) -> Result<Decimal, StreamError> {
215    let raw = v.get(field).ok_or_else(|| StreamError::ParseError {
216        exchange: exchange.to_string(),
217        reason: format!("missing field '{}'", field),
218    })?;
219    let s = if let Some(s) = raw.as_str() {
220        s.to_string()
221    } else if let Some(n) = raw.as_f64() {
222        n.to_string()
223    } else {
224        return Err(StreamError::ParseError {
225            exchange: exchange.to_string(),
226            reason: format!("field '{}' is not a string or number", field),
227        });
228    };
229    Decimal::from_str(&s).map_err(|e| StreamError::ParseError {
230        exchange: exchange.to_string(),
231        reason: format!("field '{}' parse error: {}", field, e),
232    })
233}
234
235fn now_ms() -> u64 {
236    std::time::SystemTime::now()
237        .duration_since(std::time::UNIX_EPOCH)
238        .map(|d| d.as_millis() as u64)
239        .unwrap_or(0)
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245    use serde_json::json;
246
247    fn normalizer() -> TickNormalizer { TickNormalizer::new() }
248
249    fn binance_tick(symbol: &str) -> RawTick {
250        RawTick {
251            exchange: Exchange::Binance,
252            symbol: symbol.to_string(),
253            payload: json!({ "p": "50000.12", "q": "0.001", "m": false, "t": 12345, "T": 1700000000000u64 }),
254            received_at_ms: 1700000000001,
255        }
256    }
257
258    fn coinbase_tick(symbol: &str) -> RawTick {
259        RawTick {
260            exchange: Exchange::Coinbase,
261            symbol: symbol.to_string(),
262            payload: json!({ "price": "50001.00", "size": "0.5", "side": "buy", "trade_id": "abc123" }),
263            received_at_ms: 1700000000002,
264        }
265    }
266
267    fn alpaca_tick(symbol: &str) -> RawTick {
268        RawTick {
269            exchange: Exchange::Alpaca,
270            symbol: symbol.to_string(),
271            payload: json!({ "p": "180.50", "s": "10", "i": 99 }),
272            received_at_ms: 1700000000003,
273        }
274    }
275
276    fn polygon_tick(symbol: &str) -> RawTick {
277        RawTick {
278            exchange: Exchange::Polygon,
279            symbol: symbol.to_string(),
280            payload: json!({ "p": "180.51", "s": "5", "i": "XYZ-001", "t": 1700000000004u64 }),
281            received_at_ms: 1700000000005,
282        }
283    }
284
285    #[test]
286    fn test_exchange_from_str_valid() {
287        assert_eq!("binance".parse::<Exchange>().unwrap(), Exchange::Binance);
288        assert_eq!("Coinbase".parse::<Exchange>().unwrap(), Exchange::Coinbase);
289        assert_eq!("ALPACA".parse::<Exchange>().unwrap(), Exchange::Alpaca);
290        assert_eq!("polygon".parse::<Exchange>().unwrap(), Exchange::Polygon);
291    }
292
293    #[test]
294    fn test_exchange_from_str_unknown_returns_error() {
295        let result = "Kraken".parse::<Exchange>();
296        assert!(matches!(result, Err(StreamError::UnknownExchange(_))));
297    }
298
299    #[test]
300    fn test_exchange_display() {
301        assert_eq!(Exchange::Binance.to_string(), "Binance");
302        assert_eq!(Exchange::Coinbase.to_string(), "Coinbase");
303    }
304
305    #[test]
306    fn test_normalize_binance_tick_price_and_qty() {
307        let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
308        assert_eq!(tick.price, Decimal::from_str("50000.12").unwrap());
309        assert_eq!(tick.quantity, Decimal::from_str("0.001").unwrap());
310        assert_eq!(tick.exchange, Exchange::Binance);
311        assert_eq!(tick.symbol, "BTCUSDT");
312    }
313
314    #[test]
315    fn test_normalize_binance_side_maker_false_is_buy() {
316        let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
317        assert_eq!(tick.side, Some(TradeSide::Buy));
318    }
319
320    #[test]
321    fn test_normalize_binance_side_maker_true_is_sell() {
322        let raw = RawTick {
323            exchange: Exchange::Binance,
324            symbol: "BTCUSDT".into(),
325            payload: json!({ "p": "50000", "q": "1", "m": true }),
326            received_at_ms: 0,
327        };
328        let tick = normalizer().normalize(raw).unwrap();
329        assert_eq!(tick.side, Some(TradeSide::Sell));
330    }
331
332    #[test]
333    fn test_normalize_binance_trade_id_and_ts() {
334        let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
335        assert_eq!(tick.trade_id, Some("12345".to_string()));
336        assert_eq!(tick.exchange_ts_ms, Some(1700000000000));
337    }
338
339    #[test]
340    fn test_normalize_coinbase_tick() {
341        let tick = normalizer().normalize(coinbase_tick("BTC-USD")).unwrap();
342        assert_eq!(tick.price, Decimal::from_str("50001.00").unwrap());
343        assert_eq!(tick.quantity, Decimal::from_str("0.5").unwrap());
344        assert_eq!(tick.side, Some(TradeSide::Buy));
345        assert_eq!(tick.trade_id, Some("abc123".to_string()));
346    }
347
348    #[test]
349    fn test_normalize_coinbase_sell_side() {
350        let raw = RawTick {
351            exchange: Exchange::Coinbase,
352            symbol: "BTC-USD".into(),
353            payload: json!({ "price": "50000", "size": "1", "side": "sell" }),
354            received_at_ms: 0,
355        };
356        let tick = normalizer().normalize(raw).unwrap();
357        assert_eq!(tick.side, Some(TradeSide::Sell));
358    }
359
360    #[test]
361    fn test_normalize_alpaca_tick() {
362        let tick = normalizer().normalize(alpaca_tick("AAPL")).unwrap();
363        assert_eq!(tick.price, Decimal::from_str("180.50").unwrap());
364        assert_eq!(tick.quantity, Decimal::from_str("10").unwrap());
365        assert_eq!(tick.trade_id, Some("99".to_string()));
366        assert_eq!(tick.side, None);
367    }
368
369    #[test]
370    fn test_normalize_polygon_tick() {
371        let tick = normalizer().normalize(polygon_tick("AAPL")).unwrap();
372        assert_eq!(tick.price, Decimal::from_str("180.51").unwrap());
373        assert_eq!(tick.exchange_ts_ms, Some(1700000000004));
374        assert_eq!(tick.trade_id, Some("XYZ-001".to_string()));
375    }
376
377    #[test]
378    fn test_normalize_missing_price_field_returns_parse_error() {
379        let raw = RawTick {
380            exchange: Exchange::Binance,
381            symbol: "BTCUSDT".into(),
382            payload: json!({ "q": "1" }),
383            received_at_ms: 0,
384        };
385        let result = normalizer().normalize(raw);
386        assert!(matches!(result, Err(StreamError::ParseError { .. })));
387    }
388
389    #[test]
390    fn test_normalize_invalid_decimal_returns_parse_error() {
391        let raw = RawTick {
392            exchange: Exchange::Coinbase,
393            symbol: "BTC-USD".into(),
394            payload: json!({ "price": "not-a-number", "size": "1" }),
395            received_at_ms: 0,
396        };
397        let result = normalizer().normalize(raw);
398        assert!(matches!(result, Err(StreamError::ParseError { .. })));
399    }
400
401    #[test]
402    fn test_raw_tick_new_sets_received_at() {
403        let raw = RawTick::new(Exchange::Binance, "BTCUSDT", json!({}));
404        assert!(raw.received_at_ms > 0);
405    }
406
407    #[test]
408    fn test_normalize_numeric_price_field() {
409        let raw = RawTick {
410            exchange: Exchange::Binance,
411            symbol: "BTCUSDT".into(),
412            payload: json!({ "p": 50000.0, "q": 1.0 }),
413            received_at_ms: 0,
414        };
415        let tick = normalizer().normalize(raw).unwrap();
416        assert!(tick.price > Decimal::ZERO);
417    }
418}