use crate::{
ActivationSignal, ChainProvider, L1RetrievalProvider, OriginAdvancer, OriginProvider,
PipelineError, PipelineResult, ResetError, ResetSignal, Signal, SignalReceiver,
};
use alloc::{boxed::Box, sync::Arc};
use alloy_primitives::Address;
use async_trait::async_trait;
use kona_genesis::{RollupConfig, SystemConfig};
use kona_protocol::BlockInfo;
#[derive(Debug, Clone)]
pub struct IndexedTraversal<Provider: ChainProvider> {
pub block: Option<BlockInfo>,
pub data_source: Provider,
pub done: bool,
pub system_config: SystemConfig,
pub rollup_config: Arc<RollupConfig>,
}
#[async_trait]
impl<F: ChainProvider + Send> L1RetrievalProvider for IndexedTraversal<F> {
fn batcher_addr(&self) -> Address {
self.system_config.batcher_address
}
async fn next_l1_block(&mut self) -> PipelineResult<Option<BlockInfo>> {
if !self.done {
self.done = true;
Ok(self.block)
} else {
Err(PipelineError::Eof.temp())
}
}
}
impl<F: ChainProvider> IndexedTraversal<F> {
pub fn new(data_source: F, cfg: Arc<RollupConfig>) -> Self {
Self {
block: Some(BlockInfo::default()),
data_source,
done: false,
system_config: SystemConfig::default(),
rollup_config: cfg,
}
}
fn update_origin(&mut self, block: BlockInfo) {
self.done = false;
self.block = Some(block);
kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_ORIGIN, block.number as f64);
}
async fn provide_next_block(&mut self, block_info: BlockInfo) -> PipelineResult<()> {
if !self.done {
debug!(target: "traversal", "Not finished consuming block, ignoring provided block.");
return Ok(());
}
let Some(block) = self.block else {
return Err(PipelineError::MissingOrigin.temp());
};
if block.number + 1 != block_info.number {
return Ok(());
}
if block.hash != block_info.parent_hash {
return Err(
ResetError::NextL1BlockHashMismatch(block.hash, block_info.parent_hash).reset()
);
}
let receipts =
self.data_source.receipts_by_hash(block_info.hash).await.map_err(Into::into)?;
let addr = self.rollup_config.l1_system_config_address;
let active = self.rollup_config.is_ecotone_active(block_info.timestamp);
match self.system_config.update_with_receipts(&receipts[..], addr, active) {
Ok(true) => {
let next = block_info.number as f64;
kona_macros::set!(gauge, crate::Metrics::PIPELINE_LATEST_SYS_CONFIG_UPDATE, next);
info!(target: "traversal", "System config updated at block {next}.");
}
Ok(false) => { }
Err(err) => {
error!(target: "traversal", ?err, "Failed to update system config at block {}", block_info.number);
kona_macros::set!(
gauge,
crate::Metrics::PIPELINE_SYS_CONFIG_UPDATE_ERROR,
block_info.number as f64
);
return Err(PipelineError::SystemConfigUpdate(err).crit());
}
}
self.update_origin(block_info);
Ok(())
}
}
#[async_trait]
impl<F: ChainProvider + Send> OriginAdvancer for IndexedTraversal<F> {
async fn advance_origin(&mut self) -> PipelineResult<()> {
if !self.done {
debug!(target: "traversal", "Not finished consuming block, ignoring advance call.");
return Ok(());
}
return Err(PipelineError::Eof.temp());
}
}
impl<F: ChainProvider> OriginProvider for IndexedTraversal<F> {
fn origin(&self) -> Option<BlockInfo> {
self.block
}
}
#[async_trait]
impl<F: ChainProvider + Send> SignalReceiver for IndexedTraversal<F> {
async fn signal(&mut self, signal: Signal) -> PipelineResult<()> {
match signal {
Signal::Reset(ResetSignal { l1_origin, system_config, .. }) |
Signal::Activation(ActivationSignal { l1_origin, system_config, .. }) => {
self.update_origin(l1_origin);
self.system_config = system_config.expect("System config must be provided.");
}
Signal::ProvideBlock(block_info) => self.provide_next_block(block_info).await?,
_ => {}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{errors::PipelineErrorKind, test_utils::TestChainProvider};
use alloc::vec;
use alloy_consensus::Receipt;
use alloy_primitives::{B256, Bytes, Log, LogData, address, b256, hex};
use kona_genesis::{CONFIG_UPDATE_EVENT_VERSION_0, CONFIG_UPDATE_TOPIC};
const L1_SYS_CONFIG_ADDR: Address = address!("1337000000000000000000000000000000000000");
fn new_update_batcher_log() -> Log {
Log {
address: L1_SYS_CONFIG_ADDR,
data: LogData::new_unchecked(
vec![
CONFIG_UPDATE_TOPIC,
CONFIG_UPDATE_EVENT_VERSION_0,
B256::ZERO, ],
hex!("00000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000beef").into()
)
}
}
fn new_receipts() -> alloc::vec::Vec<Receipt> {
let mut receipt =
Receipt { status: alloy_consensus::Eip658Value::Eip658(true), ..Receipt::default() };
let bad = Log::new(
Address::from([2; 20]),
vec![CONFIG_UPDATE_TOPIC, B256::default()],
Bytes::default(),
)
.unwrap();
receipt.logs = vec![new_update_batcher_log(), bad, new_update_batcher_log()];
vec![receipt.clone(), Receipt::default(), receipt]
}
fn new_test_managed(
blocks: alloc::vec::Vec<BlockInfo>,
receipts: alloc::vec::Vec<Receipt>,
) -> IndexedTraversal<TestChainProvider> {
let mut provider = TestChainProvider::default();
let rollup_config = RollupConfig {
l1_system_config_address: L1_SYS_CONFIG_ADDR,
..RollupConfig::default()
};
for (i, block) in blocks.iter().enumerate() {
provider.insert_block(i as u64, *block);
}
for (i, receipt) in receipts.iter().enumerate() {
let hash = blocks.get(i).map(|b| b.hash).unwrap_or_default();
provider.insert_receipts(hash, vec![receipt.clone()]);
}
IndexedTraversal::new(provider, Arc::new(rollup_config))
}
fn new_populated_test_managed() -> IndexedTraversal<TestChainProvider> {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = new_receipts();
new_test_managed(blocks, receipts)
}
#[test]
fn test_managed_traversal_batcher_address() {
let mut traversal = new_populated_test_managed();
traversal.system_config.batcher_address = L1_SYS_CONFIG_ADDR;
assert_eq!(traversal.batcher_addr(), L1_SYS_CONFIG_ADDR);
}
#[tokio::test]
async fn test_managed_traversal_activation_signal() {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = new_receipts();
let mut traversal = new_test_managed(blocks, receipts);
let cfg = SystemConfig::default();
traversal.done = true;
assert!(
traversal
.signal(Signal::Activation(ActivationSignal {
system_config: Some(cfg),
..Default::default()
}))
.await
.is_ok()
);
assert_eq!(traversal.origin(), Some(BlockInfo::default()));
assert_eq!(traversal.system_config, cfg);
assert!(!traversal.done);
}
#[tokio::test]
async fn test_managed_traversal_reset_signal() {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = new_receipts();
let mut traversal = new_test_managed(blocks, receipts);
let cfg = SystemConfig::default();
traversal.done = true;
assert!(
traversal
.signal(Signal::Reset(ResetSignal {
system_config: Some(cfg),
..Default::default()
}))
.await
.is_ok()
);
assert_eq!(traversal.origin(), Some(BlockInfo::default()));
assert_eq!(traversal.system_config, cfg);
assert!(!traversal.done);
}
#[tokio::test]
async fn test_managed_traversal_next_l1_block() {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = new_receipts();
let mut traversal = new_test_managed(blocks, receipts);
assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().await.unwrap_err(), PipelineError::Eof.temp());
}
#[tokio::test]
async fn test_managed_traversal_missing_receipts() {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let mut traversal = new_test_managed(blocks, vec![]);
assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().await.unwrap_err(), PipelineError::Eof.temp());
let next_block = BlockInfo { number: 1, ..BlockInfo::default() };
let err = traversal.provide_next_block(next_block).await.unwrap_err();
matches!(err, PipelineErrorKind::Temporary(PipelineError::Provider(_)));
}
#[tokio::test]
async fn test_managed_traversal_reorgs() {
let hash = b256!("3333333333333333333333333333333333333333333333333333333333333333");
let block = BlockInfo { hash, number: 0, ..BlockInfo::default() };
let blocks = vec![block];
let receipts = new_receipts();
let mut traversal = new_test_managed(blocks, receipts);
traversal.block = Some(block);
assert_eq!(traversal.next_l1_block().await.unwrap(), Some(block));
let next_block = BlockInfo { number: 1, ..BlockInfo::default() };
let err = traversal.provide_next_block(next_block).await.unwrap_err();
assert_eq!(err, ResetError::NextL1BlockHashMismatch(hash, next_block.parent_hash).reset());
}
#[tokio::test]
async fn test_managed_traversal_missing_blocks() {
let mut traversal = new_test_managed(vec![], vec![]);
assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().await.unwrap_err(), PipelineError::Eof.temp());
let next_block = BlockInfo { number: 1, ..BlockInfo::default() };
let err = traversal.provide_next_block(next_block).await.unwrap_err();
matches!(err, PipelineErrorKind::Temporary(PipelineError::MissingOrigin));
}
#[tokio::test]
async fn test_managed_traversal_system_config_update_fails() {
let first = b256!("3333333333333333333333333333333333333333333333333333333333333333");
let second = b256!("4444444444444444444444444444444444444444444444444444444444444444");
let block1 = BlockInfo { hash: first, ..BlockInfo::default() };
let block2 = BlockInfo { number: 1, hash: second, ..BlockInfo::default() };
let blocks = vec![block1, block2];
let receipts = new_receipts();
let mut traversal = new_test_managed(blocks, receipts);
traversal.block = Some(block1);
assert_eq!(traversal.next_l1_block().await.unwrap(), Some(block1));
let err = traversal.provide_next_block(block2).await.unwrap_err();
matches!(err, PipelineErrorKind::Critical(PipelineError::SystemConfigUpdate(_)));
}
#[tokio::test]
async fn test_managed_traversal_system_config_updated() {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = new_receipts();
let mut traversal = new_test_managed(blocks, receipts);
assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().await.unwrap_err(), PipelineError::Eof.temp());
let next_block = BlockInfo { number: 1, ..BlockInfo::default() };
assert!(traversal.provide_next_block(next_block).await.is_ok());
let expected = address!("000000000000000000000000000000000000bEEF");
assert_eq!(traversal.system_config.batcher_address, expected);
}
}