Skip to main content

marketdata_core/models/
streaming.rs

1//! Streaming message types for WebSocket data
2//!
3//! These types are used to parse WebSocket messages from the streaming API.
4//! The top-level [`StreamMessage`] enum uses serde's internally tagged enum
5//! pattern to distinguish between event types.
6
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9
10use crate::models::common::{PriceLevel, TotalStats, TradeInfo};
11
12// ============================================================================
13// Top-level Message Types
14// ============================================================================
15
16/// Top-level WebSocket message
17///
18/// Uses serde's internally tagged enum to parse based on "event" field.
19///
20/// # Example
21/// ```rust
22/// use marketdata_core::models::streaming::StreamMessage;
23///
24/// let json = r#"{"event": "authenticated"}"#;
25/// let msg: StreamMessage = serde_json::from_str(json).unwrap();
26/// assert!(matches!(msg, StreamMessage::Authenticated));
27/// ```
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(tag = "event", rename_all = "lowercase")]
30pub enum StreamMessage {
31    /// Authentication successful
32    Authenticated,
33
34    /// Subscription confirmed
35    Subscribed {
36        /// Subscription confirmation payload.
37        #[serde(flatten)]
38        data: SubscribedData,
39    },
40
41    /// Snapshot after subscription (channel-specific)
42    Snapshot {
43        /// Subscription ID
44        id: String,
45        /// Channel name
46        channel: String,
47        /// Snapshot payload (channel-specific, parsed later)
48        #[serde(flatten)]
49        payload: SnapshotPayload,
50    },
51
52    /// Real-time data (channel-specific)
53    Data {
54        /// Subscription ID
55        id: String,
56        /// Channel name
57        channel: String,
58        /// Data payload (channel-specific, parsed later)
59        #[serde(flatten)]
60        payload: DataPayload,
61    },
62
63    /// Error event
64    Error {
65        /// Server-supplied error details.
66        #[serde(flatten)]
67        data: ErrorData,
68    },
69
70    /// Pong response (health check)
71    Pong {
72        /// Optional server-reported connection state (e.g. `"alive"`).
73        #[serde(default)]
74        state: Option<String>,
75    },
76}
77
78/// Subscribed event data
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct SubscribedData {
81    /// Subscription ID (used for unsubscribe)
82    pub id: String,
83    /// Channel name
84    #[serde(default)]
85    pub channel: Option<String>,
86    /// Symbol
87    #[serde(default)]
88    pub symbol: Option<String>,
89}
90
91/// Snapshot payload (channel-specific, parsed later)
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct SnapshotPayload {
94    /// Raw data to be parsed based on channel
95    pub data: Value,
96}
97
98/// Data payload (channel-specific, parsed later)
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct DataPayload {
101    /// Raw data to be parsed based on channel
102    pub data: Value,
103}
104
105/// Error event data
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct ErrorData {
108    /// Error code
109    #[serde(default)]
110    pub code: Option<i32>,
111    /// Error message
112    #[serde(default)]
113    pub message: Option<String>,
114}
115
116// ============================================================================
117// Channel-Specific Data Types
118// ============================================================================
119
120/// Trades data (snapshot and real-time share same structure)
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct TradesData {
123    /// Stock symbol
124    pub symbol: String,
125    /// Data type (e.g., "EQUITY", "ODDLOT")
126    #[serde(rename = "type", default)]
127    pub data_type: Option<String>,
128    /// Exchange code
129    #[serde(default)]
130    pub exchange: Option<String>,
131    /// Market
132    #[serde(default)]
133    pub market: Option<String>,
134    /// Trade list
135    #[serde(default)]
136    pub trades: Vec<StreamTrade>,
137    /// Total statistics
138    #[serde(default)]
139    pub total: Option<TotalStats>,
140    /// Unix microseconds
141    #[serde(default)]
142    pub time: Option<i64>,
143    /// Serial number
144    #[serde(default)]
145    pub serial: Option<i64>,
146}
147
148/// Single trade in streaming
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct StreamTrade {
151    /// Trade price
152    pub price: f64,
153    /// Trade size
154    pub size: i64,
155    /// Best bid at trade time
156    #[serde(default)]
157    pub bid: Option<f64>,
158    /// Best ask at trade time
159    #[serde(default)]
160    pub ask: Option<f64>,
161}
162
163/// Candles snapshot (special: entire day of 1-min candles)
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct CandlesSnapshot {
166    /// Stock symbol
167    pub symbol: String,
168    /// Trading date (YYYY-MM-DD)
169    pub date: String,
170    /// Timeframe (always "1" for snapshot)
171    #[serde(default)]
172    pub timeframe: Option<String>,
173    /// Array of 1-minute candles for the day
174    pub data: Vec<CandleHistoryItem>,
175}
176
177/// Single candle in snapshot
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct CandleHistoryItem {
180    /// ISO 8601 timestamp
181    pub date: String,
182    /// Open price
183    pub open: f64,
184    /// High price
185    pub high: f64,
186    /// Low price
187    pub low: f64,
188    /// Close price
189    pub close: f64,
190    /// Volume
191    pub volume: i64,
192    /// Average price
193    #[serde(default)]
194    pub average: Option<f64>,
195}
196
197/// Candles real-time data (single candle update)
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct CandleData {
200    /// Stock symbol
201    pub symbol: String,
202    /// ISO 8601 timestamp
203    pub date: String,
204    /// Open price
205    pub open: f64,
206    /// High price
207    pub high: f64,
208    /// Low price
209    pub low: f64,
210    /// Close price
211    pub close: f64,
212    /// Volume
213    pub volume: i64,
214    /// Average price
215    #[serde(default)]
216    pub average: Option<f64>,
217}
218
219/// Books data (order book depth)
220#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct BooksData {
222    /// Stock symbol
223    pub symbol: String,
224    /// Bid levels
225    #[serde(default)]
226    pub bids: Vec<PriceLevel>,
227    /// Ask levels
228    #[serde(default)]
229    pub asks: Vec<PriceLevel>,
230    /// Unix microseconds
231    #[serde(default)]
232    pub time: Option<i64>,
233    /// Serial number
234    #[serde(default)]
235    pub serial: Option<i64>,
236}
237
238/// Aggregates data (comprehensive quote-like)
239#[derive(Debug, Clone, Serialize, Deserialize)]
240pub struct AggregatesData {
241    /// Stock symbol
242    pub symbol: String,
243    /// Data type
244    #[serde(rename = "type", default)]
245    pub data_type: Option<String>,
246    /// Exchange code
247    #[serde(default)]
248    pub exchange: Option<String>,
249    /// Market
250    #[serde(default)]
251    pub market: Option<String>,
252    /// Trading date
253    #[serde(default)]
254    pub date: Option<String>,
255    // Price fields
256    /// Reference price
257    #[serde(rename = "referencePrice", default)]
258    pub reference_price: Option<f64>,
259    /// Previous close
260    #[serde(rename = "previousClose", default)]
261    pub previous_close: Option<f64>,
262    /// Open price
263    #[serde(rename = "openPrice", default)]
264    pub open_price: Option<f64>,
265    /// High price
266    #[serde(rename = "highPrice", default)]
267    pub high_price: Option<f64>,
268    /// Low price
269    #[serde(rename = "lowPrice", default)]
270    pub low_price: Option<f64>,
271    /// Close price
272    #[serde(rename = "closePrice", default)]
273    pub close_price: Option<f64>,
274    /// Average price
275    #[serde(rename = "avgPrice", default)]
276    pub avg_price: Option<f64>,
277    /// Last trade price
278    #[serde(rename = "lastPrice", default)]
279    pub last_price: Option<f64>,
280    /// Last trade size
281    #[serde(rename = "lastSize", default)]
282    pub last_size: Option<i64>,
283    // Order book
284    /// Bid levels
285    #[serde(default)]
286    pub bids: Vec<PriceLevel>,
287    /// Ask levels
288    #[serde(default)]
289    pub asks: Vec<PriceLevel>,
290    // Statistics
291    /// Total statistics
292    #[serde(default)]
293    pub total: Option<TotalStats>,
294    /// Last trade info
295    #[serde(rename = "lastTrade", default)]
296    pub last_trade: Option<TradeInfo>,
297    // Timestamps
298    /// Unix microseconds
299    #[serde(default)]
300    pub time: Option<i64>,
301    /// Serial number
302    #[serde(default)]
303    pub serial: Option<i64>,
304    /// Last updated timestamp
305    #[serde(rename = "lastUpdated", default)]
306    pub last_updated: Option<i64>,
307}
308
309/// Indices data
310#[derive(Debug, Clone, Serialize, Deserialize)]
311pub struct IndicesData {
312    /// Index symbol
313    pub symbol: String,
314    /// Data type
315    #[serde(rename = "type", default)]
316    pub data_type: Option<String>,
317    /// Exchange code
318    #[serde(default)]
319    pub exchange: Option<String>,
320    /// Index value
321    #[serde(default)]
322    pub index: Option<f64>,
323    /// Unix microseconds
324    #[serde(default)]
325    pub time: Option<i64>,
326    /// Serial number
327    #[serde(default)]
328    pub serial: Option<i64>,
329}
330
331// ============================================================================
332// Tests
333// ============================================================================
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338
339    #[test]
340    fn test_parse_authenticated_event() {
341        let json = r#"{"event": "authenticated"}"#;
342        let msg: StreamMessage = serde_json::from_str(json).unwrap();
343        assert!(matches!(msg, StreamMessage::Authenticated));
344    }
345
346    #[test]
347    fn test_parse_subscribed_event() {
348        let json = r#"{
349            "event": "subscribed",
350            "id": "sub-123",
351            "channel": "trades",
352            "symbol": "2330"
353        }"#;
354        let msg: StreamMessage = serde_json::from_str(json).unwrap();
355        if let StreamMessage::Subscribed { data } = msg {
356            assert_eq!(data.id, "sub-123");
357            assert_eq!(data.channel.as_deref(), Some("trades"));
358            assert_eq!(data.symbol.as_deref(), Some("2330"));
359        } else {
360            panic!("Expected Subscribed event");
361        }
362    }
363
364    #[test]
365    fn test_parse_snapshot_event() {
366        let json = r#"{
367            "event": "snapshot",
368            "id": "sub-123",
369            "channel": "trades",
370            "data": {
371                "symbol": "2330",
372                "trades": [{"price": 583.0, "size": 100}],
373                "time": 1704067200123456
374            }
375        }"#;
376        let msg: StreamMessage = serde_json::from_str(json).unwrap();
377        if let StreamMessage::Snapshot { id, channel, payload } = msg {
378            assert_eq!(id, "sub-123");
379            assert_eq!(channel, "trades");
380            assert!(payload.data.is_object());
381        } else {
382            panic!("Expected Snapshot event");
383        }
384    }
385
386    #[test]
387    fn test_parse_data_event() {
388        let json = r#"{
389            "event": "data",
390            "id": "sub-123",
391            "channel": "candles",
392            "data": {
393                "symbol": "2330",
394                "date": "2026-01-30T09:00:00.000+08:00",
395                "open": 580.0,
396                "high": 585.0,
397                "low": 578.0,
398                "close": 583.0,
399                "volume": 12345
400            }
401        }"#;
402        let msg: StreamMessage = serde_json::from_str(json).unwrap();
403        if let StreamMessage::Data { id, channel, payload } = msg {
404            assert_eq!(id, "sub-123");
405            assert_eq!(channel, "candles");
406            assert!(payload.data.is_object());
407        } else {
408            panic!("Expected Data event");
409        }
410    }
411
412    #[test]
413    fn test_parse_error_event() {
414        let json = r#"{
415            "event": "error",
416            "code": 4001,
417            "message": "Invalid symbol"
418        }"#;
419        let msg: StreamMessage = serde_json::from_str(json).unwrap();
420        if let StreamMessage::Error { data } = msg {
421            assert_eq!(data.code, Some(4001));
422            assert_eq!(data.message.as_deref(), Some("Invalid symbol"));
423        } else {
424            panic!("Expected Error event");
425        }
426    }
427
428    #[test]
429    fn test_parse_pong_event() {
430        let json = r#"{"event": "pong", "state": "ok"}"#;
431        let msg: StreamMessage = serde_json::from_str(json).unwrap();
432        if let StreamMessage::Pong { state } = msg {
433            assert_eq!(state.as_deref(), Some("ok"));
434        } else {
435            panic!("Expected Pong event");
436        }
437    }
438
439    #[test]
440    fn test_parse_pong_without_state() {
441        let json = r#"{"event": "pong"}"#;
442        let msg: StreamMessage = serde_json::from_str(json).unwrap();
443        if let StreamMessage::Pong { state } = msg {
444            assert!(state.is_none());
445        } else {
446            panic!("Expected Pong event");
447        }
448    }
449
450    #[test]
451    fn test_parse_candles_snapshot() {
452        let json = r#"{
453            "symbol": "2330",
454            "date": "2026-01-30",
455            "timeframe": "1",
456            "data": [
457                {"date": "2026-01-30T09:00:00.000+08:00", "open": 580.0, "high": 581.0, "low": 579.0, "close": 580.5, "volume": 1000},
458                {"date": "2026-01-30T09:01:00.000+08:00", "open": 580.5, "high": 582.0, "low": 580.0, "close": 581.5, "volume": 1500}
459            ]
460        }"#;
461        let snapshot: CandlesSnapshot = serde_json::from_str(json).unwrap();
462        assert_eq!(snapshot.symbol, "2330");
463        assert_eq!(snapshot.date, "2026-01-30");
464        assert_eq!(snapshot.timeframe.as_deref(), Some("1"));
465        assert_eq!(snapshot.data.len(), 2);
466        assert_eq!(snapshot.data[0].open, 580.0);
467        assert_eq!(snapshot.data[1].volume, 1500);
468    }
469
470    #[test]
471    fn test_parse_trades_data() {
472        let json = r#"{
473            "symbol": "2330",
474            "type": "EQUITY",
475            "trades": [{"price": 583.0, "size": 100, "bid": 582.0, "ask": 583.0}],
476            "total": {"tradeVolume": 12345678, "tradeValue": 7201234567.0}
477        }"#;
478        let trades: TradesData = serde_json::from_str(json).unwrap();
479        assert_eq!(trades.symbol, "2330");
480        assert_eq!(trades.data_type.as_deref(), Some("EQUITY"));
481        assert_eq!(trades.trades.len(), 1);
482        assert_eq!(trades.trades[0].price, 583.0);
483        assert_eq!(trades.trades[0].bid, Some(582.0));
484        assert!(trades.total.is_some());
485    }
486
487    #[test]
488    fn test_parse_candle_data() {
489        let json = r#"{
490            "symbol": "2330",
491            "date": "2026-01-30T09:15:00.000+08:00",
492            "open": 580.0,
493            "high": 585.0,
494            "low": 578.0,
495            "close": 583.0,
496            "volume": 12345,
497            "average": 581.5
498        }"#;
499        let candle: CandleData = serde_json::from_str(json).unwrap();
500        assert_eq!(candle.symbol, "2330");
501        assert_eq!(candle.open, 580.0);
502        assert_eq!(candle.close, 583.0);
503        assert_eq!(candle.average, Some(581.5));
504    }
505
506    #[test]
507    fn test_parse_books_data() {
508        let json = r#"{
509            "symbol": "2330",
510            "bids": [{"price": 582.0, "size": 100}],
511            "asks": [{"price": 583.0, "size": 50}],
512            "time": 1704067200123456
513        }"#;
514        let books: BooksData = serde_json::from_str(json).unwrap();
515        assert_eq!(books.symbol, "2330");
516        assert_eq!(books.bids.len(), 1);
517        assert_eq!(books.asks.len(), 1);
518        assert_eq!(books.bids[0].price, 582.0);
519        assert_eq!(books.asks[0].size, 50);
520    }
521
522    #[test]
523    fn test_parse_indices_data() {
524        let json = r#"{
525            "symbol": "IX0001",
526            "type": "INDEX",
527            "index": 17500.5,
528            "time": 1704067200123456
529        }"#;
530        let indices: IndicesData = serde_json::from_str(json).unwrap();
531        assert_eq!(indices.symbol, "IX0001");
532        assert_eq!(indices.data_type.as_deref(), Some("INDEX"));
533        assert_eq!(indices.index, Some(17500.5));
534    }
535
536    #[test]
537    fn test_parse_aggregates_data() {
538        let json = r#"{
539            "symbol": "2330",
540            "type": "EQUITY",
541            "openPrice": 580.0,
542            "highPrice": 590.0,
543            "lowPrice": 578.0,
544            "closePrice": 585.0,
545            "lastPrice": 585.0,
546            "lastSize": 100,
547            "bids": [{"price": 584.0, "size": 500}],
548            "asks": [{"price": 585.0, "size": 300}]
549        }"#;
550        let agg: AggregatesData = serde_json::from_str(json).unwrap();
551        assert_eq!(agg.symbol, "2330");
552        assert_eq!(agg.open_price, Some(580.0));
553        assert_eq!(agg.last_price, Some(585.0));
554        assert_eq!(agg.bids.len(), 1);
555        assert_eq!(agg.asks.len(), 1);
556    }
557}