1use crate::errors::{PolyfillError, Result};
7use crate::types::*;
8use crate::ws_hot_path::{WsBookApplyStats, WsBookUpdateProcessor};
9use chrono::Utc;
10use futures::{SinkExt, Stream, StreamExt};
11use serde_json::Value;
12use std::collections::VecDeque;
13use std::pin::Pin;
14use std::task::{Context, Poll};
15use tokio::sync::mpsc;
16use tracing::{debug, error, info, warn};
17
18pub trait MarketStream: Stream<Item = Result<StreamMessage>> + Send + Sync {
20 fn subscribe(&mut self, subscription: Subscription) -> Result<()>;
22
23 fn unsubscribe(&mut self, token_ids: &[String]) -> Result<()>;
25
26 fn is_connected(&self) -> bool;
28
29 fn get_stats(&self) -> StreamStats;
31}
32
33#[derive(Debug)]
35#[allow(dead_code)]
36pub struct WebSocketStream {
37 connection: Option<
39 tokio_tungstenite::WebSocketStream<
40 tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
41 >,
42 >,
43 url: String,
45 auth: Option<WssAuth>,
47 subscriptions: Vec<WssSubscription>,
49 pending: VecDeque<StreamMessage>,
54 pending_capacity: usize,
55 stats: StreamStats,
57 reconnect_config: ReconnectConfig,
59}
60
61#[derive(Debug, Clone)]
63pub struct StreamStats {
64 pub messages_received: u64,
65 pub messages_sent: u64,
66 pub errors: u64,
67 pub dropped_messages: u64,
68 pub last_message_time: Option<chrono::DateTime<Utc>>,
69 pub connection_uptime: std::time::Duration,
70 pub reconnect_count: u32,
71}
72
73#[derive(Debug, Clone)]
75pub struct ReconnectConfig {
76 pub max_retries: u32,
77 pub base_delay: std::time::Duration,
78 pub max_delay: std::time::Duration,
79 pub backoff_multiplier: f64,
80}
81
82impl Default for ReconnectConfig {
83 fn default() -> Self {
84 Self {
85 max_retries: 5,
86 base_delay: std::time::Duration::from_secs(1),
87 max_delay: std::time::Duration::from_secs(60),
88 backoff_multiplier: 2.0,
89 }
90 }
91}
92
93impl WebSocketStream {
94 pub fn new(url: &str) -> Self {
96 let pending_capacity = 1024;
97
98 Self {
99 connection: None,
100 url: url.to_string(),
101 auth: None,
102 subscriptions: Vec::new(),
103 pending: VecDeque::with_capacity(pending_capacity),
104 pending_capacity,
105 stats: StreamStats {
106 messages_received: 0,
107 messages_sent: 0,
108 errors: 0,
109 dropped_messages: 0,
110 last_message_time: None,
111 connection_uptime: std::time::Duration::ZERO,
112 reconnect_count: 0,
113 },
114 reconnect_config: ReconnectConfig::default(),
115 }
116 }
117
118 fn enqueue(&mut self, message: StreamMessage) {
119 if self.pending.len() >= self.pending_capacity {
120 let _ = self.pending.pop_front();
121 self.stats.dropped_messages += 1;
122 }
123 self.pending.push_back(message);
124 }
125
126 pub fn with_auth(mut self, auth: WssAuth) -> Self {
128 self.auth = Some(auth);
129 self
130 }
131
132 async fn connect(&mut self) -> Result<()> {
134 let (ws_stream, _) = tokio_tungstenite::connect_async(&self.url)
135 .await
136 .map_err(|e| {
137 PolyfillError::stream(
138 format!("WebSocket connection failed: {}", e),
139 crate::errors::StreamErrorKind::ConnectionFailed,
140 )
141 })?;
142
143 self.connection = Some(ws_stream);
144 info!("Connected to WebSocket stream at {}", self.url);
145 Ok(())
146 }
147
148 async fn send_message(&mut self, message: Value) -> Result<()> {
150 if let Some(connection) = &mut self.connection {
151 let text = serde_json::to_string(&message).map_err(|e| {
152 PolyfillError::parse(format!("Failed to serialize message: {}", e), None)
153 })?;
154
155 let ws_message = tokio_tungstenite::tungstenite::Message::Text(text);
156 connection.send(ws_message).await.map_err(|e| {
157 PolyfillError::stream(
158 format!("Failed to send message: {}", e),
159 crate::errors::StreamErrorKind::MessageCorrupted,
160 )
161 })?;
162
163 self.stats.messages_sent += 1;
164 }
165
166 Ok(())
167 }
168
169 pub async fn subscribe_async(&mut self, subscription: WssSubscription) -> Result<()> {
171 if self.connection.is_none() {
173 self.connect().await?;
174 }
175
176 let message = serde_json::to_value(&subscription).map_err(|e| {
179 PolyfillError::parse(format!("Failed to serialize subscription: {}", e), None)
180 })?;
181
182 self.send_message(message).await?;
183 self.subscriptions.push(subscription.clone());
184
185 info!("Subscribed to {} channel", subscription.channel_type);
186 Ok(())
187 }
188
189 pub async fn subscribe_user_channel(&mut self, markets: Vec<String>) -> Result<()> {
191 let auth = self
192 .auth
193 .as_ref()
194 .ok_or_else(|| PolyfillError::auth("No authentication provided for WebSocket"))?
195 .clone();
196
197 let subscription = WssSubscription {
198 channel_type: "user".to_string(),
199 operation: Some("subscribe".to_string()),
200 markets,
201 asset_ids: Vec::new(),
202 initial_dump: Some(true),
203 custom_feature_enabled: None,
204 auth: Some(auth),
205 };
206
207 self.subscribe_async(subscription).await
208 }
209
210 pub async fn subscribe_market_channel(&mut self, asset_ids: Vec<String>) -> Result<()> {
213 let subscription = WssSubscription {
214 channel_type: "market".to_string(),
215 operation: Some("subscribe".to_string()),
216 markets: Vec::new(),
217 asset_ids,
218 initial_dump: Some(true),
219 custom_feature_enabled: None,
220 auth: None,
221 };
222
223 self.subscribe_async(subscription).await
224 }
225
226 pub async fn subscribe_market_channel_with_features(
229 &mut self,
230 asset_ids: Vec<String>,
231 ) -> Result<()> {
232 let subscription = WssSubscription {
233 channel_type: "market".to_string(),
234 operation: Some("subscribe".to_string()),
235 markets: Vec::new(),
236 asset_ids,
237 initial_dump: Some(true),
238 custom_feature_enabled: Some(true),
239 auth: None,
240 };
241
242 self.subscribe_async(subscription).await
243 }
244
245 pub async fn unsubscribe_market_channel(&mut self, asset_ids: Vec<String>) -> Result<()> {
247 let subscription = WssSubscription {
248 channel_type: "market".to_string(),
249 operation: Some("unsubscribe".to_string()),
250 markets: Vec::new(),
251 asset_ids,
252 initial_dump: None,
253 custom_feature_enabled: None,
254 auth: None,
255 };
256
257 self.subscribe_async(subscription).await
258 }
259
260 pub async fn unsubscribe_user_channel(&mut self, markets: Vec<String>) -> Result<()> {
262 let auth = self
263 .auth
264 .as_ref()
265 .ok_or_else(|| PolyfillError::auth("No authentication provided for WebSocket"))?
266 .clone();
267
268 let subscription = WssSubscription {
269 channel_type: "user".to_string(),
270 operation: Some("unsubscribe".to_string()),
271 markets,
272 asset_ids: Vec::new(),
273 initial_dump: None,
274 custom_feature_enabled: None,
275 auth: Some(auth),
276 };
277
278 self.subscribe_async(subscription).await
279 }
280
281 #[allow(dead_code)]
283 async fn handle_message(
284 &mut self,
285 message: tokio_tungstenite::tungstenite::Message,
286 ) -> Result<()> {
287 match message {
288 tokio_tungstenite::tungstenite::Message::Text(text) => {
289 debug!("Received WebSocket message: {}", text);
290
291 let stream_messages = crate::decode::parse_stream_messages(&text)?;
293 for stream_message in stream_messages {
294 self.enqueue(stream_message);
295 }
296
297 self.stats.messages_received += 1;
298 self.stats.last_message_time = Some(Utc::now());
299 },
300 tokio_tungstenite::tungstenite::Message::Close(_) => {
301 info!("WebSocket connection closed by server");
302 self.connection = None;
303 },
304 tokio_tungstenite::tungstenite::Message::Ping(data) => {
305 if let Some(connection) = &mut self.connection {
307 let pong = tokio_tungstenite::tungstenite::Message::Pong(data);
308 if let Err(e) = connection.send(pong).await {
309 error!("Failed to send pong: {}", e);
310 }
311 }
312 },
313 tokio_tungstenite::tungstenite::Message::Pong(_) => {
314 debug!("Received pong");
316 },
317 tokio_tungstenite::tungstenite::Message::Binary(_) => {
318 warn!("Received binary message (not supported)");
319 },
320 tokio_tungstenite::tungstenite::Message::Frame(_) => {
321 warn!("Received raw frame (not supported)");
322 },
323 }
324
325 Ok(())
326 }
327
328 #[allow(dead_code)]
330 fn parse_polymarket_messages(&self, text: &str) -> Result<Vec<StreamMessage>> {
331 crate::decode::parse_stream_messages(text)
332 }
333
334 #[allow(dead_code)]
336 async fn reconnect(&mut self) -> Result<()> {
337 let mut delay = self.reconnect_config.base_delay;
338 let mut retries = 0;
339
340 while retries < self.reconnect_config.max_retries {
341 warn!("Attempting to reconnect (attempt {})", retries + 1);
342
343 match self.connect().await {
344 Ok(()) => {
345 info!("Successfully reconnected");
346 self.stats.reconnect_count += 1;
347
348 let subscriptions = self.subscriptions.clone();
350 for subscription in subscriptions {
351 self.send_message(serde_json::to_value(subscription)?)
352 .await?;
353 }
354
355 return Ok(());
356 },
357 Err(e) => {
358 error!("Reconnection attempt {} failed: {}", retries + 1, e);
359 retries += 1;
360
361 if retries < self.reconnect_config.max_retries {
362 tokio::time::sleep(delay).await;
363 delay = std::cmp::min(
364 delay.mul_f64(self.reconnect_config.backoff_multiplier),
365 self.reconnect_config.max_delay,
366 );
367 }
368 },
369 }
370 }
371
372 Err(PolyfillError::stream(
373 format!(
374 "Failed to reconnect after {} attempts",
375 self.reconnect_config.max_retries
376 ),
377 crate::errors::StreamErrorKind::ConnectionFailed,
378 ))
379 }
380}
381
382pub struct WebSocketBookApplier<'a> {
389 stream: WebSocketStream,
390 books: &'a crate::book::OrderBookManager,
391 processor: WsBookUpdateProcessor,
392}
393
394impl WebSocketStream {
395 pub fn into_book_applier<'a>(
400 mut self,
401 books: &'a crate::book::OrderBookManager,
402 processor: WsBookUpdateProcessor,
403 ) -> WebSocketBookApplier<'a> {
404 self.pending.clear();
406 WebSocketBookApplier {
407 stream: self,
408 books,
409 processor,
410 }
411 }
412}
413
414impl<'a> WebSocketBookApplier<'a> {
415 pub fn stream_mut(&mut self) -> &mut WebSocketStream {
417 &mut self.stream
418 }
419
420 pub fn stream_stats(&self) -> StreamStats {
422 self.stream.stats.clone()
423 }
424
425 pub fn processor_mut(&mut self) -> &mut WsBookUpdateProcessor {
427 &mut self.processor
428 }
429
430 pub fn apply_text_message(&mut self, text: String) -> Result<WsBookApplyStats> {
432 let stats = self.processor.process_text(text, self.books)?;
433 self.stream.stats.messages_received += 1;
434 self.stream.stats.last_message_time = Some(Utc::now());
435 Ok(stats)
436 }
437}
438
439impl<'a> Stream for WebSocketBookApplier<'a> {
440 type Item = Result<WsBookApplyStats>;
441
442 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
443 loop {
444 let Some(connection) = &mut self.stream.connection else {
445 return Poll::Ready(None);
446 };
447
448 match connection.poll_next_unpin(cx) {
449 Poll::Pending => return Poll::Pending,
450 Poll::Ready(Some(Ok(msg))) => match msg {
451 tokio_tungstenite::tungstenite::Message::Text(text) => {
452 match self.apply_text_message(text) {
453 Ok(stats) => {
454 if stats.book_messages == 0 {
455 continue;
456 }
457 return Poll::Ready(Some(Ok(stats)));
458 },
459 Err(e) => {
460 self.stream.stats.errors += 1;
461 return Poll::Ready(Some(Err(e)));
462 },
463 }
464 },
465 tokio_tungstenite::tungstenite::Message::Close(_) => {
466 info!("WebSocket connection closed by server");
467 self.stream.connection = None;
468 return Poll::Ready(None);
469 },
470 tokio_tungstenite::tungstenite::Message::Ping(_) => {
471 continue;
473 },
474 tokio_tungstenite::tungstenite::Message::Pong(_) => continue,
475 tokio_tungstenite::tungstenite::Message::Binary(_) => continue,
476 tokio_tungstenite::tungstenite::Message::Frame(_) => continue,
477 },
478 Poll::Ready(Some(Err(e))) => {
479 error!("WebSocket error: {}", e);
480 self.stream.stats.errors += 1;
481 return Poll::Ready(Some(Err(e.into())));
482 },
483 Poll::Ready(None) => {
484 info!("WebSocket stream ended");
485 return Poll::Ready(None);
486 },
487 }
488 }
489 }
490}
491
492impl Stream for WebSocketStream {
493 type Item = Result<StreamMessage>;
494
495 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
496 loop {
497 if let Some(message) = self.pending.pop_front() {
498 return Poll::Ready(Some(Ok(message)));
499 }
500
501 let Some(connection) = &mut self.connection else {
502 return Poll::Ready(None);
503 };
504
505 match connection.poll_next_unpin(cx) {
506 Poll::Pending => return Poll::Pending,
507 Poll::Ready(Some(Ok(ws_message))) => match ws_message {
508 tokio_tungstenite::tungstenite::Message::Text(text) => {
509 match crate::decode::parse_stream_messages(&text) {
510 Ok(messages) => {
511 let mut iter = messages.into_iter();
512 let Some(first) = iter.next() else {
513 continue;
514 };
515
516 for msg in iter {
517 self.enqueue(msg);
518 }
519 self.stats.messages_received += 1;
520 self.stats.last_message_time = Some(Utc::now());
521 return Poll::Ready(Some(Ok(first)));
522 },
523 Err(e) => {
524 self.stats.errors += 1;
525 return Poll::Ready(Some(Err(e)));
526 },
527 }
528 },
529 tokio_tungstenite::tungstenite::Message::Close(_) => {
530 info!("WebSocket connection closed by server");
531 self.connection = None;
532 return Poll::Ready(None);
533 },
534 tokio_tungstenite::tungstenite::Message::Ping(_) => {
535 continue;
537 },
538 tokio_tungstenite::tungstenite::Message::Pong(_) => continue,
539 tokio_tungstenite::tungstenite::Message::Binary(_) => continue,
540 tokio_tungstenite::tungstenite::Message::Frame(_) => continue,
541 },
542 Poll::Ready(Some(Err(e))) => {
543 error!("WebSocket error: {}", e);
544 self.stats.errors += 1;
545 return Poll::Ready(Some(Err(e.into())));
546 },
547 Poll::Ready(None) => {
548 info!("WebSocket stream ended");
549 return Poll::Ready(None);
550 },
551 }
552 }
553 }
554}
555
556impl MarketStream for WebSocketStream {
557 fn subscribe(&mut self, _subscription: Subscription) -> Result<()> {
558 Ok(())
560 }
561
562 fn unsubscribe(&mut self, _token_ids: &[String]) -> Result<()> {
563 Ok(())
565 }
566
567 fn is_connected(&self) -> bool {
568 self.connection.is_some()
569 }
570
571 fn get_stats(&self) -> StreamStats {
572 self.stats.clone()
573 }
574}
575
576#[derive(Debug)]
578pub struct MockStream {
579 messages: Vec<Result<StreamMessage>>,
580 index: usize,
581 connected: bool,
582}
583
584impl Default for MockStream {
585 fn default() -> Self {
586 Self::new()
587 }
588}
589
590impl MockStream {
591 pub fn new() -> Self {
592 Self {
593 messages: Vec::new(),
594 index: 0,
595 connected: true,
596 }
597 }
598
599 pub fn add_message(&mut self, message: StreamMessage) {
600 self.messages.push(Ok(message));
601 }
602
603 pub fn add_error(&mut self, error: PolyfillError) {
604 self.messages.push(Err(error));
605 }
606
607 pub fn set_connected(&mut self, connected: bool) {
608 self.connected = connected;
609 }
610}
611
612impl Stream for MockStream {
613 type Item = Result<StreamMessage>;
614
615 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
616 if self.index >= self.messages.len() {
617 Poll::Ready(None)
618 } else {
619 let message = self.messages[self.index].clone();
620 self.index += 1;
621 Poll::Ready(Some(message))
622 }
623 }
624}
625
626impl MarketStream for MockStream {
627 fn subscribe(&mut self, _subscription: Subscription) -> Result<()> {
628 Ok(())
629 }
630
631 fn unsubscribe(&mut self, _token_ids: &[String]) -> Result<()> {
632 Ok(())
633 }
634
635 fn is_connected(&self) -> bool {
636 self.connected
637 }
638
639 fn get_stats(&self) -> StreamStats {
640 StreamStats {
641 messages_received: self.messages.len() as u64,
642 messages_sent: 0,
643 errors: self.messages.iter().filter(|m| m.is_err()).count() as u64,
644 dropped_messages: 0,
645 last_message_time: None,
646 connection_uptime: std::time::Duration::ZERO,
647 reconnect_count: 0,
648 }
649 }
650}
651
652#[allow(dead_code)]
654pub struct StreamManager {
655 streams: Vec<Box<dyn MarketStream>>,
656 message_tx: mpsc::UnboundedSender<StreamMessage>,
657 message_rx: mpsc::UnboundedReceiver<StreamMessage>,
658}
659
660impl Default for StreamManager {
661 fn default() -> Self {
662 Self::new()
663 }
664}
665
666impl StreamManager {
667 pub fn new() -> Self {
668 let (message_tx, message_rx) = mpsc::unbounded_channel();
669
670 Self {
671 streams: Vec::new(),
672 message_tx,
673 message_rx,
674 }
675 }
676
677 pub fn add_stream(&mut self, stream: Box<dyn MarketStream>) {
678 self.streams.push(stream);
679 }
680
681 pub fn get_message_receiver(&mut self) -> mpsc::UnboundedReceiver<StreamMessage> {
682 let (_, rx) = mpsc::unbounded_channel();
686 rx
687 }
688
689 pub fn broadcast_message(&self, message: StreamMessage) -> Result<()> {
690 self.message_tx
691 .send(message)
692 .map_err(|e| PolyfillError::internal("Failed to broadcast message", e))
693 }
694}
695
696#[cfg(test)]
697mod tests {
698 use super::*;
699 use rust_decimal::Decimal;
700 use std::str::FromStr;
701
702 #[test]
703 fn test_mock_stream() {
704 let mut stream = MockStream::new();
705
706 stream.add_message(StreamMessage::Book(BookUpdate {
708 asset_id: "1".to_string(),
709 market: "0xabc".to_string(),
710 timestamp: 1_234_567_890,
711 bids: vec![],
712 asks: vec![],
713 hash: None,
714 }));
715 stream.add_message(StreamMessage::PriceChange(PriceChange {
716 market: "0xabc".to_string(),
717 timestamp: 1_234_567_891,
718 price_changes: vec![],
719 }));
720
721 assert!(stream.is_connected());
722 assert_eq!(stream.get_stats().messages_received, 2);
723 }
724
725 #[test]
726 fn test_stream_manager() {
727 let mut manager = StreamManager::new();
728 let mock_stream = Box::new(MockStream::new());
729 manager.add_stream(mock_stream);
730
731 let message = StreamMessage::Book(BookUpdate {
733 asset_id: "1".to_string(),
734 market: "0xabc".to_string(),
735 timestamp: 1_234_567_890,
736 bids: vec![],
737 asks: vec![],
738 hash: None,
739 });
740 assert!(manager.broadcast_message(message).is_ok());
741 }
742
743 #[test]
744 fn test_websocket_book_applier_apply_text_message_updates_book() {
745 let books = crate::book::OrderBookManager::new(64);
746 let _ = books.get_or_create_book("12345").unwrap();
747
748 let processor = WsBookUpdateProcessor::new(1024);
749 let stream = WebSocketStream::new("wss://example.com/ws");
750 let mut applier = stream.into_book_applier(&books, processor);
751
752 let msg = r#"{"event_type":"book","asset_id":"12345","timestamp":1,"bids":[{"price":"0.75","size":"10"}],"asks":[{"price":"0.76","size":"5"}]}"#.to_string();
753 let stats = applier.apply_text_message(msg).unwrap();
754 assert_eq!(stats.book_messages, 1);
755 assert_eq!(stats.book_levels_applied, 2);
756
757 let snapshot = books.get_book("12345").unwrap();
758 assert_eq!(snapshot.bids.len(), 1);
759 assert_eq!(snapshot.asks.len(), 1);
760 assert_eq!(snapshot.bids[0].price, Decimal::from_str("0.75").unwrap());
761 assert_eq!(snapshot.bids[0].size, Decimal::from_str("10").unwrap());
762 assert_eq!(snapshot.asks[0].price, Decimal::from_str("0.76").unwrap());
763 assert_eq!(snapshot.asks[0].size, Decimal::from_str("5").unwrap());
764 }
765}