kona_engine/
client.rs

1//! An Engine API Client.
2
3use crate::Metrics;
4use alloy_eips::eip1898::BlockNumberOrTag;
5use alloy_network::Network;
6use alloy_primitives::{B256, BlockHash, Bytes};
7use alloy_provider::{Provider, RootProvider};
8use alloy_rpc_client::RpcClient;
9use alloy_rpc_types_engine::{
10    ClientVersionV1, ExecutionPayloadBodiesV1, ExecutionPayloadEnvelopeV2, ExecutionPayloadInputV2,
11    ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, JwtSecret, PayloadId, PayloadStatus,
12};
13use alloy_rpc_types_eth::Block;
14use alloy_transport::{RpcError, TransportErrorKind, TransportResult};
15use alloy_transport_http::{
16    AuthLayer, AuthService, Http, HyperClient,
17    hyper_util::{
18        client::legacy::{Client, connect::HttpConnector},
19        rt::TokioExecutor,
20    },
21};
22use derive_more::Deref;
23use http_body_util::Full;
24use kona_genesis::RollupConfig;
25use kona_protocol::{FromBlockError, L2BlockInfo};
26use op_alloy_network::Optimism;
27use op_alloy_provider::ext::engine::OpEngineApi;
28use op_alloy_rpc_types::Transaction;
29use op_alloy_rpc_types_engine::{
30    OpExecutionPayloadEnvelopeV3, OpExecutionPayloadEnvelopeV4, OpExecutionPayloadV4,
31    OpPayloadAttributes, ProtocolVersion,
32};
33use std::{sync::Arc, time::Instant};
34use thiserror::Error;
35use tower::ServiceBuilder;
36use url::Url;
37
38/// An error that occurred in the [`EngineClient`].
39#[derive(Error, Debug)]
40pub enum EngineClientError {
41    /// An RPC error occurred
42    #[error("An RPC error occurred: {0}")]
43    RpcError(#[from] RpcError<TransportErrorKind>),
44
45    /// An error occurred while decoding the payload
46    #[error("An error occurred while decoding the payload: {0}")]
47    BlockInfoDecodeError(#[from] FromBlockError),
48}
49/// A Hyper HTTP client with a JWT authentication layer.
50type HyperAuthClient<B = Full<Bytes>> = HyperClient<B, AuthService<Client<HttpConnector, B>>>;
51
52/// An Engine API client that provides authenticated HTTP communication with an execution layer.
53///
54/// The [`EngineClient`] handles JWT authentication and manages connections to both L1 and L2
55/// execution layers. It automatically selects the appropriate Engine API version based on the
56/// rollup configuration and block timestamps.
57///
58/// # Examples
59///
60/// ```rust,no_run
61/// use alloy_rpc_types_engine::JwtSecret;
62/// use kona_engine::EngineClient;
63/// use kona_genesis::RollupConfig;
64/// use std::sync::Arc;
65/// use url::Url;
66///
67/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
68/// let engine_url = Url::parse("http://localhost:8551")?;
69/// let l1_url = Url::parse("http://localhost:8545")?;
70/// let config = Arc::new(RollupConfig::default());
71/// let jwt = JwtSecret::from_hex("0xabcd")?;
72///
73/// let client = EngineClient::new_http(engine_url, l1_url, config, jwt);
74/// # Ok(())
75/// # }
76/// ```
77#[derive(Debug, Deref, Clone)]
78pub struct EngineClient {
79    /// The L2 engine provider for Engine API calls.
80    #[deref]
81    engine: RootProvider<Optimism>,
82    /// The L1 chain provider for reading L1 data.
83    l1_provider: RootProvider,
84    /// The [`RollupConfig`] for determining Engine API versions based on hardfork activations.
85    cfg: Arc<RollupConfig>,
86}
87
88impl EngineClient {
89    /// Creates a new RPC client for the given address and JWT secret.
90    fn rpc_client<T: Network>(addr: Url, jwt: JwtSecret) -> RootProvider<T> {
91        let hyper_client = Client::builder(TokioExecutor::new()).build_http::<Full<Bytes>>();
92        let auth_layer = AuthLayer::new(jwt);
93        let service = ServiceBuilder::new().layer(auth_layer).service(hyper_client);
94        let layer_transport = HyperClient::with_service(service);
95
96        let http_hyper = Http::with_client(layer_transport, addr);
97        let rpc_client = RpcClient::new(http_hyper, false);
98        RootProvider::<T>::new(rpc_client)
99    }
100
101    /// Creates a new [`EngineClient`] with authenticated HTTP connections.
102    ///
103    /// Sets up JWT-authenticated connections to the Engine API endpoint,
104    /// along with an unauthenticated connection to the L1 chain.
105    ///
106    /// # Arguments
107    ///
108    /// * `engine` - L2 Engine API endpoint URL (typically port 8551)
109    /// * `l1_rpc` - L1 chain RPC endpoint URL
110    /// * `cfg` - Rollup configuration for version selection
111    /// * `jwt` - JWT secret for authentication
112    pub fn new_http(engine: Url, l1_rpc: Url, cfg: Arc<RollupConfig>, jwt: JwtSecret) -> Self {
113        let engine = Self::rpc_client::<Optimism>(engine, jwt);
114        let l1_provider = RootProvider::new_http(l1_rpc);
115
116        Self { engine, l1_provider, cfg }
117    }
118
119    /// Returns a reference to the inner L2 [`RootProvider`].
120    pub const fn l2_engine(&self) -> &RootProvider<Optimism> {
121        &self.engine
122    }
123
124    /// Returns a reference to the inner L1 [`RootProvider`].
125    pub const fn l1_provider(&self) -> &RootProvider {
126        &self.l1_provider
127    }
128
129    /// Returns a reference to the inner [`RollupConfig`].
130    pub fn cfg(&self) -> &RollupConfig {
131        self.cfg.as_ref()
132    }
133
134    /// Fetches the [`Block<T>`] for the given [`BlockNumberOrTag`].
135    pub async fn l2_block_by_label(
136        &self,
137        numtag: BlockNumberOrTag,
138    ) -> Result<Option<Block<Transaction>>, EngineClientError> {
139        Ok(<RootProvider<Optimism>>::get_block_by_number(&self.engine, numtag).full().await?)
140    }
141
142    /// Fetches the [L2BlockInfo] by [BlockNumberOrTag].
143    pub async fn l2_block_info_by_label(
144        &self,
145        numtag: BlockNumberOrTag,
146    ) -> Result<Option<L2BlockInfo>, EngineClientError> {
147        let block =
148            <RootProvider<Optimism>>::get_block_by_number(&self.engine, numtag).full().await?;
149        let Some(block) = block else {
150            return Ok(None);
151        };
152        Ok(Some(L2BlockInfo::from_block_and_genesis(&block.into_consensus(), &self.cfg.genesis)?))
153    }
154}
155
156#[async_trait::async_trait]
157impl OpEngineApi<Optimism, Http<HyperAuthClient>> for EngineClient {
158    async fn new_payload_v2(
159        &self,
160        payload: ExecutionPayloadInputV2,
161    ) -> TransportResult<PayloadStatus> {
162        let call = <RootProvider<Optimism> as OpEngineApi<
163            Optimism,
164            Http<HyperAuthClient>,
165        >>::new_payload_v2(&self.engine, payload);
166
167        record_call_time(call, Metrics::NEW_PAYLOAD_METHOD).await
168    }
169
170    async fn new_payload_v3(
171        &self,
172        payload: ExecutionPayloadV3,
173        parent_beacon_block_root: B256,
174    ) -> TransportResult<PayloadStatus> {
175        let call = <RootProvider<Optimism> as OpEngineApi<
176            Optimism,
177            Http<HyperAuthClient>,
178        >>::new_payload_v3(&self.engine, payload, parent_beacon_block_root);
179
180        record_call_time(call, Metrics::NEW_PAYLOAD_METHOD).await
181    }
182
183    async fn new_payload_v4(
184        &self,
185        payload: OpExecutionPayloadV4,
186        parent_beacon_block_root: B256,
187    ) -> TransportResult<PayloadStatus> {
188        let call = <RootProvider<Optimism> as OpEngineApi<
189            Optimism,
190            Http<HyperAuthClient>,
191        >>::new_payload_v4(&self.engine, payload, parent_beacon_block_root);
192
193        record_call_time(call, Metrics::NEW_PAYLOAD_METHOD).await
194    }
195
196    async fn fork_choice_updated_v2(
197        &self,
198        fork_choice_state: ForkchoiceState,
199        payload_attributes: Option<OpPayloadAttributes>,
200    ) -> TransportResult<ForkchoiceUpdated> {
201        let call = <RootProvider<Optimism> as OpEngineApi<
202            Optimism,
203            Http<HyperAuthClient>,
204        >>::fork_choice_updated_v2(&self.engine, fork_choice_state, payload_attributes);
205
206        record_call_time(call, Metrics::FORKCHOICE_UPDATE_METHOD).await
207    }
208
209    async fn fork_choice_updated_v3(
210        &self,
211        fork_choice_state: ForkchoiceState,
212        payload_attributes: Option<OpPayloadAttributes>,
213    ) -> TransportResult<ForkchoiceUpdated> {
214        let call = <RootProvider<Optimism> as OpEngineApi<
215            Optimism,
216            Http<HyperAuthClient>,
217        >>::fork_choice_updated_v3(&self.engine, fork_choice_state, payload_attributes);
218
219        record_call_time(call, Metrics::FORKCHOICE_UPDATE_METHOD).await
220    }
221
222    async fn get_payload_v2(
223        &self,
224        payload_id: PayloadId,
225    ) -> TransportResult<ExecutionPayloadEnvelopeV2> {
226        let call = <RootProvider<Optimism> as OpEngineApi<
227            Optimism,
228            Http<HyperAuthClient>,
229        >>::get_payload_v2(&self.engine, payload_id);
230
231        record_call_time(call, Metrics::GET_PAYLOAD_METHOD).await
232    }
233
234    async fn get_payload_v3(
235        &self,
236        payload_id: PayloadId,
237    ) -> TransportResult<OpExecutionPayloadEnvelopeV3> {
238        let call = <RootProvider<Optimism> as OpEngineApi<
239            Optimism,
240            Http<HyperAuthClient>,
241        >>::get_payload_v3(&self.engine, payload_id);
242
243        record_call_time(call, Metrics::GET_PAYLOAD_METHOD).await
244    }
245
246    async fn get_payload_v4(
247        &self,
248        payload_id: PayloadId,
249    ) -> TransportResult<OpExecutionPayloadEnvelopeV4> {
250        let call = <RootProvider<Optimism> as OpEngineApi<
251            Optimism,
252            Http<HyperAuthClient>,
253        >>::get_payload_v4(&self.engine, payload_id);
254
255        record_call_time(call, Metrics::GET_PAYLOAD_METHOD).await
256    }
257
258    async fn get_payload_bodies_by_hash_v1(
259        &self,
260        block_hashes: Vec<BlockHash>,
261    ) -> TransportResult<ExecutionPayloadBodiesV1> {
262        <RootProvider<Optimism> as OpEngineApi<
263            Optimism,
264            Http<HyperAuthClient>,
265        >>::get_payload_bodies_by_hash_v1(&self.engine, block_hashes).await
266    }
267
268    async fn get_payload_bodies_by_range_v1(
269        &self,
270        start: u64,
271        count: u64,
272    ) -> TransportResult<ExecutionPayloadBodiesV1> {
273        <RootProvider<Optimism> as OpEngineApi<
274            Optimism,
275            Http<HyperAuthClient>,
276        >>::get_payload_bodies_by_range_v1(&self.engine, start, count).await
277    }
278
279    async fn get_client_version_v1(
280        &self,
281        client_version: ClientVersionV1,
282    ) -> TransportResult<Vec<ClientVersionV1>> {
283        <RootProvider<Optimism> as OpEngineApi<
284            Optimism,
285            Http<HyperAuthClient>,
286        >>::get_client_version_v1(&self.engine, client_version).await
287    }
288
289    async fn signal_superchain_v1(
290        &self,
291        recommended: ProtocolVersion,
292        required: ProtocolVersion,
293    ) -> TransportResult<ProtocolVersion> {
294        <RootProvider<Optimism> as OpEngineApi<
295            Optimism,
296            Http<HyperAuthClient>,
297        >>::signal_superchain_v1(&self.engine, recommended, required).await
298    }
299
300    async fn exchange_capabilities(
301        &self,
302        capabilities: Vec<String>,
303    ) -> TransportResult<Vec<String>> {
304        <RootProvider<Optimism> as OpEngineApi<
305            Optimism,
306            Http<HyperAuthClient>,
307        >>::exchange_capabilities(&self.engine, capabilities).await
308    }
309}
310
311/// Wrapper to record the time taken for a call to the engine API and log the result as a metric.
312async fn record_call_time<T>(
313    f: impl Future<Output = TransportResult<T>>,
314    metric_label: &'static str,
315) -> TransportResult<T> {
316    // Await on the future and track its duration.
317    let start = Instant::now();
318    let result = f.await?;
319    let duration = start.elapsed();
320
321    // Record the call duration.
322    kona_macros::record!(
323        histogram,
324        Metrics::ENGINE_METHOD_REQUEST_DURATION,
325        "method",
326        metric_label,
327        duration.as_secs_f64()
328    );
329    Ok(result)
330}