1use std::sync::Arc;
8
9use alloy_eips::BlockNumberOrTag;
10use alloy_provider::Provider;
11use alloy_transport::{RpcError, TransportErrorKind};
12use kona_genesis::RollupConfig;
13use kona_protocol::{L2BlockInfo, OutputRoot, Predeploys};
14use tokio::sync::oneshot::Sender;
15
16use crate::{EngineClient, EngineClientError, EngineState};
17
18pub type EngineQuerySender = tokio::sync::mpsc::Sender<EngineQueries>;
20
21#[derive(Debug)]
27pub enum EngineQueries {
28 Config(Sender<RollupConfig>),
30 State(Sender<EngineState>),
32 OutputAtBlock {
36 block: BlockNumberOrTag,
38 sender: Sender<(L2BlockInfo, OutputRoot, EngineState)>,
40 },
41 StateReceiver(Sender<tokio::sync::watch::Receiver<EngineState>>),
43 QueueLengthReceiver(Sender<tokio::sync::watch::Receiver<usize>>),
45 TaskQueueLength(Sender<usize>),
47}
48
49#[derive(Debug, thiserror::Error)]
51pub enum EngineQueriesError {
52 #[error("Output channel closed unexpectedly. Impossible to send query response")]
54 OutputChannelClosed,
55 #[error("Failed to retrieve L2 block by label: {0}")]
57 BlockRetrievalFailed(#[from] EngineClientError),
58 #[error("No block withdrawals root while Isthmus is active")]
60 NoWithdrawalsRoot,
61 #[error("No L2 block found for block number or tag: {0}")]
63 NoL2BlockFound(BlockNumberOrTag),
64 #[error("Impossible to retrieve L2 withdrawals root from state. {0}")]
66 FailedToRetrieveWithdrawalsRoot(#[from] RpcError<TransportErrorKind>),
67}
68
69impl EngineQueries {
70 pub async fn handle(
72 self,
73 state_recv: &tokio::sync::watch::Receiver<EngineState>,
74 queue_length_recv: &tokio::sync::watch::Receiver<usize>,
75 client: &Arc<EngineClient>,
76 rollup_config: &Arc<RollupConfig>,
77 ) -> Result<(), EngineQueriesError> {
78 let state = *state_recv.borrow();
79
80 match self {
81 Self::Config(sender) => sender
82 .send((**rollup_config).clone())
83 .map_err(|_| EngineQueriesError::OutputChannelClosed),
84 Self::State(sender) => {
85 sender.send(state).map_err(|_| EngineQueriesError::OutputChannelClosed)
86 }
87 Self::OutputAtBlock { block, sender } => {
88 let output_block = client.l2_block_by_label(block).await?;
89 let output_block = output_block.ok_or(EngineQueriesError::NoL2BlockFound(block))?;
90 let consensus_block = output_block.clone().into_consensus();
94 let output_block_info =
95 L2BlockInfo::from_block_and_genesis::<op_alloy_consensus::OpTxEnvelope>(
96 &consensus_block.map_transactions(|tx| tx.inner.inner.into_inner()),
97 &rollup_config.genesis,
98 )
99 .map_err(|_| EngineQueriesError::NoL2BlockFound(block))?;
100
101 let state_root = output_block.header.state_root;
102
103 let message_passer_storage_root =
104 if rollup_config.is_isthmus_active(output_block.header.timestamp) {
105 output_block
106 .header
107 .withdrawals_root
108 .ok_or(EngineQueriesError::NoWithdrawalsRoot)?
109 } else {
110 let l2_to_l1_message_passer = client
112 .get_proof(Predeploys::L2_TO_L1_MESSAGE_PASSER, Default::default())
113 .block_id(block.into())
114 .await?;
115
116 l2_to_l1_message_passer.storage_hash
117 };
118
119 let output_response_v0 = OutputRoot::from_parts(
120 state_root,
121 message_passer_storage_root,
122 output_block.header.hash,
123 );
124
125 sender
126 .send((output_block_info, output_response_v0, state))
127 .map_err(|_| EngineQueriesError::OutputChannelClosed)
128 }
129 Self::StateReceiver(subscription) => subscription
130 .send(state_recv.clone())
131 .map_err(|_| EngineQueriesError::OutputChannelClosed),
132 Self::QueueLengthReceiver(subscription) => subscription
133 .send(queue_length_recv.clone())
134 .map_err(|_| EngineQueriesError::OutputChannelClosed),
135 Self::TaskQueueLength(sender) => {
136 let queue_length = *queue_length_recv.borrow();
137 if sender.send(queue_length).is_err() {
138 warn!(target: "engine", "Failed to send task queue length response");
139 }
140 Ok(())
141 }
142 }
143 }
144}