1use crate::{
10 bootstrap::{InitialBootstrap, InitialBootstrapTrigger},
11 circular_vec::CircularVec,
12 driver::NodeBehaviour,
13 error::{NetworkError, Result},
14 event::NetworkEvent,
15 external_address::ExternalAddressManager,
16 network_discovery::NetworkDiscovery,
17 record_store::{NodeRecordStore, NodeRecordStoreConfig},
18 relay_manager::RelayManager,
19 replication_fetcher::ReplicationFetcher,
20 time::Instant,
21 transport, Network, SwarmDriver, CLOSE_GROUP_SIZE,
22};
23#[cfg(feature = "open-metrics")]
24use crate::{
25 metrics::service::run_metrics_server, metrics::NetworkMetricsRecorder, MetricsRegistries,
26};
27use ant_bootstrap::BootstrapCacheStore;
28use ant_protocol::{
29 version::{get_network_id_str, IDENTIFY_PROTOCOL_STR, REQ_RESPONSE_VERSION_STR},
30 NetworkAddress, PrettyPrintKBucketKey,
31};
32use futures::future::Either;
33use libp2p::Transport as _;
34use libp2p::{core::muxing::StreamMuxerBox, relay};
35use libp2p::{
36 identity::Keypair,
37 kad,
38 multiaddr::Protocol,
39 request_response::{self, Config as RequestResponseConfig, ProtocolSupport},
40 swarm::{StreamProtocol, Swarm},
41 Multiaddr, PeerId,
42};
43#[cfg(feature = "open-metrics")]
44use prometheus_client::metrics::info::Info;
45use rand::Rng;
46use std::{
47 convert::TryInto,
48 fmt::Debug,
49 fs,
50 io::{Read, Write},
51 net::SocketAddr,
52 num::NonZeroUsize,
53 path::PathBuf,
54 time::Duration,
55};
56use tokio::sync::mpsc;
57
58const REQUEST_TIMEOUT_DEFAULT_S: Duration = Duration::from_secs(30);
60const CONNECTION_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
62
63const RESEND_IDENTIFY_INVERVAL: Duration = Duration::from_secs(3600);
65
66const NETWORKING_CHANNEL_SIZE: usize = 10_000;
67
68const KAD_QUERY_TIMEOUT_S: Duration = Duration::from_secs(10);
70
71const REPLICATION_FACTOR: NonZeroUsize = match NonZeroUsize::new(CLOSE_GROUP_SIZE + 2) {
74 Some(v) => v,
75 None => panic!("CLOSE_GROUP_SIZE should not be zero"),
76};
77
78const KAD_STREAM_PROTOCOL_ID: StreamProtocol = StreamProtocol::new("/autonomi/kad/1.0.0");
79
80pub const MAX_PACKET_SIZE: usize = 1024 * 1024 * 5; const PERIODIC_KAD_BOOTSTRAP_INTERVAL_MAX_S: u64 = 21600;
87
88#[derive(Debug)]
89pub struct NetworkBuilder {
90 bootstrap_cache: Option<BootstrapCacheStore>,
91 concurrency_limit: Option<usize>,
92 initial_contacts: Vec<Multiaddr>,
93 keypair: Keypair,
94 listen_addr: Option<SocketAddr>,
95 local: bool,
96 #[cfg(feature = "open-metrics")]
97 metrics_registries: Option<MetricsRegistries>,
98 #[cfg(feature = "open-metrics")]
99 metrics_server_port: Option<u16>,
100 no_upnp: bool,
101 relay_client: bool,
102 request_timeout: Option<Duration>,
103}
104
105impl NetworkBuilder {
106 pub fn new(keypair: Keypair, local: bool, initial_contacts: Vec<Multiaddr>) -> Self {
107 Self {
108 bootstrap_cache: None,
109 concurrency_limit: None,
110 initial_contacts,
111 keypair,
112 listen_addr: None,
113 local,
114 #[cfg(feature = "open-metrics")]
115 metrics_registries: None,
116 #[cfg(feature = "open-metrics")]
117 metrics_server_port: None,
118 no_upnp: true,
119 relay_client: false,
120 request_timeout: None,
121 }
122 }
123
124 pub fn bootstrap_cache(&mut self, bootstrap_cache: BootstrapCacheStore) {
125 self.bootstrap_cache = Some(bootstrap_cache);
126 }
127
128 pub fn relay_client(&mut self, relay_client: bool) {
129 self.relay_client = relay_client;
130 }
131
132 pub fn listen_addr(&mut self, listen_addr: SocketAddr) {
133 self.listen_addr = Some(listen_addr);
134 }
135
136 pub fn request_timeout(&mut self, request_timeout: Duration) {
137 self.request_timeout = Some(request_timeout);
138 }
139
140 pub fn concurrency_limit(&mut self, concurrency_limit: usize) {
141 self.concurrency_limit = Some(concurrency_limit);
142 }
143
144 #[cfg(feature = "open-metrics")]
147 pub fn metrics_registries(&mut self, registries: MetricsRegistries) {
148 self.metrics_registries = Some(registries);
149 }
150
151 #[cfg(feature = "open-metrics")]
152 pub fn metrics_server_port(&mut self, port: Option<u16>) {
154 self.metrics_server_port = port;
155 }
156
157 pub fn no_upnp(&mut self, no_upnp: bool) {
158 self.no_upnp = no_upnp;
159 }
160
161 pub fn build_node(
175 self,
176 root_dir: PathBuf,
177 ) -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
178 let bootstrap_interval = rand::thread_rng().gen_range(
179 PERIODIC_KAD_BOOTSTRAP_INTERVAL_MAX_S / 2..PERIODIC_KAD_BOOTSTRAP_INTERVAL_MAX_S,
180 );
181
182 let mut kad_cfg = kad::Config::new(KAD_STREAM_PROTOCOL_ID);
183 let _ = kad_cfg
184 .set_kbucket_inserts(libp2p::kad::BucketInserts::Manual)
185 .set_replication_interval(None)
189 .set_publication_interval(None)
192 .set_max_packet_size(MAX_PACKET_SIZE)
194 .set_replication_factor(REPLICATION_FACTOR)
196 .set_query_timeout(KAD_QUERY_TIMEOUT_S)
197 .set_record_ttl(None)
202 .set_periodic_bootstrap_interval(Some(Duration::from_secs(bootstrap_interval)))
203 .set_provider_publication_interval(None);
208
209 let store_cfg = {
210 let storage_dir_path = root_dir.join("record_store");
211 check_and_wipe_storage_dir_if_necessary(
215 root_dir.clone(),
216 storage_dir_path.clone(),
217 get_network_id_str(),
218 )?;
219
220 if let Err(error) = std::fs::create_dir_all(&storage_dir_path) {
224 return Err(NetworkError::FailedToCreateRecordStoreDir {
225 path: storage_dir_path,
226 source: error,
227 });
228 }
229 let peer_id = PeerId::from(self.keypair.public());
230 let encryption_seed: [u8; 16] = peer_id
231 .to_bytes()
232 .get(..16)
233 .expect("Cann't get encryption_seed from keypair")
234 .try_into()
235 .expect("Cann't get 16 bytes from serialised key_pair");
236 NodeRecordStoreConfig {
237 max_value_bytes: MAX_PACKET_SIZE, storage_dir: storage_dir_path,
239 historic_quote_dir: root_dir.clone(),
240 encryption_seed,
241 ..Default::default()
242 }
243 };
244
245 let listen_addr = self.listen_addr;
246
247 let (network, events_receiver, mut swarm_driver) =
248 self.build(kad_cfg, store_cfg, ProtocolSupport::Full);
249
250 let listen_socket_addr = listen_addr.ok_or(NetworkError::ListenAddressNotProvided)?;
252
253 let addr_quic = Multiaddr::from(listen_socket_addr.ip())
255 .with(Protocol::Udp(listen_socket_addr.port()))
256 .with(Protocol::QuicV1);
257 swarm_driver
258 .listen_on(addr_quic)
259 .expect("Multiaddr should be supported by our configured transports");
260
261 Ok((network, events_receiver, swarm_driver))
262 }
263
264 fn build(
266 self,
267 kad_cfg: kad::Config,
268 record_store_cfg: NodeRecordStoreConfig,
269 req_res_protocol: ProtocolSupport,
270 ) -> (Network, mpsc::Receiver<NetworkEvent>, SwarmDriver) {
271 let identify_protocol_str = IDENTIFY_PROTOCOL_STR
272 .read()
273 .expect("Failed to obtain read lock for IDENTIFY_PROTOCOL_STR")
274 .clone();
275
276 let peer_id = PeerId::from(self.keypair.public());
277 info!(
279 "Process (PID: {}) with PeerId: {peer_id}",
280 std::process::id()
281 );
282 info!(
283 "Self PeerID {peer_id} is represented as kbucket_key {:?}",
284 PrettyPrintKBucketKey(NetworkAddress::from(peer_id).as_kbucket_key())
285 );
286
287 #[cfg(feature = "open-metrics")]
288 let mut metrics_registries = self.metrics_registries.unwrap_or_default();
289
290 #[cfg(feature = "open-metrics")]
292 let main_transport = transport::build_transport(&self.keypair, &mut metrics_registries);
293 #[cfg(not(feature = "open-metrics"))]
294 let main_transport = transport::build_transport(&self.keypair);
295 let transport = if !self.local {
296 debug!("Preventing non-global dials");
297 libp2p::core::transport::global_only::Transport::new(main_transport).boxed()
299 } else {
300 main_transport
301 };
302
303 let (relay_transport, relay_behaviour) =
304 libp2p::relay::client::new(self.keypair.public().to_peer_id());
305 let relay_transport = relay_transport
306 .upgrade(libp2p::core::upgrade::Version::V1Lazy)
307 .authenticate(
308 libp2p::noise::Config::new(&self.keypair)
309 .expect("Signing libp2p-noise static DH keypair failed."),
310 )
311 .multiplex(libp2p::yamux::Config::default())
312 .or_transport(transport);
313
314 let transport = relay_transport
315 .map(|either_output, _| match either_output {
316 Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
317 Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
318 })
319 .boxed();
320
321 #[cfg(feature = "open-metrics")]
322 let metrics_recorder = if let Some(port) = self.metrics_server_port {
323 let metrics_recorder = NetworkMetricsRecorder::new(&mut metrics_registries);
324 let metadata_sub_reg = metrics_registries
325 .metadata
326 .sub_registry_with_prefix("ant_networking");
327
328 metadata_sub_reg.register(
329 "peer_id",
330 "Identifier of a peer of the network",
331 Info::new(vec![("peer_id".to_string(), peer_id.to_string())]),
332 );
333 metadata_sub_reg.register(
334 "identify_protocol_str",
335 "The protocol version string that is used to connect to the correct network",
336 Info::new(vec![(
337 "identify_protocol_str".to_string(),
338 identify_protocol_str.clone(),
339 )]),
340 );
341
342 run_metrics_server(metrics_registries, port);
343 Some(metrics_recorder)
344 } else {
345 None
346 };
347
348 let request_response = {
350 let cfg = RequestResponseConfig::default()
351 .with_request_timeout(self.request_timeout.unwrap_or(REQUEST_TIMEOUT_DEFAULT_S));
352 let req_res_version_str = REQ_RESPONSE_VERSION_STR
353 .read()
354 .expect("Failed to obtain read lock for REQ_RESPONSE_VERSION_STR")
355 .clone();
356
357 info!("Building request response with {req_res_version_str:?}",);
358 request_response::cbor::Behaviour::new(
359 [(
360 StreamProtocol::try_from_owned(req_res_version_str)
361 .expect("StreamProtocol should start with a /"),
362 req_res_protocol,
363 )],
364 cfg,
365 )
366 };
367
368 let (network_event_sender, network_event_receiver) = mpsc::channel(NETWORKING_CHANNEL_SIZE);
369 let (network_swarm_cmd_sender, network_swarm_cmd_receiver) =
370 mpsc::channel(NETWORKING_CHANNEL_SIZE);
371 let (local_swarm_cmd_sender, local_swarm_cmd_receiver) =
372 mpsc::channel(NETWORKING_CHANNEL_SIZE);
373
374 let kademlia = {
376 #[cfg(feature = "open-metrics")]
377 let record_stored_metrics = metrics_recorder.as_ref().map(|r| r.records_stored.clone());
378 let node_record_store = NodeRecordStore::with_config(
379 peer_id,
380 record_store_cfg,
381 network_event_sender.clone(),
382 local_swarm_cmd_sender.clone(),
383 #[cfg(feature = "open-metrics")]
384 record_stored_metrics,
385 );
386
387 let store = node_record_store;
388 debug!("Using Kademlia with NodeRecordStore!");
389 kad::Behaviour::with_config(peer_id, store, kad_cfg)
390 };
391
392 let agent_version =
393 ant_protocol::version::construct_node_user_agent(env!("CARGO_PKG_VERSION").to_string());
394
395 info!("Building Identify with identify_protocol_str: {identify_protocol_str:?} and agent_version: {agent_version:?}");
397 let identify = {
398 let cfg = libp2p::identify::Config::new(identify_protocol_str, self.keypair.public())
399 .with_agent_version(agent_version)
400 .with_interval(RESEND_IDENTIFY_INVERVAL)
402 .with_hide_listen_addrs(true);
403 libp2p::identify::Behaviour::new(cfg)
404 };
405
406 let upnp = if !self.local && !self.no_upnp && !self.relay_client {
407 debug!("Enabling UPnP port opening behavior");
408 Some(libp2p::upnp::tokio::Behaviour::default())
409 } else {
410 None
411 }
412 .into(); let relay_server = if !self.relay_client {
415 let relay_server_cfg = relay::Config {
416 max_reservations: 128, max_circuits: 1024, max_circuits_per_peer: 256, circuit_src_rate_limiters: vec![], max_circuit_bytes: MAX_PACKET_SIZE as u64,
422 ..Default::default()
423 };
424 Some(libp2p::relay::Behaviour::new(peer_id, relay_server_cfg))
425 } else {
426 None
427 }
428 .into();
429
430 let behaviour = NodeBehaviour {
431 blocklist: libp2p::allow_block_list::Behaviour::default(),
432 do_not_disturb: crate::behaviour::do_not_disturb::Behaviour::default(),
433 relay_client: relay_behaviour,
436 relay_server,
437 upnp,
438 request_response,
439 kademlia,
440 identify,
441 };
442
443 let swarm_config = libp2p::swarm::Config::with_tokio_executor()
444 .with_idle_connection_timeout(CONNECTION_KEEP_ALIVE_TIMEOUT);
445
446 let swarm = Swarm::new(transport, behaviour, peer_id, swarm_config);
447
448 let replication_fetcher = ReplicationFetcher::new(peer_id, network_event_sender.clone());
449
450 let relay_manager = if self.relay_client {
452 let relay_manager = RelayManager::new(peer_id);
453 #[cfg(feature = "open-metrics")]
454 let mut relay_manager = relay_manager;
455 #[cfg(feature = "open-metrics")]
456 if let Some(metrics_recorder) = &metrics_recorder {
457 relay_manager.set_reservation_health_metrics(
458 metrics_recorder.relay_reservation_health.clone(),
459 );
460 }
461 Some(relay_manager)
462 } else {
463 info!("Relay manager is disabled for this node.");
464 None
465 };
466
467 let external_address_manager = if !self.local && !self.relay_client {
471 Some(ExternalAddressManager::new(peer_id))
472 } else {
473 info!("External address manager is disabled for this node.");
474 None
475 };
476
477 let is_upnp_enabled = swarm.behaviour().upnp.is_enabled();
478 let swarm_driver = SwarmDriver {
479 swarm,
480 self_peer_id: peer_id,
481 local: self.local,
482 is_relay_client: self.relay_client,
483 #[cfg(feature = "open-metrics")]
484 close_group: Vec::with_capacity(CLOSE_GROUP_SIZE),
485 peers_in_rt: 0,
486 initial_bootstrap: InitialBootstrap::new(self.initial_contacts),
487 initial_bootstrap_trigger: InitialBootstrapTrigger::new(is_upnp_enabled),
488 bootstrap_cache: self.bootstrap_cache,
489 dial_queue: Default::default(),
490 relay_manager,
491 connected_relay_clients: Default::default(),
492 external_address_manager,
493 replication_fetcher,
494 #[cfg(feature = "open-metrics")]
495 metrics_recorder,
496 network_cmd_sender: network_swarm_cmd_sender.clone(),
499 network_cmd_receiver: network_swarm_cmd_receiver,
500 local_cmd_sender: local_swarm_cmd_sender.clone(),
501 local_cmd_receiver: local_swarm_cmd_receiver,
502 event_sender: network_event_sender,
503 pending_get_closest_peers: Default::default(),
504 pending_requests: Default::default(),
505 dialed_peers: CircularVec::new(255),
508 network_discovery: NetworkDiscovery::new(&peer_id),
509 live_connected_peers: Default::default(),
510 latest_established_connection_ids: Default::default(),
511 handling_statistics: Default::default(),
512 handled_times: 0,
513 hard_disk_write_error: 0,
514 bad_nodes: Default::default(),
515 quotes_history: Default::default(),
516 replication_targets: Default::default(),
517 last_replication: None,
518 last_connection_pruning_time: Instant::now(),
519 peers_version: Default::default(),
520 };
521
522 let network = Network::new(
523 network_swarm_cmd_sender,
524 local_swarm_cmd_sender,
525 peer_id,
526 self.keypair,
527 );
528
529 (network, network_event_receiver, swarm_driver)
530 }
531}
532
533fn check_and_wipe_storage_dir_if_necessary(
534 root_dir: PathBuf,
535 storage_dir_path: PathBuf,
536 cur_version_str: String,
537) -> Result<()> {
538 let mut prev_version_str = String::new();
539 let version_file = root_dir.join("network_key_version");
540 {
541 match fs::File::open(version_file.clone()) {
542 Ok(mut file) => {
543 file.read_to_string(&mut prev_version_str)?;
544 }
545 Err(err) => {
546 warn!("Failed in accessing version file {version_file:?}: {err:?}");
547 info!("Creating a new version file at {version_file:?}");
549 fs::File::create(version_file.clone())?;
550 }
551 }
552 }
553
554 if cur_version_str != prev_version_str {
558 warn!("Trying to wipe out storage dir {storage_dir_path:?}, as cur_version {cur_version_str:?} doesn't match prev_version {prev_version_str:?}");
559 let _ = fs::remove_dir_all(storage_dir_path);
560
561 let mut file = fs::OpenOptions::new()
562 .write(true)
563 .truncate(true)
564 .open(version_file.clone())?;
565 info!("Writing cur_version {cur_version_str:?} into version file at {version_file:?}");
566 file.write_all(cur_version_str.as_bytes())?;
567 }
568
569 Ok(())
570}
571
572#[cfg(test)]
573mod tests {
574 use super::check_and_wipe_storage_dir_if_necessary;
575 use std::{fs, io::Read};
576
577 #[tokio::test]
578 async fn version_file_update() {
579 let temp_dir = std::env::temp_dir();
580 let unique_dir_name = uuid::Uuid::new_v4().to_string();
581 let root_dir = temp_dir.join(unique_dir_name);
582 fs::create_dir_all(&root_dir).expect("Failed to create root directory");
583
584 let version_file = root_dir.join("network_key_version");
585 let storage_dir = root_dir.join("record_store");
586
587 let cur_version = uuid::Uuid::new_v4().to_string();
588 assert!(check_and_wipe_storage_dir_if_necessary(
589 root_dir.clone(),
590 storage_dir.clone(),
591 cur_version.clone()
592 )
593 .is_ok());
594 {
595 let mut content_str = String::new();
596 let mut file = fs::OpenOptions::new()
597 .read(true)
598 .open(version_file.clone())
599 .expect("Failed to open version file");
600 file.read_to_string(&mut content_str)
601 .expect("Failed to read from version file");
602 assert_eq!(content_str, cur_version);
603
604 drop(file);
605 }
606
607 fs::create_dir_all(&storage_dir).expect("Failed to create storage directory");
608 assert!(fs::metadata(storage_dir.clone()).is_ok());
609
610 let cur_version = uuid::Uuid::new_v4().to_string();
611 assert!(check_and_wipe_storage_dir_if_necessary(
612 root_dir.clone(),
613 storage_dir.clone(),
614 cur_version.clone()
615 )
616 .is_ok());
617 {
618 let mut content_str = String::new();
619 let mut file = fs::OpenOptions::new()
620 .read(true)
621 .open(version_file.clone())
622 .expect("Failed to open version file");
623 file.read_to_string(&mut content_str)
624 .expect("Failed to read from version file");
625 assert_eq!(content_str, cur_version);
626
627 drop(file);
628 }
629 assert!(fs::metadata(storage_dir.clone()).is_err());
631 }
632}