solana_oasis_node/
network.rs1use anyhow::{anyhow, Result};
2use libp2p::{
3 core::upgrade,
4 gossipsub::{
5 Gossipsub, GossipsubConfigBuilder, GossipsubEvent, IdentTopic, MessageAuthenticity,
6 },
7 kad::{store::MemoryStore, Kademlia, KademliaEvent, QueryResult},
8 mdns::{Mdns, MdnsConfig, MdnsEvent},
9 noise,
10 swarm::NetworkBehaviourEventProcess,
11 swarm::SwarmEvent,
12 tcp::TokioTcpConfig,
13 yamux, NetworkBehaviour, Transport,
14};
15use std::time::Duration;
16
17use crate::config::NetworkConfig;
18use crate::types::{Block, Message, Transaction};
19
20const TRANSACTION_TOPIC: &str = "transactions";
21const BLOCK_TOPIC: &str = "blocks";
22const STATE_TOPIC: &str = "state";
23const COMPUTE_TOPIC: &str = "compute";
24
25#[derive(NetworkBehaviour)]
26#[behaviour(out_event = "NetworkEvent")]
27pub struct NetworkBehavior {
28 gossipsub: Gossipsub,
29 kademlia: Kademlia<MemoryStore>,
30 mdns: Mdns,
31}
32
33impl NetworkBehaviourEventProcess<GossipsubEvent> for NetworkBehavior {
34 fn inject_event(&mut self, event: GossipsubEvent) {
35 if let GossipsubEvent::Message { message, .. } = event {
36 match bincode::deserialize::<Message>(&message.data) {
37 Ok(msg) => log::info!("Received message: {:?}", msg),
38 Err(e) => log::error!("Failed to deserialize message: {}", e),
39 }
40 }
41 }
42}
43
44impl NetworkBehaviourEventProcess<KademliaEvent> for NetworkBehavior {
45 fn inject_event(&mut self, event: KademliaEvent) {
46 if let KademliaEvent::OutboundQueryCompleted {
47 result: QueryResult::GetClosestPeers(Ok(peers)),
48 ..
49 } = event
50 {
51 for peer_id in peers.peers {
52 log::info!("Found peer: {}", peer_id);
53 }
54 }
55 }
56}
57
58impl NetworkBehaviourEventProcess<MdnsEvent> for NetworkBehavior {
59 fn inject_event(&mut self, event: MdnsEvent) {
60 match event {
61 MdnsEvent::Discovered(list) => {
62 for (peer, _) in list {
63 log::info!("mDNS discovered peer: {}", peer);
64 }
65 }
66 MdnsEvent::Expired(list) => {
67 for (peer, _) in list {
68 log::info!("mDNS expired peer: {}", peer);
69 }
70 }
71 }
72 }
73}
74
75#[derive(Debug)]
76pub enum NetworkEvent {
77 Gossipsub(GossipsubEvent),
78 Kademlia(KademliaEvent),
79 Mdns(MdnsEvent),
80}
81
82impl From<GossipsubEvent> for NetworkEvent {
83 fn from(event: GossipsubEvent) -> Self {
84 NetworkEvent::Gossipsub(event)
85 }
86}
87
88impl From<KademliaEvent> for NetworkEvent {
89 fn from(event: KademliaEvent) -> Self {
90 NetworkEvent::Kademlia(event)
91 }
92}
93
94impl From<MdnsEvent> for NetworkEvent {
95 fn from(event: MdnsEvent) -> Self {
96 NetworkEvent::Mdns(event)
97 }
98}
99
100#[allow(dead_code)]
101pub struct Network {
102 config: NetworkConfig,
103 gossipsub: Gossipsub,
104 kademlia: Kademlia<MemoryStore>,
105 mdns: Mdns,
106}
107
108impl Network {
109 pub async fn new(config: NetworkConfig) -> Result<Self> {
110 let local_key = config.identity.clone();
111 let local_peer_id = local_key.public().to_peer_id();
112
113 let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
114 .into_authentic(&local_key)
115 .expect("Failed to create noise keys");
116
117 let _transport = TokioTcpConfig::new()
118 .upgrade(upgrade::Version::V1)
119 .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
120 .multiplex(yamux::YamuxConfig::default())
121 .boxed();
122
123 let gossipsub_config = GossipsubConfigBuilder::default()
124 .heartbeat_interval(Duration::from_secs(1))
125 .build()
126 .expect("Valid config");
127
128 let mut gossipsub = Gossipsub::new(
129 MessageAuthenticity::Signed(local_key.clone()),
130 gossipsub_config,
131 )
132 .expect("Failed to create gossipsub");
133
134 let transaction_topic = IdentTopic::new(TRANSACTION_TOPIC);
135 let block_topic = IdentTopic::new(BLOCK_TOPIC);
136 let state_topic = IdentTopic::new(STATE_TOPIC);
137 let compute_topic = IdentTopic::new(COMPUTE_TOPIC);
138
139 gossipsub
140 .subscribe(&transaction_topic)
141 .expect("Failed to subscribe to transaction topic");
142 gossipsub
143 .subscribe(&block_topic)
144 .expect("Failed to subscribe to block topic");
145 gossipsub
146 .subscribe(&state_topic)
147 .expect("Failed to subscribe to state topic");
148 gossipsub
149 .subscribe(&compute_topic)
150 .expect("Failed to subscribe to compute topic");
151
152 let store = MemoryStore::new(local_peer_id);
153 let kademlia = Kademlia::new(local_peer_id, store);
154
155 let mdns = Mdns::new(MdnsConfig::default()).await?;
156
157 Ok(Self {
158 config,
159 gossipsub,
160 kademlia,
161 mdns,
162 })
163 }
164
165 pub async fn broadcast_transaction(&mut self, transaction: Transaction) -> Result<()> {
166 let message = Message::Transaction(transaction);
167 let data = bincode::serialize(&message)?;
168 self.gossipsub
169 .publish(IdentTopic::new(TRANSACTION_TOPIC), data)
170 .map_err(|e| anyhow!("Failed to publish transaction: {:?}", e))?;
171 Ok(())
172 }
173
174 pub async fn broadcast_block(&mut self, block: Block) -> Result<()> {
175 let message = Message::Block(block);
176 let data = bincode::serialize(&message)?;
177 self.gossipsub
178 .publish(IdentTopic::new(BLOCK_TOPIC), data)
179 .map_err(|e| anyhow!("Failed to publish block: {:?}", e))?;
180 Ok(())
181 }
182
183 pub async fn broadcast_state(&mut self, state: Vec<u8>) -> Result<()> {
184 let message = Message::State(state);
185 let data = bincode::serialize(&message)?;
186 self.gossipsub
187 .publish(IdentTopic::new(STATE_TOPIC), data)
188 .map_err(|e| anyhow!("Failed to publish state: {:?}", e))?;
189 Ok(())
190 }
191
192 pub async fn broadcast_compute(&mut self, compute: Vec<u8>) -> Result<()> {
193 let message = Message::Compute(compute);
194 let data = bincode::serialize(&message)?;
195 self.gossipsub
196 .publish(IdentTopic::new(COMPUTE_TOPIC), data)
197 .map_err(|e| anyhow!("Failed to publish compute: {:?}", e))?;
198 Ok(())
199 }
200
201 pub async fn handle_event(
202 &mut self,
203 event: SwarmEvent<NetworkEvent, impl std::error::Error>,
204 ) -> Result<Option<Message>> {
205 match event {
206 SwarmEvent::Behaviour(NetworkEvent::Gossipsub(GossipsubEvent::Message {
207 message,
208 ..
209 })) => match bincode::deserialize::<Message>(&message.data) {
210 Ok(msg) => Ok(Some(msg)),
211 Err(e) => {
212 log::error!("Failed to deserialize message: {}", e);
213 Ok(None)
214 }
215 },
216 SwarmEvent::Behaviour(NetworkEvent::Kademlia(
217 KademliaEvent::OutboundQueryCompleted {
218 result: QueryResult::GetClosestPeers(Ok(peers)),
219 ..
220 },
221 )) => {
222 for peer_id in peers.peers {
223 log::info!("Found peer: {}", peer_id);
224 }
225 Ok(None)
226 }
227 _ => Ok(None),
228 }
229 }
230}