cometbft_rpc/client/subscription.rs
1//! Subscription- and subscription management-related functionality.
2
3use core::pin::Pin;
4
5use async_trait::async_trait;
6use futures::{
7 task::{Context, Poll},
8 Stream,
9};
10use pin_project::pin_project;
11
12use crate::{
13 client::sync::{ChannelRx, ChannelTx},
14 event::Event,
15 prelude::*,
16 query::Query,
17 Error,
18};
19
20/// A client that exclusively provides [`Event`] subscription capabilities,
21/// without any other RPC method support.
22#[async_trait]
23pub trait SubscriptionClient {
24 /// `/subscribe`: subscribe to receive events produced by the given query.
25 async fn subscribe(&self, query: Query) -> Result<Subscription, Error>;
26
27 /// `/unsubscribe`: unsubscribe from events relating to the given query.
28 ///
29 /// This method is particularly useful when you want to terminate multiple
30 /// [`Subscription`]s to the same [`Query`] simultaneously, or if you've
31 /// joined multiple `Subscription`s together using [`select_all`] and you
32 /// no longer have access to the individual `Subscription` instances to
33 /// terminate them separately.
34 ///
35 /// [`select_all`]: https://docs.rs/futures/*/futures/stream/fn.select_all.html
36 async fn unsubscribe(&self, query: Query) -> Result<(), Error>;
37
38 /// Subscription clients will usually have long-running underlying
39 /// transports that will need to be closed at some point.
40 fn close(self) -> Result<(), Error>;
41}
42
43pub(crate) type SubscriptionTx = ChannelTx<Result<Event, Error>>;
44pub(crate) type SubscriptionRx = ChannelRx<Result<Event, Error>>;
45
46/// An interface that can be used to asynchronously receive [`Event`]s for a
47/// particular subscription.
48///
49/// ## Examples
50///
51/// ```
52/// use cometbft_rpc::Subscription;
53/// use futures::StreamExt;
54///
55/// /// Prints `count` events from the given subscription.
56/// async fn print_events(subs: &mut Subscription, count: usize) {
57/// let mut counter = 0_usize;
58/// while let Some(res) = subs.next().await {
59/// // Technically, a subscription produces `Result<Event, Error>`
60/// // instances. Errors can be produced by the remote endpoint at any
61/// // time and need to be handled here.
62/// let ev = res.unwrap();
63/// println!("Got incoming event: {:?}", ev);
64/// counter += 1;
65/// if counter >= count {
66/// break
67/// }
68/// }
69/// }
70/// ```
71#[pin_project]
72#[derive(Debug)]
73pub struct Subscription {
74 // A unique identifier for this subscription.
75 id: String,
76 // The query for which events will be produced.
77 query: Query,
78 // Our internal result event receiver for this subscription.
79 #[pin]
80 rx: SubscriptionRx,
81}
82
83impl Stream for Subscription {
84 type Item = Result<Event, Error>;
85
86 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87 self.project().rx.poll_next(cx)
88 }
89}
90
91impl Subscription {
92 pub(crate) fn new(id: String, query: Query, rx: SubscriptionRx) -> Self {
93 Self { id, query, rx }
94 }
95
96 /// Return this subscription's ID for informational purposes.
97 pub fn id(&self) -> &str {
98 &self.id
99 }
100
101 pub fn query(&self) -> &Query {
102 &self.query
103 }
104}