kona_net/gossip/
handler.rs1use std::{
4 sync::mpsc::{channel, Receiver, Sender},
5 time::SystemTime,
6};
7
8use alloy_primitives::Address;
9use alloy_rpc_types_engine::ExecutionPayload;
10use libp2p::gossipsub::{IdentTopic, Message, MessageAcceptance, TopicHash};
11use op_alloy_rpc_types_engine::OpNetworkPayloadEnvelope;
12use tokio::sync::watch;
13
14pub trait Handler: Send {
20 fn handle(&self, msg: Message) -> MessageAcceptance;
22
23 fn topics(&self) -> Vec<TopicHash>;
25}
26
27#[derive(Debug, Clone)]
29pub struct BlockHandler {
30 pub chain_id: u64,
33 pub block_sender: Sender<ExecutionPayload>,
35 pub unsafe_signer_recv: watch::Receiver<Address>,
37 pub blocks_v1_topic: IdentTopic,
39 pub blocks_v2_topic: IdentTopic,
41 pub blocks_v3_topic: IdentTopic,
43}
44
45impl Handler for BlockHandler {
46 fn handle(&self, msg: Message) -> MessageAcceptance {
49 tracing::debug!("received block");
50
51 let decoded = if msg.topic == self.blocks_v1_topic.hash() {
52 tracing::debug!("received v1 block");
53 OpNetworkPayloadEnvelope::decode_v1(&msg.data)
54 } else if msg.topic == self.blocks_v2_topic.hash() {
55 tracing::debug!("received v2 block");
56 OpNetworkPayloadEnvelope::decode_v2(&msg.data)
57 } else if msg.topic == self.blocks_v3_topic.hash() {
58 tracing::debug!("received v3 block");
59 OpNetworkPayloadEnvelope::decode_v3(&msg.data)
60 } else {
61 return MessageAcceptance::Reject;
62 };
63
64 match decoded {
65 Ok(envelope) => {
66 if self.block_valid(&envelope) {
67 _ = self.block_sender.send(envelope.payload);
68 MessageAcceptance::Accept
69 } else {
70 tracing::warn!("invalid unsafe block");
71 MessageAcceptance::Reject
72 }
73 }
74 Err(err) => {
75 tracing::warn!("unsafe block decode failed: {}", err);
76 MessageAcceptance::Reject
77 }
78 }
79 }
80
81 fn topics(&self) -> Vec<TopicHash> {
83 vec![self.blocks_v1_topic.hash(), self.blocks_v2_topic.hash(), self.blocks_v3_topic.hash()]
84 }
85}
86
87impl BlockHandler {
88 pub fn new(
90 chain_id: u64,
91 unsafe_recv: watch::Receiver<Address>,
92 ) -> (Self, Receiver<ExecutionPayload>) {
93 let (sender, recv) = channel();
94
95 let handler = Self {
96 chain_id,
97 block_sender: sender,
98 unsafe_signer_recv: unsafe_recv,
99 blocks_v1_topic: IdentTopic::new(format!("/optimism/{}/0/blocks", chain_id)),
100 blocks_v2_topic: IdentTopic::new(format!("/optimism/{}/1/blocks", chain_id)),
101 blocks_v3_topic: IdentTopic::new(format!("/optimism/{}/2/blocks", chain_id)),
102 };
103
104 (handler, recv)
105 }
106
107 pub fn block_valid(&self, envelope: &OpNetworkPayloadEnvelope) -> bool {
112 let current_timestamp =
113 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
114
115 let is_future = envelope.payload.timestamp() > current_timestamp + 5;
116 let is_past = envelope.payload.timestamp() < current_timestamp - 60;
117 let time_valid = !(is_future || is_past);
118
119 let msg = envelope.payload_hash.signature_message(self.chain_id);
120 let block_signer = *self.unsafe_signer_recv.borrow();
121 let Ok(msg_signer) = envelope.signature.recover_address_from_prehash(&msg) else {
122 tracing::warn!("Failed to recover address from message");
123 return false;
124 };
125
126 let signer_valid = msg_signer == block_signer;
127 time_valid && signer_valid
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134 use alloy_primitives::{Address, Bloom, Bytes, PrimitiveSignature, B256, U256};
135 use alloy_rpc_types_engine::ExecutionPayloadV1;
136 use op_alloy_rpc_types_engine::PayloadHash;
137
138 #[test]
139 fn test_block_valid() {
140 let current_timestamp =
141 SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
142
143 let v1 = ExecutionPayloadV1 {
144 parent_hash: B256::ZERO,
145 fee_recipient: Address::default(),
146 state_root: B256::ZERO,
147 receipts_root: B256::ZERO,
148 logs_bloom: Bloom::default(),
149 prev_randao: B256::ZERO,
150 block_number: 0,
151 gas_limit: 0,
152 gas_used: 0,
153 timestamp: current_timestamp,
154 extra_data: Bytes::default(),
155 base_fee_per_gas: U256::from(0),
156 block_hash: B256::ZERO,
157 transactions: vec![],
158 };
159 let payload = ExecutionPayload::V1(v1);
160 let envelope = OpNetworkPayloadEnvelope {
161 payload,
162 signature: PrimitiveSignature::test_signature(),
163 payload_hash: PayloadHash(B256::ZERO),
164 parent_beacon_block_root: None,
165 };
166
167 let msg = envelope.payload_hash.signature_message(10);
168 let signer = envelope.signature.recover_address_from_prehash(&msg).unwrap();
169 let (_, unsafe_signer) = tokio::sync::watch::channel(signer);
170 let (handler, _) = BlockHandler::new(10, unsafe_signer);
171
172 assert!(handler.block_valid(&envelope));
173 }
174}