Skip to main content

celestia_client/
header.rs

1use std::pin::Pin;
2use std::sync::Arc;
3
4use async_stream::try_stream;
5use celestia_rpc::HeaderClient;
6use futures_util::{Stream, StreamExt};
7
8use crate::Result;
9use crate::client::ClientInner;
10use crate::types::hash::Hash;
11use crate::types::{ExtendedHeader, SyncState};
12
13/// Header API for quering bridge nodes.
14pub struct HeaderApi {
15    inner: Arc<ClientInner>,
16}
17
18impl HeaderApi {
19    pub(crate) fn new(inner: Arc<ClientInner>) -> HeaderApi {
20        HeaderApi { inner }
21    }
22
23    /// Returns the latest header synchronized by the node.
24    pub async fn head(&self) -> Result<ExtendedHeader> {
25        let header = self.inner.rpc.header_local_head().await?;
26        header.validate()?;
27        Ok(header)
28    }
29
30    /// Returns the latest header announced in the network.
31    pub async fn network_head(&self) -> Result<ExtendedHeader> {
32        let header = self.inner.rpc.header_network_head().await?;
33        header.validate()?;
34        Ok(header)
35    }
36
37    /// Returns the header of the given hash from the node's header store.
38    pub async fn get_by_hash(&self, hash: Hash) -> Result<ExtendedHeader> {
39        let header = self.inner.rpc.header_get_by_hash(hash).await?;
40        header.validate()?;
41        Ok(header)
42    }
43
44    /// Returns the header at the given height, if it is
45    /// currently available.
46    pub async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader> {
47        let header = self.inner.rpc.header_get_by_height(height).await?;
48        header.validate()?;
49        Ok(header)
50    }
51
52    /// Returns the given range headers from the node and verifies that they
53    /// form a confirmed and continuous chain starting from the `from` header.
54    ///
55    /// The range is exclusive from both sides.
56    pub async fn get_range_by_height(
57        &self,
58        from: &ExtendedHeader,
59        to: u64,
60    ) -> Result<Vec<ExtendedHeader>> {
61        from.validate()?;
62
63        let headers = self
64            .inner
65            .rpc
66            .header_get_range_by_height(from.clone(), to)
67            .await?;
68
69        for header in &headers {
70            header.validate()?;
71        }
72
73        from.verify_adjacent_range(&headers)?;
74
75        Ok(headers)
76    }
77
78    /// Blocks until the header at the given height has been synced by the node.
79    pub async fn wait_for_height(&self, height: u64) -> Result<ExtendedHeader> {
80        let header = self.inner.rpc.header_wait_for_height(height).await?;
81        header.validate()?;
82        Ok(header)
83    }
84
85    /// Returns the current state of the node's Syncer.
86    pub async fn sync_state(&self) -> Result<SyncState> {
87        Ok(self.inner.rpc.header_sync_state().await?)
88    }
89
90    /// Blocks until the node's Syncer is synced to network head.
91    pub async fn sync_wait(&self) -> Result<()> {
92        Ok(self.inner.rpc.header_sync_wait().await?)
93    }
94
95    /// Subscribe to recent headers from the network.
96    ///
97    /// Headers will be validated and verified with the one that was
98    /// previously received.
99    ///
100    /// # Example
101    ///
102    /// ```no_run
103    /// # use futures_util::StreamExt;
104    /// # use celestia_client::{Client, Result};
105    /// # async fn docs() -> Result<()> {
106    /// let client = Client::builder()
107    ///     .rpc_url("ws://localhost:26658")
108    ///     .build()
109    ///     .await?;
110    ///
111    /// let mut headers_rx = client.header().subscribe();
112    ///
113    /// while let Some(header) = headers_rx.next().await {
114    ///     dbg!(header);
115    /// }
116    /// # Ok(())
117    /// # }
118    /// ```
119    pub fn subscribe(
120        &self,
121    ) -> Pin<Box<dyn Stream<Item = Result<ExtendedHeader>> + Send + 'static>> {
122        let inner = self.inner.clone();
123
124        // we need to re-stream it to map error and satisfy 'static
125        try_stream! {
126            let mut subscription = inner.rpc.header_subscribe();
127            while let Some(item) = subscription.next().await {
128                yield item?;
129            }
130        }
131        .boxed()
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    use crate::test_utils::ensure_serializable_deserializable;
140
141    #[allow(dead_code)]
142    #[allow(unused_variables)]
143    #[allow(unreachable_code)]
144    #[allow(clippy::diverging_sub_expression)]
145    async fn enforce_serde_bounds() {
146        // intentionally no-run, compile only test
147        let api = HeaderApi::new(unimplemented!());
148
149        let _: () = api.sync_wait().await.unwrap();
150
151        ensure_serializable_deserializable(api.head().await.unwrap());
152
153        ensure_serializable_deserializable(api.network_head().await.unwrap());
154
155        let hash = ensure_serializable_deserializable(unimplemented!());
156        ensure_serializable_deserializable(api.get_by_hash(hash).await.unwrap());
157
158        ensure_serializable_deserializable(api.get_by_height(0).await.unwrap());
159
160        let header = ensure_serializable_deserializable(unimplemented!());
161        ensure_serializable_deserializable(api.get_range_by_height(&header, 0).await.unwrap());
162
163        ensure_serializable_deserializable(api.wait_for_height(0).await.unwrap());
164
165        ensure_serializable_deserializable(api.sync_state().await.unwrap());
166
167        ensure_serializable_deserializable(api.subscribe().next().await.unwrap().unwrap());
168    }
169}