tycho_network/network/
mod.rs

1use std::net::{SocketAddr, ToSocketAddrs};
2use std::sync::{Arc, Weak};
3
4use anyhow::Result;
5use everscale_crypto::ed25519;
6use tokio::sync::{broadcast, mpsc, oneshot};
7
8use self::config::EndpointConfig;
9pub use self::config::{NetworkConfig, QuicConfig};
10pub use self::connection::{Connection, RecvStream, SendStream};
11pub use self::connection_manager::{
12    ActivePeers, KnownPeerHandle, KnownPeers, KnownPeersError, PeerBannedError, WeakActivePeers,
13    WeakKnownPeerHandle,
14};
15use self::connection_manager::{ConnectionManager, ConnectionManagerRequest};
16use self::endpoint::Endpoint;
17pub use self::peer::Peer;
18use crate::types::{
19    Address, DisconnectReason, PeerEvent, PeerId, PeerInfo, Response, Service, ServiceExt,
20    ServiceRequest,
21};
22
23mod config;
24mod connection;
25mod connection_manager;
26mod crypto;
27mod endpoint;
28mod peer;
29mod request_handler;
30mod wire;
31
32pub struct NetworkBuilder<MandatoryFields = ([u8; 32],)> {
33    mandatory_fields: MandatoryFields,
34    optional_fields: BuilderFields,
35}
36
37#[derive(Default)]
38struct BuilderFields {
39    config: Option<NetworkConfig>,
40    remote_addr: Option<Address>,
41}
42
43impl<MandatoryFields> NetworkBuilder<MandatoryFields> {
44    pub fn with_config(mut self, config: NetworkConfig) -> Self {
45        self.optional_fields.config = Some(config);
46        self
47    }
48
49    pub fn with_remote_addr<T: Into<Address>>(mut self, addr: T) -> Self {
50        self.optional_fields.remote_addr = Some(addr.into());
51        self
52    }
53}
54
55impl NetworkBuilder<((),)> {
56    pub fn with_private_key(self, private_key: [u8; 32]) -> NetworkBuilder<([u8; 32],)> {
57        NetworkBuilder {
58            mandatory_fields: (private_key,),
59            optional_fields: self.optional_fields,
60        }
61    }
62
63    pub fn with_random_private_key(self) -> NetworkBuilder<([u8; 32],)> {
64        self.with_private_key(rand::random())
65    }
66}
67
68impl NetworkBuilder {
69    pub fn build<T: ToSocket, S>(self, bind_address: T, service: S) -> Result<Network>
70    where
71        S: Send + Sync + Clone + 'static,
72        S: Service<ServiceRequest, QueryResponse = Response>,
73    {
74        let config = self.optional_fields.config.unwrap_or_default();
75        let quic_config = config.quic.clone().unwrap_or_default();
76        let (private_key,) = self.mandatory_fields;
77
78        let keypair = ed25519::KeyPair::from(&ed25519::SecretKey::from_bytes(private_key));
79
80        let endpoint_config = EndpointConfig::builder()
81            .with_private_key(private_key)
82            .with_0rtt_enabled(config.enable_0rtt)
83            .with_transport_config(quic_config.make_transport_config())
84            .build()?;
85
86        let socket = bind_address.to_socket().map(socket2::Socket::from)?;
87
88        if let Some(send_buffer_size) = quic_config.socket_send_buffer_size {
89            if let Err(e) = socket.set_send_buffer_size(send_buffer_size) {
90                tracing::warn!(
91                    send_buffer_size,
92                    "failed to set socket send buffer size: {e:?}"
93                );
94            }
95        }
96
97        if let Some(recv_buffer_size) = quic_config.socket_recv_buffer_size {
98            if let Err(e) = socket.set_recv_buffer_size(recv_buffer_size) {
99                tracing::warn!(
100                    recv_buffer_size,
101                    "failed to set socket recv buffer size: {e:?}"
102                );
103            }
104        }
105
106        let config = Arc::new(config);
107        let endpoint = Arc::new(Endpoint::new(endpoint_config, socket.into())?);
108        let active_peers = ActivePeers::new(config.active_peers_event_channel_capacity);
109        let weak_active_peers = ActivePeers::downgrade(&active_peers);
110        let known_peers = KnownPeers::new();
111
112        let remote_addr = self.optional_fields.remote_addr.unwrap_or_else(|| {
113            let addr = endpoint.local_addr();
114            tracing::debug!(%addr, "using local address as remote address");
115            addr.into()
116        });
117
118        let inner = Arc::new_cyclic(move |_weak| {
119            let service = service.boxed_clone();
120
121            let (connection_manager, connection_manager_handle) = ConnectionManager::new(
122                config.clone(),
123                endpoint.clone(),
124                active_peers,
125                known_peers.clone(),
126                service,
127            );
128
129            tokio::spawn(connection_manager.start());
130
131            NetworkInner {
132                config,
133                remote_addr,
134                endpoint,
135                active_peers: weak_active_peers,
136                known_peers,
137                connection_manager_handle,
138                keypair,
139            }
140        });
141
142        Ok(Network(inner))
143    }
144}
145
146#[derive(Clone)]
147#[repr(transparent)]
148pub struct WeakNetwork(Weak<NetworkInner>);
149
150impl WeakNetwork {
151    pub fn upgrade(&self) -> Option<Network> {
152        self.0
153            .upgrade()
154            .map(Network)
155            .and_then(|network| (!network.is_closed()).then_some(network))
156    }
157}
158
159#[derive(Clone)]
160#[repr(transparent)]
161pub struct Network(Arc<NetworkInner>);
162
163impl Network {
164    pub fn builder() -> NetworkBuilder<((),)> {
165        NetworkBuilder {
166            mandatory_fields: ((),),
167            optional_fields: Default::default(),
168        }
169    }
170
171    pub fn remote_addr(&self) -> &Address {
172        self.0.remote_addr()
173    }
174
175    pub fn local_addr(&self) -> SocketAddr {
176        self.0.local_addr()
177    }
178
179    pub fn peer_id(&self) -> &PeerId {
180        self.0.peer_id()
181    }
182
183    pub fn peer(&self, peer_id: &PeerId) -> Option<Peer> {
184        self.0.peer(peer_id)
185    }
186
187    pub fn known_peers(&self) -> &KnownPeers {
188        self.0.known_peers()
189    }
190
191    pub fn subscribe(&self) -> Result<broadcast::Receiver<PeerEvent>> {
192        let active_peers = self.0.active_peers.upgrade().ok_or(NetworkShutdownError)?;
193        Ok(active_peers.subscribe())
194    }
195
196    pub async fn connect<T>(&self, addr: T, peer_id: &PeerId) -> Result<PeerId>
197    where
198        T: Into<Address>,
199    {
200        self.0.connect(addr.into(), peer_id).await
201    }
202
203    pub fn disconnect(&self, peer_id: &PeerId) -> Result<()> {
204        self.0.disconnect(peer_id)
205    }
206
207    pub async fn shutdown(&self) -> Result<()> {
208        self.0.shutdown().await
209    }
210
211    pub fn is_closed(&self) -> bool {
212        self.0.is_closed()
213    }
214
215    pub fn sign_tl<T: tl_proto::TlWrite>(&self, data: T) -> [u8; 64] {
216        self.0.keypair.sign(data)
217    }
218
219    pub fn sign_raw(&self, data: &[u8]) -> [u8; 64] {
220        self.0.keypair.sign_raw(data)
221    }
222
223    pub fn sign_peer_info(&self, now: u32, ttl: u32) -> PeerInfo {
224        let mut res = PeerInfo {
225            id: *self.0.peer_id(),
226            address_list: vec![self.remote_addr().clone()].into_boxed_slice(),
227            created_at: now,
228            expires_at: now.saturating_add(ttl),
229            signature: Box::new([0; 64]),
230        };
231        *res.signature = self.sign_tl(&res);
232        res
233    }
234
235    pub fn downgrade(this: &Self) -> WeakNetwork {
236        WeakNetwork(Arc::downgrade(&this.0))
237    }
238
239    /// returns the maximum size which can be potentially sent in a single frame
240    pub fn max_frame_size(&self) -> usize {
241        self.0.config.max_frame_size.0 as usize
242    }
243}
244
245struct NetworkInner {
246    config: Arc<NetworkConfig>,
247    remote_addr: Address,
248    endpoint: Arc<Endpoint>,
249    active_peers: WeakActivePeers,
250    known_peers: KnownPeers,
251    connection_manager_handle: mpsc::Sender<ConnectionManagerRequest>,
252    keypair: ed25519::KeyPair,
253}
254
255impl NetworkInner {
256    fn remote_addr(&self) -> &Address {
257        &self.remote_addr
258    }
259
260    fn local_addr(&self) -> SocketAddr {
261        self.endpoint.local_addr()
262    }
263
264    fn peer_id(&self) -> &PeerId {
265        self.endpoint.peer_id()
266    }
267
268    fn known_peers(&self) -> &KnownPeers {
269        &self.known_peers
270    }
271
272    async fn connect(&self, addr: Address, peer_id: &PeerId) -> Result<PeerId> {
273        #[derive(thiserror::Error, Debug)]
274        #[error("connection error: {0}")]
275        struct ConnectionError(#[source] Arc<anyhow::Error>);
276
277        let (tx, rx) = oneshot::channel();
278        self.connection_manager_handle
279            .send(ConnectionManagerRequest::Connect(addr, *peer_id, tx))
280            .await
281            .map_err(|_e| NetworkShutdownError)?;
282
283        let res = rx.await?;
284        res.map_err(|e| anyhow::Error::new(ConnectionError(e)))
285    }
286
287    fn disconnect(&self, peer_id: &PeerId) -> Result<()> {
288        let Some(active_peers) = self.active_peers.upgrade() else {
289            anyhow::bail!("network has been shutdown");
290        };
291        active_peers.remove(peer_id, DisconnectReason::Requested);
292        Ok(())
293    }
294
295    fn peer(&self, peer_id: &PeerId) -> Option<Peer> {
296        let active_peers = self.active_peers.upgrade()?;
297        let connection = active_peers.get(peer_id)?;
298        Some(Peer::new(connection, self.config.clone()))
299    }
300
301    async fn shutdown(&self) -> Result<()> {
302        let (sender, receiver) = oneshot::channel();
303        self.connection_manager_handle
304            .send(ConnectionManagerRequest::Shutdown(sender))
305            .await
306            .map_err(|_e| NetworkShutdownError)?;
307        receiver.await.map_err(Into::into)
308    }
309
310    fn is_closed(&self) -> bool {
311        self.connection_manager_handle.is_closed()
312    }
313}
314
315impl Drop for NetworkInner {
316    fn drop(&mut self) {
317        tracing::debug!("network dropped");
318    }
319}
320
321pub trait ToSocket {
322    fn to_socket(self) -> Result<std::net::UdpSocket>;
323}
324
325impl ToSocket for std::net::UdpSocket {
326    fn to_socket(self) -> Result<std::net::UdpSocket> {
327        Ok(self)
328    }
329}
330
331macro_rules! impl_to_socket_for_addr {
332    ($($ty:ty),*$(,)?) => {$(
333        impl ToSocket for $ty {
334            fn to_socket(self) -> Result<std::net::UdpSocket> {
335                bind_socket_to_addr(self)
336            }
337        }
338    )*};
339}
340
341impl_to_socket_for_addr! {
342    SocketAddr,
343    std::net::SocketAddrV4,
344    std::net::SocketAddrV6,
345    (std::net::IpAddr, u16),
346    (std::net::Ipv4Addr, u16),
347    (std::net::Ipv6Addr, u16),
348    (&str, u16),
349    (String, u16),
350    &str,
351    String,
352    &[SocketAddr],
353    Address,
354}
355
356fn bind_socket_to_addr<T: ToSocketAddrs>(bind_address: T) -> Result<std::net::UdpSocket> {
357    use socket2::{Domain, Protocol, Socket, Type};
358
359    let mut err = anyhow::anyhow!("no addresses to bind to");
360    for addr in bind_address.to_socket_addrs()? {
361        let s = Socket::new(Domain::for_address(addr), Type::DGRAM, Some(Protocol::UDP))?;
362        if let Err(e) = s.bind(&socket2::SockAddr::from(addr)) {
363            err = e.into();
364        } else {
365            return Ok(s.into());
366        }
367    }
368    Err(err)
369}
370
371#[derive(thiserror::Error, Debug)]
372#[error("network has been shutdown")]
373struct NetworkShutdownError;
374
375#[cfg(test)]
376mod tests {
377    use futures_util::stream::FuturesUnordered;
378    use futures_util::StreamExt;
379
380    use super::*;
381    use crate::types::{service_message_fn, service_query_fn, BoxCloneService, PeerInfo, Request};
382    use crate::util::NetworkExt;
383
384    fn echo_service() -> BoxCloneService<ServiceRequest, Response> {
385        let handle = |request: ServiceRequest| async move {
386            tracing::trace!("received: {}", request.body.escape_ascii());
387            let response = Response {
388                version: Default::default(),
389                body: request.body,
390            };
391            Some(response)
392        };
393        service_query_fn(handle).boxed_clone()
394    }
395
396    fn make_network() -> Result<Network> {
397        Network::builder()
398            .with_config(NetworkConfig {
399                enable_0rtt: true,
400                ..Default::default()
401            })
402            .with_random_private_key()
403            .build("127.0.0.1:0", echo_service())
404    }
405
406    fn make_peer_info(network: &Network) -> Arc<PeerInfo> {
407        Arc::new(PeerInfo {
408            id: *network.peer_id(),
409            address_list: vec![network.remote_addr().clone()].into_boxed_slice(),
410            created_at: 0,
411            expires_at: u32::MAX,
412            signature: Box::new([0; 64]),
413        })
414    }
415
416    #[tokio::test]
417    async fn connection_manager_works() -> Result<()> {
418        tycho_util::test::init_logger("connection_manager_works", "debug");
419
420        let peer1 = make_network()?;
421        let peer2 = make_network()?;
422
423        peer1
424            .connect(peer2.local_addr(), peer2.peer_id())
425            .await
426            .unwrap();
427        peer2
428            .connect(peer1.local_addr(), peer1.peer_id())
429            .await
430            .unwrap();
431
432        Ok(())
433    }
434
435    #[tokio::test]
436    async fn simultaneous_queries() -> Result<()> {
437        tycho_util::test::init_logger("simultaneous_queries", "debug");
438
439        for _ in 0..10 {
440            let peer1 = make_network()?;
441            let peer2 = make_network()?;
442
443            let _peer1_peer2_handle = peer1.known_peers().insert(make_peer_info(&peer2), false)?;
444            let _peer2_peer1_handle = peer2.known_peers().insert(make_peer_info(&peer1), false)?;
445
446            let req = Request {
447                version: Default::default(),
448                body: "hello".into(),
449            };
450            let peer1_fut = std::pin::pin!(peer1.query(peer2.peer_id(), req.clone()));
451            let peer2_fut = std::pin::pin!(peer2.query(peer1.peer_id(), req.clone()));
452
453            let (res1, res2) = futures_util::future::join(peer1_fut, peer2_fut).await;
454            assert_eq!(res1?.body, req.body);
455            assert_eq!(res2?.body, req.body);
456        }
457
458        Ok(())
459    }
460
461    #[tokio::test(flavor = "multi_thread")]
462    async fn uni_message_handler() -> Result<()> {
463        tycho_util::test::init_logger("uni_message_handler", "debug");
464
465        fn noop_service() -> BoxCloneService<ServiceRequest, Response> {
466            let handle = |request: ServiceRequest| async move {
467                tracing::trace!("received: {} bytes", request.body.len());
468            };
469            service_message_fn(handle).boxed_clone()
470        }
471
472        fn make_network() -> Result<Network> {
473            Network::builder()
474                .with_config(NetworkConfig {
475                    enable_0rtt: true,
476                    ..Default::default()
477                })
478                .with_random_private_key()
479                .build("127.0.0.1:0", noop_service())
480        }
481
482        let left = make_network()?;
483        let right = make_network()?;
484
485        let _left_to_right = left.known_peers().insert(make_peer_info(&right), false)?;
486        let _right_to_left = right.known_peers().insert(make_peer_info(&left), false)?;
487
488        let req = Request {
489            version: Default::default(),
490            body: vec![0xff; 750 * 1024].into(),
491        };
492
493        for _ in 0..10 {
494            let mut futures = FuturesUnordered::new();
495            for _ in 0..100 {
496                futures.push(left.send(right.peer_id(), req.clone()));
497            }
498
499            while let Some(res) = futures.next().await {
500                res?;
501            }
502        }
503
504        Ok(())
505    }
506}