Skip to main content

hive_rs/api/
blockchain.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use async_stream::try_stream;
5use futures::Stream;
6use serde_json::json;
7
8use crate::client::ClientInner;
9use crate::error::{HiveError, Result};
10use crate::types::{AppliedOperation, BlockHeader, DynamicGlobalProperties, SignedBlock};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
13pub enum BlockchainMode {
14    #[default]
15    Irreversible,
16    Latest,
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
20pub struct BlockchainStreamOptions {
21    pub from: Option<u32>,
22    pub to: Option<u32>,
23    pub mode: BlockchainMode,
24}
25
26#[derive(Debug, Clone)]
27pub struct Blockchain {
28    client: Arc<ClientInner>,
29}
30
31impl Blockchain {
32    pub(crate) fn new(client: Arc<ClientInner>) -> Self {
33        Self { client }
34    }
35
36    pub async fn get_current_block_num(&self, mode: BlockchainMode) -> Result<u32> {
37        let props: DynamicGlobalProperties = self
38            .client
39            .call("condenser_api", "get_dynamic_global_properties", json!([]))
40            .await?;
41
42        Ok(match mode {
43            BlockchainMode::Irreversible => props.last_irreversible_block_num,
44            BlockchainMode::Latest => props.head_block_number,
45        })
46    }
47
48    pub async fn get_current_block_header(&self, mode: BlockchainMode) -> Result<BlockHeader> {
49        let block_num = self.get_current_block_num(mode).await?;
50        let header: Option<BlockHeader> = self
51            .client
52            .call("condenser_api", "get_block_header", json!([block_num]))
53            .await?;
54
55        header.ok_or_else(|| {
56            HiveError::Serialization(format!("block header {block_num} not returned by node"))
57        })
58    }
59
60    pub async fn get_current_block(&self, mode: BlockchainMode) -> Result<SignedBlock> {
61        let block_num = self.get_current_block_num(mode).await?;
62        let block: Option<SignedBlock> = self
63            .client
64            .call("condenser_api", "get_block", json!([block_num]))
65            .await?;
66
67        block.ok_or_else(|| {
68            HiveError::Serialization(format!("block {block_num} not returned by node"))
69        })
70    }
71
72    pub fn get_block_numbers(
73        &self,
74        options: BlockchainStreamOptions,
75    ) -> impl Stream<Item = Result<u32>> + '_ {
76        try_stream! {
77            let interval = Duration::from_secs(3);
78            let mut current = self.get_current_block_num(options.mode).await?;
79            if let Some(from) = options.from {
80                if from > current {
81                    Err(HiveError::Other(format!(
82                        "from cannot be larger than current block num ({current})"
83                    )))?;
84                }
85            }
86
87            let mut seen = options.from.unwrap_or(current);
88            loop {
89                while current > seen {
90                    let next = seen;
91                    seen = seen.saturating_add(1);
92                    yield next;
93
94                    if let Some(to) = options.to {
95                        if seen > to {
96                            return;
97                        }
98                    }
99                }
100
101                tokio::time::sleep(interval).await;
102                current = self.get_current_block_num(options.mode).await?;
103            }
104        }
105    }
106
107    pub fn get_blocks(
108        &self,
109        options: BlockchainStreamOptions,
110    ) -> impl Stream<Item = Result<SignedBlock>> + '_ {
111        try_stream! {
112            let numbers = self.get_block_numbers(options);
113            futures::pin_mut!(numbers);
114
115            while let Some(number_result) = futures::StreamExt::next(&mut numbers).await {
116                let number = number_result?;
117                let block: Option<SignedBlock> = self
118                    .client
119                    .call("condenser_api", "get_block", json!([number]))
120                    .await?;
121                if let Some(block) = block {
122                    yield block;
123                }
124            }
125        }
126    }
127
128    pub fn get_operations(
129        &self,
130        options: BlockchainStreamOptions,
131    ) -> impl Stream<Item = Result<AppliedOperation>> + '_ {
132        try_stream! {
133            let numbers = self.get_block_numbers(options);
134            futures::pin_mut!(numbers);
135
136            while let Some(number_result) = futures::StreamExt::next(&mut numbers).await {
137                let number = number_result?;
138                let operations: Vec<AppliedOperation> = self
139                    .client
140                    .call("condenser_api", "get_ops_in_block", json!([number, false]))
141                    .await?;
142                for op in operations {
143                    yield op;
144                }
145            }
146        }
147    }
148}
149
150#[cfg(test)]
151mod tests {
152    use std::sync::Arc;
153    use std::time::Duration;
154
155    use serde_json::json;
156    use wiremock::matchers::method;
157    use wiremock::{Mock, MockServer, ResponseTemplate};
158
159    use crate::api::{Blockchain, BlockchainMode};
160    use crate::client::{ClientInner, ClientOptions};
161    use crate::transport::{BackoffStrategy, FailoverTransport};
162
163    #[tokio::test]
164    async fn current_block_num_uses_requested_mode() {
165        let server = MockServer::start().await;
166
167        Mock::given(method("POST"))
168            .respond_with(ResponseTemplate::new(200).set_body_json(json!({
169                "id": 0,
170                "jsonrpc": "2.0",
171                "result": {
172                    "head_block_number": 100,
173                    "head_block_id": "0000006400112233445566778899aabbccddeeff00112233445566778899aabb",
174                    "time": "2024-01-01T00:00:00",
175                    "last_irreversible_block_num": 95
176                }
177            })))
178            .mount(&server)
179            .await;
180
181        let transport = Arc::new(
182            FailoverTransport::new(
183                &[server.uri()],
184                Duration::from_secs(2),
185                1,
186                BackoffStrategy::default(),
187            )
188            .expect("transport should initialize"),
189        );
190
191        let inner = Arc::new(ClientInner::new(transport, ClientOptions::default()));
192        let blockchain = Blockchain::new(inner);
193
194        let irreversible = blockchain
195            .get_current_block_num(BlockchainMode::Irreversible)
196            .await
197            .expect("request should succeed");
198        let latest = blockchain
199            .get_current_block_num(BlockchainMode::Latest)
200            .await
201            .expect("request should succeed");
202
203        assert_eq!(irreversible, 95);
204        assert_eq!(latest, 100);
205    }
206}