celestia_client/
header.rs1use std::pin::Pin;
2use std::sync::Arc;
3
4use async_stream::try_stream;
5use celestia_rpc::HeaderClient;
6use futures_util::{Stream, StreamExt};
7
8use crate::Result;
9use crate::client::ClientInner;
10use crate::types::hash::Hash;
11use crate::types::{ExtendedHeader, SyncState};
12
13pub struct HeaderApi {
15 inner: Arc<ClientInner>,
16}
17
18impl HeaderApi {
19 pub(crate) fn new(inner: Arc<ClientInner>) -> HeaderApi {
20 HeaderApi { inner }
21 }
22
23 pub async fn head(&self) -> Result<ExtendedHeader> {
25 let header = self.inner.rpc.header_local_head().await?;
26 header.validate()?;
27 Ok(header)
28 }
29
30 pub async fn network_head(&self) -> Result<ExtendedHeader> {
32 let header = self.inner.rpc.header_network_head().await?;
33 header.validate()?;
34 Ok(header)
35 }
36
37 pub async fn get_by_hash(&self, hash: Hash) -> Result<ExtendedHeader> {
39 let header = self.inner.rpc.header_get_by_hash(hash).await?;
40 header.validate()?;
41 Ok(header)
42 }
43
44 pub async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader> {
47 let header = self.inner.rpc.header_get_by_height(height).await?;
48 header.validate()?;
49 Ok(header)
50 }
51
52 pub async fn get_range_by_height(
57 &self,
58 from: &ExtendedHeader,
59 to: u64,
60 ) -> Result<Vec<ExtendedHeader>> {
61 from.validate()?;
62
63 let headers = self
64 .inner
65 .rpc
66 .header_get_range_by_height(from.clone(), to)
67 .await?;
68
69 for header in &headers {
70 header.validate()?;
71 }
72
73 from.verify_adjacent_range(&headers)?;
74
75 Ok(headers)
76 }
77
78 pub async fn wait_for_height(&self, height: u64) -> Result<ExtendedHeader> {
80 let header = self.inner.rpc.header_wait_for_height(height).await?;
81 header.validate()?;
82 Ok(header)
83 }
84
85 pub async fn sync_state(&self) -> Result<SyncState> {
87 Ok(self.inner.rpc.header_sync_state().await?)
88 }
89
90 pub async fn sync_wait(&self) -> Result<()> {
92 Ok(self.inner.rpc.header_sync_wait().await?)
93 }
94
95 pub fn subscribe(
120 &self,
121 ) -> Pin<Box<dyn Stream<Item = Result<ExtendedHeader>> + Send + 'static>> {
122 let inner = self.inner.clone();
123
124 try_stream! {
126 let mut subscription = inner.rpc.header_subscribe();
127 while let Some(item) = subscription.next().await {
128 yield item?;
129 }
130 }
131 .boxed()
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 use crate::test_utils::ensure_serializable_deserializable;
140
141 #[allow(dead_code)]
142 #[allow(unused_variables)]
143 #[allow(unreachable_code)]
144 #[allow(clippy::diverging_sub_expression)]
145 async fn enforce_serde_bounds() {
146 let api = HeaderApi::new(unimplemented!());
148
149 let _: () = api.sync_wait().await.unwrap();
150
151 ensure_serializable_deserializable(api.head().await.unwrap());
152
153 ensure_serializable_deserializable(api.network_head().await.unwrap());
154
155 let hash = ensure_serializable_deserializable(unimplemented!());
156 ensure_serializable_deserializable(api.get_by_hash(hash).await.unwrap());
157
158 ensure_serializable_deserializable(api.get_by_height(0).await.unwrap());
159
160 let header = ensure_serializable_deserializable(unimplemented!());
161 ensure_serializable_deserializable(api.get_range_by_height(&header, 0).await.unwrap());
162
163 ensure_serializable_deserializable(api.wait_for_height(0).await.unwrap());
164
165 ensure_serializable_deserializable(api.sync_state().await.unwrap());
166
167 ensure_serializable_deserializable(api.subscribe().next().await.unwrap().unwrap());
168 }
169}