imbibe-indexer 0.0.1

provides indexing strategies for imbibe
use core::num::NonZeroU64;

use bon::Builder;
use futures::StreamExt;
use imbibe_persistence::{pool::DbPool, store};
use tendermint_rpc::{SubscriptionClient, WebSocketClient, event::EventData, query::EventType};
use tokio::sync::oneshot;

use crate::error::Result;

#[derive(Builder)]
pub struct LiveIndexer {
	pool: DbPool,
	client: WebSocketClient,
	first_block_transmitter: Option<oneshot::Sender<NonZeroU64>>,
}

impl LiveIndexer {
	#[tracing::instrument(skip_all)]
	pub async fn start(mut self) -> Result<()> {
		let mut subscription = self.client.subscribe(EventType::NewBlock.into()).await?;

		let (first_block, first_block_id, result) = loop {
			if let Some(Ok(event)) = subscription.next().await {
				if let EventData::NewBlock {
					block: Some(block),
					block_id,
					result_finalize_block: Some(result),
				} = event.data
				{
					break (block, block_id, result);
				}
			}
		};

		let first_block_height = first_block.header.height;

		tracing::info!("received first live block {}", first_block_height);

		if let Some(transmitter) = self.first_block_transmitter.take() {
			let height = first_block_height.value().try_into().unwrap();

			if let Err(height) = transmitter.send(height) {
				tracing::error!("no receiver to receive first live block height {}", height);
			}
		}

		let (first_block, tx_results) = super::process_block(
			first_block.header,
			first_block_id.hash,
			first_block.data,
			result.tx_results,
		)?;

		store::save_block_with_txs(&mut self.pool.get().await?, &first_block, &tx_results).await?;

		while let Some(Ok(event)) = subscription.next().await {
			if let EventData::NewBlock {
				block: Some(block),
				block_id,
				result_finalize_block: Some(result),
			} = event.data
			{
				tracing::info!("received live block {}", block.header.height);

				let (block, tx_results) = super::process_block(
					block.header,
					block_id.hash,
					block.data,
					result.tx_results,
				)?;

				store::save_block_with_txs(&mut self.pool.get().await?, &block, &tx_results)
					.await?;
			}
		}

		tracing::info!("indexing finished");

		Ok(())
	}
}