px_core/websocket/stream.rs
1//! Concrete stream newtypes for the multiplexed WS surface.
2//!
3//! These replace the `Pin<Box<dyn Stream<Item = ...>>>` aliases used in the
4//! 0.1 trait. The receiver channel is an implementation detail; the public
5//! surface is `Stream<Item = T>` and a direct `recv().await` shortcut for
6//! hot-path callers that want to avoid `Stream` combinators entirely.
7
8use std::sync::Mutex;
9
10use flume::{Receiver, Sender, TryRecvError};
11
12use crate::websocket::events::{SessionEvent, WsUpdate};
13
14/// Multiplexed per-market update stream. There is exactly one `UpdateStream`
15/// per dispatcher; calling `OrderBookWebSocket::updates()` twice returns
16/// `None` the second time.
17///
18/// Why take-once semantics: the underlying channel is MPMC, so cloning the
19/// receiver and handing one out per call would split messages between
20/// receivers in a way callers cannot detect — every test harness or debug
21/// sidecar that "just adds a second consumer" would silently lose half the
22/// stream. Take-once turns that footgun into a compile-checked None at the
23/// call site.
24///
25/// Backed by `flume`, an MPMC channel with lighter-weight wakers than
26/// `async-channel`'s; the per-message recv on the consumer side is a
27/// significant fraction of the WS pipeline p99, and flume's atomic-only
28/// fast path stays out of the parking_lot machinery on uncontended
29/// recv. The channel is bounded; when full the producer raises
30/// `SessionEvent::Lagged` + `SessionEvent::BookInvalidated` rather than
31/// dropping deltas silently.
32pub struct UpdateStream {
33 rx: Receiver<WsUpdate>,
34}
35
36impl UpdateStream {
37 #[inline]
38 pub(crate) fn new(rx: Receiver<WsUpdate>) -> Self {
39 Self { rx }
40 }
41
42 /// Await the next update. `None` once the producer has been dropped.
43 #[inline]
44 pub async fn next(&self) -> Option<WsUpdate> {
45 self.rx.recv_async().await.ok()
46 }
47
48 /// Non-blocking peek. Returns `Ok(Some)` if an update is ready,
49 /// `Ok(None)` if the channel is empty, `Err` if closed.
50 #[inline]
51 pub fn try_next(&self) -> Result<Option<WsUpdate>, TryRecvError> {
52 match self.rx.try_recv() {
53 Ok(v) => Ok(Some(v)),
54 Err(TryRecvError::Empty) => Ok(None),
55 Err(e @ TryRecvError::Disconnected) => Err(e),
56 }
57 }
58
59 #[inline]
60 pub fn len(&self) -> usize {
61 self.rx.len()
62 }
63
64 #[inline]
65 pub fn is_empty(&self) -> bool {
66 self.rx.is_empty()
67 }
68
69 #[inline]
70 pub fn is_closed(&self) -> bool {
71 self.rx.is_disconnected()
72 }
73}
74
75/// Connection-level events. Separate from `UpdateStream` so one reconnect is
76/// one event regardless of how many markets are subscribed.
77pub struct SessionStream {
78 rx: Receiver<SessionEvent>,
79}
80
81impl SessionStream {
82 #[inline]
83 pub(crate) fn new(rx: Receiver<SessionEvent>) -> Self {
84 Self { rx }
85 }
86
87 #[inline]
88 pub async fn next(&self) -> Option<SessionEvent> {
89 self.rx.recv_async().await.ok()
90 }
91
92 #[inline]
93 pub fn try_next(&self) -> Result<Option<SessionEvent>, TryRecvError> {
94 match self.rx.try_recv() {
95 Ok(v) => Ok(Some(v)),
96 Err(TryRecvError::Empty) => Ok(None),
97 Err(e @ TryRecvError::Disconnected) => Err(e),
98 }
99 }
100
101 #[inline]
102 pub fn is_closed(&self) -> bool {
103 self.rx.is_disconnected()
104 }
105}
106
107/// Producer handle held by exchange WS implementations. Owns the sender
108/// halves of both channels so the implementation can emit updates and
109/// session events directly.
110///
111/// The receiver halves are held in `Mutex<Option<...>>` and handed out
112/// exactly once via `take_updates()` / `take_session_events()`. This
113/// enforces the take-once contract on the consumer side.
114pub struct WsDispatcher {
115 updates_tx: Sender<WsUpdate>,
116 updates_rx: Mutex<Option<Receiver<WsUpdate>>>,
117 session_tx: Sender<SessionEvent>,
118 session_rx: Mutex<Option<Receiver<SessionEvent>>>,
119}
120
121/// Configuration for the dispatcher's bounded channels.
122#[derive(Debug, Clone, Copy)]
123pub struct WsDispatcherConfig {
124 /// Capacity of the per-subscriber update channel. On overflow the
125 /// dispatcher emits `SessionEvent::Lagged` + `BookInvalidated` and
126 /// drops the offending update. Default 4096.
127 pub updates_capacity: usize,
128 /// Capacity of the session-event channel. Default 256 — session events
129 /// are rare and losing one is a correctness hazard, so oversized.
130 pub session_capacity: usize,
131}
132
133impl Default for WsDispatcherConfig {
134 fn default() -> Self {
135 Self {
136 updates_capacity: 4096,
137 session_capacity: 256,
138 }
139 }
140}
141
142impl WsDispatcher {
143 /// Create a new dispatcher. The returned dispatcher owns the send halves
144 /// of both channels and the (one-shot) receive halves; consumers fetch
145 /// streams exactly once via `take_updates()` / `take_session_events()`.
146 pub fn new(config: WsDispatcherConfig) -> Self {
147 let (updates_tx, updates_rx) = flume::bounded(config.updates_capacity);
148 let (session_tx, session_rx) = flume::bounded(config.session_capacity);
149 Self {
150 updates_tx,
151 updates_rx: Mutex::new(Some(updates_rx)),
152 session_tx,
153 session_rx: Mutex::new(Some(session_rx)),
154 }
155 }
156
157 /// Take ownership of the consumer-side update stream. Returns `None` if
158 /// already taken — the receiver is single-consumer by contract; cloning
159 /// would silently split messages between holders.
160 #[inline]
161 pub fn take_updates(&self) -> Option<UpdateStream> {
162 self.updates_rx
163 .lock()
164 .ok()
165 .and_then(|mut g| g.take())
166 .map(UpdateStream::new)
167 }
168
169 /// Take ownership of the consumer-side session stream.
170 #[inline]
171 pub fn take_session_events(&self) -> Option<SessionStream> {
172 self.session_rx
173 .lock()
174 .ok()
175 .and_then(|mut g| g.take())
176 .map(SessionStream::new)
177 }
178
179 /// Emit an update. Returns `true` if delivered. On `Err(TrySendError::Full)`
180 /// the update is dropped and the caller is expected to follow up with a
181 /// `SessionEvent::Lagged` + one or more `BookInvalidated` events — this
182 /// is the correctness contract that distinguishes 0.2 from 0.1's
183 /// silent-skip broadcast behavior.
184 #[inline]
185 pub fn try_send_update(&self, update: WsUpdate) -> bool {
186 self.updates_tx.try_send(update).is_ok()
187 }
188
189 /// Emit a session event. Unlike updates, these are always delivered via
190 /// `send`; the session-event channel is sized generously and losing an
191 /// event (e.g. a missed `Reconnected`) is worse than a brief await.
192 #[inline]
193 pub async fn send_session(&self, event: SessionEvent) {
194 let _ = self.session_tx.send_async(event).await;
195 }
196
197 #[inline]
198 pub fn is_updates_full(&self) -> bool {
199 self.updates_tx.is_full()
200 }
201}