1use std::net::{SocketAddr, ToSocketAddrs};
2use std::sync::{Arc, Weak};
3
4use anyhow::Result;
5use everscale_crypto::ed25519;
6use tokio::sync::{broadcast, mpsc, oneshot};
7
8use self::config::EndpointConfig;
9pub use self::config::{NetworkConfig, QuicConfig};
10pub use self::connection::{Connection, RecvStream, SendStream};
11pub use self::connection_manager::{
12 ActivePeers, KnownPeerHandle, KnownPeers, KnownPeersError, PeerBannedError, WeakActivePeers,
13 WeakKnownPeerHandle,
14};
15use self::connection_manager::{ConnectionManager, ConnectionManagerRequest};
16use self::endpoint::Endpoint;
17pub use self::peer::Peer;
18use crate::types::{
19 Address, DisconnectReason, PeerEvent, PeerId, PeerInfo, Response, Service, ServiceExt,
20 ServiceRequest,
21};
22
23mod config;
24mod connection;
25mod connection_manager;
26mod crypto;
27mod endpoint;
28mod peer;
29mod request_handler;
30mod wire;
31
32pub struct NetworkBuilder<MandatoryFields = ([u8; 32],)> {
33 mandatory_fields: MandatoryFields,
34 optional_fields: BuilderFields,
35}
36
37#[derive(Default)]
38struct BuilderFields {
39 config: Option<NetworkConfig>,
40 remote_addr: Option<Address>,
41}
42
43impl<MandatoryFields> NetworkBuilder<MandatoryFields> {
44 pub fn with_config(mut self, config: NetworkConfig) -> Self {
45 self.optional_fields.config = Some(config);
46 self
47 }
48
49 pub fn with_remote_addr<T: Into<Address>>(mut self, addr: T) -> Self {
50 self.optional_fields.remote_addr = Some(addr.into());
51 self
52 }
53}
54
55impl NetworkBuilder<((),)> {
56 pub fn with_private_key(self, private_key: [u8; 32]) -> NetworkBuilder<([u8; 32],)> {
57 NetworkBuilder {
58 mandatory_fields: (private_key,),
59 optional_fields: self.optional_fields,
60 }
61 }
62
63 pub fn with_random_private_key(self) -> NetworkBuilder<([u8; 32],)> {
64 self.with_private_key(rand::random())
65 }
66}
67
68impl NetworkBuilder {
69 pub fn build<T: ToSocket, S>(self, bind_address: T, service: S) -> Result<Network>
70 where
71 S: Send + Sync + Clone + 'static,
72 S: Service<ServiceRequest, QueryResponse = Response>,
73 {
74 let config = self.optional_fields.config.unwrap_or_default();
75 let quic_config = config.quic.clone().unwrap_or_default();
76 let (private_key,) = self.mandatory_fields;
77
78 let keypair = ed25519::KeyPair::from(&ed25519::SecretKey::from_bytes(private_key));
79
80 let endpoint_config = EndpointConfig::builder()
81 .with_private_key(private_key)
82 .with_0rtt_enabled(config.enable_0rtt)
83 .with_transport_config(quic_config.make_transport_config())
84 .build()?;
85
86 let socket = bind_address.to_socket().map(socket2::Socket::from)?;
87
88 if let Some(send_buffer_size) = quic_config.socket_send_buffer_size {
89 if let Err(e) = socket.set_send_buffer_size(send_buffer_size) {
90 tracing::warn!(
91 send_buffer_size,
92 "failed to set socket send buffer size: {e:?}"
93 );
94 }
95 }
96
97 if let Some(recv_buffer_size) = quic_config.socket_recv_buffer_size {
98 if let Err(e) = socket.set_recv_buffer_size(recv_buffer_size) {
99 tracing::warn!(
100 recv_buffer_size,
101 "failed to set socket recv buffer size: {e:?}"
102 );
103 }
104 }
105
106 let config = Arc::new(config);
107 let endpoint = Arc::new(Endpoint::new(endpoint_config, socket.into())?);
108 let active_peers = ActivePeers::new(config.active_peers_event_channel_capacity);
109 let weak_active_peers = ActivePeers::downgrade(&active_peers);
110 let known_peers = KnownPeers::new();
111
112 let remote_addr = self.optional_fields.remote_addr.unwrap_or_else(|| {
113 let addr = endpoint.local_addr();
114 tracing::debug!(%addr, "using local address as remote address");
115 addr.into()
116 });
117
118 let inner = Arc::new_cyclic(move |_weak| {
119 let service = service.boxed_clone();
120
121 let (connection_manager, connection_manager_handle) = ConnectionManager::new(
122 config.clone(),
123 endpoint.clone(),
124 active_peers,
125 known_peers.clone(),
126 service,
127 );
128
129 tokio::spawn(connection_manager.start());
130
131 NetworkInner {
132 config,
133 remote_addr,
134 endpoint,
135 active_peers: weak_active_peers,
136 known_peers,
137 connection_manager_handle,
138 keypair,
139 }
140 });
141
142 Ok(Network(inner))
143 }
144}
145
146#[derive(Clone)]
147#[repr(transparent)]
148pub struct WeakNetwork(Weak<NetworkInner>);
149
150impl WeakNetwork {
151 pub fn upgrade(&self) -> Option<Network> {
152 self.0
153 .upgrade()
154 .map(Network)
155 .and_then(|network| (!network.is_closed()).then_some(network))
156 }
157}
158
159#[derive(Clone)]
160#[repr(transparent)]
161pub struct Network(Arc<NetworkInner>);
162
163impl Network {
164 pub fn builder() -> NetworkBuilder<((),)> {
165 NetworkBuilder {
166 mandatory_fields: ((),),
167 optional_fields: Default::default(),
168 }
169 }
170
171 pub fn remote_addr(&self) -> &Address {
172 self.0.remote_addr()
173 }
174
175 pub fn local_addr(&self) -> SocketAddr {
176 self.0.local_addr()
177 }
178
179 pub fn peer_id(&self) -> &PeerId {
180 self.0.peer_id()
181 }
182
183 pub fn peer(&self, peer_id: &PeerId) -> Option<Peer> {
184 self.0.peer(peer_id)
185 }
186
187 pub fn known_peers(&self) -> &KnownPeers {
188 self.0.known_peers()
189 }
190
191 pub fn subscribe(&self) -> Result<broadcast::Receiver<PeerEvent>> {
192 let active_peers = self.0.active_peers.upgrade().ok_or(NetworkShutdownError)?;
193 Ok(active_peers.subscribe())
194 }
195
196 pub async fn connect<T>(&self, addr: T, peer_id: &PeerId) -> Result<PeerId>
197 where
198 T: Into<Address>,
199 {
200 self.0.connect(addr.into(), peer_id).await
201 }
202
203 pub fn disconnect(&self, peer_id: &PeerId) -> Result<()> {
204 self.0.disconnect(peer_id)
205 }
206
207 pub async fn shutdown(&self) -> Result<()> {
208 self.0.shutdown().await
209 }
210
211 pub fn is_closed(&self) -> bool {
212 self.0.is_closed()
213 }
214
215 pub fn sign_tl<T: tl_proto::TlWrite>(&self, data: T) -> [u8; 64] {
216 self.0.keypair.sign(data)
217 }
218
219 pub fn sign_raw(&self, data: &[u8]) -> [u8; 64] {
220 self.0.keypair.sign_raw(data)
221 }
222
223 pub fn sign_peer_info(&self, now: u32, ttl: u32) -> PeerInfo {
224 let mut res = PeerInfo {
225 id: *self.0.peer_id(),
226 address_list: vec![self.remote_addr().clone()].into_boxed_slice(),
227 created_at: now,
228 expires_at: now.saturating_add(ttl),
229 signature: Box::new([0; 64]),
230 };
231 *res.signature = self.sign_tl(&res);
232 res
233 }
234
235 pub fn downgrade(this: &Self) -> WeakNetwork {
236 WeakNetwork(Arc::downgrade(&this.0))
237 }
238
239 pub fn max_frame_size(&self) -> usize {
241 self.0.config.max_frame_size.0 as usize
242 }
243}
244
245struct NetworkInner {
246 config: Arc<NetworkConfig>,
247 remote_addr: Address,
248 endpoint: Arc<Endpoint>,
249 active_peers: WeakActivePeers,
250 known_peers: KnownPeers,
251 connection_manager_handle: mpsc::Sender<ConnectionManagerRequest>,
252 keypair: ed25519::KeyPair,
253}
254
255impl NetworkInner {
256 fn remote_addr(&self) -> &Address {
257 &self.remote_addr
258 }
259
260 fn local_addr(&self) -> SocketAddr {
261 self.endpoint.local_addr()
262 }
263
264 fn peer_id(&self) -> &PeerId {
265 self.endpoint.peer_id()
266 }
267
268 fn known_peers(&self) -> &KnownPeers {
269 &self.known_peers
270 }
271
272 async fn connect(&self, addr: Address, peer_id: &PeerId) -> Result<PeerId> {
273 #[derive(thiserror::Error, Debug)]
274 #[error("connection error: {0}")]
275 struct ConnectionError(#[source] Arc<anyhow::Error>);
276
277 let (tx, rx) = oneshot::channel();
278 self.connection_manager_handle
279 .send(ConnectionManagerRequest::Connect(addr, *peer_id, tx))
280 .await
281 .map_err(|_e| NetworkShutdownError)?;
282
283 let res = rx.await?;
284 res.map_err(|e| anyhow::Error::new(ConnectionError(e)))
285 }
286
287 fn disconnect(&self, peer_id: &PeerId) -> Result<()> {
288 let Some(active_peers) = self.active_peers.upgrade() else {
289 anyhow::bail!("network has been shutdown");
290 };
291 active_peers.remove(peer_id, DisconnectReason::Requested);
292 Ok(())
293 }
294
295 fn peer(&self, peer_id: &PeerId) -> Option<Peer> {
296 let active_peers = self.active_peers.upgrade()?;
297 let connection = active_peers.get(peer_id)?;
298 Some(Peer::new(connection, self.config.clone()))
299 }
300
301 async fn shutdown(&self) -> Result<()> {
302 let (sender, receiver) = oneshot::channel();
303 self.connection_manager_handle
304 .send(ConnectionManagerRequest::Shutdown(sender))
305 .await
306 .map_err(|_e| NetworkShutdownError)?;
307 receiver.await.map_err(Into::into)
308 }
309
310 fn is_closed(&self) -> bool {
311 self.connection_manager_handle.is_closed()
312 }
313}
314
315impl Drop for NetworkInner {
316 fn drop(&mut self) {
317 tracing::debug!("network dropped");
318 }
319}
320
321pub trait ToSocket {
322 fn to_socket(self) -> Result<std::net::UdpSocket>;
323}
324
325impl ToSocket for std::net::UdpSocket {
326 fn to_socket(self) -> Result<std::net::UdpSocket> {
327 Ok(self)
328 }
329}
330
331macro_rules! impl_to_socket_for_addr {
332 ($($ty:ty),*$(,)?) => {$(
333 impl ToSocket for $ty {
334 fn to_socket(self) -> Result<std::net::UdpSocket> {
335 bind_socket_to_addr(self)
336 }
337 }
338 )*};
339}
340
341impl_to_socket_for_addr! {
342 SocketAddr,
343 std::net::SocketAddrV4,
344 std::net::SocketAddrV6,
345 (std::net::IpAddr, u16),
346 (std::net::Ipv4Addr, u16),
347 (std::net::Ipv6Addr, u16),
348 (&str, u16),
349 (String, u16),
350 &str,
351 String,
352 &[SocketAddr],
353 Address,
354}
355
356fn bind_socket_to_addr<T: ToSocketAddrs>(bind_address: T) -> Result<std::net::UdpSocket> {
357 use socket2::{Domain, Protocol, Socket, Type};
358
359 let mut err = anyhow::anyhow!("no addresses to bind to");
360 for addr in bind_address.to_socket_addrs()? {
361 let s = Socket::new(Domain::for_address(addr), Type::DGRAM, Some(Protocol::UDP))?;
362 if let Err(e) = s.bind(&socket2::SockAddr::from(addr)) {
363 err = e.into();
364 } else {
365 return Ok(s.into());
366 }
367 }
368 Err(err)
369}
370
371#[derive(thiserror::Error, Debug)]
372#[error("network has been shutdown")]
373struct NetworkShutdownError;
374
375#[cfg(test)]
376mod tests {
377 use futures_util::stream::FuturesUnordered;
378 use futures_util::StreamExt;
379
380 use super::*;
381 use crate::types::{service_message_fn, service_query_fn, BoxCloneService, PeerInfo, Request};
382 use crate::util::NetworkExt;
383
384 fn echo_service() -> BoxCloneService<ServiceRequest, Response> {
385 let handle = |request: ServiceRequest| async move {
386 tracing::trace!("received: {}", request.body.escape_ascii());
387 let response = Response {
388 version: Default::default(),
389 body: request.body,
390 };
391 Some(response)
392 };
393 service_query_fn(handle).boxed_clone()
394 }
395
396 fn make_network() -> Result<Network> {
397 Network::builder()
398 .with_config(NetworkConfig {
399 enable_0rtt: true,
400 ..Default::default()
401 })
402 .with_random_private_key()
403 .build("127.0.0.1:0", echo_service())
404 }
405
406 fn make_peer_info(network: &Network) -> Arc<PeerInfo> {
407 Arc::new(PeerInfo {
408 id: *network.peer_id(),
409 address_list: vec![network.remote_addr().clone()].into_boxed_slice(),
410 created_at: 0,
411 expires_at: u32::MAX,
412 signature: Box::new([0; 64]),
413 })
414 }
415
416 #[tokio::test]
417 async fn connection_manager_works() -> Result<()> {
418 tycho_util::test::init_logger("connection_manager_works", "debug");
419
420 let peer1 = make_network()?;
421 let peer2 = make_network()?;
422
423 peer1
424 .connect(peer2.local_addr(), peer2.peer_id())
425 .await
426 .unwrap();
427 peer2
428 .connect(peer1.local_addr(), peer1.peer_id())
429 .await
430 .unwrap();
431
432 Ok(())
433 }
434
435 #[tokio::test]
436 async fn simultaneous_queries() -> Result<()> {
437 tycho_util::test::init_logger("simultaneous_queries", "debug");
438
439 for _ in 0..10 {
440 let peer1 = make_network()?;
441 let peer2 = make_network()?;
442
443 let _peer1_peer2_handle = peer1.known_peers().insert(make_peer_info(&peer2), false)?;
444 let _peer2_peer1_handle = peer2.known_peers().insert(make_peer_info(&peer1), false)?;
445
446 let req = Request {
447 version: Default::default(),
448 body: "hello".into(),
449 };
450 let peer1_fut = std::pin::pin!(peer1.query(peer2.peer_id(), req.clone()));
451 let peer2_fut = std::pin::pin!(peer2.query(peer1.peer_id(), req.clone()));
452
453 let (res1, res2) = futures_util::future::join(peer1_fut, peer2_fut).await;
454 assert_eq!(res1?.body, req.body);
455 assert_eq!(res2?.body, req.body);
456 }
457
458 Ok(())
459 }
460
461 #[tokio::test(flavor = "multi_thread")]
462 async fn uni_message_handler() -> Result<()> {
463 tycho_util::test::init_logger("uni_message_handler", "debug");
464
465 fn noop_service() -> BoxCloneService<ServiceRequest, Response> {
466 let handle = |request: ServiceRequest| async move {
467 tracing::trace!("received: {} bytes", request.body.len());
468 };
469 service_message_fn(handle).boxed_clone()
470 }
471
472 fn make_network() -> Result<Network> {
473 Network::builder()
474 .with_config(NetworkConfig {
475 enable_0rtt: true,
476 ..Default::default()
477 })
478 .with_random_private_key()
479 .build("127.0.0.1:0", noop_service())
480 }
481
482 let left = make_network()?;
483 let right = make_network()?;
484
485 let _left_to_right = left.known_peers().insert(make_peer_info(&right), false)?;
486 let _right_to_left = right.known_peers().insert(make_peer_info(&left), false)?;
487
488 let req = Request {
489 version: Default::default(),
490 body: vec![0xff; 750 * 1024].into(),
491 };
492
493 for _ in 0..10 {
494 let mut futures = FuturesUnordered::new();
495 for _ in 0..100 {
496 futures.push(left.send(right.peer_id(), req.clone()));
497 }
498
499 while let Some(res) = futures.next().await {
500 res?;
501 }
502 }
503
504 Ok(())
505 }
506}