tendermint_rpc/client/
subscription.rs

1//! Subscription- and subscription management-related functionality.
2
3use core::pin::Pin;
4
5use async_trait::async_trait;
6use futures::{
7    task::{Context, Poll},
8    Stream,
9};
10use pin_project::pin_project;
11
12use crate::{
13    client::sync::{ChannelRx, ChannelTx},
14    event::Event,
15    prelude::*,
16    query::Query,
17    Error,
18};
19
20/// A client that exclusively provides [`Event`] subscription capabilities,
21/// without any other RPC method support.
22#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
23#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
24pub trait SubscriptionClient {
25    /// `/subscribe`: subscribe to receive events produced by the given query.
26    async fn subscribe(&self, query: Query) -> Result<Subscription, Error>;
27
28    /// `/unsubscribe`: unsubscribe from events relating to the given query.
29    ///
30    /// This method is particularly useful when you want to terminate multiple
31    /// [`Subscription`]s to the same [`Query`] simultaneously, or if you've
32    /// joined multiple `Subscription`s together using [`select_all`] and you
33    /// no longer have access to the individual `Subscription` instances to
34    /// terminate them separately.
35    ///
36    /// [`select_all`]: https://docs.rs/futures/*/futures/stream/fn.select_all.html
37    async fn unsubscribe(&self, query: Query) -> Result<(), Error>;
38
39    /// Subscription clients will usually have long-running underlying
40    /// transports that will need to be closed at some point.
41    fn close(self) -> Result<(), Error>;
42}
43
44pub(crate) type SubscriptionTx = ChannelTx<Result<Event, Error>>;
45pub(crate) type SubscriptionRx = ChannelRx<Result<Event, Error>>;
46
47/// An interface that can be used to asynchronously receive [`Event`]s for a
48/// particular subscription.
49///
50/// ## Examples
51///
52/// ```
53/// use tendermint_rpc::Subscription;
54/// use futures::StreamExt;
55///
56/// /// Prints `count` events from the given subscription.
57/// async fn print_events(subs: &mut Subscription, count: usize) {
58///     let mut counter = 0_usize;
59///     while let Some(res) = subs.next().await {
60///         // Technically, a subscription produces `Result<Event, Error>`
61///         // instances. Errors can be produced by the remote endpoint at any
62///         // time and need to be handled here.
63///         let ev = res.unwrap();
64///         println!("Got incoming event: {:?}", ev);
65///         counter += 1;
66///         if counter >= count {
67///             break
68///         }
69///     }
70/// }
71/// ```
72#[pin_project]
73#[derive(Debug)]
74pub struct Subscription {
75    // A unique identifier for this subscription.
76    id: String,
77    // The query for which events will be produced.
78    query: Query,
79    // Our internal result event receiver for this subscription.
80    #[pin]
81    rx: SubscriptionRx,
82}
83
84impl Stream for Subscription {
85    type Item = Result<Event, Error>;
86
87    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
88        self.project().rx.poll_next(cx)
89    }
90}
91
92impl Subscription {
93    pub fn new(id: String, query: Query, rx: SubscriptionRx) -> Self {
94        Self { id, query, rx }
95    }
96
97    /// Return this subscription's ID for informational purposes.
98    pub fn id(&self) -> &str {
99        &self.id
100    }
101
102    pub fn query(&self) -> &Query {
103        &self.query
104    }
105}