px_core/websocket/stream.rs
1//! Concrete stream newtypes for the multiplexed WS surface.
2//!
3//! These replace the `Pin<Box<dyn Stream<Item = ...>>>` aliases used in the
4//! 0.1 trait. The receiver channel is an implementation detail; the public
5//! surface is `Stream<Item = T>` and a direct `recv().await` shortcut for
6//! hot-path callers that want to avoid `Stream` combinators entirely.
7
8use std::sync::Mutex;
9
10use async_channel::{Receiver, TryRecvError};
11
12use crate::websocket::events::{SessionEvent, WsUpdate};
13
14/// Multiplexed per-market update stream. There is exactly one `UpdateStream`
15/// per dispatcher; calling `OrderBookWebSocket::updates()` twice returns
16/// `None` the second time.
17///
18/// Why take-once semantics: the underlying channel is MPMC, so cloning the
19/// receiver and handing one out per call would split messages between
20/// receivers in a way callers cannot detect — every test harness or debug
21/// sidecar that "just adds a second consumer" would silently lose half the
22/// stream. Take-once turns that footgun into a compile-checked None at the
23/// call site.
24///
25/// The channel is bounded; when full the producer raises
26/// `SessionEvent::Lagged` + `SessionEvent::BookInvalidated` rather than
27/// dropping deltas silently.
28pub struct UpdateStream {
29 rx: Receiver<WsUpdate>,
30}
31
32impl UpdateStream {
33 #[inline]
34 pub(crate) fn new(rx: Receiver<WsUpdate>) -> Self {
35 Self { rx }
36 }
37
38 /// Await the next update. `None` once the producer has been dropped.
39 #[inline]
40 pub async fn next(&self) -> Option<WsUpdate> {
41 self.rx.recv().await.ok()
42 }
43
44 /// Non-blocking peek. Returns `Ok(Some)` if an update is ready,
45 /// `Ok(None)` if the channel is empty, `Err` if closed.
46 #[inline]
47 pub fn try_next(&self) -> Result<Option<WsUpdate>, TryRecvError> {
48 match self.rx.try_recv() {
49 Ok(v) => Ok(Some(v)),
50 Err(TryRecvError::Empty) => Ok(None),
51 Err(e @ TryRecvError::Closed) => Err(e),
52 }
53 }
54
55 #[inline]
56 pub fn len(&self) -> usize {
57 self.rx.len()
58 }
59
60 #[inline]
61 pub fn is_empty(&self) -> bool {
62 self.rx.is_empty()
63 }
64
65 #[inline]
66 pub fn is_closed(&self) -> bool {
67 self.rx.is_closed()
68 }
69}
70
71/// Connection-level events. Separate from `UpdateStream` so one reconnect is
72/// one event regardless of how many markets are subscribed.
73pub struct SessionStream {
74 rx: Receiver<SessionEvent>,
75}
76
77impl SessionStream {
78 #[inline]
79 pub(crate) fn new(rx: Receiver<SessionEvent>) -> Self {
80 Self { rx }
81 }
82
83 #[inline]
84 pub async fn next(&self) -> Option<SessionEvent> {
85 self.rx.recv().await.ok()
86 }
87
88 #[inline]
89 pub fn try_next(&self) -> Result<Option<SessionEvent>, TryRecvError> {
90 match self.rx.try_recv() {
91 Ok(v) => Ok(Some(v)),
92 Err(TryRecvError::Empty) => Ok(None),
93 Err(e @ TryRecvError::Closed) => Err(e),
94 }
95 }
96
97 #[inline]
98 pub fn is_closed(&self) -> bool {
99 self.rx.is_closed()
100 }
101}
102
103/// Producer handle held by exchange WS implementations. Owns the sender
104/// halves of both channels so the implementation can emit updates and
105/// session events directly.
106///
107/// The receiver halves are held in `Mutex<Option<...>>` and handed out
108/// exactly once via `take_updates()` / `take_session_events()`. This
109/// enforces the take-once contract on the consumer side.
110pub struct WsDispatcher {
111 updates_tx: async_channel::Sender<WsUpdate>,
112 updates_rx: Mutex<Option<Receiver<WsUpdate>>>,
113 session_tx: async_channel::Sender<SessionEvent>,
114 session_rx: Mutex<Option<Receiver<SessionEvent>>>,
115}
116
117/// Configuration for the dispatcher's bounded channels.
118#[derive(Debug, Clone, Copy)]
119pub struct WsDispatcherConfig {
120 /// Capacity of the per-subscriber update channel. On overflow the
121 /// dispatcher emits `SessionEvent::Lagged` + `BookInvalidated` and
122 /// drops the offending update. Default 4096.
123 pub updates_capacity: usize,
124 /// Capacity of the session-event channel. Default 256 — session events
125 /// are rare and losing one is a correctness hazard, so oversized.
126 pub session_capacity: usize,
127}
128
129impl Default for WsDispatcherConfig {
130 fn default() -> Self {
131 Self {
132 updates_capacity: 4096,
133 session_capacity: 256,
134 }
135 }
136}
137
138impl WsDispatcher {
139 /// Create a new dispatcher. The returned dispatcher owns the send halves
140 /// of both channels and the (one-shot) receive halves; consumers fetch
141 /// streams exactly once via `take_updates()` / `take_session_events()`.
142 pub fn new(config: WsDispatcherConfig) -> Self {
143 let (updates_tx, updates_rx) = async_channel::bounded(config.updates_capacity);
144 let (session_tx, session_rx) = async_channel::bounded(config.session_capacity);
145 Self {
146 updates_tx,
147 updates_rx: Mutex::new(Some(updates_rx)),
148 session_tx,
149 session_rx: Mutex::new(Some(session_rx)),
150 }
151 }
152
153 /// Take ownership of the consumer-side update stream. Returns `None` if
154 /// already taken — the receiver is single-consumer by contract; cloning
155 /// would silently split messages between holders.
156 #[inline]
157 pub fn take_updates(&self) -> Option<UpdateStream> {
158 self.updates_rx
159 .lock()
160 .ok()
161 .and_then(|mut g| g.take())
162 .map(UpdateStream::new)
163 }
164
165 /// Take ownership of the consumer-side session stream.
166 #[inline]
167 pub fn take_session_events(&self) -> Option<SessionStream> {
168 self.session_rx
169 .lock()
170 .ok()
171 .and_then(|mut g| g.take())
172 .map(SessionStream::new)
173 }
174
175 /// Emit an update. Returns `true` if delivered. On `Err(TrySendError::Full)`
176 /// the update is dropped and the caller is expected to follow up with a
177 /// `SessionEvent::Lagged` + one or more `BookInvalidated` events — this
178 /// is the correctness contract that distinguishes 0.2 from 0.1's
179 /// silent-skip broadcast behavior.
180 #[inline]
181 pub fn try_send_update(&self, update: WsUpdate) -> bool {
182 self.updates_tx.try_send(update).is_ok()
183 }
184
185 /// Emit a session event. Unlike updates, these are always delivered via
186 /// `send`; the session-event channel is sized generously and losing an
187 /// event (e.g. a missed `Reconnected`) is worse than a brief await.
188 #[inline]
189 pub async fn send_session(&self, event: SessionEvent) {
190 let _ = self.session_tx.send(event).await;
191 }
192
193 #[inline]
194 pub fn is_updates_full(&self) -> bool {
195 self.updates_tx.is_full()
196 }
197}