kona_engine/
client.rs

1//! An Engine API Client.
2
3use crate::Metrics;
4use alloy_eips::eip1898::BlockNumberOrTag;
5use alloy_network::{AnyNetwork, Network};
6use alloy_primitives::{B256, BlockHash, Bytes};
7use alloy_provider::{Provider, RootProvider};
8use alloy_rpc_client::RpcClient;
9use alloy_rpc_types_engine::{
10    ClientVersionV1, ExecutionPayloadBodiesV1, ExecutionPayloadEnvelopeV2, ExecutionPayloadInputV2,
11    ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, JwtSecret, PayloadId, PayloadStatus,
12};
13use alloy_rpc_types_eth::Block;
14use alloy_transport::{RpcError, TransportErrorKind, TransportResult};
15use alloy_transport_http::{
16    AuthLayer, AuthService, Http, HyperClient,
17    hyper_util::{
18        client::legacy::{Client, connect::HttpConnector},
19        rt::TokioExecutor,
20    },
21};
22use derive_more::Deref;
23use http_body_util::Full;
24use kona_genesis::RollupConfig;
25use kona_protocol::{FromBlockError, L2BlockInfo};
26use op_alloy_network::Optimism;
27use op_alloy_provider::ext::engine::OpEngineApi;
28use op_alloy_rpc_types::Transaction;
29use op_alloy_rpc_types_engine::{
30    OpExecutionPayloadEnvelopeV3, OpExecutionPayloadEnvelopeV4, OpExecutionPayloadV4,
31    OpPayloadAttributes, ProtocolVersion,
32};
33use std::{sync::Arc, time::Instant};
34use thiserror::Error;
35use tower::ServiceBuilder;
36use url::Url;
37
38/// An error that occurred in the [`EngineClient`].
39#[derive(Error, Debug)]
40pub enum EngineClientError {
41    /// An RPC error occurred
42    #[error("An RPC error occurred: {0}")]
43    RpcError(#[from] RpcError<TransportErrorKind>),
44
45    /// An error occurred while decoding the payload
46    #[error("An error occurred while decoding the payload: {0}")]
47    BlockInfoDecodeError(#[from] FromBlockError),
48}
49/// A Hyper HTTP client with a JWT authentication layer.
50type HyperAuthClient<B = Full<Bytes>> = HyperClient<B, AuthService<Client<HttpConnector, B>>>;
51
52/// An external engine api client
53#[derive(Debug, Deref, Clone)]
54pub struct EngineClient {
55    /// The L2 engine provider.
56    #[deref]
57    engine: RootProvider<AnyNetwork>,
58    /// The L2 chain provider.
59    l2_provider: RootProvider<Optimism>,
60    /// The L1 chain provider.
61    l1_provider: RootProvider,
62    /// The [RollupConfig] for the chain used to timestamp which version of the engine api to use.
63    cfg: Arc<RollupConfig>,
64}
65
66impl EngineClient {
67    /// Creates a new RPC client for the given address and JWT secret.
68    fn rpc_client<T: Network>(addr: Url, jwt: JwtSecret) -> RootProvider<T> {
69        let hyper_client = Client::builder(TokioExecutor::new()).build_http::<Full<Bytes>>();
70        let auth_layer = AuthLayer::new(jwt);
71        let service = ServiceBuilder::new().layer(auth_layer).service(hyper_client);
72        let layer_transport = HyperClient::with_service(service);
73
74        let http_hyper = Http::with_client(layer_transport, addr);
75        let rpc_client = RpcClient::new(http_hyper, false);
76        RootProvider::<T>::new(rpc_client)
77    }
78
79    /// Creates a new [`EngineClient`] from the provided [Url] and [JwtSecret].
80    pub fn new_http(
81        engine: Url,
82        l2_rpc: Url,
83        l1_rpc: Url,
84        cfg: Arc<RollupConfig>,
85        jwt: JwtSecret,
86    ) -> Self {
87        let engine = Self::rpc_client::<AnyNetwork>(engine, jwt);
88        let l2_provider = Self::rpc_client::<Optimism>(l2_rpc, jwt);
89        let l1_provider = RootProvider::new_http(l1_rpc);
90
91        Self { engine, l2_provider, l1_provider, cfg }
92    }
93
94    /// Returns a reference to the inner L2 [`RootProvider`].
95    pub const fn l2_provider(&self) -> &RootProvider<Optimism> {
96        &self.l2_provider
97    }
98
99    /// Returns a reference to the inner L1 [`RootProvider`].
100    pub const fn l1_provider(&self) -> &RootProvider {
101        &self.l1_provider
102    }
103
104    /// Returns a reference to the inner [`RollupConfig`].
105    pub fn cfg(&self) -> &RollupConfig {
106        self.cfg.as_ref()
107    }
108
109    /// Fetches the [`Block<T>`] for the given [`BlockNumberOrTag`].
110    pub async fn l2_block_by_label(
111        &self,
112        numtag: BlockNumberOrTag,
113    ) -> Result<Option<Block<Transaction>>, EngineClientError> {
114        Ok(<RootProvider<Optimism>>::get_block_by_number(&self.l2_provider, numtag).full().await?)
115    }
116
117    /// Fetches the [L2BlockInfo] by [BlockNumberOrTag].
118    pub async fn l2_block_info_by_label(
119        &self,
120        numtag: BlockNumberOrTag,
121    ) -> Result<Option<L2BlockInfo>, EngineClientError> {
122        let block =
123            <RootProvider<Optimism>>::get_block_by_number(&self.l2_provider, numtag).full().await?;
124        let Some(block) = block else {
125            return Ok(None);
126        };
127        Ok(Some(L2BlockInfo::from_block_and_genesis(&block.into_consensus(), &self.cfg.genesis)?))
128    }
129}
130
131#[async_trait::async_trait]
132impl OpEngineApi<AnyNetwork, Http<HyperAuthClient>> for EngineClient {
133    async fn new_payload_v2(
134        &self,
135        payload: ExecutionPayloadInputV2,
136    ) -> TransportResult<PayloadStatus> {
137        let call = <RootProvider<AnyNetwork> as OpEngineApi<
138            AnyNetwork,
139            Http<HyperAuthClient>,
140        >>::new_payload_v2(&self.engine, payload);
141
142        record_call_time(call, Metrics::NEW_PAYLOAD_METHOD).await
143    }
144
145    async fn new_payload_v3(
146        &self,
147        payload: ExecutionPayloadV3,
148        parent_beacon_block_root: B256,
149    ) -> TransportResult<PayloadStatus> {
150        let call = <RootProvider<AnyNetwork> as OpEngineApi<
151            AnyNetwork,
152            Http<HyperAuthClient>,
153        >>::new_payload_v3(&self.engine, payload, parent_beacon_block_root);
154
155        record_call_time(call, Metrics::NEW_PAYLOAD_METHOD).await
156    }
157
158    async fn new_payload_v4(
159        &self,
160        payload: OpExecutionPayloadV4,
161        parent_beacon_block_root: B256,
162    ) -> TransportResult<PayloadStatus> {
163        let call = <RootProvider<AnyNetwork> as OpEngineApi<
164            AnyNetwork,
165            Http<HyperAuthClient>,
166        >>::new_payload_v4(&self.engine, payload, parent_beacon_block_root);
167
168        record_call_time(call, Metrics::NEW_PAYLOAD_METHOD).await
169    }
170
171    async fn fork_choice_updated_v2(
172        &self,
173        fork_choice_state: ForkchoiceState,
174        payload_attributes: Option<OpPayloadAttributes>,
175    ) -> TransportResult<ForkchoiceUpdated> {
176        let call = <RootProvider<AnyNetwork> as OpEngineApi<
177            AnyNetwork,
178            Http<HyperAuthClient>,
179        >>::fork_choice_updated_v2(&self.engine, fork_choice_state, payload_attributes);
180
181        record_call_time(call, Metrics::FORKCHOICE_UPDATE_METHOD).await
182    }
183
184    async fn fork_choice_updated_v3(
185        &self,
186        fork_choice_state: ForkchoiceState,
187        payload_attributes: Option<OpPayloadAttributes>,
188    ) -> TransportResult<ForkchoiceUpdated> {
189        let call = <RootProvider<AnyNetwork> as OpEngineApi<
190            AnyNetwork,
191            Http<HyperAuthClient>,
192        >>::fork_choice_updated_v3(&self.engine, fork_choice_state, payload_attributes);
193
194        record_call_time(call, Metrics::FORKCHOICE_UPDATE_METHOD).await
195    }
196
197    async fn get_payload_v2(
198        &self,
199        payload_id: PayloadId,
200    ) -> TransportResult<ExecutionPayloadEnvelopeV2> {
201        let call = <RootProvider<AnyNetwork> as OpEngineApi<
202            AnyNetwork,
203            Http<HyperAuthClient>,
204        >>::get_payload_v2(&self.engine, payload_id);
205
206        record_call_time(call, Metrics::GET_PAYLOAD_METHOD).await
207    }
208
209    async fn get_payload_v3(
210        &self,
211        payload_id: PayloadId,
212    ) -> TransportResult<OpExecutionPayloadEnvelopeV3> {
213        let call = <RootProvider<AnyNetwork> as OpEngineApi<
214            AnyNetwork,
215            Http<HyperAuthClient>,
216        >>::get_payload_v3(&self.engine, payload_id);
217
218        record_call_time(call, Metrics::GET_PAYLOAD_METHOD).await
219    }
220
221    async fn get_payload_v4(
222        &self,
223        payload_id: PayloadId,
224    ) -> TransportResult<OpExecutionPayloadEnvelopeV4> {
225        let call = <RootProvider<AnyNetwork> as OpEngineApi<
226            AnyNetwork,
227            Http<HyperAuthClient>,
228        >>::get_payload_v4(&self.engine, payload_id);
229
230        record_call_time(call, Metrics::GET_PAYLOAD_METHOD).await
231    }
232
233    async fn get_payload_bodies_by_hash_v1(
234        &self,
235        block_hashes: Vec<BlockHash>,
236    ) -> TransportResult<ExecutionPayloadBodiesV1> {
237        <RootProvider<AnyNetwork> as OpEngineApi<
238            AnyNetwork,
239            Http<HyperAuthClient>,
240        >>::get_payload_bodies_by_hash_v1(&self.engine, block_hashes).await
241    }
242
243    async fn get_payload_bodies_by_range_v1(
244        &self,
245        start: u64,
246        count: u64,
247    ) -> TransportResult<ExecutionPayloadBodiesV1> {
248        <RootProvider<AnyNetwork> as OpEngineApi<
249            AnyNetwork,
250            Http<HyperAuthClient>,
251        >>::get_payload_bodies_by_range_v1(&self.engine, start, count).await
252    }
253
254    async fn get_client_version_v1(
255        &self,
256        client_version: ClientVersionV1,
257    ) -> TransportResult<Vec<ClientVersionV1>> {
258        <RootProvider<AnyNetwork> as OpEngineApi<
259            AnyNetwork,
260            Http<HyperAuthClient>,
261        >>::get_client_version_v1(&self.engine, client_version).await
262    }
263
264    async fn signal_superchain_v1(
265        &self,
266        recommended: ProtocolVersion,
267        required: ProtocolVersion,
268    ) -> TransportResult<ProtocolVersion> {
269        <RootProvider<AnyNetwork> as OpEngineApi<
270            AnyNetwork,
271            Http<HyperAuthClient>,
272        >>::signal_superchain_v1(&self.engine, recommended, required).await
273    }
274
275    async fn exchange_capabilities(
276        &self,
277        capabilities: Vec<String>,
278    ) -> TransportResult<Vec<String>> {
279        <RootProvider<AnyNetwork> as OpEngineApi<
280            AnyNetwork,
281            Http<HyperAuthClient>,
282        >>::exchange_capabilities(&self.engine, capabilities).await
283    }
284}
285
286/// Wrapper to record the time taken for a call to the engine API and log the result as a metric.
287async fn record_call_time<T>(
288    f: impl Future<Output = TransportResult<T>>,
289    metric_label: &'static str,
290) -> TransportResult<T> {
291    // Await on the future and track its duration.
292    let start = Instant::now();
293    let result = f.await?;
294    let duration = start.elapsed();
295
296    // Record the call duration.
297    kona_macros::record!(
298        histogram,
299        Metrics::ENGINE_METHOD_REQUEST_DURATION,
300        "method",
301        metric_label,
302        duration.as_secs_f64()
303    );
304    Ok(result)
305}