Skip to main content

px_core/websocket/
stream.rs

1//! Concrete stream newtypes for the multiplexed WS surface.
2//!
3//! These replace the `Pin<Box<dyn Stream<Item = ...>>>` aliases used in the
4//! 0.1 trait. The receiver channel is an implementation detail; the public
5//! surface is `Stream<Item = T>` and a direct `recv().await` shortcut for
6//! hot-path callers that want to avoid `Stream` combinators entirely.
7
8use std::sync::Mutex;
9
10use flume::{Receiver, Sender, TryRecvError};
11
12use crate::websocket::events::{SessionEvent, WsUpdate};
13
14/// Multiplexed per-market update stream. There is exactly one `UpdateStream`
15/// per dispatcher; calling `OrderBookWebSocket::updates()` twice returns
16/// `None` the second time.
17///
18/// Why take-once semantics: the underlying channel is MPMC, so cloning the
19/// receiver and handing one out per call would split messages between
20/// receivers in a way callers cannot detect — every test harness or debug
21/// sidecar that "just adds a second consumer" would silently lose half the
22/// stream. Take-once turns that footgun into a compile-checked None at the
23/// call site.
24///
25/// Backed by `flume`, an MPMC channel with lighter-weight wakers than
26/// `async-channel`'s; the per-message recv on the consumer side is a
27/// significant fraction of the WS pipeline p99, and flume's atomic-only
28/// fast path stays out of the parking_lot machinery on uncontended
29/// recv. The channel is bounded; when full the producer raises
30/// `SessionEvent::Lagged` + `SessionEvent::BookInvalidated` rather than
31/// dropping deltas silently.
32pub struct UpdateStream {
33    rx: Receiver<WsUpdate>,
34}
35
36impl UpdateStream {
37    #[inline]
38    pub(crate) fn new(rx: Receiver<WsUpdate>) -> Self {
39        Self { rx }
40    }
41
42    /// Await the next update. `None` once the producer has been dropped.
43    #[inline]
44    pub async fn next(&self) -> Option<WsUpdate> {
45        self.rx.recv_async().await.ok()
46    }
47
48    /// Non-blocking peek. Returns `Ok(Some)` if an update is ready,
49    /// `Ok(None)` if the channel is empty, `Err` if closed.
50    #[inline]
51    pub fn try_next(&self) -> Result<Option<WsUpdate>, TryRecvError> {
52        match self.rx.try_recv() {
53            Ok(v) => Ok(Some(v)),
54            Err(TryRecvError::Empty) => Ok(None),
55            Err(e @ TryRecvError::Disconnected) => Err(e),
56        }
57    }
58
59    #[inline]
60    pub fn len(&self) -> usize {
61        self.rx.len()
62    }
63
64    #[inline]
65    pub fn is_empty(&self) -> bool {
66        self.rx.is_empty()
67    }
68
69    #[inline]
70    pub fn is_closed(&self) -> bool {
71        self.rx.is_disconnected()
72    }
73}
74
75/// Connection-level events. Separate from `UpdateStream` so one reconnect is
76/// one event regardless of how many markets are subscribed.
77pub struct SessionStream {
78    rx: Receiver<SessionEvent>,
79}
80
81impl SessionStream {
82    #[inline]
83    pub(crate) fn new(rx: Receiver<SessionEvent>) -> Self {
84        Self { rx }
85    }
86
87    #[inline]
88    pub async fn next(&self) -> Option<SessionEvent> {
89        self.rx.recv_async().await.ok()
90    }
91
92    #[inline]
93    pub fn try_next(&self) -> Result<Option<SessionEvent>, TryRecvError> {
94        match self.rx.try_recv() {
95            Ok(v) => Ok(Some(v)),
96            Err(TryRecvError::Empty) => Ok(None),
97            Err(e @ TryRecvError::Disconnected) => Err(e),
98        }
99    }
100
101    #[inline]
102    pub fn is_closed(&self) -> bool {
103        self.rx.is_disconnected()
104    }
105}
106
107/// Producer handle held by exchange WS implementations. Owns the sender
108/// halves of both channels so the implementation can emit updates and
109/// session events directly.
110///
111/// The receiver halves are held in `Mutex<Option<...>>` and handed out
112/// exactly once via `take_updates()` / `take_session_events()`. This
113/// enforces the take-once contract on the consumer side.
114pub struct WsDispatcher {
115    updates_tx: Sender<WsUpdate>,
116    updates_rx: Mutex<Option<Receiver<WsUpdate>>>,
117    session_tx: Sender<SessionEvent>,
118    session_rx: Mutex<Option<Receiver<SessionEvent>>>,
119}
120
121/// Configuration for the dispatcher's bounded channels.
122#[derive(Debug, Clone, Copy)]
123pub struct WsDispatcherConfig {
124    /// Capacity of the per-subscriber update channel. On overflow the
125    /// dispatcher emits `SessionEvent::Lagged` + `BookInvalidated` and
126    /// drops the offending update. Default 4096.
127    pub updates_capacity: usize,
128    /// Capacity of the session-event channel. Default 256 — session events
129    /// are rare and losing one is a correctness hazard, so oversized.
130    pub session_capacity: usize,
131}
132
133impl Default for WsDispatcherConfig {
134    fn default() -> Self {
135        Self {
136            updates_capacity: 4096,
137            session_capacity: 256,
138        }
139    }
140}
141
142impl WsDispatcher {
143    /// Create a new dispatcher. The returned dispatcher owns the send halves
144    /// of both channels and the (one-shot) receive halves; consumers fetch
145    /// streams exactly once via `take_updates()` / `take_session_events()`.
146    pub fn new(config: WsDispatcherConfig) -> Self {
147        let (updates_tx, updates_rx) = flume::bounded(config.updates_capacity);
148        let (session_tx, session_rx) = flume::bounded(config.session_capacity);
149        Self {
150            updates_tx,
151            updates_rx: Mutex::new(Some(updates_rx)),
152            session_tx,
153            session_rx: Mutex::new(Some(session_rx)),
154        }
155    }
156
157    /// Take ownership of the consumer-side update stream. Returns `None` if
158    /// already taken — the receiver is single-consumer by contract; cloning
159    /// would silently split messages between holders.
160    #[inline]
161    pub fn take_updates(&self) -> Option<UpdateStream> {
162        self.updates_rx
163            .lock()
164            .ok()
165            .and_then(|mut g| g.take())
166            .map(UpdateStream::new)
167    }
168
169    /// Take ownership of the consumer-side session stream.
170    #[inline]
171    pub fn take_session_events(&self) -> Option<SessionStream> {
172        self.session_rx
173            .lock()
174            .ok()
175            .and_then(|mut g| g.take())
176            .map(SessionStream::new)
177    }
178
179    /// Emit an update. Returns `true` if delivered. On `Err(TrySendError::Full)`
180    /// the update is dropped and the caller is expected to follow up with a
181    /// `SessionEvent::Lagged` + one or more `BookInvalidated` events — this
182    /// is the correctness contract that distinguishes 0.2 from 0.1's
183    /// silent-skip broadcast behavior.
184    #[inline]
185    pub fn try_send_update(&self, update: WsUpdate) -> bool {
186        self.updates_tx.try_send(update).is_ok()
187    }
188
189    /// Emit a session event. Unlike updates, these are always delivered via
190    /// `send`; the session-event channel is sized generously and losing an
191    /// event (e.g. a missed `Reconnected`) is worse than a brief await.
192    #[inline]
193    pub async fn send_session(&self, event: SessionEvent) {
194        let _ = self.session_tx.send_async(event).await;
195    }
196
197    #[inline]
198    pub fn is_updates_full(&self) -> bool {
199        self.updates_tx.is_full()
200    }
201}