1use std::{
2 fmt::{Debug, Display},
3 net::SocketAddr,
4 ops::Deref,
5 str::FromStr,
6 sync::Arc,
7 time::Duration,
8};
9
10use anyhow::anyhow;
11use ctx::SharedCtx;
12use derive_more::derive::{Deref, Display, From};
13use discovery::{PeerDiscovery, PeerDiscoverySync};
14use msg::{P2pServiceId, PeerMessage};
15use neighbours::NetworkNeighbours;
16use peer::PeerConnection;
17use quinn::{Endpoint, Incoming, VarInt};
18use router::RouterTableSync;
19use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
20use serde::{Deserialize, Serialize};
21use tokio::{
22 select,
23 sync::{
24 mpsc::{channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender},
25 oneshot,
26 },
27 time::Interval,
28};
29
30use crate::quic::make_server_endpoint;
31
32mod ctx;
33mod discovery;
34mod msg;
35mod neighbours;
36mod peer;
37mod quic;
38mod requester;
39mod router;
40mod secure;
41mod service;
42mod stream;
43#[cfg(test)]
44mod tests;
45mod utils;
46
47pub use peer::PeerConnectionMetric;
48pub use requester::P2pNetworkRequester;
49pub use router::SharedRouterTable;
50pub use secure::*;
51pub use service::*;
52pub use stream::P2pQuicStream;
53pub use utils::*;
54
55#[derive(Debug, Display, Clone, Copy, From, Deref, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
56pub struct PeerId(u64);
57
58#[derive(Debug, Display, From, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
59pub struct ConnectionId(u64);
60
61impl ConnectionId {
62 pub fn rand() -> Self {
63 Self(rand::random())
64 }
65}
66
67#[derive(Debug, Clone, From, Display, Deref, PartialEq, Eq, Serialize, Deserialize)]
68pub struct NetworkAddress(SocketAddr);
69
70#[derive(Debug, Clone, From, PartialEq, Eq, Serialize, Deserialize)]
71pub struct PeerAddress(PeerId, NetworkAddress);
72
73impl PeerAddress {
74 pub fn new(p: PeerId, a: NetworkAddress) -> Self {
75 Self(p, a)
76 }
77
78 pub fn peer_id(&self) -> PeerId {
79 self.0
80 }
81
82 pub fn network_address(&self) -> &NetworkAddress {
83 &self.1
84 }
85}
86
87impl Display for PeerAddress {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 write!(f, "{}@{}", self.peer_id(), self.network_address())
90 }
91}
92
93impl FromStr for PeerAddress {
94 type Err = String;
95
96 fn from_str(s: &str) -> Result<Self, Self::Err> {
97 let parts: Vec<&str> = s.split('@').collect();
98 if parts.len() != 2 {
99 return Err("Invalid format, expected 'peer_id@network_address'".to_string());
100 }
101 let peer_id = parts[0].parse::<u64>().map(PeerId).map_err(|e| e.to_string())?;
102 let network_address = parts[1].parse::<SocketAddr>().map(NetworkAddress).map_err(|e| e.to_string())?;
103 Ok(Self(peer_id, network_address))
104 }
105}
106
107pub const CERT_DOMAIN_NAME: &str = "cluster";
108
109#[derive(Debug)]
110enum PeerMainData {
111 Sync { route: RouterTableSync, advertise: PeerDiscoverySync },
112}
113
114#[allow(clippy::enum_variant_names)]
115enum MainEvent {
116 PeerConnected(ConnectionId, PeerId, u16),
117 PeerConnectError(ConnectionId, Option<PeerId>, anyhow::Error),
118 PeerData(ConnectionId, PeerId, PeerMainData),
119 PeerStats(ConnectionId, PeerId, PeerConnectionMetric),
120 PeerDisconnected(ConnectionId, PeerId),
121}
122
123enum ControlCmd {
124 Connect(PeerAddress, Option<oneshot::Sender<anyhow::Result<()>>>),
125}
126
127pub struct P2pNetworkConfig<SECURE> {
128 pub peer_id: PeerId,
129 pub listen_addr: SocketAddr,
130 pub advertise: Option<NetworkAddress>,
131 pub priv_key: PrivatePkcs8KeyDer<'static>,
132 pub cert: CertificateDer<'static>,
133 pub tick_ms: u64,
134 pub seeds: Vec<PeerAddress>,
135 pub secure: SECURE,
136}
137
138#[derive(Debug, PartialEq, Eq)]
139pub enum P2pNetworkEvent {
140 PeerConnected(ConnectionId, PeerId),
141 PeerDisconnected(ConnectionId, PeerId),
142 Continue,
143}
144
145pub struct P2pNetwork<SECURE> {
146 local_id: PeerId,
147 endpoint: Endpoint,
148 control_tx: UnboundedSender<ControlCmd>,
149 control_rx: UnboundedReceiver<ControlCmd>,
150 main_tx: Sender<MainEvent>,
151 main_rx: Receiver<MainEvent>,
152 neighbours: NetworkNeighbours,
153 ticker: Interval,
154 router: SharedRouterTable,
155 discovery: PeerDiscovery,
156 ctx: SharedCtx,
157 secure: Arc<SECURE>,
158}
159
160impl<SECURE: HandshakeProtocol> P2pNetwork<SECURE> {
161 pub async fn new(cfg: P2pNetworkConfig<SECURE>) -> anyhow::Result<Self> {
162 log::info!("[P2pNetwork] starting node {}@{}", cfg.peer_id, cfg.listen_addr);
163 let endpoint = make_server_endpoint(cfg.listen_addr, cfg.priv_key, cfg.cert)?;
164 let (main_tx, main_rx) = channel(10);
165 let (control_tx, control_rx) = unbounded_channel();
166 let mut discovery = PeerDiscovery::new(cfg.seeds);
167 let router = SharedRouterTable::new(cfg.peer_id);
168
169 if let Some(addr) = cfg.advertise {
170 discovery.enable_local(cfg.peer_id, addr);
171 }
172
173 Ok(Self {
174 local_id: cfg.peer_id,
175 endpoint,
176 neighbours: NetworkNeighbours::default(),
177 main_tx,
178 main_rx,
179 control_tx,
180 control_rx,
181 ticker: tokio::time::interval(Duration::from_millis(cfg.tick_ms)),
182 ctx: SharedCtx::new(router.clone()),
183 router,
184 discovery,
185 secure: Arc::new(cfg.secure),
186 })
187 }
188
189 pub fn create_service(&mut self, service_id: P2pServiceId) -> P2pService {
190 let (service, tx) = P2pService::build(service_id, self.ctx.clone());
191 self.ctx.set_service(service_id, tx);
192 service
193 }
194
195 pub fn requester(&mut self) -> P2pNetworkRequester {
196 P2pNetworkRequester { control_tx: self.control_tx.clone() }
197 }
198
199 pub async fn recv(&mut self) -> anyhow::Result<P2pNetworkEvent> {
200 select! {
201 _ = self.ticker.tick() => {
202 self.process_tick(now_ms())
203 }
204 connecting = self.endpoint.accept() => {
205 self.process_incoming(connecting.ok_or(anyhow!("quic crash"))?)
206 },
207 event = self.main_rx.recv() => {
208 self.process_internal(now_ms(), event.ok_or(anyhow!("internal channel crash"))?)
209 },
210 event = self.control_rx.recv() => {
211 self.process_control(event.ok_or(anyhow!("internal channel crash"))?)
212 },
213
214 }
215 }
216
217 pub fn shutdown(&mut self) {
218 self.endpoint.close(VarInt::from_u32(0), "Shutdown".as_bytes());
219 }
220
221 fn process_tick(&mut self, now_ms: u64) -> anyhow::Result<P2pNetworkEvent> {
222 self.discovery.clear_timeout(now_ms);
223 for conn in self.neighbours.connected_conns() {
224 let peer_id = conn.peer_id().expect("connected neighbours should have peer_id");
225 let conn_id = conn.conn_id();
226 let route: router::RouterTableSync = self.router.create_sync(&peer_id);
227 let advertise = self.discovery.create_sync_for(now_ms, &peer_id);
228 if let Some(alias) = self.ctx.conn(&conn_id) {
229 if let Err(e) = alias.try_send(PeerMessage::Sync { route, advertise }) {
230 log::error!("[P2pNetwork] try send message to peer {peer_id} over conn {conn_id} error {e}");
231 }
232 }
233 }
234 for addr in self.discovery.remotes() {
235 self.control_tx.send(ControlCmd::Connect(addr.clone(), None))?;
236 }
237 Ok(P2pNetworkEvent::Continue)
238 }
239
240 fn process_incoming(&mut self, incoming: Incoming) -> anyhow::Result<P2pNetworkEvent> {
241 let remote = incoming.remote_address();
242 log::info!("[P2pNetwork] incoming connect from {remote} => accept");
243 let conn = PeerConnection::new_incoming(self.secure.clone(), self.local_id, incoming, self.main_tx.clone(), self.ctx.clone());
244 self.neighbours.insert(conn.conn_id(), conn);
245 Ok(P2pNetworkEvent::Continue)
246 }
247
248 fn process_internal(&mut self, now_ms: u64, event: MainEvent) -> anyhow::Result<P2pNetworkEvent> {
249 match event {
250 MainEvent::PeerConnected(conn, peer, ttl_ms) => {
251 log::info!("[P2pNetwork] connection {conn} connected to {peer}");
252 self.router.set_direct(conn, peer, ttl_ms);
253 self.neighbours.mark_connected(&conn, peer);
254 Ok(P2pNetworkEvent::PeerConnected(conn, peer))
255 }
256 MainEvent::PeerData(conn, peer, data) => {
257 log::debug!("[P2pNetwork] connection {conn} on data {data:?} from {peer}");
258 match data {
259 PeerMainData::Sync { route, advertise } => {
260 self.router.apply_sync(conn, route);
261 self.discovery.apply_sync(now_ms, advertise);
262 }
263 }
264 Ok(P2pNetworkEvent::Continue)
265 }
266 MainEvent::PeerConnectError(conn, peer, err) => {
267 log::error!("[P2pNetwork] connection {conn} outgoing: {peer:?} error {err}");
268 self.neighbours.remove(&conn);
269 Ok(P2pNetworkEvent::Continue)
270 }
271 MainEvent::PeerDisconnected(conn, peer) => {
272 log::info!("[P2pNetwork] connection {conn} disconnected from {peer}");
273 self.router.del_direct(&conn);
274 self.neighbours.remove(&conn);
275 Ok(P2pNetworkEvent::PeerDisconnected(conn, peer))
276 }
277 MainEvent::PeerStats(conn, to_peer, metrics) => {
278 log::debug!("[P2pNetwork] conn {conn} to peer {to_peer} metrics {:?}", metrics);
279 self.ctx.update_metrics(&conn, to_peer, metrics);
280 Ok(P2pNetworkEvent::Continue)
281 }
282 }
283 }
284
285 fn process_control(&mut self, cmd: ControlCmd) -> anyhow::Result<P2pNetworkEvent> {
286 match cmd {
287 ControlCmd::Connect(addr, tx) => {
288 let res = if self.neighbours.has_peer(&addr.peer_id()) {
289 Ok(())
290 } else {
291 log::info!("[P2pNetwork] connecting to {addr}");
292 match self.endpoint.connect(*addr.network_address().deref(), CERT_DOMAIN_NAME) {
293 Ok(connecting) => {
294 let conn = PeerConnection::new_connecting(self.secure.clone(), self.local_id, addr.peer_id(), connecting, self.main_tx.clone(), self.ctx.clone());
295 self.neighbours.insert(conn.conn_id(), conn);
296 Ok(())
297 }
298 Err(err) => Err(err.into()),
299 }
300 };
301
302 if let Some(tx) = tx {
303 tx.send(res).print_on_err2("[P2pNetwork] send connect answer");
304 }
305
306 Ok(P2pNetworkEvent::Continue)
307 }
308 }
309 }
310}