hive_rs/api/
blockchain.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use async_stream::try_stream;
5use futures::Stream;
6use serde_json::json;
7
8use crate::client::ClientInner;
9use crate::error::{HiveError, Result};
10use crate::types::{AppliedOperation, BlockHeader, DynamicGlobalProperties, SignedBlock};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
13pub enum BlockchainMode {
14 #[default]
15 Irreversible,
16 Latest,
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
20pub struct BlockchainStreamOptions {
21 pub from: Option<u32>,
22 pub to: Option<u32>,
23 pub mode: BlockchainMode,
24}
25
26#[derive(Debug, Clone)]
27pub struct Blockchain {
28 client: Arc<ClientInner>,
29}
30
31impl Blockchain {
32 pub(crate) fn new(client: Arc<ClientInner>) -> Self {
33 Self { client }
34 }
35
36 pub async fn get_current_block_num(&self, mode: BlockchainMode) -> Result<u32> {
37 let props: DynamicGlobalProperties = self
38 .client
39 .call("condenser_api", "get_dynamic_global_properties", json!([]))
40 .await?;
41
42 Ok(match mode {
43 BlockchainMode::Irreversible => props.last_irreversible_block_num,
44 BlockchainMode::Latest => props.head_block_number,
45 })
46 }
47
48 pub async fn get_current_block_header(&self, mode: BlockchainMode) -> Result<BlockHeader> {
49 let block_num = self.get_current_block_num(mode).await?;
50 let header: Option<BlockHeader> = self
51 .client
52 .call("condenser_api", "get_block_header", json!([block_num]))
53 .await?;
54
55 header.ok_or_else(|| {
56 HiveError::Serialization(format!("block header {block_num} not returned by node"))
57 })
58 }
59
60 pub async fn get_current_block(&self, mode: BlockchainMode) -> Result<SignedBlock> {
61 let block_num = self.get_current_block_num(mode).await?;
62 let block: Option<SignedBlock> = self
63 .client
64 .call("condenser_api", "get_block", json!([block_num]))
65 .await?;
66
67 block.ok_or_else(|| {
68 HiveError::Serialization(format!("block {block_num} not returned by node"))
69 })
70 }
71
72 pub fn get_block_numbers(
73 &self,
74 options: BlockchainStreamOptions,
75 ) -> impl Stream<Item = Result<u32>> + '_ {
76 try_stream! {
77 let interval = Duration::from_secs(3);
78 let mut current = self.get_current_block_num(options.mode).await?;
79 if let Some(from) = options.from {
80 if from > current {
81 Err(HiveError::Other(format!(
82 "from cannot be larger than current block num ({current})"
83 )))?;
84 }
85 }
86
87 let mut seen = options.from.unwrap_or(current);
88 loop {
89 while current > seen {
90 let next = seen;
91 seen = seen.saturating_add(1);
92 yield next;
93
94 if let Some(to) = options.to {
95 if seen > to {
96 return;
97 }
98 }
99 }
100
101 tokio::time::sleep(interval).await;
102 current = self.get_current_block_num(options.mode).await?;
103 }
104 }
105 }
106
107 pub fn get_blocks(
108 &self,
109 options: BlockchainStreamOptions,
110 ) -> impl Stream<Item = Result<SignedBlock>> + '_ {
111 try_stream! {
112 let numbers = self.get_block_numbers(options);
113 futures::pin_mut!(numbers);
114
115 while let Some(number_result) = futures::StreamExt::next(&mut numbers).await {
116 let number = number_result?;
117 let block: Option<SignedBlock> = self
118 .client
119 .call("condenser_api", "get_block", json!([number]))
120 .await?;
121 if let Some(block) = block {
122 yield block;
123 }
124 }
125 }
126 }
127
128 pub fn get_operations(
129 &self,
130 options: BlockchainStreamOptions,
131 ) -> impl Stream<Item = Result<AppliedOperation>> + '_ {
132 try_stream! {
133 let numbers = self.get_block_numbers(options);
134 futures::pin_mut!(numbers);
135
136 while let Some(number_result) = futures::StreamExt::next(&mut numbers).await {
137 let number = number_result?;
138 let operations: Vec<AppliedOperation> = self
139 .client
140 .call("condenser_api", "get_ops_in_block", json!([number, false]))
141 .await?;
142 for op in operations {
143 yield op;
144 }
145 }
146 }
147 }
148}
149
150#[cfg(test)]
151mod tests {
152 use std::sync::Arc;
153 use std::time::Duration;
154
155 use serde_json::json;
156 use wiremock::matchers::method;
157 use wiremock::{Mock, MockServer, ResponseTemplate};
158
159 use crate::api::{Blockchain, BlockchainMode};
160 use crate::client::{ClientInner, ClientOptions};
161 use crate::transport::{BackoffStrategy, FailoverTransport};
162
163 #[tokio::test]
164 async fn current_block_num_uses_requested_mode() {
165 let server = MockServer::start().await;
166
167 Mock::given(method("POST"))
168 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
169 "id": 0,
170 "jsonrpc": "2.0",
171 "result": {
172 "head_block_number": 100,
173 "head_block_id": "0000006400112233445566778899aabbccddeeff00112233445566778899aabb",
174 "time": "2024-01-01T00:00:00",
175 "last_irreversible_block_num": 95
176 }
177 })))
178 .mount(&server)
179 .await;
180
181 let transport = Arc::new(
182 FailoverTransport::new(
183 &[server.uri()],
184 Duration::from_secs(2),
185 1,
186 BackoffStrategy::default(),
187 )
188 .expect("transport should initialize"),
189 );
190
191 let inner = Arc::new(ClientInner::new(transport, ClientOptions::default()));
192 let blockchain = Blockchain::new(inner);
193
194 let irreversible = blockchain
195 .get_current_block_num(BlockchainMode::Irreversible)
196 .await
197 .expect("request should succeed");
198 let latest = blockchain
199 .get_current_block_num(BlockchainMode::Latest)
200 .await
201 .expect("request should succeed");
202
203 assert_eq!(irreversible, 95);
204 assert_eq!(latest, 100);
205 }
206}