1use crate::Metrics;
4use alloy_eips::eip1898::BlockNumberOrTag;
5use alloy_network::Network;
6use alloy_primitives::{B256, BlockHash, Bytes};
7use alloy_provider::{Provider, RootProvider};
8use alloy_rpc_client::RpcClient;
9use alloy_rpc_types_engine::{
10 ClientVersionV1, ExecutionPayloadBodiesV1, ExecutionPayloadEnvelopeV2, ExecutionPayloadInputV2,
11 ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, JwtSecret, PayloadId, PayloadStatus,
12};
13use alloy_rpc_types_eth::Block;
14use alloy_transport::{RpcError, TransportErrorKind, TransportResult};
15use alloy_transport_http::{
16 AuthLayer, AuthService, Http, HyperClient,
17 hyper_util::{
18 client::legacy::{Client, connect::HttpConnector},
19 rt::TokioExecutor,
20 },
21};
22use derive_more::Deref;
23use http_body_util::Full;
24use kona_genesis::RollupConfig;
25use kona_protocol::{FromBlockError, L2BlockInfo};
26use op_alloy_network::Optimism;
27use op_alloy_provider::ext::engine::OpEngineApi;
28use op_alloy_rpc_types::Transaction;
29use op_alloy_rpc_types_engine::{
30 OpExecutionPayloadEnvelopeV3, OpExecutionPayloadEnvelopeV4, OpExecutionPayloadV4,
31 OpPayloadAttributes, ProtocolVersion,
32};
33use std::{sync::Arc, time::Instant};
34use thiserror::Error;
35use tower::ServiceBuilder;
36use url::Url;
37
38#[derive(Error, Debug)]
40pub enum EngineClientError {
41 #[error("An RPC error occurred: {0}")]
43 RpcError(#[from] RpcError<TransportErrorKind>),
44
45 #[error("An error occurred while decoding the payload: {0}")]
47 BlockInfoDecodeError(#[from] FromBlockError),
48}
49type HyperAuthClient<B = Full<Bytes>> = HyperClient<B, AuthService<Client<HttpConnector, B>>>;
51
52#[derive(Debug, Deref, Clone)]
78pub struct EngineClient {
79 #[deref]
81 engine: RootProvider<Optimism>,
82 l1_provider: RootProvider,
84 cfg: Arc<RollupConfig>,
86}
87
88impl EngineClient {
89 fn rpc_client<T: Network>(addr: Url, jwt: JwtSecret) -> RootProvider<T> {
91 let hyper_client = Client::builder(TokioExecutor::new()).build_http::<Full<Bytes>>();
92 let auth_layer = AuthLayer::new(jwt);
93 let service = ServiceBuilder::new().layer(auth_layer).service(hyper_client);
94 let layer_transport = HyperClient::with_service(service);
95
96 let http_hyper = Http::with_client(layer_transport, addr);
97 let rpc_client = RpcClient::new(http_hyper, false);
98 RootProvider::<T>::new(rpc_client)
99 }
100
101 pub fn new_http(engine: Url, l1_rpc: Url, cfg: Arc<RollupConfig>, jwt: JwtSecret) -> Self {
113 let engine = Self::rpc_client::<Optimism>(engine, jwt);
114 let l1_provider = RootProvider::new_http(l1_rpc);
115
116 Self { engine, l1_provider, cfg }
117 }
118
119 pub const fn l2_engine(&self) -> &RootProvider<Optimism> {
121 &self.engine
122 }
123
124 pub const fn l1_provider(&self) -> &RootProvider {
126 &self.l1_provider
127 }
128
129 pub fn cfg(&self) -> &RollupConfig {
131 self.cfg.as_ref()
132 }
133
134 pub async fn l2_block_by_label(
136 &self,
137 numtag: BlockNumberOrTag,
138 ) -> Result<Option<Block<Transaction>>, EngineClientError> {
139 Ok(<RootProvider<Optimism>>::get_block_by_number(&self.engine, numtag).full().await?)
140 }
141
142 pub async fn l2_block_info_by_label(
144 &self,
145 numtag: BlockNumberOrTag,
146 ) -> Result<Option<L2BlockInfo>, EngineClientError> {
147 let block =
148 <RootProvider<Optimism>>::get_block_by_number(&self.engine, numtag).full().await?;
149 let Some(block) = block else {
150 return Ok(None);
151 };
152 Ok(Some(L2BlockInfo::from_block_and_genesis(&block.into_consensus(), &self.cfg.genesis)?))
153 }
154}
155
156#[async_trait::async_trait]
157impl OpEngineApi<Optimism, Http<HyperAuthClient>> for EngineClient {
158 async fn new_payload_v2(
159 &self,
160 payload: ExecutionPayloadInputV2,
161 ) -> TransportResult<PayloadStatus> {
162 let call = <RootProvider<Optimism> as OpEngineApi<
163 Optimism,
164 Http<HyperAuthClient>,
165 >>::new_payload_v2(&self.engine, payload);
166
167 record_call_time(call, Metrics::NEW_PAYLOAD_METHOD).await
168 }
169
170 async fn new_payload_v3(
171 &self,
172 payload: ExecutionPayloadV3,
173 parent_beacon_block_root: B256,
174 ) -> TransportResult<PayloadStatus> {
175 let call = <RootProvider<Optimism> as OpEngineApi<
176 Optimism,
177 Http<HyperAuthClient>,
178 >>::new_payload_v3(&self.engine, payload, parent_beacon_block_root);
179
180 record_call_time(call, Metrics::NEW_PAYLOAD_METHOD).await
181 }
182
183 async fn new_payload_v4(
184 &self,
185 payload: OpExecutionPayloadV4,
186 parent_beacon_block_root: B256,
187 ) -> TransportResult<PayloadStatus> {
188 let call = <RootProvider<Optimism> as OpEngineApi<
189 Optimism,
190 Http<HyperAuthClient>,
191 >>::new_payload_v4(&self.engine, payload, parent_beacon_block_root);
192
193 record_call_time(call, Metrics::NEW_PAYLOAD_METHOD).await
194 }
195
196 async fn fork_choice_updated_v2(
197 &self,
198 fork_choice_state: ForkchoiceState,
199 payload_attributes: Option<OpPayloadAttributes>,
200 ) -> TransportResult<ForkchoiceUpdated> {
201 let call = <RootProvider<Optimism> as OpEngineApi<
202 Optimism,
203 Http<HyperAuthClient>,
204 >>::fork_choice_updated_v2(&self.engine, fork_choice_state, payload_attributes);
205
206 record_call_time(call, Metrics::FORKCHOICE_UPDATE_METHOD).await
207 }
208
209 async fn fork_choice_updated_v3(
210 &self,
211 fork_choice_state: ForkchoiceState,
212 payload_attributes: Option<OpPayloadAttributes>,
213 ) -> TransportResult<ForkchoiceUpdated> {
214 let call = <RootProvider<Optimism> as OpEngineApi<
215 Optimism,
216 Http<HyperAuthClient>,
217 >>::fork_choice_updated_v3(&self.engine, fork_choice_state, payload_attributes);
218
219 record_call_time(call, Metrics::FORKCHOICE_UPDATE_METHOD).await
220 }
221
222 async fn get_payload_v2(
223 &self,
224 payload_id: PayloadId,
225 ) -> TransportResult<ExecutionPayloadEnvelopeV2> {
226 let call = <RootProvider<Optimism> as OpEngineApi<
227 Optimism,
228 Http<HyperAuthClient>,
229 >>::get_payload_v2(&self.engine, payload_id);
230
231 record_call_time(call, Metrics::GET_PAYLOAD_METHOD).await
232 }
233
234 async fn get_payload_v3(
235 &self,
236 payload_id: PayloadId,
237 ) -> TransportResult<OpExecutionPayloadEnvelopeV3> {
238 let call = <RootProvider<Optimism> as OpEngineApi<
239 Optimism,
240 Http<HyperAuthClient>,
241 >>::get_payload_v3(&self.engine, payload_id);
242
243 record_call_time(call, Metrics::GET_PAYLOAD_METHOD).await
244 }
245
246 async fn get_payload_v4(
247 &self,
248 payload_id: PayloadId,
249 ) -> TransportResult<OpExecutionPayloadEnvelopeV4> {
250 let call = <RootProvider<Optimism> as OpEngineApi<
251 Optimism,
252 Http<HyperAuthClient>,
253 >>::get_payload_v4(&self.engine, payload_id);
254
255 record_call_time(call, Metrics::GET_PAYLOAD_METHOD).await
256 }
257
258 async fn get_payload_bodies_by_hash_v1(
259 &self,
260 block_hashes: Vec<BlockHash>,
261 ) -> TransportResult<ExecutionPayloadBodiesV1> {
262 <RootProvider<Optimism> as OpEngineApi<
263 Optimism,
264 Http<HyperAuthClient>,
265 >>::get_payload_bodies_by_hash_v1(&self.engine, block_hashes).await
266 }
267
268 async fn get_payload_bodies_by_range_v1(
269 &self,
270 start: u64,
271 count: u64,
272 ) -> TransportResult<ExecutionPayloadBodiesV1> {
273 <RootProvider<Optimism> as OpEngineApi<
274 Optimism,
275 Http<HyperAuthClient>,
276 >>::get_payload_bodies_by_range_v1(&self.engine, start, count).await
277 }
278
279 async fn get_client_version_v1(
280 &self,
281 client_version: ClientVersionV1,
282 ) -> TransportResult<Vec<ClientVersionV1>> {
283 <RootProvider<Optimism> as OpEngineApi<
284 Optimism,
285 Http<HyperAuthClient>,
286 >>::get_client_version_v1(&self.engine, client_version).await
287 }
288
289 async fn signal_superchain_v1(
290 &self,
291 recommended: ProtocolVersion,
292 required: ProtocolVersion,
293 ) -> TransportResult<ProtocolVersion> {
294 <RootProvider<Optimism> as OpEngineApi<
295 Optimism,
296 Http<HyperAuthClient>,
297 >>::signal_superchain_v1(&self.engine, recommended, required).await
298 }
299
300 async fn exchange_capabilities(
301 &self,
302 capabilities: Vec<String>,
303 ) -> TransportResult<Vec<String>> {
304 <RootProvider<Optimism> as OpEngineApi<
305 Optimism,
306 Http<HyperAuthClient>,
307 >>::exchange_capabilities(&self.engine, capabilities).await
308 }
309}
310
311async fn record_call_time<T>(
313 f: impl Future<Output = TransportResult<T>>,
314 metric_label: &'static str,
315) -> TransportResult<T> {
316 let start = Instant::now();
318 let result = f.await?;
319 let duration = start.elapsed();
320
321 kona_macros::record!(
323 histogram,
324 Metrics::ENGINE_METHOD_REQUEST_DURATION,
325 "method",
326 metric_label,
327 duration.as_secs_f64()
328 );
329 Ok(result)
330}