Skip to main content

px_core/websocket/
stream.rs

1//! Concrete stream newtypes for the multiplexed WS surface.
2//!
3//! These replace the `Pin<Box<dyn Stream<Item = ...>>>` aliases used in the
4//! 0.1 trait. The receiver channel is an implementation detail; the public
5//! surface is `Stream<Item = T>` and a direct `recv().await` shortcut for
6//! hot-path callers that want to avoid `Stream` combinators entirely.
7
8use std::sync::Mutex;
9
10use async_channel::{Receiver, TryRecvError};
11
12use crate::websocket::events::{SessionEvent, WsUpdate};
13
14/// Multiplexed per-market update stream. There is exactly one `UpdateStream`
15/// per dispatcher; calling `OrderBookWebSocket::updates()` twice returns
16/// `None` the second time.
17///
18/// Why take-once semantics: the underlying channel is MPMC, so cloning the
19/// receiver and handing one out per call would split messages between
20/// receivers in a way callers cannot detect — every test harness or debug
21/// sidecar that "just adds a second consumer" would silently lose half the
22/// stream. Take-once turns that footgun into a compile-checked None at the
23/// call site.
24///
25/// The channel is bounded; when full the producer raises
26/// `SessionEvent::Lagged` + `SessionEvent::BookInvalidated` rather than
27/// dropping deltas silently.
28pub struct UpdateStream {
29    rx: Receiver<WsUpdate>,
30}
31
32impl UpdateStream {
33    #[inline]
34    pub(crate) fn new(rx: Receiver<WsUpdate>) -> Self {
35        Self { rx }
36    }
37
38    /// Await the next update. `None` once the producer has been dropped.
39    #[inline]
40    pub async fn next(&self) -> Option<WsUpdate> {
41        self.rx.recv().await.ok()
42    }
43
44    /// Non-blocking peek. Returns `Ok(Some)` if an update is ready,
45    /// `Ok(None)` if the channel is empty, `Err` if closed.
46    #[inline]
47    pub fn try_next(&self) -> Result<Option<WsUpdate>, TryRecvError> {
48        match self.rx.try_recv() {
49            Ok(v) => Ok(Some(v)),
50            Err(TryRecvError::Empty) => Ok(None),
51            Err(e @ TryRecvError::Closed) => Err(e),
52        }
53    }
54
55    #[inline]
56    pub fn len(&self) -> usize {
57        self.rx.len()
58    }
59
60    #[inline]
61    pub fn is_empty(&self) -> bool {
62        self.rx.is_empty()
63    }
64
65    #[inline]
66    pub fn is_closed(&self) -> bool {
67        self.rx.is_closed()
68    }
69}
70
71/// Connection-level events. Separate from `UpdateStream` so one reconnect is
72/// one event regardless of how many markets are subscribed.
73pub struct SessionStream {
74    rx: Receiver<SessionEvent>,
75}
76
77impl SessionStream {
78    #[inline]
79    pub(crate) fn new(rx: Receiver<SessionEvent>) -> Self {
80        Self { rx }
81    }
82
83    #[inline]
84    pub async fn next(&self) -> Option<SessionEvent> {
85        self.rx.recv().await.ok()
86    }
87
88    #[inline]
89    pub fn try_next(&self) -> Result<Option<SessionEvent>, TryRecvError> {
90        match self.rx.try_recv() {
91            Ok(v) => Ok(Some(v)),
92            Err(TryRecvError::Empty) => Ok(None),
93            Err(e @ TryRecvError::Closed) => Err(e),
94        }
95    }
96
97    #[inline]
98    pub fn is_closed(&self) -> bool {
99        self.rx.is_closed()
100    }
101}
102
103/// Producer handle held by exchange WS implementations. Owns the sender
104/// halves of both channels so the implementation can emit updates and
105/// session events directly.
106///
107/// The receiver halves are held in `Mutex<Option<...>>` and handed out
108/// exactly once via `take_updates()` / `take_session_events()`. This
109/// enforces the take-once contract on the consumer side.
110pub struct WsDispatcher {
111    updates_tx: async_channel::Sender<WsUpdate>,
112    updates_rx: Mutex<Option<Receiver<WsUpdate>>>,
113    session_tx: async_channel::Sender<SessionEvent>,
114    session_rx: Mutex<Option<Receiver<SessionEvent>>>,
115}
116
117/// Configuration for the dispatcher's bounded channels.
118#[derive(Debug, Clone, Copy)]
119pub struct WsDispatcherConfig {
120    /// Capacity of the per-subscriber update channel. On overflow the
121    /// dispatcher emits `SessionEvent::Lagged` + `BookInvalidated` and
122    /// drops the offending update. Default 4096.
123    pub updates_capacity: usize,
124    /// Capacity of the session-event channel. Default 256 — session events
125    /// are rare and losing one is a correctness hazard, so oversized.
126    pub session_capacity: usize,
127}
128
129impl Default for WsDispatcherConfig {
130    fn default() -> Self {
131        Self {
132            updates_capacity: 4096,
133            session_capacity: 256,
134        }
135    }
136}
137
138impl WsDispatcher {
139    /// Create a new dispatcher. The returned dispatcher owns the send halves
140    /// of both channels and the (one-shot) receive halves; consumers fetch
141    /// streams exactly once via `take_updates()` / `take_session_events()`.
142    pub fn new(config: WsDispatcherConfig) -> Self {
143        let (updates_tx, updates_rx) = async_channel::bounded(config.updates_capacity);
144        let (session_tx, session_rx) = async_channel::bounded(config.session_capacity);
145        Self {
146            updates_tx,
147            updates_rx: Mutex::new(Some(updates_rx)),
148            session_tx,
149            session_rx: Mutex::new(Some(session_rx)),
150        }
151    }
152
153    /// Take ownership of the consumer-side update stream. Returns `None` if
154    /// already taken — the receiver is single-consumer by contract; cloning
155    /// would silently split messages between holders.
156    #[inline]
157    pub fn take_updates(&self) -> Option<UpdateStream> {
158        self.updates_rx
159            .lock()
160            .ok()
161            .and_then(|mut g| g.take())
162            .map(UpdateStream::new)
163    }
164
165    /// Take ownership of the consumer-side session stream.
166    #[inline]
167    pub fn take_session_events(&self) -> Option<SessionStream> {
168        self.session_rx
169            .lock()
170            .ok()
171            .and_then(|mut g| g.take())
172            .map(SessionStream::new)
173    }
174
175    /// Emit an update. Returns `true` if delivered. On `Err(TrySendError::Full)`
176    /// the update is dropped and the caller is expected to follow up with a
177    /// `SessionEvent::Lagged` + one or more `BookInvalidated` events — this
178    /// is the correctness contract that distinguishes 0.2 from 0.1's
179    /// silent-skip broadcast behavior.
180    #[inline]
181    pub fn try_send_update(&self, update: WsUpdate) -> bool {
182        self.updates_tx.try_send(update).is_ok()
183    }
184
185    /// Emit a session event. Unlike updates, these are always delivered via
186    /// `send`; the session-event channel is sized generously and losing an
187    /// event (e.g. a missed `Reconnected`) is worse than a brief await.
188    #[inline]
189    pub async fn send_session(&self, event: SessionEvent) {
190        let _ = self.session_tx.send(event).await;
191    }
192
193    #[inline]
194    pub fn is_updates_full(&self) -> bool {
195        self.updates_tx.is_full()
196    }
197}