1use ethrex_blockchain::{
2 error::{ChainError, InvalidForkChoice},
3 fork_choice::apply_fork_choice,
4 payload::{BuildPayloadArgs, create_payload},
5};
6use ethrex_common::types::{BlockHeader, ELASTICITY_MULTIPLIER};
7use ethrex_p2p::sync::SyncMode;
8use serde_json::Value;
9use tracing::{debug, info, warn};
10
11use crate::{
12 rpc::{RpcApiContext, RpcHandler},
13 subscription_manager::SubscriptionManagerProtocol,
14 types::{
15 fork_choice::{
16 ForkChoiceResponse, ForkChoiceState, PayloadAttributesV3, PayloadAttributesV4,
17 },
18 payload::PayloadStatus,
19 },
20 utils::RpcErr,
21 utils::RpcRequest,
22};
23
24#[derive(Debug)]
25pub struct ForkChoiceUpdatedV1 {
26 pub fork_choice_state: ForkChoiceState,
27 pub payload_attributes: Option<PayloadAttributesV3>,
28}
29
30impl RpcHandler for ForkChoiceUpdatedV1 {
31 fn parse(params: &Option<Vec<Value>>) -> Result<Self, RpcErr> {
32 let (fork_choice_state, payload_attributes) = parse(params, false)?;
33 Ok(ForkChoiceUpdatedV1 {
34 fork_choice_state,
35 payload_attributes,
36 })
37 }
38
39 async fn handle(&self, context: RpcApiContext) -> Result<Value, RpcErr> {
40 let (head_block_opt, mut response) =
41 handle_forkchoice(&self.fork_choice_state, context.clone(), 1).await?;
42 if let (Some(head_block), Some(attributes)) = (head_block_opt, &self.payload_attributes) {
43 let chain_config = context.storage.get_chain_config();
44 if chain_config.is_cancun_activated(attributes.timestamp) {
45 return Err(RpcErr::UnsupportedFork(
46 "forkChoiceV1 used to build Cancun payload".to_string(),
47 ));
48 }
49 validate_attributes_v1(attributes, &head_block)?;
50 let payload_id = build_payload(attributes, context, &self.fork_choice_state, 1).await?;
51 response.set_id(payload_id);
52 }
53 serde_json::to_value(response).map_err(|error| RpcErr::Internal(error.to_string()))
54 }
55}
56
57#[derive(Debug)]
58pub struct ForkChoiceUpdatedV2 {
59 pub fork_choice_state: ForkChoiceState,
60 pub payload_attributes: Option<PayloadAttributesV3>,
61}
62
63impl RpcHandler for ForkChoiceUpdatedV2 {
64 fn parse(params: &Option<Vec<Value>>) -> Result<Self, RpcErr> {
65 let (fork_choice_state, payload_attributes) = parse(params, false)?;
66 Ok(ForkChoiceUpdatedV2 {
67 fork_choice_state,
68 payload_attributes,
69 })
70 }
71
72 async fn handle(&self, context: RpcApiContext) -> Result<Value, RpcErr> {
73 let (head_block_opt, mut response) =
74 handle_forkchoice(&self.fork_choice_state, context.clone(), 2).await?;
75 if let (Some(head_block), Some(attributes)) = (head_block_opt, &self.payload_attributes) {
76 let chain_config = context.storage.get_chain_config();
77 if chain_config.is_cancun_activated(attributes.timestamp) {
78 return Err(RpcErr::UnsupportedFork(
79 "forkChoiceV2 used to build Cancun payload".to_string(),
80 ));
81 } else if chain_config.is_shanghai_activated(attributes.timestamp) {
82 validate_attributes_v2(attributes, &head_block)?;
83 } else {
84 validate_attributes_v2_pre_shanghai(attributes, &head_block)?;
85 }
86 let payload_id = build_payload(attributes, context, &self.fork_choice_state, 2).await?;
87 response.set_id(payload_id);
88 }
89 serde_json::to_value(response).map_err(|error| RpcErr::Internal(error.to_string()))
90 }
91}
92
93#[derive(Debug)]
94pub struct ForkChoiceUpdatedV3 {
95 pub fork_choice_state: ForkChoiceState,
96 pub payload_attributes: Option<PayloadAttributesV3>,
97}
98
99impl From<ForkChoiceUpdatedV3> for RpcRequest {
100 fn from(val: ForkChoiceUpdatedV3) -> Self {
101 RpcRequest {
102 method: "engine_forkchoiceUpdatedV3".to_string(),
103 params: Some(vec![
104 serde_json::json!(val.fork_choice_state),
105 serde_json::json!(val.payload_attributes),
106 ]),
107 ..Default::default()
108 }
109 }
110}
111
112impl RpcHandler for ForkChoiceUpdatedV3 {
113 fn parse(params: &Option<Vec<Value>>) -> Result<Self, RpcErr> {
114 let (fork_choice_state, payload_attributes) = parse(params, true)?;
115 Ok(ForkChoiceUpdatedV3 {
116 fork_choice_state,
117 payload_attributes,
118 })
119 }
120
121 async fn handle(&self, context: RpcApiContext) -> Result<Value, RpcErr> {
122 let (head_block_opt, mut response) =
123 handle_forkchoice(&self.fork_choice_state, context.clone(), 3).await?;
124 if let (Some(head_block), Some(attributes)) = (head_block_opt, &self.payload_attributes) {
125 validate_attributes_v3(attributes, &head_block, &context)?;
126 let payload_id = build_payload(attributes, context, &self.fork_choice_state, 3).await?;
127 response.set_id(payload_id);
128 }
129 serde_json::to_value(response).map_err(|error| RpcErr::Internal(error.to_string()))
130 }
131}
132
133#[derive(Debug)]
134pub struct ForkChoiceUpdatedV4 {
135 pub fork_choice_state: ForkChoiceState,
136 pub payload_attributes: Option<PayloadAttributesV4>,
137}
138
139impl From<ForkChoiceUpdatedV4> for RpcRequest {
140 fn from(val: ForkChoiceUpdatedV4) -> Self {
141 RpcRequest {
142 method: "engine_forkchoiceUpdatedV4".to_string(),
143 params: Some(vec![
144 serde_json::json!(val.fork_choice_state),
145 serde_json::json!(val.payload_attributes),
146 ]),
147 ..Default::default()
148 }
149 }
150}
151
152impl RpcHandler for ForkChoiceUpdatedV4 {
153 fn parse(params: &Option<Vec<Value>>) -> Result<Self, RpcErr> {
154 let (fork_choice_state, payload_attributes) = parse_v4(params)?;
155 Ok(ForkChoiceUpdatedV4 {
156 fork_choice_state,
157 payload_attributes,
158 })
159 }
160
161 async fn handle(&self, context: RpcApiContext) -> Result<Value, RpcErr> {
162 let (head_block_opt, mut response) =
163 handle_forkchoice(&self.fork_choice_state, context.clone(), 4).await?;
164 if let (Some(head_block), Some(attributes)) = (head_block_opt, &self.payload_attributes) {
165 validate_attributes_v4(attributes, &head_block, &context)?;
166 let payload_id = build_payload_v4(attributes, context, &self.fork_choice_state).await?;
167 response.set_id(payload_id);
168 }
169 serde_json::to_value(response).map_err(|error| RpcErr::Internal(error.to_string()))
170 }
171}
172
173fn parse(
174 params: &Option<Vec<Value>>,
175 is_v3: bool,
176) -> Result<(ForkChoiceState, Option<PayloadAttributesV3>), RpcErr> {
177 let params = params
178 .as_ref()
179 .ok_or(RpcErr::BadParams("No params provided".to_owned()))?;
180
181 if params.len() != 2 && params.len() != 1 {
182 return Err(RpcErr::BadParams("Expected 2 or 1 params".to_owned()));
183 }
184
185 let forkchoice_state: ForkChoiceState = serde_json::from_value(params[0].clone())?;
186 let mut payload_attributes: Option<PayloadAttributesV3> = None;
187 if params.len() == 2 {
188 payload_attributes =
190 match serde_json::from_value::<Option<PayloadAttributesV3>>(params[1].clone()) {
191 Ok(attributes) => attributes,
192 Err(error) => {
193 warn!("Could not parse payload attributes {}", error);
194 None
195 }
196 };
197 }
198
199 if payload_attributes
200 .as_ref()
201 .is_some_and(|attr| !is_v3 && attr.parent_beacon_block_root.is_some())
202 {
203 return Err(RpcErr::InvalidPayloadAttributes(
204 "Attribute parent_beacon_block_root is non-null".to_string(),
205 ));
206 }
207 Ok((forkchoice_state, payload_attributes))
208}
209
210async fn handle_forkchoice(
211 fork_choice_state: &ForkChoiceState,
212 context: RpcApiContext,
213 version: usize,
214) -> Result<(Option<BlockHeader>, ForkChoiceResponse), RpcErr> {
215 let Some(syncer) = &context.syncer else {
216 return Err(RpcErr::Internal(
217 "Fork choice requested but syncer is not initialized".to_string(),
218 ));
219 };
220 debug!(
221 version = %format!("v{}", version),
222 head = %format!("{:#x}", fork_choice_state.head_block_hash),
223 safe = %format!("{:#x}", fork_choice_state.safe_block_hash),
224 finalized = %format!("{:#x}", fork_choice_state.finalized_block_hash),
225 "New fork choice update",
226 );
227
228 if let Some(latest_valid_hash) = context
229 .storage
230 .get_latest_valid_ancestor(fork_choice_state.head_block_hash)
231 .await?
232 {
233 return Ok((
234 None,
235 ForkChoiceResponse::from(PayloadStatus::invalid_with(
236 latest_valid_hash,
237 InvalidForkChoice::InvalidAncestor(latest_valid_hash).to_string(),
238 )),
239 ));
240 }
241
242 if let Some(head_block) = context
244 .storage
245 .get_block_header_by_hash(fork_choice_state.head_block_hash)?
246 && let Some(latest_valid_hash) = context
247 .storage
248 .get_latest_valid_ancestor(head_block.parent_hash)
249 .await?
250 {
251 context
253 .storage
254 .set_latest_valid_ancestor(head_block.hash(), latest_valid_hash)
255 .await?;
256 return Ok((
257 None,
258 ForkChoiceResponse::from(PayloadStatus::invalid_with(
259 latest_valid_hash,
260 InvalidForkChoice::InvalidAncestor(latest_valid_hash).to_string(),
261 )),
262 ));
263 }
264
265 if syncer.sync_mode() == SyncMode::Snap {
270 syncer.sync_to_head(fork_choice_state.head_block_hash);
271 return Ok((None, PayloadStatus::syncing().into()));
272 }
273
274 match apply_fork_choice(
275 &context.storage,
276 fork_choice_state.head_block_hash,
277 fork_choice_state.safe_block_hash,
278 fork_choice_state.finalized_block_hash,
279 )
280 .await
281 {
282 Ok(head) => {
283 context.blockchain.set_synced();
285 match context.storage.get_block_by_hash(head.hash()).await {
288 Ok(Some(block)) => {
289 context
291 .blockchain
292 .remove_block_transactions_from_pool(&block)?;
293 if let Err(err) = context.blockchain.remove_stale_blob_txs(block.hash()) {
299 warn!(
300 "Failed to prune stale blob txs from mempool after fork choice: {err}"
301 );
302 }
303 }
304 Ok(None) => {
305 warn!(
306 "Couldn't get block by hash to remove transactions from the mempool. This is expected in a reconstruted network"
307 )
308 }
309 Err(_) => {
310 return Err(RpcErr::Internal(
311 "Failed to get block by hash to remove transactions from the mempool"
312 .to_string(),
313 ));
314 }
315 };
316
317 if let Some(ws) = &context.ws {
319 let _ = ws.subscription_manager.new_head(head.clone());
320 }
321
322 Ok((
323 Some(head),
324 ForkChoiceResponse::from(PayloadStatus::valid_with_hash(
325 fork_choice_state.head_block_hash,
326 )),
327 ))
328 }
329 Err(forkchoice_error) => {
330 let forkchoice_response = match forkchoice_error {
331 InvalidForkChoice::NewHeadAlreadyCanonical => {
332 context.blockchain.set_synced();
338 return Ok((
339 None,
340 ForkChoiceResponse::from(PayloadStatus::valid_with_hash(
341 fork_choice_state.head_block_hash,
342 )),
343 ));
344 }
345 InvalidForkChoice::Syncing => {
346 syncer.sync_to_head(fork_choice_state.head_block_hash);
348 ForkChoiceResponse::from(PayloadStatus::syncing())
349 }
350 InvalidForkChoice::StateNotReachable => {
352 syncer.sync_to_head(fork_choice_state.head_block_hash);
361 ForkChoiceResponse::from(PayloadStatus::syncing())
362 }
363 InvalidForkChoice::Disconnected(_, _) | InvalidForkChoice::ElementNotFound(_) => {
364 warn!("Invalid fork choice state. Reason: {:?}", forkchoice_error);
365 return Err(RpcErr::InvalidForkChoiceState(forkchoice_error.to_string()));
366 }
367 InvalidForkChoice::TooDeepReorg { .. } => {
368 warn!("Rejecting fork choice update. Reason: {forkchoice_error}");
369 return Err(RpcErr::TooDeepReorg(forkchoice_error.to_string()));
370 }
371 InvalidForkChoice::InvalidAncestor(last_valid_hash) => {
372 ForkChoiceResponse::from(PayloadStatus::invalid_with(
373 last_valid_hash,
374 InvalidForkChoice::InvalidAncestor(last_valid_hash).to_string(),
375 ))
376 }
377 reason => {
378 warn!(
379 "Invalid fork choice payload. Reason: {}",
380 reason.to_string()
381 );
382 let latest_valid_hash = context
383 .storage
384 .get_latest_canonical_block_hash()
385 .await?
386 .ok_or(RpcErr::Internal(
387 "Missing latest canonical block".to_owned(),
388 ))?;
389 ForkChoiceResponse::from(PayloadStatus::invalid_with(
390 latest_valid_hash,
391 reason.to_string(),
392 ))
393 }
394 };
395 Ok((None, forkchoice_response))
396 }
397 }
398}
399
400fn validate_attributes_v1(
401 attributes: &PayloadAttributesV3,
402 head_block: &BlockHeader,
403) -> Result<(), RpcErr> {
404 if attributes.withdrawals.is_some() {
405 return Err(RpcErr::WrongParam("withdrawals".to_string()));
406 }
407 validate_timestamp(attributes, head_block)
408}
409
410fn validate_attributes_v2(
411 attributes: &PayloadAttributesV3,
412 head_block: &BlockHeader,
413) -> Result<(), RpcErr> {
414 if attributes.withdrawals.is_none() {
415 return Err(RpcErr::InvalidPayloadAttributes("withdrawals".to_string()));
416 }
417 validate_timestamp(attributes, head_block)
418}
419
420fn validate_attributes_v2_pre_shanghai(
421 attributes: &PayloadAttributesV3,
422 head_block: &BlockHeader,
423) -> Result<(), RpcErr> {
424 if attributes.withdrawals.is_some() {
425 return Err(RpcErr::InvalidPayloadAttributes("withdrawals".to_string()));
426 }
427 validate_timestamp(attributes, head_block)
428}
429
430fn validate_attributes_v3(
431 attributes: &PayloadAttributesV3,
432 head_block: &BlockHeader,
433 context: &RpcApiContext,
434) -> Result<(), RpcErr> {
435 let chain_config = context.storage.get_chain_config();
436 if attributes.withdrawals.is_none() {
439 return Err(RpcErr::InvalidPayloadAttributes("withdrawals".to_string()));
440 }
441 if attributes.parent_beacon_block_root.is_none() {
442 return Err(RpcErr::InvalidPayloadAttributes(
443 "Attribute parent_beacon_block_root is null".to_string(),
444 ));
445 }
446 if !chain_config.is_cancun_activated(attributes.timestamp) {
447 return Err(RpcErr::UnsupportedFork(
448 "forkChoiceV3 used to build pre-Cancun payload".to_string(),
449 ));
450 }
451 validate_timestamp(attributes, head_block)
452}
453
454fn validate_timestamp(
455 attributes: &PayloadAttributesV3,
456 head_block: &BlockHeader,
457) -> Result<(), RpcErr> {
458 if attributes.timestamp <= head_block.timestamp {
459 return Err(RpcErr::InvalidPayloadAttributes(
460 "invalid timestamp".to_string(),
461 ));
462 }
463 Ok(())
464}
465
466async fn build_payload(
467 attributes: &PayloadAttributesV3,
468 context: RpcApiContext,
469 fork_choice_state: &ForkChoiceState,
470 version: u8,
471) -> Result<u64, RpcErr> {
472 let args = BuildPayloadArgs {
473 parent: fork_choice_state.head_block_hash,
474 timestamp: attributes.timestamp,
475 fee_recipient: attributes.suggested_fee_recipient,
476 random: attributes.prev_randao,
477 withdrawals: attributes.withdrawals.clone(),
478 beacon_root: attributes.parent_beacon_block_root,
479 slot_number: None,
480 version,
481 elasticity_multiplier: ELASTICITY_MULTIPLIER,
482 gas_ceil: context.gas_ceil,
483 };
484 let payload_id = args
485 .id()
486 .map_err(|error| RpcErr::Internal(error.to_string()))?;
487
488 info!(
489 id = payload_id,
490 "Fork choice updated includes payload attributes. Creating a new payload"
491 );
492 let payload = match create_payload(&args, &context.storage, context.node_data.extra_data) {
493 Ok(payload) => payload,
494 Err(ChainError::EvmError(error)) => return Err(error.into()),
495 Err(error) => return Err(RpcErr::Internal(error.to_string())),
498 };
499 context
500 .blockchain
501 .initiate_payload_build(payload, payload_id)
502 .await;
503 Ok(payload_id)
504}
505
506fn parse_v4(
507 params: &Option<Vec<Value>>,
508) -> Result<(ForkChoiceState, Option<PayloadAttributesV4>), RpcErr> {
509 let params = params
510 .as_ref()
511 .ok_or(RpcErr::BadParams("No params provided".to_owned()))?;
512
513 if params.len() != 2 && params.len() != 1 {
514 return Err(RpcErr::BadParams("Expected 2 or 1 params".to_owned()));
515 }
516
517 let forkchoice_state: ForkChoiceState = serde_json::from_value(params[0].clone())?;
518 let mut payload_attributes: Option<PayloadAttributesV4> = None;
519 if params.len() == 2 {
520 payload_attributes =
521 match serde_json::from_value::<Option<PayloadAttributesV4>>(params[1].clone()) {
522 Ok(attributes) => attributes,
523 Err(error) => {
524 warn!("Could not parse payload attributes {}", error);
525 None
526 }
527 };
528 }
529 Ok((forkchoice_state, payload_attributes))
530}
531
532fn validate_attributes_v4(
533 attributes: &PayloadAttributesV4,
534 head_block: &BlockHeader,
535 context: &RpcApiContext,
536) -> Result<(), RpcErr> {
537 let chain_config = context.storage.get_chain_config();
539 if !chain_config.is_amsterdam_activated(attributes.timestamp) {
540 return Err(RpcErr::InvalidPayloadAttributes(
541 "V4 payload attributes used for pre-Amsterdam timestamp".to_string(),
542 ));
543 }
544 if attributes.withdrawals.is_none() {
545 return Err(RpcErr::InvalidPayloadAttributes(
546 "V4 payload attributes missing withdrawals".to_string(),
547 ));
548 }
549 if attributes.parent_beacon_block_root.is_none() {
550 return Err(RpcErr::InvalidPayloadAttributes(
551 "V4 payload attributes missing parent_beacon_block_root".to_string(),
552 ));
553 }
554 validate_timestamp_v4(attributes, head_block)
555}
556
557fn validate_timestamp_v4(
558 attributes: &PayloadAttributesV4,
559 head_block: &BlockHeader,
560) -> Result<(), RpcErr> {
561 if attributes.timestamp <= head_block.timestamp {
562 return Err(RpcErr::InvalidPayloadAttributes(
563 "invalid timestamp".to_string(),
564 ));
565 }
566 Ok(())
567}
568
569async fn build_payload_v4(
570 attributes: &PayloadAttributesV4,
571 context: RpcApiContext,
572 fork_choice_state: &ForkChoiceState,
573) -> Result<u64, RpcErr> {
574 let args = BuildPayloadArgs {
575 parent: fork_choice_state.head_block_hash,
576 timestamp: attributes.timestamp,
577 fee_recipient: attributes.suggested_fee_recipient,
578 random: attributes.prev_randao,
579 withdrawals: attributes.withdrawals.clone(),
580 beacon_root: attributes.parent_beacon_block_root,
581 slot_number: Some(attributes.slot_number),
582 version: 4,
583 elasticity_multiplier: ELASTICITY_MULTIPLIER,
584 gas_ceil: context.gas_ceil,
585 };
586 let payload_id = args
587 .id()
588 .map_err(|error| RpcErr::Internal(error.to_string()))?;
589
590 info!(
591 id = payload_id,
592 slot = attributes.slot_number,
593 "Fork choice updated V4 includes payload attributes. Creating a new payload"
594 );
595 let payload = match create_payload(&args, &context.storage, context.node_data.extra_data) {
596 Ok(payload) => payload,
597 Err(ChainError::EvmError(error)) => return Err(error.into()),
598 Err(error) => return Err(RpcErr::Internal(error.to_string())),
599 };
600 context
601 .blockchain
602 .initiate_payload_build(payload, payload_id)
603 .await;
604 Ok(payload_id)
605}
606
607#[cfg(test)]
608mod tests {
609 use super::{validate_attributes_v2, validate_attributes_v2_pre_shanghai};
610 use crate::types::fork_choice::PayloadAttributesV3;
611 use ethrex_common::types::{BlockHeader, Withdrawal};
612
613 #[test]
614 fn forkchoice_updated_v2_returns_invalid_payload_attributes_when_withdrawals_missing() {
615 let attributes = PayloadAttributesV3 {
616 timestamp: 2,
617 withdrawals: None,
618 ..Default::default()
619 };
620 let head_block = BlockHeader {
621 timestamp: 1,
622 ..Default::default()
623 };
624
625 let err = validate_attributes_v2(&attributes, &head_block).unwrap_err();
626
627 assert!(matches!(
628 err,
629 crate::utils::RpcErr::InvalidPayloadAttributes(_)
630 ));
631 }
632
633 #[test]
634 fn forkchoice_updated_v2_returns_invalid_payload_attributes_pre_shanghai_with_withdrawals() {
635 let attributes = PayloadAttributesV3 {
636 timestamp: 2,
637 withdrawals: Some(Vec::<Withdrawal>::new()),
638 ..Default::default()
639 };
640 let head_block = BlockHeader {
641 timestamp: 1,
642 ..Default::default()
643 };
644
645 let err = validate_attributes_v2_pre_shanghai(&attributes, &head_block).unwrap_err();
646
647 assert!(matches!(
648 err,
649 crate::utils::RpcErr::InvalidPayloadAttributes(_)
650 ));
651 }
652}