kona_node_service/actors/network/
actor.rs

1use alloy_primitives::Address;
2use async_trait::async_trait;
3use kona_p2p::P2pRpcRequest;
4use kona_rpc::NetworkAdminQuery;
5use kona_sources::BlockSignerError;
6use libp2p::TransportError;
7use op_alloy_rpc_types_engine::{OpExecutionPayloadEnvelope, OpNetworkPayloadEnvelope};
8use thiserror::Error;
9use tokio::{self, select, sync::mpsc};
10use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
11
12use crate::{
13    CancellableContext, NodeActor,
14    actors::network::{
15        builder::NetworkBuilder, driver::NetworkDriverError, error::NetworkBuilderError,
16    },
17};
18
19/// The network actor handles two core networking components of the rollup node:
20/// - *discovery*: Peer discovery over UDP using discv5.
21/// - *gossip*: Block gossip over TCP using libp2p.
22///
23/// The network actor itself is a light wrapper around the [`NetworkBuilder`].
24///
25/// ## Example
26///
27/// ```rust,ignore
28/// use kona_p2p::NetworkDriver;
29/// use std::net::{IpAddr, Ipv4Addr, SocketAddr};
30///
31/// let chain_id = 10;
32/// let signer = Address::random();
33/// let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9099);
34///
35/// // Construct the `Network` using the builder.
36/// // let mut driver = Network::builder()
37/// //    .with_unsafe_block_signer(signer)
38/// //    .with_chain_id(chain_id)
39/// //    .with_gossip_addr(socket)
40/// //    .build()
41/// //    .unwrap();
42///
43/// // Construct the `NetworkActor` with the [`Network`].
44/// // let actor = NetworkActor::new(driver);
45/// ```
46#[derive(Debug)]
47pub struct NetworkActor {
48    /// Network driver
49    pub(super) builder: NetworkBuilder,
50    /// A channel to receive the unsafe block signer address.
51    pub(super) signer: mpsc::Receiver<Address>,
52    /// Handler for p2p RPC Requests.
53    pub(super) p2p_rpc: mpsc::Receiver<P2pRpcRequest>,
54    /// A channel to receive admin rpc requests.
55    pub(super) admin_rpc: mpsc::Receiver<NetworkAdminQuery>,
56    /// A channel to receive unsafe blocks and send them through the gossip layer.
57    pub(super) publish_rx: mpsc::Receiver<OpExecutionPayloadEnvelope>,
58}
59
60/// The inbound data for the network actor.
61#[derive(Debug)]
62pub struct NetworkInboundData {
63    /// A channel to send the unsafe block signer address to the network actor.
64    pub signer: mpsc::Sender<Address>,
65    /// Handler for p2p RPC Requests sent to the network actor.
66    pub p2p_rpc: mpsc::Sender<P2pRpcRequest>,
67    /// Handler for admin RPC Requests.
68    pub admin_rpc: mpsc::Sender<NetworkAdminQuery>,
69    /// A channel to send unsafe blocks to the network actor.
70    /// This channel should only be used by the sequencer actor/admin RPC api to forward their
71    /// newly produced unsafe blocks to the network actor.
72    pub gossip_payload_tx: mpsc::Sender<OpExecutionPayloadEnvelope>,
73}
74
75impl NetworkActor {
76    /// Constructs a new [`NetworkActor`] given the [`NetworkBuilder`]
77    pub fn new(driver: NetworkBuilder) -> (NetworkInboundData, Self) {
78        let (signer_tx, signer_rx) = mpsc::channel(16);
79        let (rpc_tx, rpc_rx) = mpsc::channel(1024);
80        let (admin_rpc_tx, admin_rpc_rx) = mpsc::channel(1024);
81        let (publish_tx, publish_rx) = tokio::sync::mpsc::channel(256);
82        let actor = Self {
83            builder: driver,
84            signer: signer_rx,
85            p2p_rpc: rpc_rx,
86            admin_rpc: admin_rpc_rx,
87            publish_rx,
88        };
89        let outbound_data = NetworkInboundData {
90            signer: signer_tx,
91            p2p_rpc: rpc_tx,
92            admin_rpc: admin_rpc_tx,
93            gossip_payload_tx: publish_tx,
94        };
95        (outbound_data, actor)
96    }
97}
98
99/// The communication context used by the network actor.
100#[derive(Debug)]
101pub struct NetworkContext {
102    /// The channel used by the sequencer actor for sending unsafe blocks to the network.
103    pub blocks: mpsc::Sender<OpExecutionPayloadEnvelope>,
104    /// Cancels the network actor.
105    pub cancellation: CancellationToken,
106}
107
108impl CancellableContext for NetworkContext {
109    fn cancelled(&self) -> WaitForCancellationFuture<'_> {
110        self.cancellation.cancelled()
111    }
112}
113
114/// An error from the network actor.
115#[derive(Debug, Error)]
116pub enum NetworkActorError {
117    /// Network builder error.
118    #[error(transparent)]
119    NetworkBuilder(#[from] NetworkBuilderError),
120    /// Network driver error.
121    #[error(transparent)]
122    NetworkDriver(#[from] NetworkDriverError),
123    /// Driver startup failed.
124    #[error(transparent)]
125    DriverStartup(#[from] TransportError<std::io::Error>),
126    /// The network driver was missing its unsafe block receiver.
127    #[error("Missing unsafe block receiver in network driver")]
128    MissingUnsafeBlockReceiver,
129    /// The network driver was missing its unsafe block signer sender.
130    #[error("Missing unsafe block signer in network driver")]
131    MissingUnsafeBlockSigner,
132    /// Channel closed unexpectedly.
133    #[error("Channel closed unexpectedly")]
134    ChannelClosed,
135    /// Failed to sign the payload.
136    #[error("Failed to sign the payload: {0}")]
137    FailedToSignPayload(#[from] BlockSignerError),
138}
139
140#[async_trait]
141impl NodeActor for NetworkActor {
142    type Error = NetworkActorError;
143    type InboundData = NetworkInboundData;
144    type OutboundData = NetworkContext;
145    type Builder = NetworkBuilder;
146
147    fn build(state: Self::Builder) -> (Self::InboundData, Self) {
148        Self::new(state)
149    }
150
151    async fn start(
152        mut self,
153        NetworkContext { blocks, cancellation }: Self::OutboundData,
154    ) -> Result<(), Self::Error> {
155        let mut handler = self.builder.build()?.start().await?;
156
157        // New unsafe block channel.
158        let (unsafe_block_tx, mut unsafe_block_rx) = tokio::sync::mpsc::unbounded_channel();
159
160        loop {
161            select! {
162                _ = cancellation.cancelled() => {
163                    info!(
164                        target: "network",
165                        "Received shutdown signal. Exiting network task."
166                    );
167                    return Ok(());
168                }
169                block = unsafe_block_rx.recv() => {
170                    let Some(block) = block else {
171                        error!(target: "node::p2p", "The unsafe block receiver channel has closed");
172                        return Err(NetworkActorError::ChannelClosed);
173                    };
174
175                    if blocks.send(block).await.is_err() {
176                        warn!(target: "network", "Failed to forward unsafe block");
177                        return Err(NetworkActorError::ChannelClosed);
178                    }
179                }
180                signer = self.signer.recv() => {
181                    let Some(signer) = signer else {
182                        warn!(
183                            target: "network",
184                            "Found no unsafe block signer on receive"
185                        );
186                        return Err(NetworkActorError::ChannelClosed);
187                    };
188                    if handler.unsafe_block_signer_sender.send(signer).is_err() {
189                        warn!(
190                            target: "network",
191                            "Failed to send unsafe block signer to network handler",
192                        );
193                    }
194                }
195                Some(block) = self.publish_rx.recv(), if !self.publish_rx.is_closed() => {
196                    let timestamp = block.execution_payload.timestamp();
197                    let selector = |handler: &kona_p2p::BlockHandler| {
198                        handler.topic(timestamp)
199                    };
200                    let Some(signer) = handler.signer.as_ref() else {
201                        warn!(target: "net", "No local signer available to sign the payload");
202                        continue;
203                    };
204
205                    let chain_id = handler.discovery.chain_id;
206
207                    let sender_address = *handler.unsafe_block_signer_sender.borrow();
208
209                    let payload_hash = block.payload_hash();
210                    let signature = signer.sign_block(payload_hash, chain_id, sender_address).await?;
211
212                    let payload = OpNetworkPayloadEnvelope {
213                        payload: block.execution_payload,
214                        parent_beacon_block_root: block.parent_beacon_block_root,
215                        signature,
216                        payload_hash,
217                    };
218
219                    match handler.gossip.publish(selector, Some(payload)) {
220                        Ok(id) => info!("Published unsafe payload | {:?}", id),
221                        Err(e) => warn!("Failed to publish unsafe payload: {:?}", e),
222                    }
223                }
224                event = handler.gossip.next() => {
225                    let Some(event) = event else {
226                        error!(target: "node::p2p", "The gossip swarm stream has ended");
227                        return Err(NetworkActorError::ChannelClosed);
228                    };
229
230                    if let Some(payload) = handler.gossip.handle_event(event) {
231                        if unsafe_block_tx.send(payload.into()).is_err() {
232                            warn!(target: "node::p2p", "Failed to send unsafe block to network handler");
233                        }
234                    }
235                },
236                enr = handler.enr_receiver.recv() => {
237                    let Some(enr) = enr else {
238                        error!(target: "node::p2p", "The enr receiver channel has closed");
239                        return Err(NetworkActorError::ChannelClosed);
240                    };
241                    handler.gossip.dial(enr);
242                },
243                _ = handler.peer_score_inspector.tick(), if handler.gossip.peer_monitoring.as_ref().is_some() => {
244                    handler.handle_peer_monitoring().await;
245                },
246                Some(NetworkAdminQuery::PostUnsafePayload { payload }) = self.admin_rpc.recv(), if !self.admin_rpc.is_closed() => {
247                    debug!(target: "node::p2p", "Broadcasting unsafe payload from admin api");
248                    if unsafe_block_tx.send(payload).is_err() {
249                        warn!(target: "node::p2p", "Failed to send unsafe block to network handler");
250                    }
251                },
252                req = self.p2p_rpc.recv(), if !self.p2p_rpc.is_closed() => {
253                    let Some(req) = req else {
254                        error!(target: "node::p2p", "The p2p rpc receiver channel has closed");
255                        return Err(NetworkActorError::ChannelClosed);
256                    };
257                    req.handle(&mut handler.gossip, &handler.discovery);
258                },
259            }
260        }
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267    use alloy_primitives::B256;
268    use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV3};
269    use alloy_signer::SignerSync;
270    use alloy_signer_local::PrivateKeySigner;
271    use arbitrary::Arbitrary;
272    use op_alloy_rpc_types_engine::OpExecutionPayload;
273    use rand::Rng;
274
275    #[test]
276    fn test_payload_signature_roundtrip_v1() {
277        let mut bytes = [0u8; 4096];
278        rand::rng().fill(bytes.as_mut_slice());
279
280        let pubkey = PrivateKeySigner::random();
281        let expected_address = pubkey.address();
282        const CHAIN_ID: u64 = 1337;
283
284        let block = OpExecutionPayloadEnvelope {
285            execution_payload: OpExecutionPayload::V1(
286                ExecutionPayloadV1::arbitrary(&mut arbitrary::Unstructured::new(&bytes)).unwrap(),
287            ),
288            parent_beacon_block_root: None,
289        };
290
291        let payload_hash = block.payload_hash();
292        let signature = pubkey.sign_hash_sync(&payload_hash.signature_message(CHAIN_ID)).unwrap();
293        let payload = OpNetworkPayloadEnvelope {
294            payload: block.execution_payload,
295            parent_beacon_block_root: block.parent_beacon_block_root,
296            signature,
297            payload_hash,
298        };
299        let encoded_payload = payload.encode_v1().unwrap();
300
301        let decoded_payload = OpNetworkPayloadEnvelope::decode_v1(&encoded_payload).unwrap();
302
303        let msg = decoded_payload.payload_hash.signature_message(CHAIN_ID);
304        let msg_signer = decoded_payload.signature.recover_address_from_prehash(&msg).unwrap();
305
306        assert_eq!(expected_address, msg_signer);
307    }
308
309    #[test]
310    fn test_payload_signature_roundtrip_v3() {
311        let mut bytes = [0u8; 4096];
312        rand::rng().fill(bytes.as_mut_slice());
313
314        let pubkey = PrivateKeySigner::random();
315        let expected_address = pubkey.address();
316        const CHAIN_ID: u64 = 1337;
317
318        let block = OpExecutionPayloadEnvelope {
319            execution_payload: OpExecutionPayload::V3(
320                ExecutionPayloadV3::arbitrary(&mut arbitrary::Unstructured::new(&bytes)).unwrap(),
321            ),
322            parent_beacon_block_root: Some(B256::random()),
323        };
324
325        let payload_hash = block.payload_hash();
326        let signature = pubkey.sign_hash_sync(&payload_hash.signature_message(CHAIN_ID)).unwrap();
327        let payload = OpNetworkPayloadEnvelope {
328            payload: block.execution_payload,
329            parent_beacon_block_root: block.parent_beacon_block_root,
330            signature,
331            payload_hash,
332        };
333        let encoded_payload = payload.encode_v3().unwrap();
334
335        let decoded_payload = OpNetworkPayloadEnvelope::decode_v3(&encoded_payload).unwrap();
336
337        let msg = decoded_payload.payload_hash.signature_message(CHAIN_ID);
338        let msg_signer = decoded_payload.signature.recover_address_from_prehash(&msg).unwrap();
339
340        assert_eq!(expected_address, msg_signer);
341    }
342}