1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
//! Subscription- and subscription management-related functionality.

use crate::client::sync::{ChannelRx, ChannelTx};
use crate::event::Event;
use crate::query::Query;
use crate::Result;
use async_trait::async_trait;
use futures::task::{Context, Poll};
use futures::Stream;
use pin_project::pin_project;
use std::pin::Pin;

/// A client that exclusively provides [`Event`] subscription capabilities,
/// without any other RPC method support.
#[async_trait]
pub trait SubscriptionClient {
    /// `/subscribe`: subscribe to receive events produced by the given query.
    async fn subscribe(&self, query: Query) -> Result<Subscription>;

    /// `/unsubscribe`: unsubscribe from events relating to the given query.
    ///
    /// This method is particularly useful when you want to terminate multiple
    /// [`Subscription`]s to the same [`Query`] simultaneously, or if you've
    /// joined multiple `Subscription`s together using [`select_all`] and you
    /// no longer have access to the individual `Subscription` instances to
    /// terminate them separately.
    ///
    /// [`select_all`]: https://docs.rs/futures/*/futures/stream/fn.select_all.html
    async fn unsubscribe(&self, query: Query) -> Result<()>;

    /// Subscription clients will usually have long-running underlying
    /// transports that will need to be closed at some point.
    fn close(self) -> Result<()>;
}

pub(crate) type SubscriptionTx = ChannelTx<Result<Event>>;
pub(crate) type SubscriptionRx = ChannelRx<Result<Event>>;

/// An interface that can be used to asynchronously receive [`Event`]s for a
/// particular subscription.
///
/// ## Examples
///
/// ```
/// use tendermint_rpc::Subscription;
/// use futures::StreamExt;
///
/// /// Prints `count` events from the given subscription.
/// async fn print_events(subs: &mut Subscription, count: usize) {
///     let mut counter = 0_usize;
///     while let Some(res) = subs.next().await {
///         // Technically, a subscription produces `Result<Event, Error>`
///         // instances. Errors can be produced by the remote endpoint at any
///         // time and need to be handled here.
///         let ev = res.unwrap();
///         println!("Got incoming event: {:?}", ev);
///         counter += 1;
///         if counter >= count {
///             break
///         }
///     }
/// }
/// ```
#[pin_project]
#[derive(Debug)]
pub struct Subscription {
    // A unique identifier for this subscription.
    id: String,
    // The query for which events will be produced.
    query: Query,
    // Our internal result event receiver for this subscription.
    #[pin]
    rx: SubscriptionRx,
}

impl Stream for Subscription {
    type Item = Result<Event>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.project().rx.poll_next(cx)
    }
}

impl Subscription {
    pub(crate) fn new(id: String, query: Query, rx: SubscriptionRx) -> Self {
        Self { id, query, rx }
    }

    /// Return this subscription's ID for informational purposes.
    pub fn id(&self) -> &str {
        &self.id
    }

    pub fn query(&self) -> &Query {
        &self.query
    }
}