kona_net/
driver.rs

1//! Driver for network services.
2
3use std::sync::mpsc::Receiver;
4
5use alloy_primitives::Address;
6use alloy_rpc_types_engine::ExecutionPayload;
7use libp2p::TransportError;
8use tokio::{select, sync::watch};
9
10use crate::{
11    builder::NetworkDriverBuilder, discovery::driver::DiscoveryDriver, gossip::driver::GossipDriver,
12};
13
14/// NetworkDriver
15///
16/// Contains the logic to run Optimism's consensus-layer networking stack.
17/// There are two core services that are run by the driver:
18/// - Block gossip through Gossipsub.
19/// - Peer discovery with `discv5`.
20pub struct NetworkDriver {
21    /// Channel to receive unsafe blocks.
22    pub(crate) unsafe_block_recv: Option<Receiver<ExecutionPayload>>,
23    /// Channel to send unsafe signer updates.
24    pub(crate) unsafe_block_signer_sender: Option<watch::Sender<Address>>,
25    /// The swarm instance.
26    pub gossip: GossipDriver,
27    /// The discovery service driver.
28    pub discovery: DiscoveryDriver,
29}
30
31impl NetworkDriver {
32    /// Returns a new [NetworkDriverBuilder].
33    pub fn builder() -> NetworkDriverBuilder {
34        NetworkDriverBuilder::new()
35    }
36
37    /// Take the unsafe block receiver.
38    pub fn take_unsafe_block_recv(&mut self) -> Option<Receiver<ExecutionPayload>> {
39        self.unsafe_block_recv.take()
40    }
41
42    /// Take the unsafe block signer sender.
43    pub fn take_unsafe_block_signer_sender(&mut self) -> Option<watch::Sender<Address>> {
44        self.unsafe_block_signer_sender.take()
45    }
46
47    /// Starts the Discv5 peer discovery & libp2p services
48    /// and continually listens for new peers and messages to handle
49    pub fn start(mut self) -> Result<(), TransportError<std::io::Error>> {
50        let mut peer_recv = self.discovery.start();
51        self.gossip.listen()?;
52        tokio::spawn(async move {
53            loop {
54                select! {
55                    peer = peer_recv.recv() => {
56                        self.gossip.dial_opt(peer.clone()).await;
57                        tracing::info!("Received peer: {:?} | Connected peers: {:?}", peer, self.gossip.connected_peers());
58                    },
59                    event = self.gossip.select_next_some() => {
60                        tracing::debug!("Received event: {:?}", event);
61                        self.gossip.handle_event(event);
62                    },
63                }
64            }
65        });
66
67        Ok(())
68    }
69}