1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
//! Subscription- and subscription management-related functionality. use crate::client::sync::{ChannelRx, ChannelTx}; use crate::event::Event; use crate::query::Query; use crate::Result; use async_trait::async_trait; use futures::task::{Context, Poll}; use futures::Stream; use pin_project::pin_project; use std::pin::Pin; /// A client that exclusively provides [`Event`] subscription capabilities, /// without any other RPC method support. #[async_trait] pub trait SubscriptionClient { /// `/subscribe`: subscribe to receive events produced by the given query. async fn subscribe(&self, query: Query) -> Result<Subscription>; /// `/unsubscribe`: unsubscribe from events relating to the given query. /// /// This method is particularly useful when you want to terminate multiple /// [`Subscription`]s to the same [`Query`] simultaneously, or if you've /// joined multiple `Subscription`s together using [`select_all`] and you /// no longer have access to the individual `Subscription` instances to /// terminate them separately. /// /// [`select_all`]: https://docs.rs/futures/*/futures/stream/fn.select_all.html async fn unsubscribe(&self, query: Query) -> Result<()>; /// Subscription clients will usually have long-running underlying /// transports that will need to be closed at some point. fn close(self) -> Result<()>; } pub(crate) type SubscriptionTx = ChannelTx<Result<Event>>; pub(crate) type SubscriptionRx = ChannelRx<Result<Event>>; /// An interface that can be used to asynchronously receive [`Event`]s for a /// particular subscription. /// /// ## Examples /// /// ``` /// use tendermint_rpc::Subscription; /// use futures::StreamExt; /// /// /// Prints `count` events from the given subscription. /// async fn print_events(subs: &mut Subscription, count: usize) { /// let mut counter = 0_usize; /// while let Some(res) = subs.next().await { /// // Technically, a subscription produces `Result<Event, Error>` /// // instances. Errors can be produced by the remote endpoint at any /// // time and need to be handled here. /// let ev = res.unwrap(); /// println!("Got incoming event: {:?}", ev); /// counter += 1; /// if counter >= count { /// break /// } /// } /// } /// ``` #[pin_project] #[derive(Debug)] pub struct Subscription { // A unique identifier for this subscription. id: String, // The query for which events will be produced. query: Query, // Our internal result event receiver for this subscription. #[pin] rx: SubscriptionRx, } impl Stream for Subscription { type Item = Result<Event>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { self.project().rx.poll_next(cx) } } impl Subscription { pub(crate) fn new(id: String, query: Query, rx: SubscriptionRx) -> Self { Self { id, query, rx } } /// Return this subscription's ID for informational purposes. pub fn id(&self) -> &str { &self.id } pub fn query(&self) -> &Query { &self.query } }