use anyhow::Result;
use bytes::Bytes;
use tycho_types::models::{ConsensusConfig, GenesisInfo};
use crate::mempool::{
GetAnchorResult, MempoolAdapter, MempoolAdapterStdImpl, MempoolAnchorId, StateUpdateContext,
};
use crate::tracing_targets;
use crate::types::processed_upto::BlockSeqno;
#[async_trait::async_trait]
impl MempoolAdapter for MempoolAdapterStdImpl {
async fn handle_mc_state_update(&self, new_cx: Box<StateUpdateContext>) -> Result<()> {
tracing::debug!(
target: tracing_targets::MEMPOOL_ADAPTER,
tka = %new_cx.top_processed_to_anchor_id,
"Received state update from mc block",
);
assert!(new_cx.mc_block_id.is_masterchain(), "expect only MC data");
let mut keeper_guard = self.keeper.lock().await;
keeper_guard.check_expect_genesis_change(&self.cache, &new_cx)?;
if let Some(ctx) = keeper_guard.state_update_queue.push(new_cx)? {
self.process_state_update(&mut keeper_guard, &ctx).await?;
(self.top_known_anchor).set_max_raw(ctx.top_processed_to_anchor_id);
keeper_guard.last_state_update = Some(*ctx);
}
Ok(())
}
async fn handle_signed_mc_block(&self, mc_block_seqno: BlockSeqno) -> Result<()> {
let mut keeper_guard = self.keeper.lock().await;
for ctx in keeper_guard.state_update_queue.signed(mc_block_seqno)? {
self.process_state_update(&mut keeper_guard, &ctx).await?;
(self.top_known_anchor).set_max_raw(ctx.top_processed_to_anchor_id);
keeper_guard.last_state_update = Some(*ctx);
}
Ok(())
}
async fn get_anchor_by_id(&self, anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
tracing::debug!(
target: tracing_targets::MEMPOOL_ADAPTER,
%anchor_id,
"get_anchor_by_id"
);
let result = match self.cache.get_anchor_by_id(anchor_id).await {
Some(anchor) => GetAnchorResult::Exist(anchor),
None => GetAnchorResult::NotExist,
};
Ok(result)
}
async fn get_next_anchor(&self, prev_anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
tracing::debug!(
target: tracing_targets::MEMPOOL_ADAPTER,
%prev_anchor_id,
"get_next_anchor"
);
let result = match self.cache.get_next_anchor(prev_anchor_id).await? {
Some(anchor) => GetAnchorResult::Exist(anchor),
None => GetAnchorResult::NotExist,
};
Ok(result)
}
fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()> {
self.cache.clear(before_anchor_id);
Ok(())
}
fn accept_external(&self, message: Bytes) {
self.input_buffer.push(message);
}
async fn update_delayed_config(
&self,
consensus_config: Option<&ConsensusConfig>,
genesis_info: &GenesisInfo,
) -> Result<()> {
let mut keeper_guard = self.keeper.lock().await;
if let Some(consensus_config) = consensus_config {
(keeper_guard.config_builder).set_consensus_config(consensus_config)?;
}
keeper_guard.config_builder.set_genesis(*genesis_info);
Ok(())
}
}