bitcoincore_zmq/subscribe/
stream.rs1use super::new_socket_internal;
2use crate::{
3 error::Result,
4 message::Message,
5 monitor::{event::SocketEvent, MonitorMessage, MonitorMessageError},
6};
7use core::{
8 fmt,
9 future::Future,
10 mem,
11 pin::{pin, Pin},
12 slice,
13 task::{Context as AsyncContext, Poll, Waker},
14 time::Duration,
15};
16use futures_util::{
17 future::{select, Either},
18 stream::{FusedStream, Stream, StreamExt},
19};
20use std::{
21 sync::{Arc, Mutex},
22 thread,
23};
24
25#[derive(Debug, Clone)]
27pub enum SocketMessage {
28 Message(Message),
29 Event(MonitorMessage),
30}
31
32#[deprecated(
35 since = "1.3.2",
36 note = "This struct is only used by deprecated functions."
37)]
38pub struct MultiMessageStream(pub subscribe_async_stream::MessageStream);
39
40#[allow(deprecated)]
41impl MultiMessageStream {
42 pub fn as_streams(&self) -> &[subscribe_async_stream::MessageStream] {
46 slice::from_ref(&self.0)
47 }
48
49 pub fn into_streams(self) -> Vec<subscribe_async_stream::MessageStream> {
53 vec![self.0]
54 }
55}
56
57#[allow(deprecated)]
58impl Stream for MultiMessageStream {
59 type Item = Result<Message>;
60
61 fn poll_next(mut self: Pin<&mut Self>, cx: &mut AsyncContext<'_>) -> Poll<Option<Self::Item>> {
62 self.0.poll_next_unpin(cx)
63 }
64}
65
66#[allow(deprecated)]
67impl FusedStream for MultiMessageStream {
68 fn is_terminated(&self) -> bool {
69 false
70 }
71}
72
73#[deprecated(
75 since = "1.3.2",
76 note = "Use subscribe_async. This function has no performance benefit over subscribe_single_async anymore."
77)]
78#[allow(deprecated)]
79pub fn subscribe_multi_async(endpoints: &[&str]) -> Result<MultiMessageStream> {
80 subscribe_async(endpoints).map(MultiMessageStream)
81}
82
83#[deprecated(
85 since = "1.3.2",
86 note = "Use subscribe_async. The name changed because there is no distinction made anymore between subscribing to 1 or more endpoints."
87)]
88pub fn subscribe_single_async(endpoint: &str) -> Result<subscribe_async_stream::MessageStream> {
89 subscribe_async(&[endpoint])
90}
91
92pub mod subscribe_async_stream {
93 use crate::{error::Result, message::Message, subscribe::message_from_multipart_zmq_message};
94 use async_zmq::Subscribe;
95 use core::{
96 pin::Pin,
97 task::{Context as AsyncContext, Poll},
98 };
99 use futures_util::{
100 stream::{FusedStream, StreamExt},
101 Stream,
102 };
103
104 pub struct MessageStream {
106 zmq_stream: Subscribe,
107 }
108
109 impl MessageStream {
110 pub(super) const fn new(zmq_stream: Subscribe) -> Self {
111 Self { zmq_stream }
112 }
113
114 pub const fn as_zmq_socket(&self) -> &Subscribe {
120 &self.zmq_stream
121 }
122 }
123
124 impl Stream for MessageStream {
125 type Item = Result<Message>;
126
127 fn poll_next(
128 mut self: Pin<&mut Self>,
129 cx: &mut AsyncContext<'_>,
130 ) -> Poll<Option<Self::Item>> {
131 self.zmq_stream.poll_next_unpin(cx).map(|opt| {
132 Some(match opt.unwrap() {
133 Ok(mp) => message_from_multipart_zmq_message(&mp),
134 Err(err) => Err(err.into()),
135 })
136 })
137 }
138 }
139
140 impl FusedStream for MessageStream {
141 fn is_terminated(&self) -> bool {
142 false
143 }
144 }
145}
146
147pub fn subscribe_async(endpoints: &[&str]) -> Result<subscribe_async_stream::MessageStream> {
149 let (_context, socket) = new_socket_internal(endpoints)?;
150
151 Ok(subscribe_async_stream::MessageStream::new(socket.into()))
152}
153
154pub mod subscribe_async_monitor_stream {
155 use super::{subscribe_async_stream, SocketMessage};
156 use crate::{error::Result, monitor::MonitorMessage};
157 use async_zmq::Subscribe;
158 use core::{
159 pin::Pin,
160 task::{Context as AsyncContext, Poll},
161 };
162 use futures_util::{
163 stream::{FusedStream, StreamExt},
164 Stream,
165 };
166 use zmq::Socket;
167
168 pub(super) enum Empty {}
169
170 impl Iterator for Empty {
171 type Item = Self;
172
173 fn next(&mut self) -> Option<Self::Item> {
174 None
175 }
176 }
177
178 impl From<Empty> for async_zmq::Message {
179 fn from(val: Empty) -> Self {
180 match val {}
181 }
182 }
183
184 pub(super) type RecvOnlyPair = async_zmq::Pair<Empty, Empty>;
187
188 pub struct MessageStream {
190 messages: subscribe_async_stream::MessageStream,
191 pub(super) monitor: RecvOnlyPair,
192 }
193
194 impl MessageStream {
195 pub(super) const fn new(
196 messages: subscribe_async_stream::MessageStream,
197 monitor: RecvOnlyPair,
198 ) -> Self {
199 Self { messages, monitor }
200 }
201
202 pub const fn as_zmq_socket(&self) -> &Subscribe {
208 self.messages.as_zmq_socket()
209 }
210
211 pub fn as_zmq_monitor_socket(&self) -> &Socket {
214 self.monitor.as_raw_socket()
215 }
216 }
217
218 impl Stream for MessageStream {
219 type Item = Result<SocketMessage>;
220
221 fn poll_next(
222 mut self: Pin<&mut Self>,
223 cx: &mut AsyncContext<'_>,
224 ) -> Poll<Option<Self::Item>> {
225 match self.monitor.poll_next_unpin(cx) {
226 Poll::Ready(msg) => {
227 return Poll::Ready(Some(Ok(SocketMessage::Event(
228 MonitorMessage::parse_from(&msg.unwrap()?)?,
229 ))));
230 }
231 Poll::Pending => {}
232 }
233
234 self.messages
235 .poll_next_unpin(cx)
236 .map(|opt| Some(opt.unwrap().map(SocketMessage::Message)))
237 }
238 }
239
240 impl FusedStream for MessageStream {
241 fn is_terminated(&self) -> bool {
242 false
243 }
244 }
245}
246
247pub fn subscribe_async_monitor(
250 endpoints: &[&str],
251) -> Result<subscribe_async_monitor_stream::MessageStream> {
252 let (context, socket) = new_socket_internal(endpoints)?;
253
254 socket.monitor("inproc://monitor", zmq::SocketEvent::ALL as i32)?;
255
256 let monitor = context.socket(zmq::PAIR)?;
257 monitor.connect("inproc://monitor")?;
258
259 Ok(subscribe_async_monitor_stream::MessageStream::new(
260 subscribe_async_stream::MessageStream::new(socket.into()),
261 monitor.into(),
262 ))
263}
264
265pub async fn subscribe_async_wait_handshake(
279 endpoints: &[&str],
280) -> Result<subscribe_async_monitor_stream::MessageStream> {
281 let mut stream = subscribe_async_monitor(endpoints)?;
282 let mut connecting = endpoints.len();
283
284 if connecting == 0 {
285 return Ok(stream);
286 }
287
288 loop {
289 let msg: &[zmq::Message] = &stream.monitor.next().await.unwrap()?;
290 let [event_message, _] = msg else {
291 return Err(MonitorMessageError::InvalidMutlipartLength(msg.len()).into());
292 };
293 match SocketEvent::parse_from(event_message)? {
294 SocketEvent::HandshakeSucceeded => {
295 connecting -= 1;
296 }
297 SocketEvent::Disconnected { .. } => {
298 connecting += 1;
299 }
300 _ => {
301 continue;
302 }
303 }
304 if connecting == 0 {
305 return Ok(stream);
306 }
307 }
308}
309
310pub async fn subscribe_async_wait_handshake_timeout(
313 endpoints: &[&str],
314 timeout: Duration,
315) -> core::result::Result<Result<subscribe_async_monitor_stream::MessageStream>, Timeout> {
316 let subscribe = subscribe_async_wait_handshake(endpoints);
317 let timeout = sleep(timeout);
318
319 match select(pin!(subscribe), timeout).await {
320 Either::Left((res, _)) => Ok(res),
321 Either::Right(_) => Err(Timeout(())),
322 }
323}
324
325#[derive(Debug)]
328pub struct Timeout(());
329
330impl fmt::Display for Timeout {
331 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332 write!(f, "connection timed out")
333 }
334}
335
336impl std::error::Error for Timeout {}
337
338fn sleep(dur: Duration) -> Sleep {
339 let state = Arc::new(Mutex::new(SleepReadyState::Pending));
340 {
341 let state = state.clone();
342 thread::spawn(move || {
343 thread::sleep(dur);
344 let state = {
345 let mut g = state.lock().unwrap();
346 mem::replace(&mut *g, SleepReadyState::Done)
347 };
348 if let SleepReadyState::PendingPolled(waker) = state {
349 waker.wake();
350 }
351 });
352 }
353
354 Sleep(state)
355}
356
357enum SleepReadyState {
358 Pending,
359 PendingPolled(Waker),
360 Done,
361}
362
363struct Sleep(Arc<Mutex<SleepReadyState>>);
364
365impl Future for Sleep {
366 type Output = ();
367
368 fn poll(self: Pin<&mut Self>, cx: &mut AsyncContext<'_>) -> Poll<Self::Output> {
369 let mut g = self.0.lock().unwrap();
370 if matches!(*g, SleepReadyState::Done) {
371 Poll::Ready(())
372 } else {
373 *g = SleepReadyState::PendingPolled(cx.waker().clone());
374 Poll::Pending
375 }
376 }
377}