kona_engine/
query.rs

1//! Engine query interface for external communication.
2//!
3//! Provides a channel-based API for querying engine state and configuration
4//! from external actors. Uses oneshot channels for responses to maintain
5//! clean async communication patterns.
6
7use std::sync::Arc;
8
9use alloy_eips::BlockNumberOrTag;
10use alloy_provider::Provider;
11use alloy_transport::{RpcError, TransportErrorKind};
12use kona_genesis::RollupConfig;
13use kona_protocol::{L2BlockInfo, OutputRoot, Predeploys};
14use tokio::sync::oneshot::Sender;
15
16use crate::{EngineClient, EngineClientError, EngineState};
17
18/// Channel sender for submitting [`EngineQueries`] to the engine.
19pub type EngineQuerySender = tokio::sync::mpsc::Sender<EngineQueries>;
20
21/// Query types supported by the engine for external communication.
22///
23/// Each variant includes a oneshot sender for the response, enabling
24/// async request-response patterns. The engine processes these queries
25/// and sends responses back through the provided channels.
26#[derive(Debug)]
27pub enum EngineQueries {
28    /// Request the current rollup configuration.
29    Config(Sender<RollupConfig>),
30    /// Request the current [`EngineState`] snapshot.
31    State(Sender<EngineState>),
32    /// Request the L2 output root for a specific block.
33    ///
34    /// Returns a tuple of block info, output root, and engine state at the requested block.
35    OutputAtBlock {
36        /// The block number or tag to retrieve the output for.
37        block: BlockNumberOrTag,
38        /// Response channel for (block_info, output_root, engine_state).
39        sender: Sender<(L2BlockInfo, OutputRoot, EngineState)>,
40    },
41    /// Subscribe to engine state updates via a watch channel receiver.
42    StateReceiver(Sender<tokio::sync::watch::Receiver<EngineState>>),
43    /// Development API: Subscribe to task queue length updates.
44    QueueLengthReceiver(Sender<tokio::sync::watch::Receiver<usize>>),
45    /// Development API: Get the current number of pending tasks in the queue.
46    TaskQueueLength(Sender<usize>),
47}
48
49/// An error that can occur when querying the engine.
50#[derive(Debug, thiserror::Error)]
51pub enum EngineQueriesError {
52    /// The output channel was closed unexpectedly. Impossible to send query response.
53    #[error("Output channel closed unexpectedly. Impossible to send query response")]
54    OutputChannelClosed,
55    /// Failed to retrieve the L2 block by label.
56    #[error("Failed to retrieve L2 block by label: {0}")]
57    BlockRetrievalFailed(#[from] EngineClientError),
58    /// No block withdrawals root while Isthmus is active.
59    #[error("No block withdrawals root while Isthmus is active")]
60    NoWithdrawalsRoot,
61    /// No L2 block found for block number or tag.
62    #[error("No L2 block found for block number or tag: {0}")]
63    NoL2BlockFound(BlockNumberOrTag),
64    /// Impossible to retrieve L2 withdrawals root from state.
65    #[error("Impossible to retrieve L2 withdrawals root from state. {0}")]
66    FailedToRetrieveWithdrawalsRoot(#[from] RpcError<TransportErrorKind>),
67}
68
69impl EngineQueries {
70    /// Handles the engine query request.
71    pub async fn handle(
72        self,
73        state_recv: &tokio::sync::watch::Receiver<EngineState>,
74        queue_length_recv: &tokio::sync::watch::Receiver<usize>,
75        client: &Arc<EngineClient>,
76        rollup_config: &Arc<RollupConfig>,
77    ) -> Result<(), EngineQueriesError> {
78        let state = *state_recv.borrow();
79
80        match self {
81            Self::Config(sender) => sender
82                .send((**rollup_config).clone())
83                .map_err(|_| EngineQueriesError::OutputChannelClosed),
84            Self::State(sender) => {
85                sender.send(state).map_err(|_| EngineQueriesError::OutputChannelClosed)
86            }
87            Self::OutputAtBlock { block, sender } => {
88                let output_block = client.l2_block_by_label(block).await?;
89                let output_block = output_block.ok_or(EngineQueriesError::NoL2BlockFound(block))?;
90                // Cloning the l2 block below is cheaper than sending a network request to get the
91                // l2 block info. Querying the `L2BlockInfo` from the client ends up
92                // fetching the full l2 block again.
93                let consensus_block = output_block.clone().into_consensus();
94                let output_block_info =
95                    L2BlockInfo::from_block_and_genesis::<op_alloy_consensus::OpTxEnvelope>(
96                        &consensus_block.map_transactions(|tx| tx.inner.inner.into_inner()),
97                        &rollup_config.genesis,
98                    )
99                    .map_err(|_| EngineQueriesError::NoL2BlockFound(block))?;
100
101                let state_root = output_block.header.state_root;
102
103                let message_passer_storage_root =
104                    if rollup_config.is_isthmus_active(output_block.header.timestamp) {
105                        output_block
106                            .header
107                            .withdrawals_root
108                            .ok_or(EngineQueriesError::NoWithdrawalsRoot)?
109                    } else {
110                        // Fetch the storage root for the L2 head block.
111                        let l2_to_l1_message_passer = client
112                            .get_proof(Predeploys::L2_TO_L1_MESSAGE_PASSER, Default::default())
113                            .block_id(block.into())
114                            .await?;
115
116                        l2_to_l1_message_passer.storage_hash
117                    };
118
119                let output_response_v0 = OutputRoot::from_parts(
120                    state_root,
121                    message_passer_storage_root,
122                    output_block.header.hash,
123                );
124
125                sender
126                    .send((output_block_info, output_response_v0, state))
127                    .map_err(|_| EngineQueriesError::OutputChannelClosed)
128            }
129            Self::StateReceiver(subscription) => subscription
130                .send(state_recv.clone())
131                .map_err(|_| EngineQueriesError::OutputChannelClosed),
132            Self::QueueLengthReceiver(subscription) => subscription
133                .send(queue_length_recv.clone())
134                .map_err(|_| EngineQueriesError::OutputChannelClosed),
135            Self::TaskQueueLength(sender) => {
136                let queue_length = *queue_length_recv.borrow();
137                if sender.send(queue_length).is_err() {
138                    warn!(target: "engine", "Failed to send task queue length response");
139                }
140                Ok(())
141            }
142        }
143    }
144}