1use crate::Metrics;
4use alloy_eips::eip1898::BlockNumberOrTag;
5use alloy_network::{AnyNetwork, Network};
6use alloy_primitives::{B256, BlockHash, Bytes};
7use alloy_provider::{Provider, RootProvider};
8use alloy_rpc_client::RpcClient;
9use alloy_rpc_types_engine::{
10 ClientVersionV1, ExecutionPayloadBodiesV1, ExecutionPayloadEnvelopeV2, ExecutionPayloadInputV2,
11 ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, JwtSecret, PayloadId, PayloadStatus,
12};
13use alloy_rpc_types_eth::Block;
14use alloy_transport::{RpcError, TransportErrorKind, TransportResult};
15use alloy_transport_http::{
16 AuthLayer, AuthService, Http, HyperClient,
17 hyper_util::{
18 client::legacy::{Client, connect::HttpConnector},
19 rt::TokioExecutor,
20 },
21};
22use derive_more::Deref;
23use http_body_util::Full;
24use kona_genesis::RollupConfig;
25use kona_protocol::{FromBlockError, L2BlockInfo};
26use op_alloy_network::Optimism;
27use op_alloy_provider::ext::engine::OpEngineApi;
28use op_alloy_rpc_types::Transaction;
29use op_alloy_rpc_types_engine::{
30 OpExecutionPayloadEnvelopeV3, OpExecutionPayloadEnvelopeV4, OpExecutionPayloadV4,
31 OpPayloadAttributes, ProtocolVersion,
32};
33use std::{sync::Arc, time::Instant};
34use thiserror::Error;
35use tower::ServiceBuilder;
36use url::Url;
37
38#[derive(Error, Debug)]
40pub enum EngineClientError {
41 #[error("An RPC error occurred: {0}")]
43 RpcError(#[from] RpcError<TransportErrorKind>),
44
45 #[error("An error occurred while decoding the payload: {0}")]
47 BlockInfoDecodeError(#[from] FromBlockError),
48}
49type HyperAuthClient<B = Full<Bytes>> = HyperClient<B, AuthService<Client<HttpConnector, B>>>;
51
52#[derive(Debug, Deref, Clone)]
54pub struct EngineClient {
55 #[deref]
57 engine: RootProvider<AnyNetwork>,
58 l2_provider: RootProvider<Optimism>,
60 l1_provider: RootProvider,
62 cfg: Arc<RollupConfig>,
64}
65
66impl EngineClient {
67 fn rpc_client<T: Network>(addr: Url, jwt: JwtSecret) -> RootProvider<T> {
69 let hyper_client = Client::builder(TokioExecutor::new()).build_http::<Full<Bytes>>();
70 let auth_layer = AuthLayer::new(jwt);
71 let service = ServiceBuilder::new().layer(auth_layer).service(hyper_client);
72 let layer_transport = HyperClient::with_service(service);
73
74 let http_hyper = Http::with_client(layer_transport, addr);
75 let rpc_client = RpcClient::new(http_hyper, false);
76 RootProvider::<T>::new(rpc_client)
77 }
78
79 pub fn new_http(
81 engine: Url,
82 l2_rpc: Url,
83 l1_rpc: Url,
84 cfg: Arc<RollupConfig>,
85 jwt: JwtSecret,
86 ) -> Self {
87 let engine = Self::rpc_client::<AnyNetwork>(engine, jwt);
88 let l2_provider = Self::rpc_client::<Optimism>(l2_rpc, jwt);
89 let l1_provider = RootProvider::new_http(l1_rpc);
90
91 Self { engine, l2_provider, l1_provider, cfg }
92 }
93
94 pub const fn l2_provider(&self) -> &RootProvider<Optimism> {
96 &self.l2_provider
97 }
98
99 pub const fn l1_provider(&self) -> &RootProvider {
101 &self.l1_provider
102 }
103
104 pub fn cfg(&self) -> &RollupConfig {
106 self.cfg.as_ref()
107 }
108
109 pub async fn l2_block_by_label(
111 &self,
112 numtag: BlockNumberOrTag,
113 ) -> Result<Option<Block<Transaction>>, EngineClientError> {
114 Ok(<RootProvider<Optimism>>::get_block_by_number(&self.l2_provider, numtag).full().await?)
115 }
116
117 pub async fn l2_block_info_by_label(
119 &self,
120 numtag: BlockNumberOrTag,
121 ) -> Result<Option<L2BlockInfo>, EngineClientError> {
122 let block =
123 <RootProvider<Optimism>>::get_block_by_number(&self.l2_provider, numtag).full().await?;
124 let Some(block) = block else {
125 return Ok(None);
126 };
127 Ok(Some(L2BlockInfo::from_block_and_genesis(&block.into_consensus(), &self.cfg.genesis)?))
128 }
129}
130
131#[async_trait::async_trait]
132impl OpEngineApi<AnyNetwork, Http<HyperAuthClient>> for EngineClient {
133 async fn new_payload_v2(
134 &self,
135 payload: ExecutionPayloadInputV2,
136 ) -> TransportResult<PayloadStatus> {
137 let call = <RootProvider<AnyNetwork> as OpEngineApi<
138 AnyNetwork,
139 Http<HyperAuthClient>,
140 >>::new_payload_v2(&self.engine, payload);
141
142 record_call_time(call, Metrics::NEW_PAYLOAD_METHOD).await
143 }
144
145 async fn new_payload_v3(
146 &self,
147 payload: ExecutionPayloadV3,
148 parent_beacon_block_root: B256,
149 ) -> TransportResult<PayloadStatus> {
150 let call = <RootProvider<AnyNetwork> as OpEngineApi<
151 AnyNetwork,
152 Http<HyperAuthClient>,
153 >>::new_payload_v3(&self.engine, payload, parent_beacon_block_root);
154
155 record_call_time(call, Metrics::NEW_PAYLOAD_METHOD).await
156 }
157
158 async fn new_payload_v4(
159 &self,
160 payload: OpExecutionPayloadV4,
161 parent_beacon_block_root: B256,
162 ) -> TransportResult<PayloadStatus> {
163 let call = <RootProvider<AnyNetwork> as OpEngineApi<
164 AnyNetwork,
165 Http<HyperAuthClient>,
166 >>::new_payload_v4(&self.engine, payload, parent_beacon_block_root);
167
168 record_call_time(call, Metrics::NEW_PAYLOAD_METHOD).await
169 }
170
171 async fn fork_choice_updated_v2(
172 &self,
173 fork_choice_state: ForkchoiceState,
174 payload_attributes: Option<OpPayloadAttributes>,
175 ) -> TransportResult<ForkchoiceUpdated> {
176 let call = <RootProvider<AnyNetwork> as OpEngineApi<
177 AnyNetwork,
178 Http<HyperAuthClient>,
179 >>::fork_choice_updated_v2(&self.engine, fork_choice_state, payload_attributes);
180
181 record_call_time(call, Metrics::FORKCHOICE_UPDATE_METHOD).await
182 }
183
184 async fn fork_choice_updated_v3(
185 &self,
186 fork_choice_state: ForkchoiceState,
187 payload_attributes: Option<OpPayloadAttributes>,
188 ) -> TransportResult<ForkchoiceUpdated> {
189 let call = <RootProvider<AnyNetwork> as OpEngineApi<
190 AnyNetwork,
191 Http<HyperAuthClient>,
192 >>::fork_choice_updated_v3(&self.engine, fork_choice_state, payload_attributes);
193
194 record_call_time(call, Metrics::FORKCHOICE_UPDATE_METHOD).await
195 }
196
197 async fn get_payload_v2(
198 &self,
199 payload_id: PayloadId,
200 ) -> TransportResult<ExecutionPayloadEnvelopeV2> {
201 let call = <RootProvider<AnyNetwork> as OpEngineApi<
202 AnyNetwork,
203 Http<HyperAuthClient>,
204 >>::get_payload_v2(&self.engine, payload_id);
205
206 record_call_time(call, Metrics::GET_PAYLOAD_METHOD).await
207 }
208
209 async fn get_payload_v3(
210 &self,
211 payload_id: PayloadId,
212 ) -> TransportResult<OpExecutionPayloadEnvelopeV3> {
213 let call = <RootProvider<AnyNetwork> as OpEngineApi<
214 AnyNetwork,
215 Http<HyperAuthClient>,
216 >>::get_payload_v3(&self.engine, payload_id);
217
218 record_call_time(call, Metrics::GET_PAYLOAD_METHOD).await
219 }
220
221 async fn get_payload_v4(
222 &self,
223 payload_id: PayloadId,
224 ) -> TransportResult<OpExecutionPayloadEnvelopeV4> {
225 let call = <RootProvider<AnyNetwork> as OpEngineApi<
226 AnyNetwork,
227 Http<HyperAuthClient>,
228 >>::get_payload_v4(&self.engine, payload_id);
229
230 record_call_time(call, Metrics::GET_PAYLOAD_METHOD).await
231 }
232
233 async fn get_payload_bodies_by_hash_v1(
234 &self,
235 block_hashes: Vec<BlockHash>,
236 ) -> TransportResult<ExecutionPayloadBodiesV1> {
237 <RootProvider<AnyNetwork> as OpEngineApi<
238 AnyNetwork,
239 Http<HyperAuthClient>,
240 >>::get_payload_bodies_by_hash_v1(&self.engine, block_hashes).await
241 }
242
243 async fn get_payload_bodies_by_range_v1(
244 &self,
245 start: u64,
246 count: u64,
247 ) -> TransportResult<ExecutionPayloadBodiesV1> {
248 <RootProvider<AnyNetwork> as OpEngineApi<
249 AnyNetwork,
250 Http<HyperAuthClient>,
251 >>::get_payload_bodies_by_range_v1(&self.engine, start, count).await
252 }
253
254 async fn get_client_version_v1(
255 &self,
256 client_version: ClientVersionV1,
257 ) -> TransportResult<Vec<ClientVersionV1>> {
258 <RootProvider<AnyNetwork> as OpEngineApi<
259 AnyNetwork,
260 Http<HyperAuthClient>,
261 >>::get_client_version_v1(&self.engine, client_version).await
262 }
263
264 async fn signal_superchain_v1(
265 &self,
266 recommended: ProtocolVersion,
267 required: ProtocolVersion,
268 ) -> TransportResult<ProtocolVersion> {
269 <RootProvider<AnyNetwork> as OpEngineApi<
270 AnyNetwork,
271 Http<HyperAuthClient>,
272 >>::signal_superchain_v1(&self.engine, recommended, required).await
273 }
274
275 async fn exchange_capabilities(
276 &self,
277 capabilities: Vec<String>,
278 ) -> TransportResult<Vec<String>> {
279 <RootProvider<AnyNetwork> as OpEngineApi<
280 AnyNetwork,
281 Http<HyperAuthClient>,
282 >>::exchange_capabilities(&self.engine, capabilities).await
283 }
284}
285
286async fn record_call_time<T>(
288 f: impl Future<Output = TransportResult<T>>,
289 metric_label: &'static str,
290) -> TransportResult<T> {
291 let start = Instant::now();
293 let result = f.await?;
294 let duration = start.elapsed();
295
296 kona_macros::record!(
298 histogram,
299 Metrics::ENGINE_METHOD_REQUEST_DURATION,
300 "method",
301 metric_label,
302 duration.as_secs_f64()
303 );
304 Ok(result)
305}