use tokio::sync::mpsc;
use zksync_state_keeper::io::{L1BatchParams, L2BlockParams};
use zksync_types::{L1BatchNumber, L2BlockNumber};
use super::{fetcher::FetchedTransaction, metrics::QUEUE_METRICS};
#[derive(Debug)]
pub struct ActionQueueSender(mpsc::Sender<SyncAction>);
impl ActionQueueSender {
pub async fn push_actions(&self, actions: Vec<SyncAction>) -> anyhow::Result<()> {
Self::check_action_sequence(&actions)?;
for action in actions {
self.0
.send(action)
.await
.map_err(|_| anyhow::anyhow!("node action processor stopped"))?;
QUEUE_METRICS
.action_queue_size
.set(self.0.max_capacity() - self.0.capacity());
}
Ok(())
}
fn check_action_sequence(actions: &[SyncAction]) -> anyhow::Result<()> {
let mut opened = false;
let mut l2_block_sealed = false;
for action in actions {
match action {
SyncAction::OpenBatch { .. } | SyncAction::L2Block { .. } => {
anyhow::ensure!(!opened, "Unexpected OpenBatch / L2Block: {actions:?}");
opened = true;
}
SyncAction::Tx(_) => {
anyhow::ensure!(opened && !l2_block_sealed, "Unexpected Tx: {actions:?}");
}
SyncAction::SealL2Block | SyncAction::SealBatch => {
anyhow::ensure!(
opened && !l2_block_sealed,
"Unexpected SealL2Block / SealBatch: {actions:?}"
);
l2_block_sealed = true;
}
}
}
anyhow::ensure!(l2_block_sealed, "Incomplete sequence: {actions:?}");
Ok(())
}
}
#[derive(Debug)]
pub struct ActionQueue {
receiver: mpsc::Receiver<SyncAction>,
peeked: Option<SyncAction>,
}
impl ActionQueue {
pub fn new() -> (ActionQueueSender, Self) {
const ACTION_CAPACITY: usize = 32_768;
let (sender, receiver) = mpsc::channel(ACTION_CAPACITY);
let sender = ActionQueueSender(sender);
let this = Self {
receiver,
peeked: None,
};
(sender, this)
}
pub(super) fn pop_action(&mut self) -> Option<SyncAction> {
if let Some(peeked) = self.peeked.take() {
QUEUE_METRICS.action_queue_size.dec_by(1);
return Some(peeked);
}
let action = self.receiver.try_recv().ok()?;
QUEUE_METRICS.action_queue_size.dec_by(1);
Some(action)
}
pub(super) async fn recv_action(
&mut self,
max_wait: tokio::time::Duration,
) -> Option<SyncAction> {
if let Some(action) = self.pop_action() {
return Some(action);
}
let action = tokio::time::timeout(max_wait, self.receiver.recv())
.await
.ok()??;
QUEUE_METRICS.action_queue_size.dec_by(1);
Some(action)
}
pub(super) fn peek_action(&mut self) -> Option<SyncAction> {
if let Some(action) = &self.peeked {
return Some(action.clone());
}
self.peeked = self.receiver.try_recv().ok();
self.peeked.clone()
}
pub(super) async fn peek_action_async(
&mut self,
max_wait: tokio::time::Duration,
) -> Option<SyncAction> {
if let Some(action) = &self.peeked {
return Some(action.clone());
}
self.peeked = tokio::time::timeout(max_wait, self.receiver.recv())
.await
.ok()?;
self.peeked.clone()
}
}
#[derive(Debug, Clone)]
pub enum SyncAction {
OpenBatch {
params: L1BatchParams,
number: L1BatchNumber,
first_l2_block_number: L2BlockNumber,
},
L2Block {
params: L2BlockParams,
number: L2BlockNumber,
},
Tx(Box<FetchedTransaction>),
SealL2Block,
SealBatch,
}
impl From<FetchedTransaction> for SyncAction {
fn from(tx: FetchedTransaction) -> Self {
Self::Tx(Box::new(tx))
}
}
#[cfg(test)]
mod tests {
use zksync_types::{fee_model::BatchFeeInput, l2::L2Tx, Address, ProtocolVersionId, H256};
use super::*;
fn open_batch() -> SyncAction {
SyncAction::OpenBatch {
params: L1BatchParams {
protocol_version: ProtocolVersionId::latest(),
validation_computational_gas_limit: u32::MAX,
operator_address: Address::default(),
fee_input: BatchFeeInput::default(),
first_l2_block: L2BlockParams {
timestamp: 1,
virtual_blocks: 1,
},
},
number: L1BatchNumber(1),
first_l2_block_number: L2BlockNumber(1),
}
}
fn l2_block() -> SyncAction {
SyncAction::L2Block {
params: L2BlockParams {
timestamp: 1,
virtual_blocks: 1,
},
number: 1.into(),
}
}
fn tx() -> SyncAction {
let mut tx = L2Tx::new(
Default::default(),
Default::default(),
0.into(),
Default::default(),
Default::default(),
Default::default(),
Default::default(),
Default::default(),
);
tx.set_input(H256::default().0.to_vec(), H256::default());
FetchedTransaction::new(tx.into()).into()
}
fn seal_l2_block() -> SyncAction {
SyncAction::SealL2Block
}
fn seal_batch() -> SyncAction {
SyncAction::SealBatch
}
#[test]
fn correct_sequence() {
let test_vector = vec![
vec![open_batch(), seal_l2_block()],
vec![open_batch(), seal_batch()],
vec![open_batch(), tx(), seal_l2_block()],
vec![open_batch(), tx(), tx(), tx(), seal_l2_block()],
vec![open_batch(), tx(), seal_batch()],
vec![l2_block(), seal_l2_block()],
vec![l2_block(), seal_batch()],
vec![l2_block(), tx(), seal_l2_block()],
vec![l2_block(), tx(), seal_batch()],
];
for (idx, sequence) in test_vector.into_iter().enumerate() {
ActionQueueSender::check_action_sequence(&sequence)
.unwrap_or_else(|_| panic!("Valid sequence #{idx} failed"));
}
}
#[test]
fn incorrect_sequence() {
let test_vector = vec![
(vec![open_batch()], "Incomplete sequence"),
(vec![open_batch(), tx()], "Incomplete sequence"),
(vec![l2_block()], "Incomplete sequence"),
(vec![l2_block(), tx()], "Incomplete sequence"),
(vec![tx()], "Unexpected Tx"),
(vec![open_batch(), seal_l2_block(), tx()], "Unexpected Tx"),
(
vec![l2_block(), l2_block()],
"Unexpected OpenBatch / L2Block",
),
(
vec![l2_block(), open_batch()],
"Unexpected OpenBatch / L2Block",
),
(
vec![open_batch(), l2_block()],
"Unexpected OpenBatch / L2Block",
),
(vec![seal_l2_block()], "Unexpected SealL2Block"),
(
vec![l2_block(), seal_l2_block(), seal_l2_block()],
"Unexpected SealL2Block",
),
(
vec![open_batch(), seal_l2_block(), seal_batch(), seal_batch()],
"Unexpected SealL2Block / SealBatch",
),
(
vec![l2_block(), seal_l2_block(), seal_batch(), seal_batch()],
"Unexpected SealL2Block / SealBatch",
),
(vec![seal_batch()], "Unexpected SealL2Block / SealBatch"),
];
for (idx, (sequence, expected_err)) in test_vector.into_iter().enumerate() {
let Err(err) = ActionQueueSender::check_action_sequence(&sequence) else {
panic!("Invalid sequence passed the test. Sequence #{idx}, expected error: {expected_err}");
};
assert!(
err.to_string().contains(expected_err),
"Sequence #{idx} failed. Expected error: {expected_err}, got: {err}"
);
}
}
}