use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use thiserror::Error;
use tokio::sync::{mpsc::UnboundedReceiver, watch};
use tycho_client::feed::{synchronizer::Snapshot, BlockHeader, FeedMessage};
use tycho_common::{
models::{
blockchain::{Block, BlockAggregatedChanges, TxInput},
protocol::{ComponentBalance, ProtocolComponent, ProtocolComponentStateDelta},
Chain,
},
traits::TxDeltaIndexer,
Bytes,
};
use crate::{
evm::decoder::{StreamDecodeError, TychoStreamDecoder},
protocol::models::Update,
};
pub struct PendingUpdate {
pub label: String,
pub update: Update,
}
#[derive(Debug, Error)]
pub enum PendingError {
#[error("parent block {needed} not yet confirmed (current: {current})")]
ParentNotYetConfirmed { needed: u64, current: u64 },
#[error("decoder error: {0}")]
Decoder(#[from] StreamDecodeError),
#[error("indexer error for extractor '{extractor}': {message}")]
Indexer { extractor: String, message: String },
}
pub struct PendingBlockProcessor {
indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
decoder: Arc<TychoStreamDecoder<BlockHeader>>,
chain: Chain,
current_confirmed_block: u64,
confirmed_block_tx: watch::Sender<u64>,
block_rx: UnboundedReceiver<FeedMessage<BlockHeader>>,
}
impl PendingBlockProcessor {
pub(crate) fn new(
indexers: HashMap<String, Box<dyn TxDeltaIndexer>>,
decoder: Arc<TychoStreamDecoder<BlockHeader>>,
chain: Chain,
block_rx: UnboundedReceiver<FeedMessage<BlockHeader>>,
) -> Self {
let (confirmed_block_tx, _) = watch::channel(0u64);
Self { indexers, decoder, chain, current_confirmed_block: 0, confirmed_block_tx, block_rx }
}
pub fn subscribe_confirmed_block(&self) -> watch::Receiver<u64> {
self.confirmed_block_tx.subscribe()
}
pub fn current_confirmed_block(&self) -> u64 {
self.current_confirmed_block
}
pub fn advance(&mut self, msg: &FeedMessage<BlockHeader>) -> Result<(), PendingError> {
self.advance_inner(msg)
}
pub async fn generate_pending_update(
&mut self,
txs: &[TxInput],
target_header: BlockHeader,
label: String,
) -> Result<PendingUpdate, PendingError> {
while let Ok(msg) = self.block_rx.try_recv() {
self.advance_inner(&msg)?;
}
let parent = target_header.number.saturating_sub(1);
if self.current_confirmed_block < parent {
return Err(PendingError::ParentNotYetConfirmed {
needed: parent,
current: self.current_confirmed_block,
});
}
let mut pending_deltas: HashMap<String, BlockAggregatedChanges> = HashMap::new();
for (extractor, indexer) in &mut self.indexers {
let changes = indexer.generate_deltas(txs);
pending_deltas.insert(extractor.clone(), changes);
}
let update = self
.decoder
.apply_deltas_ephemeral(&pending_deltas, target_header)
.await?;
Ok(PendingUpdate { label, update })
}
fn advance_inner(&mut self, msg: &FeedMessage<BlockHeader>) -> Result<(), PendingError> {
let msg_block = msg
.state_msgs
.values()
.map(|s| s.header.number)
.max()
.unwrap_or(0);
for (extractor, state_msg) in &msg.state_msgs {
let Some(indexer) = self.indexers.get_mut(extractor) else {
continue;
};
if !state_msg.snapshots.states.is_empty() {
let block_changes = snapshot_to_block_changes(
extractor,
&state_msg.snapshots,
&state_msg.header,
self.chain,
);
indexer
.apply_block(&block_changes)
.map_err(|e| PendingError::Indexer {
extractor: extractor.clone(),
message: format!("{e:#}"),
})?;
}
if let Some(deltas) = &state_msg.deltas {
indexer
.apply_block(deltas)
.map_err(|e| PendingError::Indexer {
extractor: extractor.clone(),
message: format!("{e:#}"),
})?;
}
}
if msg_block > self.current_confirmed_block {
self.current_confirmed_block = msg_block;
let _ = self.confirmed_block_tx.send(msg_block);
}
Ok(())
}
}
fn snapshot_to_block_changes(
extractor: &str,
snapshot: &Snapshot,
header: &BlockHeader,
chain: Chain,
) -> BlockAggregatedChanges {
let ts = chrono::DateTime::from_timestamp(header.timestamp as i64, 0)
.unwrap_or_default()
.naive_utc();
let block = Block {
number: header.number,
chain,
hash: header.hash.clone(),
parent_hash: header.parent_hash.clone(),
ts,
};
let mut new_protocol_components: HashMap<String, ProtocolComponent> = HashMap::new();
let mut state_deltas: HashMap<String, ProtocolComponentStateDelta> = HashMap::new();
let mut component_balances: HashMap<String, HashMap<Bytes, ComponentBalance>> = HashMap::new();
for (id, comp_with_state) in &snapshot.states {
new_protocol_components.insert(id.clone(), comp_with_state.component.clone());
state_deltas.insert(
id.clone(),
ProtocolComponentStateDelta {
component_id: id.clone(),
updated_attributes: comp_with_state.state.attributes.clone(),
deleted_attributes: HashSet::new(),
created_attributes: HashSet::new(),
},
);
let token_balances: HashMap<Bytes, ComponentBalance> = comp_with_state
.state
.balances
.iter()
.map(|(token, balance)| {
(
token.clone(),
ComponentBalance {
token: token.clone(),
balance: balance.clone(),
balance_float: 0.0,
modify_tx: Bytes::default(),
component_id: id.clone(),
},
)
})
.collect();
component_balances.insert(id.clone(), token_balances);
}
BlockAggregatedChanges {
extractor: extractor.to_string(),
chain,
block,
finalized_block_height: header.number,
new_protocol_components,
state_deltas,
component_balances,
..Default::default()
}
}