layer_climb_core/querier/
stream.rs

1use std::time::Duration;
2
3use crate::prelude::*;
4use futures::Stream;
5use tracing::instrument;
6
7#[derive(Debug, Clone)]
8pub struct BlockEvents {
9    pub height: u64,
10    pub events: Vec<tendermint::abci::Event>,
11}
12
13impl QueryClient {
14    #[instrument]
15    pub async fn stream_block_events(
16        // take by value to avoid lifetime issues
17        // typically this means the caller is cloning the QueryClient
18        self,
19        sleep_duration: Option<Duration>,
20    ) -> Result<impl Stream<Item = Result<BlockEvents>>> {
21        let start_height = self.block_height().await?;
22
23        Ok(futures::stream::unfold(
24            (self, start_height),
25            move |(client, block_height)| async move {
26                match client
27                    .wait_until_block_height(block_height, sleep_duration)
28                    .await
29                {
30                    Ok(_) => match client.fetch_block_events(block_height).await {
31                        Err(err) => Some((Err(err), (client, block_height))),
32                        Ok(events) => Some((
33                            Ok(BlockEvents {
34                                height: block_height,
35                                events,
36                            }),
37                            (client, block_height + 1),
38                        )),
39                    },
40                    Err(err) => Some((Err(err), (client, block_height))),
41                }
42            },
43        ))
44    }
45}