kona_net/gossip/
handler.rs

1//! Block Handler
2
3use std::{
4    sync::mpsc::{channel, Receiver, Sender},
5    time::SystemTime,
6};
7
8use alloy_primitives::Address;
9use alloy_rpc_types_engine::ExecutionPayload;
10use libp2p::gossipsub::{IdentTopic, Message, MessageAcceptance, TopicHash};
11use op_alloy_rpc_types_engine::OpNetworkPayloadEnvelope;
12use tokio::sync::watch;
13
14/// This trait defines the functionality required to process incoming messages
15/// and determine their acceptance within the network.
16///
17/// Implementors of this trait can specify how messages are handled and which
18/// topics they are interested in.
19pub trait Handler: Send {
20    /// Manages validation and further processing of messages
21    fn handle(&self, msg: Message) -> MessageAcceptance;
22
23    /// Specifies which topics the handler is interested in
24    fn topics(&self) -> Vec<TopicHash>;
25}
26
27/// Responsible for managing blocks received via p2p gossip
28#[derive(Debug, Clone)]
29pub struct BlockHandler {
30    /// Chain ID of the L2 blockchain. Used to filter out gossip messages intended for other
31    /// blockchains.
32    pub chain_id: u64,
33    /// A channel sender to forward new blocks to other modules
34    pub block_sender: Sender<ExecutionPayload>,
35    /// A [Receiver] to monitor changes to the unsafe block signer.
36    pub unsafe_signer_recv: watch::Receiver<Address>,
37    /// The libp2p topic for pre Canyon/Shangai blocks.
38    pub blocks_v1_topic: IdentTopic,
39    /// The libp2p topic for Canyon/Delta blocks.
40    pub blocks_v2_topic: IdentTopic,
41    /// The libp2p topic for Ecotone V3 blocks.
42    pub blocks_v3_topic: IdentTopic,
43}
44
45impl Handler for BlockHandler {
46    /// Checks validity of a block received via p2p gossip, and sends to the block update channel if
47    /// valid.
48    fn handle(&self, msg: Message) -> MessageAcceptance {
49        tracing::debug!("received block");
50
51        let decoded = if msg.topic == self.blocks_v1_topic.hash() {
52            tracing::debug!("received v1 block");
53            OpNetworkPayloadEnvelope::decode_v1(&msg.data)
54        } else if msg.topic == self.blocks_v2_topic.hash() {
55            tracing::debug!("received v2 block");
56            OpNetworkPayloadEnvelope::decode_v2(&msg.data)
57        } else if msg.topic == self.blocks_v3_topic.hash() {
58            tracing::debug!("received v3 block");
59            OpNetworkPayloadEnvelope::decode_v3(&msg.data)
60        } else {
61            return MessageAcceptance::Reject;
62        };
63
64        match decoded {
65            Ok(envelope) => {
66                if self.block_valid(&envelope) {
67                    _ = self.block_sender.send(envelope.payload);
68                    MessageAcceptance::Accept
69                } else {
70                    tracing::warn!("invalid unsafe block");
71                    MessageAcceptance::Reject
72                }
73            }
74            Err(err) => {
75                tracing::warn!("unsafe block decode failed: {}", err);
76                MessageAcceptance::Reject
77            }
78        }
79    }
80
81    /// The gossip topics accepted for new blocks
82    fn topics(&self) -> Vec<TopicHash> {
83        vec![self.blocks_v1_topic.hash(), self.blocks_v2_topic.hash(), self.blocks_v3_topic.hash()]
84    }
85}
86
87impl BlockHandler {
88    /// Creates a new [BlockHandler] and opens a channel
89    pub fn new(
90        chain_id: u64,
91        unsafe_recv: watch::Receiver<Address>,
92    ) -> (Self, Receiver<ExecutionPayload>) {
93        let (sender, recv) = channel();
94
95        let handler = Self {
96            chain_id,
97            block_sender: sender,
98            unsafe_signer_recv: unsafe_recv,
99            blocks_v1_topic: IdentTopic::new(format!("/optimism/{}/0/blocks", chain_id)),
100            blocks_v2_topic: IdentTopic::new(format!("/optimism/{}/1/blocks", chain_id)),
101            blocks_v3_topic: IdentTopic::new(format!("/optimism/{}/2/blocks", chain_id)),
102        };
103
104        (handler, recv)
105    }
106
107    /// Determines if a block is valid.
108    ///
109    /// True if the block is less than 1 minute old, and correctly signed by the unsafe block
110    /// signer.
111    pub fn block_valid(&self, envelope: &OpNetworkPayloadEnvelope) -> bool {
112        let current_timestamp =
113            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
114
115        let is_future = envelope.payload.timestamp() > current_timestamp + 5;
116        let is_past = envelope.payload.timestamp() < current_timestamp - 60;
117        let time_valid = !(is_future || is_past);
118
119        let msg = envelope.payload_hash.signature_message(self.chain_id);
120        let block_signer = *self.unsafe_signer_recv.borrow();
121        let Ok(msg_signer) = envelope.signature.recover_address_from_prehash(&msg) else {
122            tracing::warn!("Failed to recover address from message");
123            return false;
124        };
125
126        let signer_valid = msg_signer == block_signer;
127        time_valid && signer_valid
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use alloy_primitives::{Address, Bloom, Bytes, PrimitiveSignature, B256, U256};
135    use alloy_rpc_types_engine::ExecutionPayloadV1;
136    use op_alloy_rpc_types_engine::PayloadHash;
137
138    #[test]
139    fn test_block_valid() {
140        let current_timestamp =
141            SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
142
143        let v1 = ExecutionPayloadV1 {
144            parent_hash: B256::ZERO,
145            fee_recipient: Address::default(),
146            state_root: B256::ZERO,
147            receipts_root: B256::ZERO,
148            logs_bloom: Bloom::default(),
149            prev_randao: B256::ZERO,
150            block_number: 0,
151            gas_limit: 0,
152            gas_used: 0,
153            timestamp: current_timestamp,
154            extra_data: Bytes::default(),
155            base_fee_per_gas: U256::from(0),
156            block_hash: B256::ZERO,
157            transactions: vec![],
158        };
159        let payload = ExecutionPayload::V1(v1);
160        let envelope = OpNetworkPayloadEnvelope {
161            payload,
162            signature: PrimitiveSignature::test_signature(),
163            payload_hash: PayloadHash(B256::ZERO),
164            parent_beacon_block_root: None,
165        };
166
167        let msg = envelope.payload_hash.signature_message(10);
168        let signer = envelope.signature.recover_address_from_prehash(&msg).unwrap();
169        let (_, unsafe_signer) = tokio::sync::watch::channel(signer);
170        let (handler, _) = BlockHandler::new(10, unsafe_signer);
171
172        assert!(handler.block_valid(&envelope));
173    }
174}