use crate::{
ActivationSignal, ChainProvider, L1RetrievalProvider, OriginAdvancer, OriginProvider,
PipelineError, PipelineResult, ResetError, ResetSignal, Signal, SignalReceiver,
};
use alloc::{boxed::Box, sync::Arc};
use alloy_primitives::Address;
use async_trait::async_trait;
use kona_genesis::{RollupConfig, SystemConfig};
use kona_protocol::BlockInfo;
#[derive(Debug, Clone)]
pub struct PollingTraversal<Provider: ChainProvider> {
pub block: Option<BlockInfo>,
pub data_source: Provider,
pub done: bool,
pub system_config: SystemConfig,
pub rollup_config: Arc<RollupConfig>,
}
#[async_trait]
impl<F: ChainProvider + Send> L1RetrievalProvider for PollingTraversal<F> {
fn batcher_addr(&self) -> Address {
self.system_config.batcher_address
}
async fn next_l1_block(&mut self) -> PipelineResult<Option<BlockInfo>> {
if !self.done {
self.done = true;
Ok(self.block)
} else {
Err(PipelineError::Eof.temp())
}
}
}
impl<F: ChainProvider> PollingTraversal<F> {
pub fn new(data_source: F, cfg: Arc<RollupConfig>) -> Self {
Self {
block: Some(BlockInfo::default()),
data_source,
done: false,
system_config: SystemConfig::default(),
rollup_config: cfg,
}
}
fn update_origin(&mut self, block: BlockInfo) {
self.done = false;
self.block = Some(block);
kona_macros::set!(gauge, crate::metrics::Metrics::PIPELINE_ORIGIN, block.number as f64);
}
}
#[async_trait]
impl<F: ChainProvider + Send> OriginAdvancer for PollingTraversal<F> {
async fn advance_origin(&mut self) -> PipelineResult<()> {
#[cfg(feature = "metrics")]
let start_time = std::time::Instant::now();
let block = match self.block {
Some(block) => block,
None => {
warn!(target: "l1_traversal", "Missing current block, can't advance origin with no reference.");
return Err(PipelineError::Eof.temp());
}
};
let next_l1_origin =
self.data_source.block_info_by_number(block.number + 1).await.map_err(Into::into)?;
if block.hash != next_l1_origin.parent_hash {
return Err(ResetError::ReorgDetected(block.hash, next_l1_origin.parent_hash).into());
}
let receipts =
self.data_source.receipts_by_hash(next_l1_origin.hash).await.map_err(Into::into)?;
let addr = self.rollup_config.l1_system_config_address;
let active = self.rollup_config.is_ecotone_active(next_l1_origin.timestamp);
match self.system_config.update_with_receipts(&receipts[..], addr, active) {
Ok(true) => {
let next = next_l1_origin.number as f64;
kona_macros::set!(gauge, crate::Metrics::PIPELINE_LATEST_SYS_CONFIG_UPDATE, next);
info!(target: "l1_traversal", "System config updated at block {next}.");
}
Ok(false) => { }
Err(err) => {
error!(target: "l1_traversal", ?err, "Failed to update system config at block {}", next_l1_origin.number);
kona_macros::set!(
gauge,
crate::Metrics::PIPELINE_SYS_CONFIG_UPDATE_ERROR,
next_l1_origin.number as f64
);
return Err(PipelineError::SystemConfigUpdate(err).crit());
}
}
let prev_block_holocene = self.rollup_config.is_holocene_active(block.timestamp);
let next_block_holocene = self.rollup_config.is_holocene_active(next_l1_origin.timestamp);
self.update_origin(next_l1_origin);
#[cfg(feature = "metrics")]
{
let duration = start_time.elapsed();
kona_macros::record!(
histogram,
crate::metrics::Metrics::PIPELINE_ORIGIN_ADVANCE,
duration.as_secs_f64()
);
}
if !prev_block_holocene && next_block_holocene {
return Err(ResetError::HoloceneActivation.reset());
}
Ok(())
}
}
impl<F: ChainProvider> OriginProvider for PollingTraversal<F> {
fn origin(&self) -> Option<BlockInfo> {
self.block
}
}
#[async_trait]
impl<F: ChainProvider + Send> SignalReceiver for PollingTraversal<F> {
async fn signal(&mut self, signal: Signal) -> PipelineResult<()> {
match signal {
Signal::Reset(ResetSignal { l1_origin, system_config, .. }) |
Signal::Activation(ActivationSignal { l1_origin, system_config, .. }) => {
self.update_origin(l1_origin);
self.system_config = system_config.expect("System config must be provided.");
}
Signal::ProvideBlock(_) => {
warn!(target: "traversal", "ProvideBlock signal not supported in PollingTraversal stage.");
return Err(PipelineError::UnsupportedSignal.temp());
}
_ => {}
}
Ok(())
}
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::{errors::PipelineErrorKind, test_utils::TraversalTestHelper};
use alloc::vec;
use alloy_primitives::{address, b256};
#[test]
fn test_l1_traversal_batcher_address() {
let mut traversal = TraversalTestHelper::new_populated();
traversal.system_config.batcher_address = TraversalTestHelper::L1_SYS_CONFIG_ADDR;
assert_eq!(traversal.batcher_addr(), TraversalTestHelper::L1_SYS_CONFIG_ADDR);
}
#[tokio::test]
async fn test_l1_traversal_flush_channel() {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = TraversalTestHelper::new_receipts();
let mut traversal = TraversalTestHelper::new_from_blocks(blocks, receipts);
assert!(traversal.advance_origin().await.is_ok());
traversal.done = true;
assert!(traversal.signal(Signal::FlushChannel).await.is_ok());
assert_eq!(traversal.origin(), Some(BlockInfo::default()));
assert!(traversal.done);
}
#[tokio::test]
async fn test_l1_traversal_activation_signal() {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = TraversalTestHelper::new_receipts();
let mut traversal = TraversalTestHelper::new_from_blocks(blocks, receipts);
assert!(traversal.advance_origin().await.is_ok());
let cfg = SystemConfig::default();
traversal.done = true;
assert!(
traversal
.signal(
ActivationSignal { system_config: Some(cfg), ..Default::default() }.signal()
)
.await
.is_ok()
);
assert_eq!(traversal.origin(), Some(BlockInfo::default()));
assert_eq!(traversal.system_config, cfg);
assert!(!traversal.done);
}
#[tokio::test]
async fn test_l1_traversal_reset_signal() {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = TraversalTestHelper::new_receipts();
let mut traversal = TraversalTestHelper::new_from_blocks(blocks, receipts);
assert!(traversal.advance_origin().await.is_ok());
let cfg = SystemConfig::default();
traversal.done = true;
assert!(
traversal
.signal(ResetSignal { system_config: Some(cfg), ..Default::default() }.signal())
.await
.is_ok()
);
assert_eq!(traversal.origin(), Some(BlockInfo::default()));
assert_eq!(traversal.system_config, cfg);
assert!(!traversal.done);
}
#[tokio::test]
async fn test_l1_traversal() {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = TraversalTestHelper::new_receipts();
let mut traversal = TraversalTestHelper::new_from_blocks(blocks, receipts);
assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().await.unwrap_err(), PipelineError::Eof.temp());
assert!(traversal.advance_origin().await.is_ok());
}
#[tokio::test]
async fn test_l1_traversal_missing_receipts() {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let mut traversal = TraversalTestHelper::new_from_blocks(blocks, vec![]);
assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().await.unwrap_err(), PipelineError::Eof.temp());
matches!(
traversal.advance_origin().await.unwrap_err(),
PipelineErrorKind::Temporary(PipelineError::Provider(_))
);
}
#[tokio::test]
async fn test_l1_traversal_reorgs() {
let hash = b256!("3333333333333333333333333333333333333333333333333333333333333333");
let block = BlockInfo { hash, ..BlockInfo::default() };
let blocks = vec![block, block];
let receipts = TraversalTestHelper::new_receipts();
let mut traversal = TraversalTestHelper::new_from_blocks(blocks, receipts);
assert!(traversal.advance_origin().await.is_ok());
let err = traversal.advance_origin().await.unwrap_err();
assert_eq!(err, ResetError::ReorgDetected(block.hash, block.parent_hash).into());
}
#[tokio::test]
async fn test_l1_traversal_missing_blocks() {
let mut traversal = TraversalTestHelper::new_from_blocks(vec![], vec![]);
assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().await.unwrap_err(), PipelineError::Eof.temp());
matches!(
traversal.advance_origin().await.unwrap_err(),
PipelineErrorKind::Temporary(PipelineError::Provider(_))
);
}
#[tokio::test]
async fn test_l1_traversal_system_config_update_fails() {
let first = b256!("3333333333333333333333333333333333333333333333333333333333333333");
let second = b256!("4444444444444444444444444444444444444444444444444444444444444444");
let block1 = BlockInfo { hash: first, ..BlockInfo::default() };
let block2 = BlockInfo { hash: second, ..BlockInfo::default() };
let blocks = vec![block1, block2];
let receipts = TraversalTestHelper::new_receipts();
let mut traversal = TraversalTestHelper::new_from_blocks(blocks, receipts);
assert!(traversal.advance_origin().await.is_ok());
let err = traversal.advance_origin().await.unwrap_err();
matches!(err, PipelineErrorKind::Critical(PipelineError::SystemConfigUpdate(_)));
}
#[tokio::test]
async fn test_l1_traversal_system_config_updated() {
let blocks = vec![BlockInfo::default(), BlockInfo::default()];
let receipts = TraversalTestHelper::new_receipts();
let mut traversal = TraversalTestHelper::new_from_blocks(blocks, receipts);
assert_eq!(traversal.next_l1_block().await.unwrap(), Some(BlockInfo::default()));
assert_eq!(traversal.next_l1_block().await.unwrap_err(), PipelineError::Eof.temp());
assert!(traversal.advance_origin().await.is_ok());
let expected = address!("000000000000000000000000000000000000bEEF");
assert_eq!(traversal.system_config.batcher_address, expected);
}
}