Skip to main content

celestia_rpc/
header.rs

1use std::future::Future;
2use std::marker::{Send, Sync};
3use std::pin::Pin;
4
5use async_stream::try_stream;
6use celestia_types::hash::Hash;
7use celestia_types::{ExtendedHeader, SyncState};
8use futures_util::{Stream, StreamExt};
9use jsonrpsee::core::client::{ClientT, Error, SubscriptionClientT};
10
11use jsonrpsee::proc_macros::rpc;
12
13use crate::custom_client_error;
14
15mod rpc {
16    use jsonrpsee::core::{RpcResult, SubscriptionResult};
17
18    use super::*;
19
20    /// Header RPC methods.
21    #[rpc(client, server, namespace = "header", namespace_separator = ".")]
22    pub trait Header {
23        /// See [`crate::HeaderClient::header_get_by_hash`].
24        #[method(name = "GetByHash")]
25        async fn header_get_by_hash(&self, hash: Hash) -> RpcResult<ExtendedHeader>;
26
27        /// See [`crate::HeaderClient::header_get_by_height`].
28        #[method(name = "GetByHeight")]
29        async fn header_get_by_height(&self, height: u64) -> RpcResult<ExtendedHeader>;
30
31        /// See [`crate::HeaderClient::header_get_range_by_height`].
32        #[method(name = "GetRangeByHeight")]
33        async fn header_get_range_by_height(
34            &self,
35            from: ExtendedHeader,
36            to: u64,
37        ) -> RpcResult<Vec<ExtendedHeader>>;
38
39        /// See [`crate::HeaderClient::header_local_head`].
40        #[method(name = "LocalHead")]
41        async fn header_local_head(&self) -> RpcResult<ExtendedHeader>;
42
43        /// See [`crate::HeaderClient::header_network_head`].
44        #[method(name = "NetworkHead")]
45        async fn header_network_head(&self) -> RpcResult<ExtendedHeader>;
46
47        /// See [`crate::HeaderClient::header_sync_state`].
48        #[method(name = "SyncState")]
49        async fn header_sync_state(&self) -> RpcResult<SyncState>;
50
51        /// See [`crate::HeaderClient::header_sync_wait`].
52        #[method(name = "SyncWait")]
53        async fn header_sync_wait(&self) -> RpcResult<()>;
54
55        /// See [`crate::HeaderClient::header_wait_for_height`].
56        #[method(name = "WaitForHeight")]
57        async fn header_wait_for_height(&self, height: u64) -> RpcResult<ExtendedHeader>;
58    }
59
60    /// Header subscription RPC methods.
61    #[rpc(client, server, namespace = "header", namespace_separator = ".")]
62    pub trait HeaderSubscription {
63        /// See [`crate::HeaderClient::header_subscribe`].
64        #[subscription(name = "Subscribe", unsubscribe = "Unsubscribe", item = ExtendedHeader)]
65        async fn header_subscribe(&self) -> SubscriptionResult;
66    }
67}
68
69/// Client implementation for the `Header` RPC API.
70pub trait HeaderClient: ClientT {
71    /// GetByHash returns the header of the given hash from the node's header store.
72    fn header_get_by_hash<'a, 'fut>(
73        &'a self,
74        hash: Hash,
75    ) -> impl Future<Output = Result<ExtendedHeader, Error>> + Send + 'fut
76    where
77        'a: 'fut,
78        Self: Sized + Sync + 'fut,
79    {
80        rpc::HeaderClient::header_get_by_hash(self, hash)
81    }
82
83    /// GetByHeight returns the ExtendedHeader at the given height if it is currently available.
84    fn header_get_by_height<'a, 'fut>(
85        &'a self,
86        height: u64,
87    ) -> impl Future<Output = Result<ExtendedHeader, Error>> + Send + 'fut
88    where
89        'a: 'fut,
90        Self: Sized + Sync + 'fut,
91    {
92        rpc::HeaderClient::header_get_by_height(self, height)
93    }
94
95    /// GetRangeByHeight returns the given range (from:to) of ExtendedHeaders from the node's header store and verifies that the returned headers are adjacent to each other.
96    fn header_get_range_by_height<'a, 'b, 'fut>(
97        &'a self,
98        from: ExtendedHeader,
99        to: u64,
100    ) -> impl Future<Output = Result<Vec<ExtendedHeader>, Error>> + Send + 'fut
101    where
102        'a: 'fut,
103        'b: 'fut,
104        Self: Sized + Sync + 'fut,
105    {
106        rpc::HeaderClient::header_get_range_by_height(self, from, to)
107    }
108
109    /// LocalHead returns the ExtendedHeader of the chain head.
110    fn header_local_head<'a, 'fut>(
111        &'a self,
112    ) -> impl Future<Output = Result<ExtendedHeader, Error>> + Send + 'fut
113    where
114        'a: 'fut,
115        Self: Sized + Sync + 'fut,
116    {
117        rpc::HeaderClient::header_local_head(self)
118    }
119
120    /// NetworkHead provides the Syncer's view of the current network head.
121    fn header_network_head<'a, 'fut>(
122        &'a self,
123    ) -> impl Future<Output = Result<ExtendedHeader, Error>> + Send + 'fut
124    where
125        'a: 'fut,
126        Self: Sized + Sync + 'fut,
127    {
128        rpc::HeaderClient::header_network_head(self)
129    }
130
131    /// Subscribe to recent ExtendedHeaders from the network.
132    ///
133    /// # Notes
134    ///
135    /// If client returns [`Error::HttpNotImplemented`], the subscription will fallback to
136    /// using [`HeaderClient::header_wait_for_height`] for streaming the headers.
137    fn header_subscribe<'a>(
138        &'a self,
139    ) -> Pin<Box<dyn Stream<Item = Result<ExtendedHeader, Error>> + Send + 'a>>
140    where
141        Self: SubscriptionClientT + Sized + Sync,
142    {
143        try_stream! {
144            let mut head = rpc::HeaderClient::header_local_head(self).await?;
145            head.validate().map_err(custom_client_error)?;
146
147            let mut real_subscription = match rpc::HeaderSubscriptionClient::header_subscribe(self).await {
148                Ok(subscription) => Ok(Some(subscription)),
149                Err(Error::HttpNotImplemented) => Ok(None),
150                Err(e) => Err(e)
151            }?;
152
153            loop {
154                let header = match &mut real_subscription {
155                    Some(subscription) => subscription
156                        .next()
157                        .await
158                        .ok_or_else(|| custom_client_error("unexpected end of stream"))??,
159                    None => rpc::HeaderClient::header_wait_for_height(self, head.height() + 1).await?,
160                };
161
162                header.validate().map_err(custom_client_error)?;
163                head.verify_adjacent(&header).map_err(custom_client_error)?;
164
165                head = header.clone();
166                yield header;
167            }
168        }
169        .boxed()
170    }
171
172    /// SyncState returns the current state of the header Syncer.
173    fn header_sync_state<'a, 'fut>(
174        &'a self,
175    ) -> impl Future<Output = Result<SyncState, Error>> + Send + 'fut
176    where
177        'a: 'fut,
178        Self: Sized + Sync + 'fut,
179    {
180        rpc::HeaderClient::header_sync_state(self)
181    }
182
183    /// SyncWait blocks until the header Syncer is synced to network head.
184    fn header_sync_wait<'a, 'fut>(&'a self) -> impl Future<Output = Result<(), Error>> + Send + 'fut
185    where
186        'a: 'fut,
187        Self: Sized + Sync + 'fut,
188    {
189        rpc::HeaderClient::header_sync_wait(self)
190    }
191
192    /// WaitForHeight blocks until the header at the given height has been processed by the store or context deadline is exceeded.
193    fn header_wait_for_height<'a, 'fut>(
194        &'a self,
195        height: u64,
196    ) -> impl Future<Output = Result<ExtendedHeader, Error>> + Send + 'fut
197    where
198        'a: 'fut,
199        Self: Sized + Sync + 'fut,
200    {
201        rpc::HeaderClient::header_wait_for_height(self, height)
202    }
203}
204
205impl<T> HeaderClient for T where T: ClientT {}
206
207/// Server trait for Header RPC endpoints.
208pub trait HeaderServer: rpc::HeaderServer + rpc::HeaderSubscriptionServer {}
209
210impl<T> HeaderServer for T where T: rpc::HeaderServer + rpc::HeaderSubscriptionServer {}
211
212pub use rpc::HeaderServer as HeaderRpcServer;
213pub use rpc::HeaderSubscriptionServer as HeaderSubscriptionRpcServer;