use std::collections::HashSet;
use std::time::Duration;
use futures::future::try_join_all;
use tokio::time;
use crate::client::{TonClientError, TonClientInterface, TonConnection};
use crate::tl::{BlockId, BlockIdExt, BlocksHeader, BlocksShards};
#[derive(Debug, Clone)]
pub struct BlockStreamItem {
pub master_shard: BlockIdExt,
pub shards: Vec<BlockIdExt>,
}
pub struct BlockStream<C: TonClientInterface + Clone> {
client: C,
next_seqno: i32,
prev_block_set: HashSet<BlockId>,
}
impl<C: TonClientInterface + Clone> BlockStream<C> {
pub fn new(client: &C, from_seqno: i32) -> BlockStream<C> {
BlockStream {
client: client.clone(),
next_seqno: from_seqno,
prev_block_set: Default::default(),
}
}
pub async fn next(&mut self) -> Result<BlockStreamItem, TonClientError> {
if self.prev_block_set.is_empty() {
let (prev_block_shards, _) =
get_master_block_shards(&self.client, self.next_seqno - 1).await?;
for shard in prev_block_shards.shards {
self.prev_block_set.insert(shard.to_block_id());
}
};
let connection = loop {
let (conn, masterchain_info) = self.client.get_masterchain_info().await?;
if masterchain_info.last.seqno < self.next_seqno {
time::sleep(Duration::from_millis(100)).await;
} else {
break conn;
}
};
let (block_shards, master_block) =
get_master_block_shards(&connection, self.next_seqno).await?;
let mut result_shards: HashSet<BlockIdExt> = Default::default();
let mut unprocessed_shards: Vec<BlockIdExt> = Default::default();
unprocessed_shards.extend(block_shards.shards.clone());
while !unprocessed_shards.is_empty() {
let mut shards_to_process: HashSet<BlockIdExt> = Default::default();
for s in unprocessed_shards.into_iter() {
if self.prev_block_set.contains(&s.to_block_id()) {
continue;
}
if result_shards.contains(&s) {
continue;
}
result_shards.insert(s.clone());
shards_to_process.insert(s);
}
unprocessed_shards = Default::default();
let headers = self
.get_block_headers(&connection, &shards_to_process)
.await?;
for h in headers {
if let Some(prev_blocks) = h.prev_blocks {
unprocessed_shards.extend(prev_blocks)
}
}
}
self.next_seqno += 1;
let new_prev_seq_shards = block_shards.shards;
self.prev_block_set = new_prev_seq_shards
.into_iter()
.map(|shard| shard.to_block_id())
.collect();
Ok(BlockStreamItem {
shards: result_shards.into_iter().collect(),
master_shard: master_block,
})
}
async fn get_block_headers(
&self,
conn: &TonConnection,
shards: &HashSet<BlockIdExt>,
) -> Result<Vec<BlocksHeader>, TonClientError> {
let futures: Vec<_> = shards
.iter()
.map(|id| self.retrying_get_block_header(conn, id))
.collect();
let r = try_join_all(futures).await?;
Ok(r)
}
async fn retrying_get_block_header(
&self,
conn: &TonConnection,
block_id: &BlockIdExt,
) -> Result<BlocksHeader, TonClientError> {
let r = conn.get_block_header(block_id).await;
match r {
Ok(bh) => Ok(bh),
Err(_) => self.client.get_block_header(block_id).await,
}
}
}
async fn get_master_block_shards<C: TonClientInterface>(
conn: &C,
seqno: i32,
) -> Result<(BlocksShards, BlockIdExt), TonClientError> {
let master_block = BlockId {
workchain: -1,
shard: i64::MIN,
seqno,
};
let master_block_ext = conn.lookup_block(1, &master_block, 0, 0).await?;
Ok((
conn.get_block_shards(&master_block_ext).await?,
master_block_ext,
))
}