ant_networking/
network_builder.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use crate::{
10    bootstrap::{InitialBootstrap, InitialBootstrapTrigger},
11    circular_vec::CircularVec,
12    driver::NodeBehaviour,
13    error::{NetworkError, Result},
14    event::NetworkEvent,
15    external_address::ExternalAddressManager,
16    network_discovery::NetworkDiscovery,
17    record_store::{NodeRecordStore, NodeRecordStoreConfig},
18    relay_manager::RelayManager,
19    replication_fetcher::ReplicationFetcher,
20    time::Instant,
21    transport, Network, SwarmDriver, CLOSE_GROUP_SIZE,
22};
23#[cfg(feature = "open-metrics")]
24use crate::{
25    metrics::service::run_metrics_server, metrics::NetworkMetricsRecorder, MetricsRegistries,
26};
27use ant_bootstrap::BootstrapCacheStore;
28use ant_protocol::{
29    version::{get_network_id_str, IDENTIFY_PROTOCOL_STR, REQ_RESPONSE_VERSION_STR},
30    NetworkAddress, PrettyPrintKBucketKey,
31};
32use futures::future::Either;
33use libp2p::Transport as _;
34use libp2p::{core::muxing::StreamMuxerBox, relay};
35use libp2p::{
36    identity::Keypair,
37    kad,
38    multiaddr::Protocol,
39    request_response::{self, Config as RequestResponseConfig, ProtocolSupport},
40    swarm::{StreamProtocol, Swarm},
41    Multiaddr, PeerId,
42};
43#[cfg(feature = "open-metrics")]
44use prometheus_client::metrics::info::Info;
45use rand::Rng;
46use std::{
47    convert::TryInto,
48    fmt::Debug,
49    fs,
50    io::{Read, Write},
51    net::SocketAddr,
52    num::NonZeroUsize,
53    path::PathBuf,
54    time::Duration,
55};
56use tokio::sync::mpsc;
57
58// Timeout for requests sent/received through the request_response behaviour.
59const REQUEST_TIMEOUT_DEFAULT_S: Duration = Duration::from_secs(30);
60// Sets the keep-alive timeout of idle connections.
61const CONNECTION_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(10);
62
63// Inverval of resending identify to connected peers.
64const RESEND_IDENTIFY_INVERVAL: Duration = Duration::from_secs(3600);
65
66const NETWORKING_CHANNEL_SIZE: usize = 10_000;
67
68/// Time before a Kad query times out if no response is received
69const KAD_QUERY_TIMEOUT_S: Duration = Duration::from_secs(10);
70
71// Init during compilation, instead of runtime error that should never happen
72// Option<T>::expect will be stabilised as const in the future (https://github.com/rust-lang/rust/issues/67441)
73const REPLICATION_FACTOR: NonZeroUsize = match NonZeroUsize::new(CLOSE_GROUP_SIZE + 2) {
74    Some(v) => v,
75    None => panic!("CLOSE_GROUP_SIZE should not be zero"),
76};
77
78const KAD_STREAM_PROTOCOL_ID: StreamProtocol = StreamProtocol::new("/autonomi/kad/1.0.0");
79
80/// What is the largest packet to send over the network.
81/// Records larger than this will be rejected.
82pub const MAX_PACKET_SIZE: usize = 1024 * 1024 * 5; // the chunk size is 1mb, so should be higher than that to prevent failures
83
84/// Interval to trigger native libp2p::kad bootstrap.
85/// This is the max time it should take. Minimum interval at any node will be half this
86const PERIODIC_KAD_BOOTSTRAP_INTERVAL_MAX_S: u64 = 21600;
87
88#[derive(Debug)]
89pub struct NetworkBuilder {
90    bootstrap_cache: Option<BootstrapCacheStore>,
91    concurrency_limit: Option<usize>,
92    initial_contacts: Vec<Multiaddr>,
93    keypair: Keypair,
94    listen_addr: Option<SocketAddr>,
95    local: bool,
96    #[cfg(feature = "open-metrics")]
97    metrics_registries: Option<MetricsRegistries>,
98    #[cfg(feature = "open-metrics")]
99    metrics_server_port: Option<u16>,
100    no_upnp: bool,
101    relay_client: bool,
102    request_timeout: Option<Duration>,
103}
104
105impl NetworkBuilder {
106    pub fn new(keypair: Keypair, local: bool, initial_contacts: Vec<Multiaddr>) -> Self {
107        Self {
108            bootstrap_cache: None,
109            concurrency_limit: None,
110            initial_contacts,
111            keypair,
112            listen_addr: None,
113            local,
114            #[cfg(feature = "open-metrics")]
115            metrics_registries: None,
116            #[cfg(feature = "open-metrics")]
117            metrics_server_port: None,
118            no_upnp: true,
119            relay_client: false,
120            request_timeout: None,
121        }
122    }
123
124    pub fn bootstrap_cache(&mut self, bootstrap_cache: BootstrapCacheStore) {
125        self.bootstrap_cache = Some(bootstrap_cache);
126    }
127
128    pub fn relay_client(&mut self, relay_client: bool) {
129        self.relay_client = relay_client;
130    }
131
132    pub fn listen_addr(&mut self, listen_addr: SocketAddr) {
133        self.listen_addr = Some(listen_addr);
134    }
135
136    pub fn request_timeout(&mut self, request_timeout: Duration) {
137        self.request_timeout = Some(request_timeout);
138    }
139
140    pub fn concurrency_limit(&mut self, concurrency_limit: usize) {
141        self.concurrency_limit = Some(concurrency_limit);
142    }
143
144    /// Set the registries used inside the metrics server.
145    /// Configure the `metrics_server_port` to enable the metrics server.
146    #[cfg(feature = "open-metrics")]
147    pub fn metrics_registries(&mut self, registries: MetricsRegistries) {
148        self.metrics_registries = Some(registries);
149    }
150
151    #[cfg(feature = "open-metrics")]
152    /// The metrics server is enabled only if the port is provided.
153    pub fn metrics_server_port(&mut self, port: Option<u16>) {
154        self.metrics_server_port = port;
155    }
156
157    pub fn no_upnp(&mut self, no_upnp: bool) {
158        self.no_upnp = no_upnp;
159    }
160
161    /// Creates a new `SwarmDriver` instance, along with a `Network` handle
162    /// for sending commands and an `mpsc::Receiver<NetworkEvent>` for receiving
163    /// network events. It initializes the swarm, sets up the transport, and
164    /// configures the Kademlia and mDNS behaviour for peer discovery.
165    ///
166    /// # Returns
167    ///
168    /// A tuple containing a `Network` handle, an `mpsc::Receiver<NetworkEvent>`,
169    /// and a `SwarmDriver` instance.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if there is a problem initializing the mDNS behaviour.
174    pub fn build_node(
175        self,
176        root_dir: PathBuf,
177    ) -> Result<(Network, mpsc::Receiver<NetworkEvent>, SwarmDriver)> {
178        let bootstrap_interval = rand::thread_rng().gen_range(
179            PERIODIC_KAD_BOOTSTRAP_INTERVAL_MAX_S / 2..PERIODIC_KAD_BOOTSTRAP_INTERVAL_MAX_S,
180        );
181
182        let mut kad_cfg = kad::Config::new(KAD_STREAM_PROTOCOL_ID);
183        let _ = kad_cfg
184            .set_kbucket_inserts(libp2p::kad::BucketInserts::Manual)
185            // how often a node will replicate records that it has stored, aka copying the key-value pair to other nodes
186            // this is a heavier operation than publication, so it is done less frequently
187            // Set to `None` to ensure periodic replication disabled.
188            .set_replication_interval(None)
189            // how often a node will publish a record key, aka telling the others it exists
190            // Set to `None` to ensure periodic publish disabled.
191            .set_publication_interval(None)
192            // 1mb packet size
193            .set_max_packet_size(MAX_PACKET_SIZE)
194            // How many nodes _should_ store data.
195            .set_replication_factor(REPLICATION_FACTOR)
196            .set_query_timeout(KAD_QUERY_TIMEOUT_S)
197            // may consider to use disjoint paths for increased resiliency in the presence of potentially adversarial nodes.
198            // however, this has the risk of libp2p report back partial-correct result in case of high peer query failure rate.
199            // .disjoint_query_paths(true)
200            // Records never expire
201            .set_record_ttl(None)
202            .set_periodic_bootstrap_interval(Some(Duration::from_secs(bootstrap_interval)))
203            // Emit PUT events for validation prior to insertion into the RecordStore.
204            // This is no longer needed as the record_storage::put now can carry out validation.
205            // .set_record_filtering(KademliaStoreInserts::FilterBoth)
206            // Disable provider records publication job
207            .set_provider_publication_interval(None);
208
209        let store_cfg = {
210            let storage_dir_path = root_dir.join("record_store");
211            // In case the node instanace is restarted for a different version of network,
212            // the previous storage folder shall be wiped out,
213            // to avoid bring old data into new network.
214            check_and_wipe_storage_dir_if_necessary(
215                root_dir.clone(),
216                storage_dir_path.clone(),
217                get_network_id_str(),
218            )?;
219
220            // Configures the disk_store to store records under the provided path and increase the max record size
221            // The storage dir is appendixed with key_version str to avoid bringing records from old network into new
222
223            if let Err(error) = std::fs::create_dir_all(&storage_dir_path) {
224                return Err(NetworkError::FailedToCreateRecordStoreDir {
225                    path: storage_dir_path,
226                    source: error,
227                });
228            }
229            let peer_id = PeerId::from(self.keypair.public());
230            let encryption_seed: [u8; 16] = peer_id
231                .to_bytes()
232                .get(..16)
233                .expect("Cann't get encryption_seed from keypair")
234                .try_into()
235                .expect("Cann't get 16 bytes from serialised key_pair");
236            NodeRecordStoreConfig {
237                max_value_bytes: MAX_PACKET_SIZE, // TODO, does this need to be _less_ than MAX_PACKET_SIZE
238                storage_dir: storage_dir_path,
239                historic_quote_dir: root_dir.clone(),
240                encryption_seed,
241                ..Default::default()
242            }
243        };
244
245        let listen_addr = self.listen_addr;
246
247        let (network, events_receiver, mut swarm_driver) =
248            self.build(kad_cfg, store_cfg, ProtocolSupport::Full);
249
250        // Listen on the provided address
251        let listen_socket_addr = listen_addr.ok_or(NetworkError::ListenAddressNotProvided)?;
252
253        // Listen on QUIC
254        let addr_quic = Multiaddr::from(listen_socket_addr.ip())
255            .with(Protocol::Udp(listen_socket_addr.port()))
256            .with(Protocol::QuicV1);
257        swarm_driver
258            .listen_on(addr_quic)
259            .expect("Multiaddr should be supported by our configured transports");
260
261        Ok((network, events_receiver, swarm_driver))
262    }
263
264    /// Private helper to create the network components with the provided config and req/res behaviour
265    fn build(
266        self,
267        kad_cfg: kad::Config,
268        record_store_cfg: NodeRecordStoreConfig,
269        req_res_protocol: ProtocolSupport,
270    ) -> (Network, mpsc::Receiver<NetworkEvent>, SwarmDriver) {
271        let identify_protocol_str = IDENTIFY_PROTOCOL_STR
272            .read()
273            .expect("Failed to obtain read lock for IDENTIFY_PROTOCOL_STR")
274            .clone();
275
276        let peer_id = PeerId::from(self.keypair.public());
277        // vdash metric (if modified please notify at https://github.com/happybeing/vdash/issues):
278        info!(
279            "Process (PID: {}) with PeerId: {peer_id}",
280            std::process::id()
281        );
282        info!(
283            "Self PeerID {peer_id} is represented as kbucket_key {:?}",
284            PrettyPrintKBucketKey(NetworkAddress::from(peer_id).as_kbucket_key())
285        );
286
287        #[cfg(feature = "open-metrics")]
288        let mut metrics_registries = self.metrics_registries.unwrap_or_default();
289
290        // ==== Transport ====
291        #[cfg(feature = "open-metrics")]
292        let main_transport = transport::build_transport(&self.keypair, &mut metrics_registries);
293        #[cfg(not(feature = "open-metrics"))]
294        let main_transport = transport::build_transport(&self.keypair);
295        let transport = if !self.local {
296            debug!("Preventing non-global dials");
297            // Wrap upper in a transport that prevents dialing local addresses.
298            libp2p::core::transport::global_only::Transport::new(main_transport).boxed()
299        } else {
300            main_transport
301        };
302
303        let (relay_transport, relay_behaviour) =
304            libp2p::relay::client::new(self.keypair.public().to_peer_id());
305        let relay_transport = relay_transport
306            .upgrade(libp2p::core::upgrade::Version::V1Lazy)
307            .authenticate(
308                libp2p::noise::Config::new(&self.keypair)
309                    .expect("Signing libp2p-noise static DH keypair failed."),
310            )
311            .multiplex(libp2p::yamux::Config::default())
312            .or_transport(transport);
313
314        let transport = relay_transport
315            .map(|either_output, _| match either_output {
316                Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
317                Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
318            })
319            .boxed();
320
321        #[cfg(feature = "open-metrics")]
322        let metrics_recorder = if let Some(port) = self.metrics_server_port {
323            let metrics_recorder = NetworkMetricsRecorder::new(&mut metrics_registries);
324            let metadata_sub_reg = metrics_registries
325                .metadata
326                .sub_registry_with_prefix("ant_networking");
327
328            metadata_sub_reg.register(
329                "peer_id",
330                "Identifier of a peer of the network",
331                Info::new(vec![("peer_id".to_string(), peer_id.to_string())]),
332            );
333            metadata_sub_reg.register(
334                "identify_protocol_str",
335                "The protocol version string that is used to connect to the correct network",
336                Info::new(vec![(
337                    "identify_protocol_str".to_string(),
338                    identify_protocol_str.clone(),
339                )]),
340            );
341
342            run_metrics_server(metrics_registries, port);
343            Some(metrics_recorder)
344        } else {
345            None
346        };
347
348        // RequestResponse Behaviour
349        let request_response = {
350            let cfg = RequestResponseConfig::default()
351                .with_request_timeout(self.request_timeout.unwrap_or(REQUEST_TIMEOUT_DEFAULT_S));
352            let req_res_version_str = REQ_RESPONSE_VERSION_STR
353                .read()
354                .expect("Failed to obtain read lock for REQ_RESPONSE_VERSION_STR")
355                .clone();
356
357            info!("Building request response with {req_res_version_str:?}",);
358            request_response::cbor::Behaviour::new(
359                [(
360                    StreamProtocol::try_from_owned(req_res_version_str)
361                        .expect("StreamProtocol should start with a /"),
362                    req_res_protocol,
363                )],
364                cfg,
365            )
366        };
367
368        let (network_event_sender, network_event_receiver) = mpsc::channel(NETWORKING_CHANNEL_SIZE);
369        let (network_swarm_cmd_sender, network_swarm_cmd_receiver) =
370            mpsc::channel(NETWORKING_CHANNEL_SIZE);
371        let (local_swarm_cmd_sender, local_swarm_cmd_receiver) =
372            mpsc::channel(NETWORKING_CHANNEL_SIZE);
373
374        // Kademlia Behaviour
375        let kademlia = {
376            #[cfg(feature = "open-metrics")]
377            let record_stored_metrics = metrics_recorder.as_ref().map(|r| r.records_stored.clone());
378            let node_record_store = NodeRecordStore::with_config(
379                peer_id,
380                record_store_cfg,
381                network_event_sender.clone(),
382                local_swarm_cmd_sender.clone(),
383                #[cfg(feature = "open-metrics")]
384                record_stored_metrics,
385            );
386
387            let store = node_record_store;
388            debug!("Using Kademlia with NodeRecordStore!");
389            kad::Behaviour::with_config(peer_id, store, kad_cfg)
390        };
391
392        let agent_version =
393            ant_protocol::version::construct_node_user_agent(env!("CARGO_PKG_VERSION").to_string());
394
395        // Identify Behaviour
396        info!("Building Identify with identify_protocol_str: {identify_protocol_str:?} and agent_version: {agent_version:?}");
397        let identify = {
398            let cfg = libp2p::identify::Config::new(identify_protocol_str, self.keypair.public())
399                .with_agent_version(agent_version)
400                // Enlength the identify interval from default 5 mins to 1 hour.
401                .with_interval(RESEND_IDENTIFY_INVERVAL)
402                .with_hide_listen_addrs(true);
403            libp2p::identify::Behaviour::new(cfg)
404        };
405
406        let upnp = if !self.local && !self.no_upnp && !self.relay_client {
407            debug!("Enabling UPnP port opening behavior");
408            Some(libp2p::upnp::tokio::Behaviour::default())
409        } else {
410            None
411        }
412        .into(); // Into `Toggle<T>`
413
414        let relay_server = if !self.relay_client {
415            let relay_server_cfg = relay::Config {
416                max_reservations: 128,             // Amount of peers we are relaying for
417                max_circuits: 1024, // The total amount of relayed connections at any given moment.
418                max_circuits_per_peer: 256, // Amount of relayed connections per peer (both dst and src)
419                circuit_src_rate_limiters: vec![], // No extra rate limiting for now
420                // We should at least be able to relay packets with chunks etc.
421                max_circuit_bytes: MAX_PACKET_SIZE as u64,
422                ..Default::default()
423            };
424            Some(libp2p::relay::Behaviour::new(peer_id, relay_server_cfg))
425        } else {
426            None
427        }
428        .into();
429
430        let behaviour = NodeBehaviour {
431            blocklist: libp2p::allow_block_list::Behaviour::default(),
432            do_not_disturb: crate::behaviour::do_not_disturb::Behaviour::default(),
433            // `Relay client Behaviour` is enabled for all nodes. This is required for normal nodes to connect to relay
434            // clients.
435            relay_client: relay_behaviour,
436            relay_server,
437            upnp,
438            request_response,
439            kademlia,
440            identify,
441        };
442
443        let swarm_config = libp2p::swarm::Config::with_tokio_executor()
444            .with_idle_connection_timeout(CONNECTION_KEEP_ALIVE_TIMEOUT);
445
446        let swarm = Swarm::new(transport, behaviour, peer_id, swarm_config);
447
448        let replication_fetcher = ReplicationFetcher::new(peer_id, network_event_sender.clone());
449
450        // Enable relay manager to allow the node to act as a relay client and connect via relay servers to the network
451        let relay_manager = if self.relay_client {
452            let relay_manager = RelayManager::new(peer_id);
453            #[cfg(feature = "open-metrics")]
454            let mut relay_manager = relay_manager;
455            #[cfg(feature = "open-metrics")]
456            if let Some(metrics_recorder) = &metrics_recorder {
457                relay_manager.set_reservation_health_metrics(
458                    metrics_recorder.relay_reservation_health.clone(),
459                );
460            }
461            Some(relay_manager)
462        } else {
463            info!("Relay manager is disabled for this node.");
464            None
465        };
466
467        // Enable external address manager for public nodes and not behind nat.
468        // We don't get a peer's address from external address list anymore. But we should still
469        // advertise our external addresses, as the older nodes still rely on the old mechanism.
470        let external_address_manager = if !self.local && !self.relay_client {
471            Some(ExternalAddressManager::new(peer_id))
472        } else {
473            info!("External address manager is disabled for this node.");
474            None
475        };
476
477        let is_upnp_enabled = swarm.behaviour().upnp.is_enabled();
478        let swarm_driver = SwarmDriver {
479            swarm,
480            self_peer_id: peer_id,
481            local: self.local,
482            is_relay_client: self.relay_client,
483            #[cfg(feature = "open-metrics")]
484            close_group: Vec::with_capacity(CLOSE_GROUP_SIZE),
485            peers_in_rt: 0,
486            initial_bootstrap: InitialBootstrap::new(self.initial_contacts),
487            initial_bootstrap_trigger: InitialBootstrapTrigger::new(is_upnp_enabled),
488            bootstrap_cache: self.bootstrap_cache,
489            dial_queue: Default::default(),
490            relay_manager,
491            connected_relay_clients: Default::default(),
492            external_address_manager,
493            replication_fetcher,
494            #[cfg(feature = "open-metrics")]
495            metrics_recorder,
496            // kept here to ensure we can push messages to the channel
497            // and not block the processing thread unintentionally
498            network_cmd_sender: network_swarm_cmd_sender.clone(),
499            network_cmd_receiver: network_swarm_cmd_receiver,
500            local_cmd_sender: local_swarm_cmd_sender.clone(),
501            local_cmd_receiver: local_swarm_cmd_receiver,
502            event_sender: network_event_sender,
503            pending_get_closest_peers: Default::default(),
504            pending_requests: Default::default(),
505            // We use 255 here which allows covering a network larger than 64k without any rotating.
506            // This is based on the libp2p kad::kBuckets peers distribution.
507            dialed_peers: CircularVec::new(255),
508            network_discovery: NetworkDiscovery::new(&peer_id),
509            live_connected_peers: Default::default(),
510            latest_established_connection_ids: Default::default(),
511            handling_statistics: Default::default(),
512            handled_times: 0,
513            hard_disk_write_error: 0,
514            bad_nodes: Default::default(),
515            quotes_history: Default::default(),
516            replication_targets: Default::default(),
517            last_replication: None,
518            last_connection_pruning_time: Instant::now(),
519            peers_version: Default::default(),
520        };
521
522        let network = Network::new(
523            network_swarm_cmd_sender,
524            local_swarm_cmd_sender,
525            peer_id,
526            self.keypair,
527        );
528
529        (network, network_event_receiver, swarm_driver)
530    }
531}
532
533fn check_and_wipe_storage_dir_if_necessary(
534    root_dir: PathBuf,
535    storage_dir_path: PathBuf,
536    cur_version_str: String,
537) -> Result<()> {
538    let mut prev_version_str = String::new();
539    let version_file = root_dir.join("network_key_version");
540    {
541        match fs::File::open(version_file.clone()) {
542            Ok(mut file) => {
543                file.read_to_string(&mut prev_version_str)?;
544            }
545            Err(err) => {
546                warn!("Failed in accessing version file {version_file:?}: {err:?}");
547                // Assuming file was not created yet
548                info!("Creating a new version file at {version_file:?}");
549                fs::File::create(version_file.clone())?;
550            }
551        }
552    }
553
554    // In case of version mismatch:
555    //   * the storage_dir shall be wiped out
556    //   * the version file shall be updated
557    if cur_version_str != prev_version_str {
558        warn!("Trying to wipe out storage dir {storage_dir_path:?}, as cur_version {cur_version_str:?} doesn't match prev_version {prev_version_str:?}");
559        let _ = fs::remove_dir_all(storage_dir_path);
560
561        let mut file = fs::OpenOptions::new()
562            .write(true)
563            .truncate(true)
564            .open(version_file.clone())?;
565        info!("Writing cur_version {cur_version_str:?} into version file at {version_file:?}");
566        file.write_all(cur_version_str.as_bytes())?;
567    }
568
569    Ok(())
570}
571
572#[cfg(test)]
573mod tests {
574    use super::check_and_wipe_storage_dir_if_necessary;
575    use std::{fs, io::Read};
576
577    #[tokio::test]
578    async fn version_file_update() {
579        let temp_dir = std::env::temp_dir();
580        let unique_dir_name = uuid::Uuid::new_v4().to_string();
581        let root_dir = temp_dir.join(unique_dir_name);
582        fs::create_dir_all(&root_dir).expect("Failed to create root directory");
583
584        let version_file = root_dir.join("network_key_version");
585        let storage_dir = root_dir.join("record_store");
586
587        let cur_version = uuid::Uuid::new_v4().to_string();
588        assert!(check_and_wipe_storage_dir_if_necessary(
589            root_dir.clone(),
590            storage_dir.clone(),
591            cur_version.clone()
592        )
593        .is_ok());
594        {
595            let mut content_str = String::new();
596            let mut file = fs::OpenOptions::new()
597                .read(true)
598                .open(version_file.clone())
599                .expect("Failed to open version file");
600            file.read_to_string(&mut content_str)
601                .expect("Failed to read from version file");
602            assert_eq!(content_str, cur_version);
603
604            drop(file);
605        }
606
607        fs::create_dir_all(&storage_dir).expect("Failed to create storage directory");
608        assert!(fs::metadata(storage_dir.clone()).is_ok());
609
610        let cur_version = uuid::Uuid::new_v4().to_string();
611        assert!(check_and_wipe_storage_dir_if_necessary(
612            root_dir.clone(),
613            storage_dir.clone(),
614            cur_version.clone()
615        )
616        .is_ok());
617        {
618            let mut content_str = String::new();
619            let mut file = fs::OpenOptions::new()
620                .read(true)
621                .open(version_file.clone())
622                .expect("Failed to open version file");
623            file.read_to_string(&mut content_str)
624                .expect("Failed to read from version file");
625            assert_eq!(content_str, cur_version);
626
627            drop(file);
628        }
629        // The storage_dir shall be removed as version_key changed
630        assert!(fs::metadata(storage_dir.clone()).is_err());
631    }
632}