layer_climb_core/querier/
stream.rs1use std::time::Duration;
2
3use crate::prelude::*;
4use futures::Stream;
5use tracing::instrument;
6
7#[derive(Debug, Clone)]
8pub struct BlockEvents {
9 pub height: u64,
10 pub events: Vec<tendermint::abci::Event>,
11}
12
13impl QueryClient {
14 #[instrument]
15 pub async fn stream_block_events(
16 self,
19 sleep_duration: Option<Duration>,
20 ) -> Result<impl Stream<Item = Result<BlockEvents>>> {
21 let start_height = self.block_height().await?;
22
23 Ok(futures::stream::unfold(
24 (self, start_height),
25 move |(client, block_height)| async move {
26 match client
27 .wait_until_block_height(block_height, sleep_duration)
28 .await
29 {
30 Ok(_) => match client.fetch_block_events(block_height).await {
31 Err(err) => Some((Err(err), (client, block_height))),
32 Ok(events) => Some((
33 Ok(BlockEvents {
34 height: block_height,
35 events,
36 }),
37 (client, block_height + 1),
38 )),
39 },
40 Err(err) => Some((Err(err), (client, block_height))),
41 }
42 },
43 ))
44 }
45}