1use std::sync::Arc;
2
3use alloy_eips::BlockNumberOrTag;
4use alloy_provider::Provider;
5use alloy_transport::{RpcError, TransportErrorKind};
6use kona_genesis::RollupConfig;
7use kona_protocol::{L2BlockInfo, OutputRoot, Predeploys};
8use tokio::sync::oneshot::Sender;
9
10use crate::{EngineClient, EngineClientError, EngineState};
11
12pub type EngineQuerySender = tokio::sync::mpsc::Sender<EngineQueries>;
14
15#[derive(Debug)]
17pub enum EngineQueries {
18 Config(Sender<RollupConfig>),
20 State(Sender<EngineState>),
22 OutputAtBlock {
25 block: BlockNumberOrTag,
27 sender: Sender<(L2BlockInfo, OutputRoot, EngineState)>,
29 },
30 StateReceiver(Sender<tokio::sync::watch::Receiver<EngineState>>),
32}
33
34#[derive(Debug, thiserror::Error)]
36pub enum EngineQueriesError {
37 #[error("Output channel closed unexpectedly. Impossible to send query response")]
39 OutputChannelClosed,
40 #[error("Failed to retrieve L2 block by label: {0}")]
42 BlockRetrievalFailed(#[from] EngineClientError),
43 #[error("No block withdrawals root while Isthmus is active")]
45 NoWithdrawalsRoot,
46 #[error("No L2 block found for block number or tag: {0}")]
48 NoL2BlockFound(BlockNumberOrTag),
49 #[error("Impossible to retrieve L2 withdrawals root from state. {0}")]
51 FailedToRetrieveWithdrawalsRoot(#[from] RpcError<TransportErrorKind>),
52}
53
54impl EngineQueries {
55 pub async fn handle(
57 self,
58 state_recv: &tokio::sync::watch::Receiver<EngineState>,
59 client: &Arc<EngineClient>,
60 rollup_config: &Arc<RollupConfig>,
61 ) -> Result<(), EngineQueriesError> {
62 let state = *state_recv.borrow();
63 match self {
64 Self::Config(sender) => sender
65 .send((**rollup_config).clone())
66 .map_err(|_| EngineQueriesError::OutputChannelClosed),
67 Self::State(sender) => {
68 sender.send(state).map_err(|_| EngineQueriesError::OutputChannelClosed)
69 }
70 Self::OutputAtBlock { block, sender } => {
71 let output_block = client.l2_block_by_label(block).await?;
72 let output_block = output_block.ok_or(EngineQueriesError::NoL2BlockFound(block))?;
73 let consensus_block = output_block.clone().into_consensus();
77 let output_block_info =
78 L2BlockInfo::from_block_and_genesis::<op_alloy_consensus::OpTxEnvelope>(
79 &consensus_block.map_transactions(|tx| tx.inner.inner.into_inner()),
80 &rollup_config.genesis,
81 )
82 .map_err(|_| EngineQueriesError::NoL2BlockFound(block))?;
83
84 let state_root = output_block.header.state_root;
85
86 let message_passer_storage_root =
87 if rollup_config.is_isthmus_active(output_block.header.timestamp) {
88 output_block
89 .header
90 .withdrawals_root
91 .ok_or(EngineQueriesError::NoWithdrawalsRoot)?
92 } else {
93 let l2_to_l1_message_passer = client
95 .get_proof(Predeploys::L2_TO_L1_MESSAGE_PASSER, Default::default())
96 .block_id(block.into())
97 .await?;
98
99 l2_to_l1_message_passer.storage_hash
100 };
101
102 let output_response_v0 = OutputRoot::from_parts(
103 state_root,
104 message_passer_storage_root,
105 output_block.header.hash,
106 );
107
108 sender
109 .send((output_block_info, output_response_v0, state))
110 .map_err(|_| EngineQueriesError::OutputChannelClosed)
111 }
112 Self::StateReceiver(subscription) => subscription
113 .send(state_recv.clone())
114 .map_err(|_| EngineQueriesError::OutputChannelClosed),
115 }
116 }
117}