Skip to main content

polyfill_rs/
stream.rs

1//! Async streaming functionality for Polymarket client
2//!
3//! This module provides high-performance streaming capabilities for
4//! real-time market data and order updates.
5
6use crate::errors::{PolyfillError, Result};
7use crate::types::*;
8use crate::ws_hot_path::{WsBookApplyStats, WsBookUpdateProcessor};
9use chrono::Utc;
10use futures::{SinkExt, Stream, StreamExt};
11use serde_json::Value;
12use std::collections::VecDeque;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15use tokio::sync::mpsc;
16use tracing::{debug, error, info, warn};
17
18/// Trait for market data streams
19pub trait MarketStream: Stream<Item = Result<StreamMessage>> + Send + Sync {
20    /// Subscribe to market data for specific tokens
21    fn subscribe(&mut self, subscription: Subscription) -> Result<()>;
22
23    /// Unsubscribe from market data
24    fn unsubscribe(&mut self, token_ids: &[String]) -> Result<()>;
25
26    /// Check if the stream is connected
27    fn is_connected(&self) -> bool;
28
29    /// Get connection statistics
30    fn get_stats(&self) -> StreamStats;
31}
32
33/// WebSocket-based market stream implementation
34#[derive(Debug)]
35#[allow(dead_code)]
36pub struct WebSocketStream {
37    /// WebSocket connection
38    connection: Option<
39        tokio_tungstenite::WebSocketStream<
40            tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
41        >,
42    >,
43    /// URL for the WebSocket connection
44    url: String,
45    /// Authentication credentials
46    auth: Option<WssAuth>,
47    /// Current subscriptions
48    subscriptions: Vec<WssSubscription>,
49    /// Parsed messages awaiting delivery to the caller.
50    ///
51    /// This replaces an internal unbounded channel to avoid per-message
52    /// allocations in the buffering layer and to enforce a bounded backlog.
53    pending: VecDeque<StreamMessage>,
54    pending_capacity: usize,
55    /// Connection statistics
56    stats: StreamStats,
57    /// Reconnection configuration
58    reconnect_config: ReconnectConfig,
59}
60
61/// Stream statistics
62#[derive(Debug, Clone)]
63pub struct StreamStats {
64    pub messages_received: u64,
65    pub messages_sent: u64,
66    pub errors: u64,
67    pub dropped_messages: u64,
68    pub last_message_time: Option<chrono::DateTime<Utc>>,
69    pub connection_uptime: std::time::Duration,
70    pub reconnect_count: u32,
71}
72
73/// Reconnection configuration
74#[derive(Debug, Clone)]
75pub struct ReconnectConfig {
76    pub max_retries: u32,
77    pub base_delay: std::time::Duration,
78    pub max_delay: std::time::Duration,
79    pub backoff_multiplier: f64,
80}
81
82impl Default for ReconnectConfig {
83    fn default() -> Self {
84        Self {
85            max_retries: 5,
86            base_delay: std::time::Duration::from_secs(1),
87            max_delay: std::time::Duration::from_secs(60),
88            backoff_multiplier: 2.0,
89        }
90    }
91}
92
93impl WebSocketStream {
94    /// Create a new WebSocket stream
95    pub fn new(url: &str) -> Self {
96        let pending_capacity = 1024;
97
98        Self {
99            connection: None,
100            url: url.to_string(),
101            auth: None,
102            subscriptions: Vec::new(),
103            pending: VecDeque::with_capacity(pending_capacity),
104            pending_capacity,
105            stats: StreamStats {
106                messages_received: 0,
107                messages_sent: 0,
108                errors: 0,
109                dropped_messages: 0,
110                last_message_time: None,
111                connection_uptime: std::time::Duration::ZERO,
112                reconnect_count: 0,
113            },
114            reconnect_config: ReconnectConfig::default(),
115        }
116    }
117
118    fn enqueue(&mut self, message: StreamMessage) {
119        if self.pending.len() >= self.pending_capacity {
120            let _ = self.pending.pop_front();
121            self.stats.dropped_messages += 1;
122        }
123        self.pending.push_back(message);
124    }
125
126    /// Set authentication credentials
127    pub fn with_auth(mut self, auth: WssAuth) -> Self {
128        self.auth = Some(auth);
129        self
130    }
131
132    /// Connect to the WebSocket
133    async fn connect(&mut self) -> Result<()> {
134        let (ws_stream, _) = tokio_tungstenite::connect_async(&self.url)
135            .await
136            .map_err(|e| {
137                PolyfillError::stream(
138                    format!("WebSocket connection failed: {}", e),
139                    crate::errors::StreamErrorKind::ConnectionFailed,
140                )
141            })?;
142
143        self.connection = Some(ws_stream);
144        info!("Connected to WebSocket stream at {}", self.url);
145        Ok(())
146    }
147
148    /// Send a message to the WebSocket
149    async fn send_message(&mut self, message: Value) -> Result<()> {
150        if let Some(connection) = &mut self.connection {
151            let text = serde_json::to_string(&message).map_err(|e| {
152                PolyfillError::parse(format!("Failed to serialize message: {}", e), None)
153            })?;
154
155            let ws_message = tokio_tungstenite::tungstenite::Message::Text(text);
156            connection.send(ws_message).await.map_err(|e| {
157                PolyfillError::stream(
158                    format!("Failed to send message: {}", e),
159                    crate::errors::StreamErrorKind::MessageCorrupted,
160                )
161            })?;
162
163            self.stats.messages_sent += 1;
164        }
165
166        Ok(())
167    }
168
169    /// Subscribe to market data using official Polymarket WebSocket API
170    pub async fn subscribe_async(&mut self, subscription: WssSubscription) -> Result<()> {
171        // Ensure connection
172        if self.connection.is_none() {
173            self.connect().await?;
174        }
175
176        // Send subscription message in the format expected by Polymarket
177        // The subscription struct will serialize correctly with proper field names
178        let message = serde_json::to_value(&subscription).map_err(|e| {
179            PolyfillError::parse(format!("Failed to serialize subscription: {}", e), None)
180        })?;
181
182        self.send_message(message).await?;
183        self.subscriptions.push(subscription.clone());
184
185        info!("Subscribed to {} channel", subscription.channel_type);
186        Ok(())
187    }
188
189    /// Subscribe to user channel (orders and trades)
190    pub async fn subscribe_user_channel(&mut self, markets: Vec<String>) -> Result<()> {
191        let auth = self
192            .auth
193            .as_ref()
194            .ok_or_else(|| PolyfillError::auth("No authentication provided for WebSocket"))?
195            .clone();
196
197        let subscription = WssSubscription {
198            channel_type: "user".to_string(),
199            operation: Some("subscribe".to_string()),
200            markets,
201            asset_ids: Vec::new(),
202            initial_dump: Some(true),
203            custom_feature_enabled: None,
204            auth: Some(auth),
205        };
206
207        self.subscribe_async(subscription).await
208    }
209
210    /// Subscribe to market channel (order book and trades)
211    /// Market subscriptions do not require authentication
212    pub async fn subscribe_market_channel(&mut self, asset_ids: Vec<String>) -> Result<()> {
213        let subscription = WssSubscription {
214            channel_type: "market".to_string(),
215            operation: Some("subscribe".to_string()),
216            markets: Vec::new(),
217            asset_ids,
218            initial_dump: Some(true),
219            custom_feature_enabled: None,
220            auth: None,
221        };
222
223        self.subscribe_async(subscription).await
224    }
225
226    /// Subscribe to market channel with custom features enabled
227    /// Custom features include: best_bid_ask, new_market, market_resolved events
228    pub async fn subscribe_market_channel_with_features(
229        &mut self,
230        asset_ids: Vec<String>,
231    ) -> Result<()> {
232        let subscription = WssSubscription {
233            channel_type: "market".to_string(),
234            operation: Some("subscribe".to_string()),
235            markets: Vec::new(),
236            asset_ids,
237            initial_dump: Some(true),
238            custom_feature_enabled: Some(true),
239            auth: None,
240        };
241
242        self.subscribe_async(subscription).await
243    }
244
245    /// Unsubscribe from market channel
246    pub async fn unsubscribe_market_channel(&mut self, asset_ids: Vec<String>) -> Result<()> {
247        let subscription = WssSubscription {
248            channel_type: "market".to_string(),
249            operation: Some("unsubscribe".to_string()),
250            markets: Vec::new(),
251            asset_ids,
252            initial_dump: None,
253            custom_feature_enabled: None,
254            auth: None,
255        };
256
257        self.subscribe_async(subscription).await
258    }
259
260    /// Unsubscribe from user channel
261    pub async fn unsubscribe_user_channel(&mut self, markets: Vec<String>) -> Result<()> {
262        let auth = self
263            .auth
264            .as_ref()
265            .ok_or_else(|| PolyfillError::auth("No authentication provided for WebSocket"))?
266            .clone();
267
268        let subscription = WssSubscription {
269            channel_type: "user".to_string(),
270            operation: Some("unsubscribe".to_string()),
271            markets,
272            asset_ids: Vec::new(),
273            initial_dump: None,
274            custom_feature_enabled: None,
275            auth: Some(auth),
276        };
277
278        self.subscribe_async(subscription).await
279    }
280
281    /// Handle incoming WebSocket messages
282    #[allow(dead_code)]
283    async fn handle_message(
284        &mut self,
285        message: tokio_tungstenite::tungstenite::Message,
286    ) -> Result<()> {
287        match message {
288            tokio_tungstenite::tungstenite::Message::Text(text) => {
289                debug!("Received WebSocket message: {}", text);
290
291                // Parse the message according to Polymarket's `event_type` format
292                let stream_messages = crate::decode::parse_stream_messages(&text)?;
293                for stream_message in stream_messages {
294                    self.enqueue(stream_message);
295                }
296
297                self.stats.messages_received += 1;
298                self.stats.last_message_time = Some(Utc::now());
299            },
300            tokio_tungstenite::tungstenite::Message::Close(_) => {
301                info!("WebSocket connection closed by server");
302                self.connection = None;
303            },
304            tokio_tungstenite::tungstenite::Message::Ping(data) => {
305                // Respond with pong
306                if let Some(connection) = &mut self.connection {
307                    let pong = tokio_tungstenite::tungstenite::Message::Pong(data);
308                    if let Err(e) = connection.send(pong).await {
309                        error!("Failed to send pong: {}", e);
310                    }
311                }
312            },
313            tokio_tungstenite::tungstenite::Message::Pong(_) => {
314                // Handle pong if needed
315                debug!("Received pong");
316            },
317            tokio_tungstenite::tungstenite::Message::Binary(_) => {
318                warn!("Received binary message (not supported)");
319            },
320            tokio_tungstenite::tungstenite::Message::Frame(_) => {
321                warn!("Received raw frame (not supported)");
322            },
323        }
324
325        Ok(())
326    }
327
328    /// Parse Polymarket WebSocket message(s) in `event_type` format.
329    #[allow(dead_code)]
330    fn parse_polymarket_messages(&self, text: &str) -> Result<Vec<StreamMessage>> {
331        crate::decode::parse_stream_messages(text)
332    }
333
334    /// Reconnect with exponential backoff
335    #[allow(dead_code)]
336    async fn reconnect(&mut self) -> Result<()> {
337        let mut delay = self.reconnect_config.base_delay;
338        let mut retries = 0;
339
340        while retries < self.reconnect_config.max_retries {
341            warn!("Attempting to reconnect (attempt {})", retries + 1);
342
343            match self.connect().await {
344                Ok(()) => {
345                    info!("Successfully reconnected");
346                    self.stats.reconnect_count += 1;
347
348                    // Resubscribe to all previous subscriptions
349                    let subscriptions = self.subscriptions.clone();
350                    for subscription in subscriptions {
351                        self.send_message(serde_json::to_value(subscription)?)
352                            .await?;
353                    }
354
355                    return Ok(());
356                },
357                Err(e) => {
358                    error!("Reconnection attempt {} failed: {}", retries + 1, e);
359                    retries += 1;
360
361                    if retries < self.reconnect_config.max_retries {
362                        tokio::time::sleep(delay).await;
363                        delay = std::cmp::min(
364                            delay.mul_f64(self.reconnect_config.backoff_multiplier),
365                            self.reconnect_config.max_delay,
366                        );
367                    }
368                },
369            }
370        }
371
372        Err(PolyfillError::stream(
373            format!(
374                "Failed to reconnect after {} attempts",
375                self.reconnect_config.max_retries
376            ),
377            crate::errors::StreamErrorKind::ConnectionFailed,
378        ))
379    }
380}
381
382/// WebSocket stream wrapper that applies `book` updates directly into an [`crate::book::OrderBookManager`].
383///
384/// This bypasses `StreamMessage` decoding (serde/DOM parsing) for the `book` hot path by using
385/// [`WsBookUpdateProcessor`]. Non-`book` WS payloads are ignored.
386///
387/// Note: the underlying WS transport may still allocate when producing `Message::Text(String)`.
388pub struct WebSocketBookApplier<'a> {
389    stream: WebSocketStream,
390    books: &'a crate::book::OrderBookManager,
391    processor: WsBookUpdateProcessor,
392}
393
394impl WebSocketStream {
395    /// Convert this stream into a book-applier stream.
396    ///
397    /// The caller is expected to "warm up" the [`crate::book::OrderBookManager`] by creating books for all
398    /// subscribed asset IDs ahead of time. Missing books are treated as an error.
399    pub fn into_book_applier<'a>(
400        mut self,
401        books: &'a crate::book::OrderBookManager,
402        processor: WsBookUpdateProcessor,
403    ) -> WebSocketBookApplier<'a> {
404        // Drop any pre-parsed messages to avoid mixing the two streaming modes.
405        self.pending.clear();
406        WebSocketBookApplier {
407            stream: self,
408            books,
409            processor,
410        }
411    }
412}
413
414impl<'a> WebSocketBookApplier<'a> {
415    /// Access the underlying WebSocket stream (e.g., for subscribe/unsubscribe calls).
416    pub fn stream_mut(&mut self) -> &mut WebSocketStream {
417        &mut self.stream
418    }
419
420    /// Current WebSocket connection stats.
421    pub fn stream_stats(&self) -> StreamStats {
422        self.stream.stats.clone()
423    }
424
425    /// Access the hot-path processor (e.g., to reuse it across connections).
426    pub fn processor_mut(&mut self) -> &mut WsBookUpdateProcessor {
427        &mut self.processor
428    }
429
430    /// Apply a single WS text payload (useful for custom transports and for testing).
431    pub fn apply_text_message(&mut self, text: String) -> Result<WsBookApplyStats> {
432        let stats = self.processor.process_text(text, self.books)?;
433        self.stream.stats.messages_received += 1;
434        self.stream.stats.last_message_time = Some(Utc::now());
435        Ok(stats)
436    }
437}
438
439impl<'a> Stream for WebSocketBookApplier<'a> {
440    type Item = Result<WsBookApplyStats>;
441
442    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
443        loop {
444            let Some(connection) = &mut self.stream.connection else {
445                return Poll::Ready(None);
446            };
447
448            match connection.poll_next_unpin(cx) {
449                Poll::Pending => return Poll::Pending,
450                Poll::Ready(Some(Ok(msg))) => match msg {
451                    tokio_tungstenite::tungstenite::Message::Text(text) => {
452                        match self.apply_text_message(text) {
453                            Ok(stats) => {
454                                if stats.book_messages == 0 {
455                                    continue;
456                                }
457                                return Poll::Ready(Some(Ok(stats)));
458                            },
459                            Err(e) => {
460                                self.stream.stats.errors += 1;
461                                return Poll::Ready(Some(Err(e)));
462                            },
463                        }
464                    },
465                    tokio_tungstenite::tungstenite::Message::Close(_) => {
466                        info!("WebSocket connection closed by server");
467                        self.stream.connection = None;
468                        return Poll::Ready(None);
469                    },
470                    tokio_tungstenite::tungstenite::Message::Ping(_) => {
471                        // Best-effort: tokio-tungstenite/tungstenite may handle pings internally.
472                        continue;
473                    },
474                    tokio_tungstenite::tungstenite::Message::Pong(_) => continue,
475                    tokio_tungstenite::tungstenite::Message::Binary(_) => continue,
476                    tokio_tungstenite::tungstenite::Message::Frame(_) => continue,
477                },
478                Poll::Ready(Some(Err(e))) => {
479                    error!("WebSocket error: {}", e);
480                    self.stream.stats.errors += 1;
481                    return Poll::Ready(Some(Err(e.into())));
482                },
483                Poll::Ready(None) => {
484                    info!("WebSocket stream ended");
485                    return Poll::Ready(None);
486                },
487            }
488        }
489    }
490}
491
492impl Stream for WebSocketStream {
493    type Item = Result<StreamMessage>;
494
495    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
496        loop {
497            if let Some(message) = self.pending.pop_front() {
498                return Poll::Ready(Some(Ok(message)));
499            }
500
501            let Some(connection) = &mut self.connection else {
502                return Poll::Ready(None);
503            };
504
505            match connection.poll_next_unpin(cx) {
506                Poll::Pending => return Poll::Pending,
507                Poll::Ready(Some(Ok(ws_message))) => match ws_message {
508                    tokio_tungstenite::tungstenite::Message::Text(text) => {
509                        match crate::decode::parse_stream_messages(&text) {
510                            Ok(messages) => {
511                                let mut iter = messages.into_iter();
512                                let Some(first) = iter.next() else {
513                                    continue;
514                                };
515
516                                for msg in iter {
517                                    self.enqueue(msg);
518                                }
519                                self.stats.messages_received += 1;
520                                self.stats.last_message_time = Some(Utc::now());
521                                return Poll::Ready(Some(Ok(first)));
522                            },
523                            Err(e) => {
524                                self.stats.errors += 1;
525                                return Poll::Ready(Some(Err(e)));
526                            },
527                        }
528                    },
529                    tokio_tungstenite::tungstenite::Message::Close(_) => {
530                        info!("WebSocket connection closed by server");
531                        self.connection = None;
532                        return Poll::Ready(None);
533                    },
534                    tokio_tungstenite::tungstenite::Message::Ping(_) => {
535                        // Best-effort: tokio-tungstenite/tungstenite may handle pings internally.
536                        continue;
537                    },
538                    tokio_tungstenite::tungstenite::Message::Pong(_) => continue,
539                    tokio_tungstenite::tungstenite::Message::Binary(_) => continue,
540                    tokio_tungstenite::tungstenite::Message::Frame(_) => continue,
541                },
542                Poll::Ready(Some(Err(e))) => {
543                    error!("WebSocket error: {}", e);
544                    self.stats.errors += 1;
545                    return Poll::Ready(Some(Err(e.into())));
546                },
547                Poll::Ready(None) => {
548                    info!("WebSocket stream ended");
549                    return Poll::Ready(None);
550                },
551            }
552        }
553    }
554}
555
556impl MarketStream for WebSocketStream {
557    fn subscribe(&mut self, _subscription: Subscription) -> Result<()> {
558        // This is for backward compatibility - use subscribe_async for new code
559        Ok(())
560    }
561
562    fn unsubscribe(&mut self, _token_ids: &[String]) -> Result<()> {
563        // This is for backward compatibility - use unsubscribe_async for new code
564        Ok(())
565    }
566
567    fn is_connected(&self) -> bool {
568        self.connection.is_some()
569    }
570
571    fn get_stats(&self) -> StreamStats {
572        self.stats.clone()
573    }
574}
575
576/// Mock stream for testing
577#[derive(Debug)]
578pub struct MockStream {
579    messages: Vec<Result<StreamMessage>>,
580    index: usize,
581    connected: bool,
582}
583
584impl Default for MockStream {
585    fn default() -> Self {
586        Self::new()
587    }
588}
589
590impl MockStream {
591    pub fn new() -> Self {
592        Self {
593            messages: Vec::new(),
594            index: 0,
595            connected: true,
596        }
597    }
598
599    pub fn add_message(&mut self, message: StreamMessage) {
600        self.messages.push(Ok(message));
601    }
602
603    pub fn add_error(&mut self, error: PolyfillError) {
604        self.messages.push(Err(error));
605    }
606
607    pub fn set_connected(&mut self, connected: bool) {
608        self.connected = connected;
609    }
610}
611
612impl Stream for MockStream {
613    type Item = Result<StreamMessage>;
614
615    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
616        if self.index >= self.messages.len() {
617            Poll::Ready(None)
618        } else {
619            let message = self.messages[self.index].clone();
620            self.index += 1;
621            Poll::Ready(Some(message))
622        }
623    }
624}
625
626impl MarketStream for MockStream {
627    fn subscribe(&mut self, _subscription: Subscription) -> Result<()> {
628        Ok(())
629    }
630
631    fn unsubscribe(&mut self, _token_ids: &[String]) -> Result<()> {
632        Ok(())
633    }
634
635    fn is_connected(&self) -> bool {
636        self.connected
637    }
638
639    fn get_stats(&self) -> StreamStats {
640        StreamStats {
641            messages_received: self.messages.len() as u64,
642            messages_sent: 0,
643            errors: self.messages.iter().filter(|m| m.is_err()).count() as u64,
644            dropped_messages: 0,
645            last_message_time: None,
646            connection_uptime: std::time::Duration::ZERO,
647            reconnect_count: 0,
648        }
649    }
650}
651
652/// Stream manager for handling multiple streams
653#[allow(dead_code)]
654pub struct StreamManager {
655    streams: Vec<Box<dyn MarketStream>>,
656    message_tx: mpsc::UnboundedSender<StreamMessage>,
657    message_rx: mpsc::UnboundedReceiver<StreamMessage>,
658}
659
660impl Default for StreamManager {
661    fn default() -> Self {
662        Self::new()
663    }
664}
665
666impl StreamManager {
667    pub fn new() -> Self {
668        let (message_tx, message_rx) = mpsc::unbounded_channel();
669
670        Self {
671            streams: Vec::new(),
672            message_tx,
673            message_rx,
674        }
675    }
676
677    pub fn add_stream(&mut self, stream: Box<dyn MarketStream>) {
678        self.streams.push(stream);
679    }
680
681    pub fn get_message_receiver(&mut self) -> mpsc::UnboundedReceiver<StreamMessage> {
682        // Note: UnboundedReceiver doesn't implement Clone
683        // In a real implementation, you'd want to use a different approach
684        // For now, we'll return a dummy receiver
685        let (_, rx) = mpsc::unbounded_channel();
686        rx
687    }
688
689    pub fn broadcast_message(&self, message: StreamMessage) -> Result<()> {
690        self.message_tx
691            .send(message)
692            .map_err(|e| PolyfillError::internal("Failed to broadcast message", e))
693    }
694}
695
696#[cfg(test)]
697mod tests {
698    use super::*;
699    use rust_decimal::Decimal;
700    use std::str::FromStr;
701
702    #[test]
703    fn test_mock_stream() {
704        let mut stream = MockStream::new();
705
706        // Add some test messages
707        stream.add_message(StreamMessage::Book(BookUpdate {
708            asset_id: "1".to_string(),
709            market: "0xabc".to_string(),
710            timestamp: 1_234_567_890,
711            bids: vec![],
712            asks: vec![],
713            hash: None,
714        }));
715        stream.add_message(StreamMessage::PriceChange(PriceChange {
716            market: "0xabc".to_string(),
717            timestamp: 1_234_567_891,
718            price_changes: vec![],
719        }));
720
721        assert!(stream.is_connected());
722        assert_eq!(stream.get_stats().messages_received, 2);
723    }
724
725    #[test]
726    fn test_stream_manager() {
727        let mut manager = StreamManager::new();
728        let mock_stream = Box::new(MockStream::new());
729        manager.add_stream(mock_stream);
730
731        // Test message broadcasting
732        let message = StreamMessage::Book(BookUpdate {
733            asset_id: "1".to_string(),
734            market: "0xabc".to_string(),
735            timestamp: 1_234_567_890,
736            bids: vec![],
737            asks: vec![],
738            hash: None,
739        });
740        assert!(manager.broadcast_message(message).is_ok());
741    }
742
743    #[test]
744    fn test_websocket_book_applier_apply_text_message_updates_book() {
745        let books = crate::book::OrderBookManager::new(64);
746        let _ = books.get_or_create_book("12345").unwrap();
747
748        let processor = WsBookUpdateProcessor::new(1024);
749        let stream = WebSocketStream::new("wss://example.com/ws");
750        let mut applier = stream.into_book_applier(&books, processor);
751
752        let msg = r#"{"event_type":"book","asset_id":"12345","timestamp":1,"bids":[{"price":"0.75","size":"10"}],"asks":[{"price":"0.76","size":"5"}]}"#.to_string();
753        let stats = applier.apply_text_message(msg).unwrap();
754        assert_eq!(stats.book_messages, 1);
755        assert_eq!(stats.book_levels_applied, 2);
756
757        let snapshot = books.get_book("12345").unwrap();
758        assert_eq!(snapshot.bids.len(), 1);
759        assert_eq!(snapshot.asks.len(), 1);
760        assert_eq!(snapshot.bids[0].price, Decimal::from_str("0.75").unwrap());
761        assert_eq!(snapshot.bids[0].size, Decimal::from_str("10").unwrap());
762        assert_eq!(snapshot.asks[0].price, Decimal::from_str("0.76").unwrap());
763        assert_eq!(snapshot.asks[0].size, Decimal::from_str("5").unwrap());
764    }
765}