atm0s_small_p2p/
lib.rs

1use std::{
2    fmt::{Debug, Display},
3    net::SocketAddr,
4    ops::Deref,
5    str::FromStr,
6    sync::Arc,
7    time::Duration,
8};
9
10use anyhow::anyhow;
11use ctx::SharedCtx;
12use derive_more::derive::{Deref, Display, From};
13use discovery::{PeerDiscovery, PeerDiscoverySync};
14use msg::{P2pServiceId, PeerMessage};
15use neighbours::NetworkNeighbours;
16use peer::PeerConnection;
17use quinn::{Endpoint, Incoming, VarInt};
18use router::RouterTableSync;
19use rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
20use serde::{Deserialize, Serialize};
21use tokio::{
22    select,
23    sync::{
24        mpsc::{channel, unbounded_channel, Receiver, Sender, UnboundedReceiver, UnboundedSender},
25        oneshot,
26    },
27    time::Interval,
28};
29
30use crate::quic::make_server_endpoint;
31
32mod ctx;
33mod discovery;
34mod msg;
35mod neighbours;
36mod peer;
37mod quic;
38mod requester;
39mod router;
40mod secure;
41mod service;
42mod stream;
43#[cfg(test)]
44mod tests;
45mod utils;
46
47pub use peer::PeerConnectionMetric;
48pub use requester::P2pNetworkRequester;
49pub use router::SharedRouterTable;
50pub use secure::*;
51pub use service::*;
52pub use stream::P2pQuicStream;
53pub use utils::*;
54
55#[derive(Debug, Display, Clone, Copy, From, Deref, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
56pub struct PeerId(u64);
57
58#[derive(Debug, Display, From, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
59pub struct ConnectionId(u64);
60
61impl ConnectionId {
62    pub fn rand() -> Self {
63        Self(rand::random())
64    }
65}
66
67#[derive(Debug, Clone, From, Display, Deref, PartialEq, Eq, Serialize, Deserialize)]
68pub struct NetworkAddress(SocketAddr);
69
70#[derive(Debug, Clone, From, PartialEq, Eq, Serialize, Deserialize)]
71pub struct PeerAddress(PeerId, NetworkAddress);
72
73impl PeerAddress {
74    pub fn new(p: PeerId, a: NetworkAddress) -> Self {
75        Self(p, a)
76    }
77
78    pub fn peer_id(&self) -> PeerId {
79        self.0
80    }
81
82    pub fn network_address(&self) -> &NetworkAddress {
83        &self.1
84    }
85}
86
87impl Display for PeerAddress {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        write!(f, "{}@{}", self.peer_id(), self.network_address())
90    }
91}
92
93impl FromStr for PeerAddress {
94    type Err = String;
95
96    fn from_str(s: &str) -> Result<Self, Self::Err> {
97        let parts: Vec<&str> = s.split('@').collect();
98        if parts.len() != 2 {
99            return Err("Invalid format, expected 'peer_id@network_address'".to_string());
100        }
101        let peer_id = parts[0].parse::<u64>().map(PeerId).map_err(|e| e.to_string())?;
102        let network_address = parts[1].parse::<SocketAddr>().map(NetworkAddress).map_err(|e| e.to_string())?;
103        Ok(Self(peer_id, network_address))
104    }
105}
106
107pub const CERT_DOMAIN_NAME: &str = "cluster";
108
109#[derive(Debug)]
110enum PeerMainData {
111    Sync { route: RouterTableSync, advertise: PeerDiscoverySync },
112}
113
114#[allow(clippy::enum_variant_names)]
115enum MainEvent {
116    PeerConnected(ConnectionId, PeerId, u16),
117    PeerConnectError(ConnectionId, Option<PeerId>, anyhow::Error),
118    PeerData(ConnectionId, PeerId, PeerMainData),
119    PeerStats(ConnectionId, PeerId, PeerConnectionMetric),
120    PeerDisconnected(ConnectionId, PeerId),
121}
122
123enum ControlCmd {
124    Connect(PeerAddress, Option<oneshot::Sender<anyhow::Result<()>>>),
125}
126
127pub struct P2pNetworkConfig<SECURE> {
128    pub peer_id: PeerId,
129    pub listen_addr: SocketAddr,
130    pub advertise: Option<NetworkAddress>,
131    pub priv_key: PrivatePkcs8KeyDer<'static>,
132    pub cert: CertificateDer<'static>,
133    pub tick_ms: u64,
134    pub seeds: Vec<PeerAddress>,
135    pub secure: SECURE,
136}
137
138#[derive(Debug, PartialEq, Eq)]
139pub enum P2pNetworkEvent {
140    PeerConnected(ConnectionId, PeerId),
141    PeerDisconnected(ConnectionId, PeerId),
142    Continue,
143}
144
145pub struct P2pNetwork<SECURE> {
146    local_id: PeerId,
147    endpoint: Endpoint,
148    control_tx: UnboundedSender<ControlCmd>,
149    control_rx: UnboundedReceiver<ControlCmd>,
150    main_tx: Sender<MainEvent>,
151    main_rx: Receiver<MainEvent>,
152    neighbours: NetworkNeighbours,
153    ticker: Interval,
154    router: SharedRouterTable,
155    discovery: PeerDiscovery,
156    ctx: SharedCtx,
157    secure: Arc<SECURE>,
158}
159
160impl<SECURE: HandshakeProtocol> P2pNetwork<SECURE> {
161    pub async fn new(cfg: P2pNetworkConfig<SECURE>) -> anyhow::Result<Self> {
162        log::info!("[P2pNetwork] starting node {}@{}", cfg.peer_id, cfg.listen_addr);
163        let endpoint = make_server_endpoint(cfg.listen_addr, cfg.priv_key, cfg.cert)?;
164        let (main_tx, main_rx) = channel(10);
165        let (control_tx, control_rx) = unbounded_channel();
166        let mut discovery = PeerDiscovery::new(cfg.seeds);
167        let router = SharedRouterTable::new(cfg.peer_id);
168
169        if let Some(addr) = cfg.advertise {
170            discovery.enable_local(cfg.peer_id, addr);
171        }
172
173        Ok(Self {
174            local_id: cfg.peer_id,
175            endpoint,
176            neighbours: NetworkNeighbours::default(),
177            main_tx,
178            main_rx,
179            control_tx,
180            control_rx,
181            ticker: tokio::time::interval(Duration::from_millis(cfg.tick_ms)),
182            ctx: SharedCtx::new(router.clone()),
183            router,
184            discovery,
185            secure: Arc::new(cfg.secure),
186        })
187    }
188
189    pub fn create_service(&mut self, service_id: P2pServiceId) -> P2pService {
190        let (service, tx) = P2pService::build(service_id, self.ctx.clone());
191        self.ctx.set_service(service_id, tx);
192        service
193    }
194
195    pub fn requester(&mut self) -> P2pNetworkRequester {
196        P2pNetworkRequester { control_tx: self.control_tx.clone() }
197    }
198
199    pub async fn recv(&mut self) -> anyhow::Result<P2pNetworkEvent> {
200        select! {
201            _ = self.ticker.tick() => {
202                self.process_tick(now_ms())
203            }
204            connecting = self.endpoint.accept() => {
205                self.process_incoming(connecting.ok_or(anyhow!("quic crash"))?)
206            },
207            event = self.main_rx.recv() => {
208                self.process_internal(now_ms(), event.ok_or(anyhow!("internal channel crash"))?)
209            },
210            event = self.control_rx.recv() => {
211                self.process_control(event.ok_or(anyhow!("internal channel crash"))?)
212            },
213
214        }
215    }
216
217    pub fn shutdown(&mut self) {
218        self.endpoint.close(VarInt::from_u32(0), "Shutdown".as_bytes());
219    }
220
221    fn process_tick(&mut self, now_ms: u64) -> anyhow::Result<P2pNetworkEvent> {
222        self.discovery.clear_timeout(now_ms);
223        for conn in self.neighbours.connected_conns() {
224            let peer_id = conn.peer_id().expect("connected neighbours should have peer_id");
225            let conn_id = conn.conn_id();
226            let route: router::RouterTableSync = self.router.create_sync(&peer_id);
227            let advertise = self.discovery.create_sync_for(now_ms, &peer_id);
228            if let Some(alias) = self.ctx.conn(&conn_id) {
229                if let Err(e) = alias.try_send(PeerMessage::Sync { route, advertise }) {
230                    log::error!("[P2pNetwork] try send message to peer {peer_id} over conn {conn_id} error {e}");
231                }
232            }
233        }
234        for addr in self.discovery.remotes() {
235            self.control_tx.send(ControlCmd::Connect(addr.clone(), None))?;
236        }
237        Ok(P2pNetworkEvent::Continue)
238    }
239
240    fn process_incoming(&mut self, incoming: Incoming) -> anyhow::Result<P2pNetworkEvent> {
241        let remote = incoming.remote_address();
242        log::info!("[P2pNetwork] incoming connect from {remote} => accept");
243        let conn = PeerConnection::new_incoming(self.secure.clone(), self.local_id, incoming, self.main_tx.clone(), self.ctx.clone());
244        self.neighbours.insert(conn.conn_id(), conn);
245        Ok(P2pNetworkEvent::Continue)
246    }
247
248    fn process_internal(&mut self, now_ms: u64, event: MainEvent) -> anyhow::Result<P2pNetworkEvent> {
249        match event {
250            MainEvent::PeerConnected(conn, peer, ttl_ms) => {
251                log::info!("[P2pNetwork] connection {conn} connected to {peer}");
252                self.router.set_direct(conn, peer, ttl_ms);
253                self.neighbours.mark_connected(&conn, peer);
254                Ok(P2pNetworkEvent::PeerConnected(conn, peer))
255            }
256            MainEvent::PeerData(conn, peer, data) => {
257                log::debug!("[P2pNetwork] connection {conn} on data {data:?} from {peer}");
258                match data {
259                    PeerMainData::Sync { route, advertise } => {
260                        self.router.apply_sync(conn, route);
261                        self.discovery.apply_sync(now_ms, advertise);
262                    }
263                }
264                Ok(P2pNetworkEvent::Continue)
265            }
266            MainEvent::PeerConnectError(conn, peer, err) => {
267                log::error!("[P2pNetwork] connection {conn} outgoing: {peer:?} error {err}");
268                self.neighbours.remove(&conn);
269                Ok(P2pNetworkEvent::Continue)
270            }
271            MainEvent::PeerDisconnected(conn, peer) => {
272                log::info!("[P2pNetwork] connection {conn} disconnected from {peer}");
273                self.router.del_direct(&conn);
274                self.neighbours.remove(&conn);
275                Ok(P2pNetworkEvent::PeerDisconnected(conn, peer))
276            }
277            MainEvent::PeerStats(conn, to_peer, metrics) => {
278                log::debug!("[P2pNetwork] conn {conn} to peer {to_peer} metrics {:?}", metrics);
279                self.ctx.update_metrics(&conn, to_peer, metrics);
280                Ok(P2pNetworkEvent::Continue)
281            }
282        }
283    }
284
285    fn process_control(&mut self, cmd: ControlCmd) -> anyhow::Result<P2pNetworkEvent> {
286        match cmd {
287            ControlCmd::Connect(addr, tx) => {
288                let res = if self.neighbours.has_peer(&addr.peer_id()) {
289                    Ok(())
290                } else {
291                    log::info!("[P2pNetwork] connecting to {addr}");
292                    match self.endpoint.connect(*addr.network_address().deref(), CERT_DOMAIN_NAME) {
293                        Ok(connecting) => {
294                            let conn = PeerConnection::new_connecting(self.secure.clone(), self.local_id, addr.peer_id(), connecting, self.main_tx.clone(), self.ctx.clone());
295                            self.neighbours.insert(conn.conn_id(), conn);
296                            Ok(())
297                        }
298                        Err(err) => Err(err.into()),
299                    }
300                };
301
302                if let Some(tx) = tx {
303                    tx.send(res).print_on_err2("[P2pNetwork] send connect answer");
304                }
305
306                Ok(P2pNetworkEvent::Continue)
307            }
308        }
309    }
310}