bitcoincore_zmq/subscribe/
stream.rs

1use super::new_socket_internal;
2use crate::{
3    error::Result,
4    message::Message,
5    monitor::{event::SocketEvent, MonitorMessage, MonitorMessageError},
6};
7use core::{
8    fmt,
9    future::Future,
10    mem,
11    pin::{pin, Pin},
12    slice,
13    task::{Context as AsyncContext, Poll, Waker},
14    time::Duration,
15};
16use futures_util::{
17    future::{select, Either},
18    stream::{FusedStream, Stream, StreamExt},
19};
20use std::{
21    sync::{Arc, Mutex},
22    thread,
23};
24
25/// A [`Message`] or a [`MonitorMessage`].
26#[derive(Debug, Clone)]
27pub enum SocketMessage {
28    Message(Message),
29    Event(MonitorMessage),
30}
31
32/// Stream that asynchronously produces [`Message`]s using multiple ZMQ subscribers. The ZMQ
33/// sockets are polled in a round-robin fashion.
34#[deprecated(
35    since = "1.3.2",
36    note = "This struct is only used by deprecated functions."
37)]
38pub struct MultiMessageStream(pub subscribe_async_stream::MessageStream);
39
40#[allow(deprecated)]
41impl MultiMessageStream {
42    /// Returns a reference to the separate `MessageStream`s this [`MultiMessageStream`] is made
43    /// of. This is useful to set socket options or use other functions provided by [`zmq`] or
44    /// [`async_zmq`]. (See `MessageStream::as_zmq_socket`)
45    pub fn as_streams(&self) -> &[subscribe_async_stream::MessageStream] {
46        slice::from_ref(&self.0)
47    }
48
49    /// Returns the separate `MessageStream`s this [`MultiMessageStream`] is made of. This is
50    /// useful to set socket options or use other functions provided by [`zmq`] or [`async_zmq`].
51    /// (See `MessageStream::as_zmq_socket`)
52    pub fn into_streams(self) -> Vec<subscribe_async_stream::MessageStream> {
53        vec![self.0]
54    }
55}
56
57#[allow(deprecated)]
58impl Stream for MultiMessageStream {
59    type Item = Result<Message>;
60
61    fn poll_next(mut self: Pin<&mut Self>, cx: &mut AsyncContext<'_>) -> Poll<Option<Self::Item>> {
62        self.0.poll_next_unpin(cx)
63    }
64}
65
66#[allow(deprecated)]
67impl FusedStream for MultiMessageStream {
68    fn is_terminated(&self) -> bool {
69        false
70    }
71}
72
73/// Subscribes to multiple ZMQ endpoints and returns a [`MultiMessageStream`].
74#[deprecated(
75    since = "1.3.2",
76    note = "Use subscribe_async. This function has no performance benefit over subscribe_single_async anymore."
77)]
78#[allow(deprecated)]
79pub fn subscribe_multi_async(endpoints: &[&str]) -> Result<MultiMessageStream> {
80    subscribe_async(endpoints).map(MultiMessageStream)
81}
82
83/// Subscribes to a single ZMQ endpoint and returns a `MessageStream`.
84#[deprecated(
85    since = "1.3.2",
86    note = "Use subscribe_async. The name changed because there is no distinction made anymore between subscribing to 1 or more endpoints."
87)]
88pub fn subscribe_single_async(endpoint: &str) -> Result<subscribe_async_stream::MessageStream> {
89    subscribe_async(&[endpoint])
90}
91
92pub mod subscribe_async_stream {
93    use crate::{error::Result, message::Message, subscribe::message_from_multipart_zmq_message};
94    use async_zmq::Subscribe;
95    use core::{
96        pin::Pin,
97        task::{Context as AsyncContext, Poll},
98    };
99    use futures_util::{
100        stream::{FusedStream, StreamExt},
101        Stream,
102    };
103
104    /// Stream returned by [`subscribe_async`][super::subscribe_async].
105    pub struct MessageStream {
106        zmq_stream: Subscribe,
107    }
108
109    impl MessageStream {
110        pub(super) const fn new(zmq_stream: Subscribe) -> Self {
111            Self { zmq_stream }
112        }
113
114        /// Returns a reference to the ZMQ socket used by this stream. To get the [`zmq::Socket`], use
115        /// [`as_raw_socket`] on the result. This is useful to set socket options or use other
116        /// functions provided by [`zmq`] or [`async_zmq`].
117        ///
118        /// [`as_raw_socket`]: Subscribe::as_raw_socket
119        pub const fn as_zmq_socket(&self) -> &Subscribe {
120            &self.zmq_stream
121        }
122    }
123
124    impl Stream for MessageStream {
125        type Item = Result<Message>;
126
127        fn poll_next(
128            mut self: Pin<&mut Self>,
129            cx: &mut AsyncContext<'_>,
130        ) -> Poll<Option<Self::Item>> {
131            self.zmq_stream.poll_next_unpin(cx).map(|opt| {
132                Some(match opt.unwrap() {
133                    Ok(mp) => message_from_multipart_zmq_message(&mp),
134                    Err(err) => Err(err.into()),
135                })
136            })
137        }
138    }
139
140    impl FusedStream for MessageStream {
141        fn is_terminated(&self) -> bool {
142            false
143        }
144    }
145}
146
147/// Subscribes to multiple ZMQ endpoints and returns a stream that produces [`Message`]s.
148pub fn subscribe_async(endpoints: &[&str]) -> Result<subscribe_async_stream::MessageStream> {
149    let (_context, socket) = new_socket_internal(endpoints)?;
150
151    Ok(subscribe_async_stream::MessageStream::new(socket.into()))
152}
153
154pub mod subscribe_async_monitor_stream {
155    use super::{subscribe_async_stream, SocketMessage};
156    use crate::{error::Result, monitor::MonitorMessage};
157    use async_zmq::Subscribe;
158    use core::{
159        pin::Pin,
160        task::{Context as AsyncContext, Poll},
161    };
162    use futures_util::{
163        stream::{FusedStream, StreamExt},
164        Stream,
165    };
166    use zmq::Socket;
167
168    pub(super) enum Empty {}
169
170    impl Iterator for Empty {
171        type Item = Self;
172
173        fn next(&mut self) -> Option<Self::Item> {
174            None
175        }
176    }
177
178    impl From<Empty> for async_zmq::Message {
179        fn from(val: Empty) -> Self {
180            match val {}
181        }
182    }
183
184    // The generic type params don't matter as this will only be used for receiving
185    // Better to use an empty type to not waste precious bytes
186    pub(super) type RecvOnlyPair = async_zmq::Pair<Empty, Empty>;
187
188    /// Stream returned by [`subscribe_async_monitor`][super::subscribe_async_monitor].
189    pub struct MessageStream {
190        messages: subscribe_async_stream::MessageStream,
191        pub(super) monitor: RecvOnlyPair,
192    }
193
194    impl MessageStream {
195        pub(super) const fn new(
196            messages: subscribe_async_stream::MessageStream,
197            monitor: RecvOnlyPair,
198        ) -> Self {
199            Self { messages, monitor }
200        }
201
202        /// Returns a reference to the ZMQ socket used by this stream. To get the [`zmq::Socket`], use
203        /// [`as_raw_socket`] on the result. This is useful to set socket options or use other
204        /// functions provided by [`zmq`] or [`async_zmq`].
205        ///
206        /// [`as_raw_socket`]: Subscribe::as_raw_socket
207        pub const fn as_zmq_socket(&self) -> &Subscribe {
208            self.messages.as_zmq_socket()
209        }
210
211        /// Returns a reference to the ZMQ monitor socket used by this stream. This is useful to
212        /// set socket options or use other functions provided by [`zmq`].
213        pub fn as_zmq_monitor_socket(&self) -> &Socket {
214            self.monitor.as_raw_socket()
215        }
216    }
217
218    impl Stream for MessageStream {
219        type Item = Result<SocketMessage>;
220
221        fn poll_next(
222            mut self: Pin<&mut Self>,
223            cx: &mut AsyncContext<'_>,
224        ) -> Poll<Option<Self::Item>> {
225            match self.monitor.poll_next_unpin(cx) {
226                Poll::Ready(msg) => {
227                    return Poll::Ready(Some(Ok(SocketMessage::Event(
228                        MonitorMessage::parse_from(&msg.unwrap()?)?,
229                    ))));
230                }
231                Poll::Pending => {}
232            }
233
234            self.messages
235                .poll_next_unpin(cx)
236                .map(|opt| Some(opt.unwrap().map(SocketMessage::Message)))
237        }
238    }
239
240    impl FusedStream for MessageStream {
241        fn is_terminated(&self) -> bool {
242            false
243        }
244    }
245}
246
247/// Subscribes to multiple ZMQ endpoints and returns a stream that yields [`Message`]s and events
248/// (see [`MonitorMessage`]).
249pub fn subscribe_async_monitor(
250    endpoints: &[&str],
251) -> Result<subscribe_async_monitor_stream::MessageStream> {
252    let (context, socket) = new_socket_internal(endpoints)?;
253
254    socket.monitor("inproc://monitor", zmq::SocketEvent::ALL as i32)?;
255
256    let monitor = context.socket(zmq::PAIR)?;
257    monitor.connect("inproc://monitor")?;
258
259    Ok(subscribe_async_monitor_stream::MessageStream::new(
260        subscribe_async_stream::MessageStream::new(socket.into()),
261        monitor.into(),
262    ))
263}
264
265// TODO have some way to extract connecting to which endpoints failed, now just a (unit) error is returned (by tokio::time::timeout)
266
267/// Subscribes to multiple ZMQ endpoints and returns a stream that yields [`Message`]s and events
268/// (see [`MonitorMessage`]). This method will wait until a connection has been established to all
269/// endpoints.
270///
271/// See `examples/subscribe_async_timeout.rs` for a usage example.
272///
273/// **NOTE:** This method will wait indefinitely until a connection has been established, but this is
274/// often undesirable. This method should therefore be used in combination with your async
275/// runtime's timeout function. Currently, with the state of async Rust in December of 2023, it is
276/// not yet possible do this without creating an extra thread per timeout or depending on specific
277/// runtimes.
278pub async fn subscribe_async_wait_handshake(
279    endpoints: &[&str],
280) -> Result<subscribe_async_monitor_stream::MessageStream> {
281    let mut stream = subscribe_async_monitor(endpoints)?;
282    let mut connecting = endpoints.len();
283
284    if connecting == 0 {
285        return Ok(stream);
286    }
287
288    loop {
289        let msg: &[zmq::Message] = &stream.monitor.next().await.unwrap()?;
290        let [event_message, _] = msg else {
291            return Err(MonitorMessageError::InvalidMutlipartLength(msg.len()).into());
292        };
293        match SocketEvent::parse_from(event_message)? {
294            SocketEvent::HandshakeSucceeded => {
295                connecting -= 1;
296            }
297            SocketEvent::Disconnected { .. } => {
298                connecting += 1;
299            }
300            _ => {
301                continue;
302            }
303        }
304        if connecting == 0 {
305            return Ok(stream);
306        }
307    }
308}
309
310/// See [`subscribe_async_wait_handshake`]. This method implements the inefficient, but runtime
311/// independent approach.
312pub async fn subscribe_async_wait_handshake_timeout(
313    endpoints: &[&str],
314    timeout: Duration,
315) -> core::result::Result<Result<subscribe_async_monitor_stream::MessageStream>, Timeout> {
316    let subscribe = subscribe_async_wait_handshake(endpoints);
317    let timeout = sleep(timeout);
318
319    match select(pin!(subscribe), timeout).await {
320        Either::Left((res, _)) => Ok(res),
321        Either::Right(_) => Err(Timeout(())),
322    }
323}
324
325/// Error returned by [`subscribe_async_wait_handshake_timeout`] when the connection times out.
326/// Contains no information, but does have a Error, Debug and Display impl.
327#[derive(Debug)]
328pub struct Timeout(());
329
330impl fmt::Display for Timeout {
331    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332        write!(f, "connection timed out")
333    }
334}
335
336impl std::error::Error for Timeout {}
337
338fn sleep(dur: Duration) -> Sleep {
339    let state = Arc::new(Mutex::new(SleepReadyState::Pending));
340    {
341        let state = state.clone();
342        thread::spawn(move || {
343            thread::sleep(dur);
344            let state = {
345                let mut g = state.lock().unwrap();
346                mem::replace(&mut *g, SleepReadyState::Done)
347            };
348            if let SleepReadyState::PendingPolled(waker) = state {
349                waker.wake();
350            }
351        });
352    }
353
354    Sleep(state)
355}
356
357enum SleepReadyState {
358    Pending,
359    PendingPolled(Waker),
360    Done,
361}
362
363struct Sleep(Arc<Mutex<SleepReadyState>>);
364
365impl Future for Sleep {
366    type Output = ();
367
368    fn poll(self: Pin<&mut Self>, cx: &mut AsyncContext<'_>) -> Poll<Self::Output> {
369        let mut g = self.0.lock().unwrap();
370        if matches!(*g, SleepReadyState::Done) {
371            Poll::Ready(())
372        } else {
373            *g = SleepReadyState::PendingPolled(cx.waker().clone());
374            Poll::Pending
375        }
376    }
377}