1use std::future::Future;
2use std::marker::{Send, Sync};
3use std::pin::Pin;
4
5use async_stream::try_stream;
6use celestia_types::hash::Hash;
7use celestia_types::{ExtendedHeader, SyncState};
8use futures_util::{Stream, StreamExt};
9use jsonrpsee::core::client::{ClientT, Error, SubscriptionClientT};
10
11use jsonrpsee::proc_macros::rpc;
12
13use crate::custom_client_error;
14
15mod rpc {
16 use jsonrpsee::core::{RpcResult, SubscriptionResult};
17
18 use super::*;
19
20 #[rpc(client, server, namespace = "header", namespace_separator = ".")]
22 pub trait Header {
23 #[method(name = "GetByHash")]
25 async fn header_get_by_hash(&self, hash: Hash) -> RpcResult<ExtendedHeader>;
26
27 #[method(name = "GetByHeight")]
29 async fn header_get_by_height(&self, height: u64) -> RpcResult<ExtendedHeader>;
30
31 #[method(name = "GetRangeByHeight")]
33 async fn header_get_range_by_height(
34 &self,
35 from: ExtendedHeader,
36 to: u64,
37 ) -> RpcResult<Vec<ExtendedHeader>>;
38
39 #[method(name = "LocalHead")]
41 async fn header_local_head(&self) -> RpcResult<ExtendedHeader>;
42
43 #[method(name = "NetworkHead")]
45 async fn header_network_head(&self) -> RpcResult<ExtendedHeader>;
46
47 #[method(name = "SyncState")]
49 async fn header_sync_state(&self) -> RpcResult<SyncState>;
50
51 #[method(name = "SyncWait")]
53 async fn header_sync_wait(&self) -> RpcResult<()>;
54
55 #[method(name = "WaitForHeight")]
57 async fn header_wait_for_height(&self, height: u64) -> RpcResult<ExtendedHeader>;
58 }
59
60 #[rpc(client, server, namespace = "header", namespace_separator = ".")]
62 pub trait HeaderSubscription {
63 #[subscription(name = "Subscribe", unsubscribe = "Unsubscribe", item = ExtendedHeader)]
65 async fn header_subscribe(&self) -> SubscriptionResult;
66 }
67}
68
69pub trait HeaderClient: ClientT {
71 fn header_get_by_hash<'a, 'fut>(
73 &'a self,
74 hash: Hash,
75 ) -> impl Future<Output = Result<ExtendedHeader, Error>> + Send + 'fut
76 where
77 'a: 'fut,
78 Self: Sized + Sync + 'fut,
79 {
80 rpc::HeaderClient::header_get_by_hash(self, hash)
81 }
82
83 fn header_get_by_height<'a, 'fut>(
85 &'a self,
86 height: u64,
87 ) -> impl Future<Output = Result<ExtendedHeader, Error>> + Send + 'fut
88 where
89 'a: 'fut,
90 Self: Sized + Sync + 'fut,
91 {
92 rpc::HeaderClient::header_get_by_height(self, height)
93 }
94
95 fn header_get_range_by_height<'a, 'b, 'fut>(
97 &'a self,
98 from: ExtendedHeader,
99 to: u64,
100 ) -> impl Future<Output = Result<Vec<ExtendedHeader>, Error>> + Send + 'fut
101 where
102 'a: 'fut,
103 'b: 'fut,
104 Self: Sized + Sync + 'fut,
105 {
106 rpc::HeaderClient::header_get_range_by_height(self, from, to)
107 }
108
109 fn header_local_head<'a, 'fut>(
111 &'a self,
112 ) -> impl Future<Output = Result<ExtendedHeader, Error>> + Send + 'fut
113 where
114 'a: 'fut,
115 Self: Sized + Sync + 'fut,
116 {
117 rpc::HeaderClient::header_local_head(self)
118 }
119
120 fn header_network_head<'a, 'fut>(
122 &'a self,
123 ) -> impl Future<Output = Result<ExtendedHeader, Error>> + Send + 'fut
124 where
125 'a: 'fut,
126 Self: Sized + Sync + 'fut,
127 {
128 rpc::HeaderClient::header_network_head(self)
129 }
130
131 fn header_subscribe<'a>(
138 &'a self,
139 ) -> Pin<Box<dyn Stream<Item = Result<ExtendedHeader, Error>> + Send + 'a>>
140 where
141 Self: SubscriptionClientT + Sized + Sync,
142 {
143 try_stream! {
144 let mut head = rpc::HeaderClient::header_local_head(self).await?;
145 head.validate().map_err(custom_client_error)?;
146
147 let mut real_subscription = match rpc::HeaderSubscriptionClient::header_subscribe(self).await {
148 Ok(subscription) => Ok(Some(subscription)),
149 Err(Error::HttpNotImplemented) => Ok(None),
150 Err(e) => Err(e)
151 }?;
152
153 loop {
154 let header = match &mut real_subscription {
155 Some(subscription) => subscription
156 .next()
157 .await
158 .ok_or_else(|| custom_client_error("unexpected end of stream"))??,
159 None => rpc::HeaderClient::header_wait_for_height(self, head.height() + 1).await?,
160 };
161
162 header.validate().map_err(custom_client_error)?;
163 head.verify_adjacent(&header).map_err(custom_client_error)?;
164
165 head = header.clone();
166 yield header;
167 }
168 }
169 .boxed()
170 }
171
172 fn header_sync_state<'a, 'fut>(
174 &'a self,
175 ) -> impl Future<Output = Result<SyncState, Error>> + Send + 'fut
176 where
177 'a: 'fut,
178 Self: Sized + Sync + 'fut,
179 {
180 rpc::HeaderClient::header_sync_state(self)
181 }
182
183 fn header_sync_wait<'a, 'fut>(&'a self) -> impl Future<Output = Result<(), Error>> + Send + 'fut
185 where
186 'a: 'fut,
187 Self: Sized + Sync + 'fut,
188 {
189 rpc::HeaderClient::header_sync_wait(self)
190 }
191
192 fn header_wait_for_height<'a, 'fut>(
194 &'a self,
195 height: u64,
196 ) -> impl Future<Output = Result<ExtendedHeader, Error>> + Send + 'fut
197 where
198 'a: 'fut,
199 Self: Sized + Sync + 'fut,
200 {
201 rpc::HeaderClient::header_wait_for_height(self, height)
202 }
203}
204
205impl<T> HeaderClient for T where T: ClientT {}
206
207pub trait HeaderServer: rpc::HeaderServer + rpc::HeaderSubscriptionServer {}
209
210impl<T> HeaderServer for T where T: rpc::HeaderServer + rpc::HeaderSubscriptionServer {}
211
212pub use rpc::HeaderServer as HeaderRpcServer;
213pub use rpc::HeaderSubscriptionServer as HeaderSubscriptionRpcServer;