1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
use std::pin::Pin;
use std::sync::Arc;
use async_stream::try_stream;
use celestia_rpc::HeaderClient;
use futures_util::{Stream, StreamExt};
use crate::client::Context;
use crate::types::hash::Hash;
use crate::types::{ExtendedHeader, SyncState};
use crate::Result;
/// Header API for quering bridge nodes.
pub struct HeaderApi {
ctx: Arc<Context>,
}
impl HeaderApi {
pub(crate) fn new(ctx: Arc<Context>) -> HeaderApi {
HeaderApi { ctx }
}
/// Returns the latest header synchronized by the node.
pub async fn head(&self) -> Result<ExtendedHeader> {
let header = self.ctx.rpc.header_local_head().await?;
header.validate()?;
Ok(header)
}
/// Returns the latest header announced in the network.
pub async fn network_head(&self) -> Result<ExtendedHeader> {
let header = self.ctx.rpc.header_network_head().await?;
header.validate()?;
Ok(header)
}
/// Returns the header of the given hash from the node's header store.
pub async fn get_by_hash(&self, hash: Hash) -> Result<ExtendedHeader> {
let header = self.ctx.rpc.header_get_by_hash(hash).await?;
header.validate()?;
Ok(header)
}
/// Returns the header at the given height, if it is
/// currently available.
pub async fn get_by_height(&self, height: u64) -> Result<ExtendedHeader> {
let header = self.ctx.rpc.header_get_by_height(height).await?;
header.validate()?;
Ok(header)
}
/// Returns the given range headers from the node and verifies that they
/// form a confirmed and continuous chain starting from the `from` header.
///
/// The range is exclusive from both sides.
pub async fn get_range_by_height(
&self,
from: &ExtendedHeader,
to: u64,
) -> Result<Vec<ExtendedHeader>> {
from.validate()?;
let headers = self.ctx.rpc.header_get_range_by_height(from, to).await?;
for header in &headers {
header.validate()?;
}
from.verify_adjacent_range(&headers)?;
Ok(headers)
}
/// Blocks until the header at the given height has been synced by the node.
pub async fn wait_for_height(&self, height: u64) -> Result<ExtendedHeader> {
let header = self.ctx.rpc.header_wait_for_height(height).await?;
header.validate()?;
Ok(header)
}
/// Returns the current state of the node's Syncer.
pub async fn sync_state(&self) -> Result<SyncState> {
Ok(self.ctx.rpc.header_sync_state().await?)
}
/// Blocks until the node's Syncer is synced to network head.
pub async fn sync_wait(&self) -> Result<()> {
Ok(self.ctx.rpc.header_sync_wait().await?)
}
/// Subscribe to recent headers from the network.
///
/// Headers will be validated and verified with the one that was
/// previously received.
///
/// # Example
///
/// ```no_run
/// # use futures_util::StreamExt;
/// # use celestia_client::{Client, Result};
/// # async fn docs() -> Result<()> {
/// let client = Client::builder()
/// .rpc_url("ws://localhost:26658")
/// .build()
/// .await?;
///
/// let mut headers_rx = client.header().subscribe().await;
///
/// while let Some(header) = headers_rx.next().await {
/// dbg!(header);
/// }
/// # Ok(())
/// # }
/// ```
pub async fn subscribe(
&self,
) -> Pin<Box<dyn Stream<Item = Result<ExtendedHeader>> + Send + 'static>> {
let ctx = self.ctx.clone();
try_stream! {
let mut prev_header: Option<ExtendedHeader> = None;
let mut subscription = ctx.rpc.header_subscribe().await?;
while let Some(item) = subscription.next().await {
let header = item?;
header.validate()?;
if let Some(ref prev_header) = prev_header {
prev_header.verify_adjacent(&header)?;
}
prev_header = Some(header.clone());
yield header;
}
}
.boxed()
}
}