solana_oasis_node/
network.rs

1use anyhow::{anyhow, Result};
2use libp2p::{
3    core::upgrade,
4    gossipsub::{
5        Gossipsub, GossipsubConfigBuilder, GossipsubEvent, IdentTopic, MessageAuthenticity,
6    },
7    kad::{store::MemoryStore, Kademlia, KademliaEvent, QueryResult},
8    mdns::{Mdns, MdnsConfig, MdnsEvent},
9    noise,
10    swarm::NetworkBehaviourEventProcess,
11    swarm::SwarmEvent,
12    tcp::TokioTcpConfig,
13    yamux, NetworkBehaviour, Transport,
14};
15use std::time::Duration;
16
17use crate::config::NetworkConfig;
18use crate::types::{Block, Message, Transaction};
19
20const TRANSACTION_TOPIC: &str = "transactions";
21const BLOCK_TOPIC: &str = "blocks";
22const STATE_TOPIC: &str = "state";
23const COMPUTE_TOPIC: &str = "compute";
24
25#[derive(NetworkBehaviour)]
26#[behaviour(out_event = "NetworkEvent")]
27pub struct NetworkBehavior {
28    gossipsub: Gossipsub,
29    kademlia: Kademlia<MemoryStore>,
30    mdns: Mdns,
31}
32
33impl NetworkBehaviourEventProcess<GossipsubEvent> for NetworkBehavior {
34    fn inject_event(&mut self, event: GossipsubEvent) {
35        if let GossipsubEvent::Message { message, .. } = event {
36            match bincode::deserialize::<Message>(&message.data) {
37                Ok(msg) => log::info!("Received message: {:?}", msg),
38                Err(e) => log::error!("Failed to deserialize message: {}", e),
39            }
40        }
41    }
42}
43
44impl NetworkBehaviourEventProcess<KademliaEvent> for NetworkBehavior {
45    fn inject_event(&mut self, event: KademliaEvent) {
46        if let KademliaEvent::OutboundQueryCompleted {
47            result: QueryResult::GetClosestPeers(Ok(peers)),
48            ..
49        } = event
50        {
51            for peer_id in peers.peers {
52                log::info!("Found peer: {}", peer_id);
53            }
54        }
55    }
56}
57
58impl NetworkBehaviourEventProcess<MdnsEvent> for NetworkBehavior {
59    fn inject_event(&mut self, event: MdnsEvent) {
60        match event {
61            MdnsEvent::Discovered(list) => {
62                for (peer, _) in list {
63                    log::info!("mDNS discovered peer: {}", peer);
64                }
65            }
66            MdnsEvent::Expired(list) => {
67                for (peer, _) in list {
68                    log::info!("mDNS expired peer: {}", peer);
69                }
70            }
71        }
72    }
73}
74
75#[derive(Debug)]
76pub enum NetworkEvent {
77    Gossipsub(GossipsubEvent),
78    Kademlia(KademliaEvent),
79    Mdns(MdnsEvent),
80}
81
82impl From<GossipsubEvent> for NetworkEvent {
83    fn from(event: GossipsubEvent) -> Self {
84        NetworkEvent::Gossipsub(event)
85    }
86}
87
88impl From<KademliaEvent> for NetworkEvent {
89    fn from(event: KademliaEvent) -> Self {
90        NetworkEvent::Kademlia(event)
91    }
92}
93
94impl From<MdnsEvent> for NetworkEvent {
95    fn from(event: MdnsEvent) -> Self {
96        NetworkEvent::Mdns(event)
97    }
98}
99
100#[allow(dead_code)]
101pub struct Network {
102    config: NetworkConfig,
103    gossipsub: Gossipsub,
104    kademlia: Kademlia<MemoryStore>,
105    mdns: Mdns,
106}
107
108impl Network {
109    pub async fn new(config: NetworkConfig) -> Result<Self> {
110        let local_key = config.identity.clone();
111        let local_peer_id = local_key.public().to_peer_id();
112
113        let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
114            .into_authentic(&local_key)
115            .expect("Failed to create noise keys");
116
117        let _transport = TokioTcpConfig::new()
118            .upgrade(upgrade::Version::V1)
119            .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
120            .multiplex(yamux::YamuxConfig::default())
121            .boxed();
122
123        let gossipsub_config = GossipsubConfigBuilder::default()
124            .heartbeat_interval(Duration::from_secs(1))
125            .build()
126            .expect("Valid config");
127
128        let mut gossipsub = Gossipsub::new(
129            MessageAuthenticity::Signed(local_key.clone()),
130            gossipsub_config,
131        )
132        .expect("Failed to create gossipsub");
133
134        let transaction_topic = IdentTopic::new(TRANSACTION_TOPIC);
135        let block_topic = IdentTopic::new(BLOCK_TOPIC);
136        let state_topic = IdentTopic::new(STATE_TOPIC);
137        let compute_topic = IdentTopic::new(COMPUTE_TOPIC);
138
139        gossipsub
140            .subscribe(&transaction_topic)
141            .expect("Failed to subscribe to transaction topic");
142        gossipsub
143            .subscribe(&block_topic)
144            .expect("Failed to subscribe to block topic");
145        gossipsub
146            .subscribe(&state_topic)
147            .expect("Failed to subscribe to state topic");
148        gossipsub
149            .subscribe(&compute_topic)
150            .expect("Failed to subscribe to compute topic");
151
152        let store = MemoryStore::new(local_peer_id);
153        let kademlia = Kademlia::new(local_peer_id, store);
154
155        let mdns = Mdns::new(MdnsConfig::default()).await?;
156
157        Ok(Self {
158            config,
159            gossipsub,
160            kademlia,
161            mdns,
162        })
163    }
164
165    pub async fn broadcast_transaction(&mut self, transaction: Transaction) -> Result<()> {
166        let message = Message::Transaction(transaction);
167        let data = bincode::serialize(&message)?;
168        self.gossipsub
169            .publish(IdentTopic::new(TRANSACTION_TOPIC), data)
170            .map_err(|e| anyhow!("Failed to publish transaction: {:?}", e))?;
171        Ok(())
172    }
173
174    pub async fn broadcast_block(&mut self, block: Block) -> Result<()> {
175        let message = Message::Block(block);
176        let data = bincode::serialize(&message)?;
177        self.gossipsub
178            .publish(IdentTopic::new(BLOCK_TOPIC), data)
179            .map_err(|e| anyhow!("Failed to publish block: {:?}", e))?;
180        Ok(())
181    }
182
183    pub async fn broadcast_state(&mut self, state: Vec<u8>) -> Result<()> {
184        let message = Message::State(state);
185        let data = bincode::serialize(&message)?;
186        self.gossipsub
187            .publish(IdentTopic::new(STATE_TOPIC), data)
188            .map_err(|e| anyhow!("Failed to publish state: {:?}", e))?;
189        Ok(())
190    }
191
192    pub async fn broadcast_compute(&mut self, compute: Vec<u8>) -> Result<()> {
193        let message = Message::Compute(compute);
194        let data = bincode::serialize(&message)?;
195        self.gossipsub
196            .publish(IdentTopic::new(COMPUTE_TOPIC), data)
197            .map_err(|e| anyhow!("Failed to publish compute: {:?}", e))?;
198        Ok(())
199    }
200
201    pub async fn handle_event(
202        &mut self,
203        event: SwarmEvent<NetworkEvent, impl std::error::Error>,
204    ) -> Result<Option<Message>> {
205        match event {
206            SwarmEvent::Behaviour(NetworkEvent::Gossipsub(GossipsubEvent::Message {
207                message,
208                ..
209            })) => match bincode::deserialize::<Message>(&message.data) {
210                Ok(msg) => Ok(Some(msg)),
211                Err(e) => {
212                    log::error!("Failed to deserialize message: {}", e);
213                    Ok(None)
214                }
215            },
216            SwarmEvent::Behaviour(NetworkEvent::Kademlia(
217                KademliaEvent::OutboundQueryCompleted {
218                    result: QueryResult::GetClosestPeers(Ok(peers)),
219                    ..
220                },
221            )) => {
222                for peer_id in peers.peers {
223                    log::info!("Found peer: {}", peer_id);
224                }
225                Ok(None)
226            }
227            _ => Ok(None),
228        }
229    }
230}