Skip to main content

fin_stream/tick/
mod.rs

1//! Tick normalization — raw exchange messages → canonical NormalizedTick.
2//!
3//! ## Responsibility
4//! Convert heterogeneous exchange tick formats (Binance, Coinbase, Alpaca,
5//! Polygon) into a single canonical representation. This stage must add
6//! <1μs overhead per tick on the hot path.
7//!
8//! ## Guarantees
9//! - Deterministic: same raw bytes always produce the same NormalizedTick
10//! - Non-allocating hot path: NormalizedTick is stack-allocated
11//! - Thread-safe: TickNormalizer is Send + Sync
12
13use crate::error::StreamError;
14use rust_decimal::Decimal;
15use std::str::FromStr;
16
17/// Supported exchanges.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
19pub enum Exchange {
20    /// Binance spot/futures WebSocket feed.
21    Binance,
22    /// Coinbase Advanced Trade WebSocket feed.
23    Coinbase,
24    /// Alpaca Markets data stream.
25    Alpaca,
26    /// Polygon.io WebSocket feed.
27    Polygon,
28}
29
30impl std::fmt::Display for Exchange {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        match self {
33            Exchange::Binance => write!(f, "Binance"),
34            Exchange::Coinbase => write!(f, "Coinbase"),
35            Exchange::Alpaca => write!(f, "Alpaca"),
36            Exchange::Polygon => write!(f, "Polygon"),
37        }
38    }
39}
40
41impl FromStr for Exchange {
42    type Err = StreamError;
43    fn from_str(s: &str) -> Result<Self, Self::Err> {
44        match s.to_lowercase().as_str() {
45            "binance" => Ok(Exchange::Binance),
46            "coinbase" => Ok(Exchange::Coinbase),
47            "alpaca" => Ok(Exchange::Alpaca),
48            "polygon" => Ok(Exchange::Polygon),
49            _ => Err(StreamError::UnknownExchange(s.to_string())),
50        }
51    }
52}
53
54/// Raw tick — unprocessed bytes from an exchange WebSocket.
55#[derive(Debug, Clone)]
56pub struct RawTick {
57    /// Source exchange.
58    pub exchange: Exchange,
59    /// Instrument symbol as reported by the exchange.
60    pub symbol: String,
61    /// Raw JSON payload from the WebSocket frame.
62    pub payload: serde_json::Value,
63    /// System-clock timestamp (ms since Unix epoch) when the tick was received.
64    pub received_at_ms: u64,
65}
66
67impl RawTick {
68    /// Construct a new [`RawTick`], stamping `received_at_ms` from the system clock.
69    pub fn new(exchange: Exchange, symbol: impl Into<String>, payload: serde_json::Value) -> Self {
70        Self {
71            exchange,
72            symbol: symbol.into(),
73            payload,
74            received_at_ms: now_ms(),
75        }
76    }
77}
78
79/// Canonical normalized tick — exchange-agnostic.
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
81pub struct NormalizedTick {
82    /// Source exchange.
83    pub exchange: Exchange,
84    /// Instrument symbol in the canonical form used by this crate.
85    pub symbol: String,
86    /// Trade price (exact decimal, never `f64`).
87    pub price: Decimal,
88    /// Trade quantity (exact decimal).
89    pub quantity: Decimal,
90    /// Direction of the aggressing order, if available from the exchange.
91    pub side: Option<TradeSide>,
92    /// Exchange-assigned trade identifier, if available.
93    pub trade_id: Option<String>,
94    /// Exchange-side timestamp (ms since Unix epoch), if included in the feed.
95    pub exchange_ts_ms: Option<u64>,
96    /// Local system-clock timestamp when this tick was received.
97    pub received_at_ms: u64,
98}
99
100/// Direction of trade that generated the tick.
101#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
102pub enum TradeSide {
103    /// Buyer was the aggressor.
104    Buy,
105    /// Seller was the aggressor.
106    Sell,
107}
108
109/// Normalizes raw ticks from any supported exchange into [`NormalizedTick`] form.
110///
111/// `TickNormalizer` is stateless and cheap to clone; a single instance can be
112/// shared across threads via `Arc` or constructed per-task.
113pub struct TickNormalizer;
114
115impl TickNormalizer {
116    /// Create a new normalizer. This is a zero-cost constructor.
117    pub fn new() -> Self {
118        Self
119    }
120
121    /// Normalize a raw tick into canonical form.
122    pub fn normalize(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
123        match raw.exchange {
124            Exchange::Binance => self.normalize_binance(raw),
125            Exchange::Coinbase => self.normalize_coinbase(raw),
126            Exchange::Alpaca => self.normalize_alpaca(raw),
127            Exchange::Polygon => self.normalize_polygon(raw),
128        }
129    }
130
131    fn normalize_binance(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
132        let p = &raw.payload;
133        let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
134        let qty = parse_decimal_field(p, "q", &raw.exchange.to_string())?;
135        let side = p.get("m").and_then(|v| v.as_bool()).map(|maker| {
136            if maker {
137                TradeSide::Sell
138            } else {
139                TradeSide::Buy
140            }
141        });
142        let trade_id = p.get("t").and_then(|v| v.as_u64()).map(|id| id.to_string());
143        let exchange_ts = p.get("T").and_then(|v| v.as_u64());
144        Ok(NormalizedTick {
145            exchange: raw.exchange,
146            symbol: raw.symbol,
147            price,
148            quantity: qty,
149            side,
150            trade_id,
151            exchange_ts_ms: exchange_ts,
152            received_at_ms: raw.received_at_ms,
153        })
154    }
155
156    fn normalize_coinbase(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
157        let p = &raw.payload;
158        let price = parse_decimal_field(p, "price", &raw.exchange.to_string())?;
159        let qty = parse_decimal_field(p, "size", &raw.exchange.to_string())?;
160        let side = p.get("side").and_then(|v| v.as_str()).map(|s| {
161            if s == "buy" {
162                TradeSide::Buy
163            } else {
164                TradeSide::Sell
165            }
166        });
167        let trade_id = p
168            .get("trade_id")
169            .and_then(|v| v.as_str())
170            .map(str::to_string);
171        Ok(NormalizedTick {
172            exchange: raw.exchange,
173            symbol: raw.symbol,
174            price,
175            quantity: qty,
176            side,
177            trade_id,
178            exchange_ts_ms: None,
179            received_at_ms: raw.received_at_ms,
180        })
181    }
182
183    fn normalize_alpaca(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
184        let p = &raw.payload;
185        let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
186        let qty = parse_decimal_field(p, "s", &raw.exchange.to_string())?;
187        let trade_id = p.get("i").and_then(|v| v.as_u64()).map(|id| id.to_string());
188        Ok(NormalizedTick {
189            exchange: raw.exchange,
190            symbol: raw.symbol,
191            price,
192            quantity: qty,
193            side: None,
194            trade_id,
195            exchange_ts_ms: None,
196            received_at_ms: raw.received_at_ms,
197        })
198    }
199
200    fn normalize_polygon(&self, raw: RawTick) -> Result<NormalizedTick, StreamError> {
201        let p = &raw.payload;
202        let price = parse_decimal_field(p, "p", &raw.exchange.to_string())?;
203        let qty = parse_decimal_field(p, "s", &raw.exchange.to_string())?;
204        let trade_id = p.get("i").and_then(|v| v.as_str()).map(str::to_string);
205        let exchange_ts = p.get("t").and_then(|v| v.as_u64());
206        Ok(NormalizedTick {
207            exchange: raw.exchange,
208            symbol: raw.symbol,
209            price,
210            quantity: qty,
211            side: None,
212            trade_id,
213            exchange_ts_ms: exchange_ts,
214            received_at_ms: raw.received_at_ms,
215        })
216    }
217}
218
219impl Default for TickNormalizer {
220    fn default() -> Self {
221        Self::new()
222    }
223}
224
225fn parse_decimal_field(
226    v: &serde_json::Value,
227    field: &str,
228    exchange: &str,
229) -> Result<Decimal, StreamError> {
230    let raw = v.get(field).ok_or_else(|| StreamError::ParseError {
231        exchange: exchange.to_string(),
232        reason: format!("missing field '{}'", field),
233    })?;
234    let s = if let Some(s) = raw.as_str() {
235        s.to_string()
236    } else if let Some(n) = raw.as_f64() {
237        n.to_string()
238    } else {
239        return Err(StreamError::ParseError {
240            exchange: exchange.to_string(),
241            reason: format!("field '{}' is not a string or number", field),
242        });
243    };
244    Decimal::from_str(&s).map_err(|e| StreamError::ParseError {
245        exchange: exchange.to_string(),
246        reason: format!("field '{}' parse error: {}", field, e),
247    })
248}
249
250fn now_ms() -> u64 {
251    std::time::SystemTime::now()
252        .duration_since(std::time::UNIX_EPOCH)
253        .map(|d| d.as_millis() as u64)
254        .unwrap_or(0)
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use serde_json::json;
261
262    fn normalizer() -> TickNormalizer {
263        TickNormalizer::new()
264    }
265
266    fn binance_tick(symbol: &str) -> RawTick {
267        RawTick {
268            exchange: Exchange::Binance,
269            symbol: symbol.to_string(),
270            payload: json!({ "p": "50000.12", "q": "0.001", "m": false, "t": 12345, "T": 1700000000000u64 }),
271            received_at_ms: 1700000000001,
272        }
273    }
274
275    fn coinbase_tick(symbol: &str) -> RawTick {
276        RawTick {
277            exchange: Exchange::Coinbase,
278            symbol: symbol.to_string(),
279            payload: json!({ "price": "50001.00", "size": "0.5", "side": "buy", "trade_id": "abc123" }),
280            received_at_ms: 1700000000002,
281        }
282    }
283
284    fn alpaca_tick(symbol: &str) -> RawTick {
285        RawTick {
286            exchange: Exchange::Alpaca,
287            symbol: symbol.to_string(),
288            payload: json!({ "p": "180.50", "s": "10", "i": 99 }),
289            received_at_ms: 1700000000003,
290        }
291    }
292
293    fn polygon_tick(symbol: &str) -> RawTick {
294        RawTick {
295            exchange: Exchange::Polygon,
296            symbol: symbol.to_string(),
297            payload: json!({ "p": "180.51", "s": "5", "i": "XYZ-001", "t": 1700000000004u64 }),
298            received_at_ms: 1700000000005,
299        }
300    }
301
302    #[test]
303    fn test_exchange_from_str_valid() {
304        assert_eq!("binance".parse::<Exchange>().unwrap(), Exchange::Binance);
305        assert_eq!("Coinbase".parse::<Exchange>().unwrap(), Exchange::Coinbase);
306        assert_eq!("ALPACA".parse::<Exchange>().unwrap(), Exchange::Alpaca);
307        assert_eq!("polygon".parse::<Exchange>().unwrap(), Exchange::Polygon);
308    }
309
310    #[test]
311    fn test_exchange_from_str_unknown_returns_error() {
312        let result = "Kraken".parse::<Exchange>();
313        assert!(matches!(result, Err(StreamError::UnknownExchange(_))));
314    }
315
316    #[test]
317    fn test_exchange_display() {
318        assert_eq!(Exchange::Binance.to_string(), "Binance");
319        assert_eq!(Exchange::Coinbase.to_string(), "Coinbase");
320    }
321
322    #[test]
323    fn test_normalize_binance_tick_price_and_qty() {
324        let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
325        assert_eq!(tick.price, Decimal::from_str("50000.12").unwrap());
326        assert_eq!(tick.quantity, Decimal::from_str("0.001").unwrap());
327        assert_eq!(tick.exchange, Exchange::Binance);
328        assert_eq!(tick.symbol, "BTCUSDT");
329    }
330
331    #[test]
332    fn test_normalize_binance_side_maker_false_is_buy() {
333        let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
334        assert_eq!(tick.side, Some(TradeSide::Buy));
335    }
336
337    #[test]
338    fn test_normalize_binance_side_maker_true_is_sell() {
339        let raw = RawTick {
340            exchange: Exchange::Binance,
341            symbol: "BTCUSDT".into(),
342            payload: json!({ "p": "50000", "q": "1", "m": true }),
343            received_at_ms: 0,
344        };
345        let tick = normalizer().normalize(raw).unwrap();
346        assert_eq!(tick.side, Some(TradeSide::Sell));
347    }
348
349    #[test]
350    fn test_normalize_binance_trade_id_and_ts() {
351        let tick = normalizer().normalize(binance_tick("BTCUSDT")).unwrap();
352        assert_eq!(tick.trade_id, Some("12345".to_string()));
353        assert_eq!(tick.exchange_ts_ms, Some(1700000000000));
354    }
355
356    #[test]
357    fn test_normalize_coinbase_tick() {
358        let tick = normalizer().normalize(coinbase_tick("BTC-USD")).unwrap();
359        assert_eq!(tick.price, Decimal::from_str("50001.00").unwrap());
360        assert_eq!(tick.quantity, Decimal::from_str("0.5").unwrap());
361        assert_eq!(tick.side, Some(TradeSide::Buy));
362        assert_eq!(tick.trade_id, Some("abc123".to_string()));
363    }
364
365    #[test]
366    fn test_normalize_coinbase_sell_side() {
367        let raw = RawTick {
368            exchange: Exchange::Coinbase,
369            symbol: "BTC-USD".into(),
370            payload: json!({ "price": "50000", "size": "1", "side": "sell" }),
371            received_at_ms: 0,
372        };
373        let tick = normalizer().normalize(raw).unwrap();
374        assert_eq!(tick.side, Some(TradeSide::Sell));
375    }
376
377    #[test]
378    fn test_normalize_alpaca_tick() {
379        let tick = normalizer().normalize(alpaca_tick("AAPL")).unwrap();
380        assert_eq!(tick.price, Decimal::from_str("180.50").unwrap());
381        assert_eq!(tick.quantity, Decimal::from_str("10").unwrap());
382        assert_eq!(tick.trade_id, Some("99".to_string()));
383        assert_eq!(tick.side, None);
384    }
385
386    #[test]
387    fn test_normalize_polygon_tick() {
388        let tick = normalizer().normalize(polygon_tick("AAPL")).unwrap();
389        assert_eq!(tick.price, Decimal::from_str("180.51").unwrap());
390        assert_eq!(tick.exchange_ts_ms, Some(1700000000004));
391        assert_eq!(tick.trade_id, Some("XYZ-001".to_string()));
392    }
393
394    #[test]
395    fn test_normalize_missing_price_field_returns_parse_error() {
396        let raw = RawTick {
397            exchange: Exchange::Binance,
398            symbol: "BTCUSDT".into(),
399            payload: json!({ "q": "1" }),
400            received_at_ms: 0,
401        };
402        let result = normalizer().normalize(raw);
403        assert!(matches!(result, Err(StreamError::ParseError { .. })));
404    }
405
406    #[test]
407    fn test_normalize_invalid_decimal_returns_parse_error() {
408        let raw = RawTick {
409            exchange: Exchange::Coinbase,
410            symbol: "BTC-USD".into(),
411            payload: json!({ "price": "not-a-number", "size": "1" }),
412            received_at_ms: 0,
413        };
414        let result = normalizer().normalize(raw);
415        assert!(matches!(result, Err(StreamError::ParseError { .. })));
416    }
417
418    #[test]
419    fn test_raw_tick_new_sets_received_at() {
420        let raw = RawTick::new(Exchange::Binance, "BTCUSDT", json!({}));
421        assert!(raw.received_at_ms > 0);
422    }
423
424    #[test]
425    fn test_normalize_numeric_price_field() {
426        let raw = RawTick {
427            exchange: Exchange::Binance,
428            symbol: "BTCUSDT".into(),
429            payload: json!({ "p": 50000.0, "q": 1.0 }),
430            received_at_ms: 0,
431        };
432        let tick = normalizer().normalize(raw).unwrap();
433        assert!(tick.price > Decimal::ZERO);
434    }
435}