kona_node_service/actors/network/
actor.rs1use alloy_primitives::Address;
2use async_trait::async_trait;
3use kona_p2p::P2pRpcRequest;
4use kona_rpc::NetworkAdminQuery;
5use kona_sources::BlockSignerError;
6use libp2p::TransportError;
7use op_alloy_rpc_types_engine::{OpExecutionPayloadEnvelope, OpNetworkPayloadEnvelope};
8use thiserror::Error;
9use tokio::{self, select, sync::mpsc};
10use tokio_util::sync::{CancellationToken, WaitForCancellationFuture};
11
12use crate::{
13 CancellableContext, NodeActor,
14 actors::network::{
15 builder::NetworkBuilder, driver::NetworkDriverError, error::NetworkBuilderError,
16 },
17};
18
19#[derive(Debug)]
47pub struct NetworkActor {
48 pub(super) builder: NetworkBuilder,
50 pub(super) signer: mpsc::Receiver<Address>,
52 pub(super) p2p_rpc: mpsc::Receiver<P2pRpcRequest>,
54 pub(super) admin_rpc: mpsc::Receiver<NetworkAdminQuery>,
56 pub(super) publish_rx: mpsc::Receiver<OpExecutionPayloadEnvelope>,
58}
59
60#[derive(Debug)]
62pub struct NetworkInboundData {
63 pub signer: mpsc::Sender<Address>,
65 pub p2p_rpc: mpsc::Sender<P2pRpcRequest>,
67 pub admin_rpc: mpsc::Sender<NetworkAdminQuery>,
69 pub gossip_payload_tx: mpsc::Sender<OpExecutionPayloadEnvelope>,
73}
74
75impl NetworkActor {
76 pub fn new(driver: NetworkBuilder) -> (NetworkInboundData, Self) {
78 let (signer_tx, signer_rx) = mpsc::channel(16);
79 let (rpc_tx, rpc_rx) = mpsc::channel(1024);
80 let (admin_rpc_tx, admin_rpc_rx) = mpsc::channel(1024);
81 let (publish_tx, publish_rx) = tokio::sync::mpsc::channel(256);
82 let actor = Self {
83 builder: driver,
84 signer: signer_rx,
85 p2p_rpc: rpc_rx,
86 admin_rpc: admin_rpc_rx,
87 publish_rx,
88 };
89 let outbound_data = NetworkInboundData {
90 signer: signer_tx,
91 p2p_rpc: rpc_tx,
92 admin_rpc: admin_rpc_tx,
93 gossip_payload_tx: publish_tx,
94 };
95 (outbound_data, actor)
96 }
97}
98
99#[derive(Debug)]
101pub struct NetworkContext {
102 pub blocks: mpsc::Sender<OpExecutionPayloadEnvelope>,
104 pub cancellation: CancellationToken,
106}
107
108impl CancellableContext for NetworkContext {
109 fn cancelled(&self) -> WaitForCancellationFuture<'_> {
110 self.cancellation.cancelled()
111 }
112}
113
114#[derive(Debug, Error)]
116pub enum NetworkActorError {
117 #[error(transparent)]
119 NetworkBuilder(#[from] NetworkBuilderError),
120 #[error(transparent)]
122 NetworkDriver(#[from] NetworkDriverError),
123 #[error(transparent)]
125 DriverStartup(#[from] TransportError<std::io::Error>),
126 #[error("Missing unsafe block receiver in network driver")]
128 MissingUnsafeBlockReceiver,
129 #[error("Missing unsafe block signer in network driver")]
131 MissingUnsafeBlockSigner,
132 #[error("Channel closed unexpectedly")]
134 ChannelClosed,
135 #[error("Failed to sign the payload: {0}")]
137 FailedToSignPayload(#[from] BlockSignerError),
138}
139
140#[async_trait]
141impl NodeActor for NetworkActor {
142 type Error = NetworkActorError;
143 type InboundData = NetworkInboundData;
144 type OutboundData = NetworkContext;
145 type Builder = NetworkBuilder;
146
147 fn build(state: Self::Builder) -> (Self::InboundData, Self) {
148 Self::new(state)
149 }
150
151 async fn start(
152 mut self,
153 NetworkContext { blocks, cancellation }: Self::OutboundData,
154 ) -> Result<(), Self::Error> {
155 let mut handler = self.builder.build()?.start().await?;
156
157 let (unsafe_block_tx, mut unsafe_block_rx) = tokio::sync::mpsc::unbounded_channel();
159
160 loop {
161 select! {
162 _ = cancellation.cancelled() => {
163 info!(
164 target: "network",
165 "Received shutdown signal. Exiting network task."
166 );
167 return Ok(());
168 }
169 block = unsafe_block_rx.recv() => {
170 let Some(block) = block else {
171 error!(target: "node::p2p", "The unsafe block receiver channel has closed");
172 return Err(NetworkActorError::ChannelClosed);
173 };
174
175 if blocks.send(block).await.is_err() {
176 warn!(target: "network", "Failed to forward unsafe block");
177 return Err(NetworkActorError::ChannelClosed);
178 }
179 }
180 signer = self.signer.recv() => {
181 let Some(signer) = signer else {
182 warn!(
183 target: "network",
184 "Found no unsafe block signer on receive"
185 );
186 return Err(NetworkActorError::ChannelClosed);
187 };
188 if handler.unsafe_block_signer_sender.send(signer).is_err() {
189 warn!(
190 target: "network",
191 "Failed to send unsafe block signer to network handler",
192 );
193 }
194 }
195 Some(block) = self.publish_rx.recv(), if !self.publish_rx.is_closed() => {
196 let timestamp = block.execution_payload.timestamp();
197 let selector = |handler: &kona_p2p::BlockHandler| {
198 handler.topic(timestamp)
199 };
200 let Some(signer) = handler.signer.as_ref() else {
201 warn!(target: "net", "No local signer available to sign the payload");
202 continue;
203 };
204
205 let chain_id = handler.discovery.chain_id;
206
207 let sender_address = *handler.unsafe_block_signer_sender.borrow();
208
209 let payload_hash = block.payload_hash();
210 let signature = signer.sign_block(payload_hash, chain_id, sender_address).await?;
211
212 let payload = OpNetworkPayloadEnvelope {
213 payload: block.execution_payload,
214 parent_beacon_block_root: block.parent_beacon_block_root,
215 signature,
216 payload_hash,
217 };
218
219 match handler.gossip.publish(selector, Some(payload)) {
220 Ok(id) => info!("Published unsafe payload | {:?}", id),
221 Err(e) => warn!("Failed to publish unsafe payload: {:?}", e),
222 }
223 }
224 event = handler.gossip.next() => {
225 let Some(event) = event else {
226 error!(target: "node::p2p", "The gossip swarm stream has ended");
227 return Err(NetworkActorError::ChannelClosed);
228 };
229
230 if let Some(payload) = handler.gossip.handle_event(event) {
231 if unsafe_block_tx.send(payload.into()).is_err() {
232 warn!(target: "node::p2p", "Failed to send unsafe block to network handler");
233 }
234 }
235 },
236 enr = handler.enr_receiver.recv() => {
237 let Some(enr) = enr else {
238 error!(target: "node::p2p", "The enr receiver channel has closed");
239 return Err(NetworkActorError::ChannelClosed);
240 };
241 handler.gossip.dial(enr);
242 },
243 _ = handler.peer_score_inspector.tick(), if handler.gossip.peer_monitoring.as_ref().is_some() => {
244 handler.handle_peer_monitoring().await;
245 },
246 Some(NetworkAdminQuery::PostUnsafePayload { payload }) = self.admin_rpc.recv(), if !self.admin_rpc.is_closed() => {
247 debug!(target: "node::p2p", "Broadcasting unsafe payload from admin api");
248 if unsafe_block_tx.send(payload).is_err() {
249 warn!(target: "node::p2p", "Failed to send unsafe block to network handler");
250 }
251 },
252 req = self.p2p_rpc.recv(), if !self.p2p_rpc.is_closed() => {
253 let Some(req) = req else {
254 error!(target: "node::p2p", "The p2p rpc receiver channel has closed");
255 return Err(NetworkActorError::ChannelClosed);
256 };
257 req.handle(&mut handler.gossip, &handler.discovery);
258 },
259 }
260 }
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267 use alloy_primitives::B256;
268 use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV3};
269 use alloy_signer::SignerSync;
270 use alloy_signer_local::PrivateKeySigner;
271 use arbitrary::Arbitrary;
272 use op_alloy_rpc_types_engine::OpExecutionPayload;
273 use rand::Rng;
274
275 #[test]
276 fn test_payload_signature_roundtrip_v1() {
277 let mut bytes = [0u8; 4096];
278 rand::rng().fill(bytes.as_mut_slice());
279
280 let pubkey = PrivateKeySigner::random();
281 let expected_address = pubkey.address();
282 const CHAIN_ID: u64 = 1337;
283
284 let block = OpExecutionPayloadEnvelope {
285 execution_payload: OpExecutionPayload::V1(
286 ExecutionPayloadV1::arbitrary(&mut arbitrary::Unstructured::new(&bytes)).unwrap(),
287 ),
288 parent_beacon_block_root: None,
289 };
290
291 let payload_hash = block.payload_hash();
292 let signature = pubkey.sign_hash_sync(&payload_hash.signature_message(CHAIN_ID)).unwrap();
293 let payload = OpNetworkPayloadEnvelope {
294 payload: block.execution_payload,
295 parent_beacon_block_root: block.parent_beacon_block_root,
296 signature,
297 payload_hash,
298 };
299 let encoded_payload = payload.encode_v1().unwrap();
300
301 let decoded_payload = OpNetworkPayloadEnvelope::decode_v1(&encoded_payload).unwrap();
302
303 let msg = decoded_payload.payload_hash.signature_message(CHAIN_ID);
304 let msg_signer = decoded_payload.signature.recover_address_from_prehash(&msg).unwrap();
305
306 assert_eq!(expected_address, msg_signer);
307 }
308
309 #[test]
310 fn test_payload_signature_roundtrip_v3() {
311 let mut bytes = [0u8; 4096];
312 rand::rng().fill(bytes.as_mut_slice());
313
314 let pubkey = PrivateKeySigner::random();
315 let expected_address = pubkey.address();
316 const CHAIN_ID: u64 = 1337;
317
318 let block = OpExecutionPayloadEnvelope {
319 execution_payload: OpExecutionPayload::V3(
320 ExecutionPayloadV3::arbitrary(&mut arbitrary::Unstructured::new(&bytes)).unwrap(),
321 ),
322 parent_beacon_block_root: Some(B256::random()),
323 };
324
325 let payload_hash = block.payload_hash();
326 let signature = pubkey.sign_hash_sync(&payload_hash.signature_message(CHAIN_ID)).unwrap();
327 let payload = OpNetworkPayloadEnvelope {
328 payload: block.execution_payload,
329 parent_beacon_block_root: block.parent_beacon_block_root,
330 signature,
331 payload_hash,
332 };
333 let encoded_payload = payload.encode_v3().unwrap();
334
335 let decoded_payload = OpNetworkPayloadEnvelope::decode_v3(&encoded_payload).unwrap();
336
337 let msg = decoded_payload.payload_hash.signature_message(CHAIN_ID);
338 let msg_signer = decoded_payload.signature.recover_address_from_prehash(&msg).unwrap();
339
340 assert_eq!(expected_address, msg_signer);
341 }
342}