libp2prs_core/
peerstore.rs

1// Copyright 2020 Netwarps Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use serde::{Deserialize, Serialize};
22use std::collections::{HashMap, HashSet};
23use std::fs::File;
24use std::io;
25use std::io::{Read, Write};
26use std::str::FromStr;
27use std::sync::{Arc, Mutex};
28use std::time::{Duration, Instant};
29
30use crate::{PeerId, PublicKey};
31use libp2prs_multiaddr::Multiaddr;
32
33pub const ADDRESS_TTL: Duration = Duration::from_secs(60 * 60);
34pub const TEMP_ADDR_TTL: Duration = Duration::from_secs(2 * 60);
35pub const PROVIDER_ADDR_TTL: Duration = Duration::from_secs(10 * 60);
36pub const RECENTLY_CONNECTED_ADDR_TTL: Duration = Duration::from_secs(10 * 60);
37pub const OWN_OBSERVED_ADDR_TTL: Duration = Duration::from_secs(10 * 60);
38
39pub const PERMANENT_ADDR_TTL: Duration = Duration::from_secs(u64::MAX - 1);
40pub const CONNECTED_ADDR_TTL: Duration = Duration::from_secs(u64::MAX - 2);
41
42#[derive(Default, Clone)]
43pub struct PeerStore {
44    inner: Arc<Mutex<HashMap<PeerId, PeerRecord>>>,
45}
46
47#[derive(Clone, Debug, Serialize, Deserialize)]
48pub struct PeerSaved {
49    addr: Multiaddr,
50    ttl: Duration,
51}
52
53/// The PeerInfo represents a remote peer and its elements.
54#[derive(Clone)]
55struct PeerRecord {
56    /// Indicates if this record is currently pinned in peer store.
57    ///
58    /// PeerStore GC will not recycle a pinned record.
59    pinned: bool,
60    /// The multiaddr owned by this peer.
61    addrs: Vec<AddrBookRecord>,
62    /// The public key of the peer.
63    key: Option<PublicKey>,
64    /// The protocols supported by the peer.
65    protos: HashSet<String>,
66}
67
68impl PeerRecord {
69    fn new(addrs: Vec<AddrBookRecord>, key: Option<PublicKey>, protos: HashSet<String>) -> Self {
70        Self {
71            pinned: false,
72            addrs,
73            key,
74            protos,
75        }
76    }
77}
78
79#[derive(Clone, Debug)]
80struct AddrBookRecord {
81    addr: Multiaddr,
82    ttl: Duration,
83    expiry: Instant,
84}
85
86impl Into<Multiaddr> for AddrBookRecord {
87    fn into(self) -> Multiaddr {
88        self.addr
89    }
90}
91
92impl PeerStore {
93    /// Save addr_book when closing swarm
94    pub fn save_data(&self) -> io::Result<()> {
95        let mut ds_addr_book = HashMap::new();
96
97        {
98            let guard = self.inner.lock().unwrap();
99            // Transfer peer_id to String and insert into a new HashMap
100            for (peer_id, value) in guard.iter() {
101                let key = peer_id.to_string();
102                let mut v = Vec::new();
103                // save address info
104                for item in value.addrs.to_vec() {
105                    v.push(PeerSaved {
106                        addr: item.addr,
107                        ttl: item.ttl,
108                    })
109                }
110                ds_addr_book.insert(key, v);
111            }
112        }
113        let json_addrbook = serde_json::to_string(&ds_addr_book)?;
114
115        let mut file = File::create("./ds_addr_book.txt")?;
116        file.write_all(json_addrbook.as_bytes())
117    }
118
119    /// Load addr_book when initializing swarm
120    pub fn load_data(&self) -> io::Result<()> {
121        let mut file = match File::open("./ds_addr_book.txt") {
122            Ok(file) => file,
123            Err(e) => {
124                if e.kind() == io::ErrorKind::NotFound {
125                    File::create("./ds_addr_book.txt")?
126                } else {
127                    return Err(e);
128                }
129            }
130        };
131        let metadata = file.metadata()?;
132        let length = metadata.len() as usize;
133        if length == 0 {
134            return Ok(());
135        }
136        let mut buf = vec![0u8; length];
137
138        // Read data from file and deserialize
139        let _ = file.read_exact(buf.as_mut())?;
140        let json_data: HashMap<String, Vec<PeerSaved>> =
141            serde_json::from_slice(&buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
142
143        // Iter and insert into hashmap
144        let mut guard = self.inner.lock().unwrap();
145        for (key, value) in json_data {
146            let peer_id = PeerId::from_str(&key).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
147            let mut v = Vec::new();
148            for item in value {
149                v.push(AddrBookRecord::new(item.addr, item.ttl));
150            }
151            guard.insert(peer_id, PeerRecord::new(v, None, Default::default()));
152        }
153
154        Ok(())
155    }
156
157    /// Gets all peer Ids in peer store.
158    pub fn get_peers(&self) -> Vec<PeerId> {
159        let guard = self.inner.lock().unwrap();
160        guard.keys().cloned().collect()
161    }
162
163    /// Pins the peer Id so that GC wouldn't recycle the multiaddr of the peer.
164    pub fn pin(&self, peer_id: &PeerId) {
165        let mut guard = self.inner.lock().unwrap();
166        if let Some(pr) = guard.get_mut(peer_id) {
167            pr.pinned = true;
168        }
169    }
170
171    /// Unpins the peer Id.
172    pub fn unpin(&self, peer_id: &PeerId) {
173        let mut guard = self.inner.lock().unwrap();
174        if let Some(pr) = guard.get_mut(peer_id) {
175            pr.pinned = false;
176        }
177    }
178
179    /// Checks if the peer is currently being pinned in peer store.
180    pub fn pinned(&self, peer_id: &PeerId) -> bool {
181        let guard = self.inner.lock().unwrap();
182        guard.get(peer_id).map_or(false, |pr| pr.pinned)
183    }
184
185    /// Adds public key by peer_id.
186    pub fn add_key(&self, peer_id: &PeerId, key: PublicKey) {
187        let mut guard = self.inner.lock().unwrap();
188        if let Some(pr) = guard.get_mut(peer_id) {
189            pr.key = Some(key);
190        }
191    }
192
193    /// Gets public key by peer_id.
194    pub fn get_key(&self, peer_id: &PeerId) -> Option<PublicKey> {
195        let guard = self.inner.lock().unwrap();
196        guard.get(peer_id).and_then(|pr| pr.key.clone())
197    }
198
199    /// Add address to address_book by peer_id, if exists, update rtt.
200    pub fn add_addr(&self, peer_id: &PeerId, addr: Multiaddr, ttl: Duration) {
201        self.add_addrs(peer_id, vec![addr], ttl)
202    }
203
204    /// Adds many new addresses if they're not already in the Address Book.
205    pub fn add_addrs(&self, peer_id: &PeerId, addrs: Vec<Multiaddr>, ttl: Duration) {
206        let mut guard = self.inner.lock().unwrap();
207        if let Some(pr) = guard.get_mut(peer_id) {
208            for addr in addrs {
209                if let Some(record) = pr.addrs.iter_mut().find(|item| item.addr == addr) {
210                    // addr exists, update ttl & expiry
211                    record.set_ttl(ttl);
212                } else {
213                    pr.addrs.push(AddrBookRecord::new(addr, ttl));
214                }
215            }
216        } else {
217            // Peer_id non-exists, create a new PeerRecord and fill with a new AddrBookRecord.
218            let vec = addrs.into_iter().map(|addr| AddrBookRecord::new(addr, ttl)).collect();
219            guard.insert(*peer_id, PeerRecord::new(vec, None, Default::default()));
220        }
221    }
222
223    /// Removes all multiaddr of a peer from peer store.
224    pub fn clear_addrs(&self, peer_id: &PeerId) {
225        let mut guard = self.inner.lock().unwrap();
226        if let Some(pr) = guard.get_mut(peer_id) {
227            pr.addrs.clear();
228        }
229    }
230
231    /// Retrieves the all multiaddr of a peer from the peer store.
232    pub fn get_addrs(&self, peer_id: &PeerId) -> Option<Vec<Multiaddr>> {
233        let guard = self.inner.lock().unwrap();
234        guard.get(peer_id).map(|pr| pr.addrs.iter().map(|a| a.clone().into()).collect())
235    }
236
237    /// Updates the ttl of the multiaddr of the peer.
238    pub fn update_addr(&self, peer_id: &PeerId, new_ttl: Duration) {
239        let mut guard = self.inner.lock().unwrap();
240
241        if let Some(pr) = guard.get_mut(peer_id) {
242            for record in pr.addrs.iter_mut() {
243                record.set_ttl(new_ttl);
244            }
245        }
246    }
247
248    /// Removes all expired address.
249    pub fn remove_expired_addrs(&self) {
250        let mut to_remove = vec![];
251        let mut guard = self.inner.lock().unwrap();
252        for (peer, pr) in guard.iter_mut() {
253            if !pr.pinned {
254                log::debug!("GC attempt for {:?}", peer);
255                pr.addrs.retain(|record| record.expiry.elapsed() < record.ttl);
256                // delete this peer if no addr at all
257                if pr.addrs.is_empty() {
258                    log::debug!("remove {:?} from peerstore", peer);
259                    to_remove.push(*peer);
260                }
261            }
262        }
263
264        for peer in to_remove {
265            guard.remove(&peer);
266        }
267    }
268
269    /// Adds the supported protocols of a peer to the peer store.
270    pub fn add_protocols(&self, peer_id: &PeerId, protos: Vec<String>) {
271        let mut guard = self.inner.lock().unwrap();
272        if let Some(pr) = guard.get_mut(peer_id) {
273            pr.protos.extend(protos);
274        } else {
275            let mut s = HashSet::new();
276            s.extend(protos);
277            guard.insert(*peer_id, PeerRecord::new(Default::default(), None, s));
278        }
279    }
280
281    /// Clears the protocols by peer_id
282    pub fn clear_protocols(&self, peer_id: &PeerId) {
283        let mut guard = self.inner.lock().unwrap();
284        if let Some(pr) = guard.get_mut(peer_id) {
285            pr.protos.clear();
286        }
287    }
288
289    /// Gets the protocols by peer_id.
290    pub fn get_protocols(&self, peer_id: &PeerId) -> Option<Vec<String>> {
291        let guard = self.inner.lock().unwrap();
292        guard.get(peer_id).map(|pr| pr.protos.iter().cloned().collect())
293    }
294
295    /// Get the first protocol which is matched by the given protocols.
296    pub fn first_supported_protocol(&self, peer_id: &PeerId, protos: Vec<String>) -> Option<String> {
297        let guard = self.inner.lock().unwrap();
298        if let Some(pr) = guard.get(peer_id) {
299            for proto in protos {
300                if pr.protos.contains(&proto) {
301                    return Some(proto);
302                }
303            }
304        }
305        None
306    }
307
308    /// Searches all protocols and return an option that matches by the given protocols.
309    pub fn support_protocols(&self, peer_id: &PeerId, protos: Vec<String>) -> Option<Vec<String>> {
310        let guard = self.inner.lock().unwrap();
311        if let Some(pr) = guard.get(peer_id) {
312            let mut proto_list = Vec::with_capacity(protos.len());
313            for item in protos {
314                if pr.protos.contains(&item) {
315                    proto_list.push(item)
316                }
317            }
318            Some(proto_list)
319        } else {
320            None
321        }
322    }
323}
324
325#[allow(dead_code)]
326impl AddrBookRecord {
327    pub fn new(addr: Multiaddr, ttl: Duration) -> Self {
328        Self {
329            addr,
330            ttl,
331            expiry: Instant::now(),
332        }
333    }
334    /// Get the multiaddr.
335    pub fn get_addr(&self) -> &Multiaddr {
336        &self.addr
337    }
338
339    /// Set the time-to-live. It would also reset the 'expiry'.
340    pub fn set_ttl(&mut self, ttl: Duration) {
341        self.ttl = ttl;
342        self.expiry = Instant::now();
343    }
344}
345
346#[cfg(test)]
347mod tests {
348    use crate::identity::Keypair;
349    use crate::peerstore::{PeerStore, ADDRESS_TTL};
350    use crate::PeerId;
351    use libp2prs_multiaddr::Multiaddr;
352    use std::time::Duration;
353
354    #[test]
355    fn addr_basic() {
356        let keypair = Keypair::generate_secp256k1();
357        let peer_id = PeerId::from_public_key(keypair.public());
358
359        let peerstore = PeerStore::default();
360
361        peerstore.add_key(&peer_id, keypair.public());
362        peerstore.add_addr(&peer_id, "/memory/123456".parse().unwrap(), Duration::from_secs(1));
363
364        assert_eq!(
365            peerstore.get_addrs(&peer_id).unwrap().first().unwrap(),
366            &"/memory/123456".parse::<Multiaddr>().unwrap()
367        );
368
369        peerstore.add_addr(&peer_id, "/memory/654321".parse().unwrap(), Duration::from_secs(1));
370        let addrs = peerstore.get_addrs(&peer_id).unwrap();
371        assert_eq!(addrs.len(), 2);
372
373        peerstore.add_addr(&peer_id, "/memory/654321".parse().unwrap(), Duration::from_secs(1));
374        let addrs = peerstore.get_addrs(&peer_id).unwrap();
375        assert_eq!(addrs.len(), 2);
376
377        peerstore.clear_addrs(&peer_id);
378        assert_eq!(peerstore.get_addrs(&peer_id).unwrap().len(), 0);
379    }
380
381    #[test]
382    fn proto_basic() {
383        let keypair = Keypair::generate_secp256k1();
384        let peer_id = PeerId::from_public_key(keypair.public());
385
386        let peerstore = PeerStore::default();
387
388        let proto_list = vec!["/libp2p/secio/1.0.0".to_string(), "/libp2p/yamux/1.0.0".to_string()];
389
390        peerstore.add_key(&peer_id, keypair.public());
391        peerstore.add_protocols(&peer_id, proto_list.clone());
392
393        let p = peerstore.get_protocols(&peer_id).unwrap();
394        // let p = peerstore.get_protocol(&peer_id).unwrap();
395
396        for i in proto_list {
397            if p.contains(&i) {
398                continue;
399            } else {
400                unreachable!()
401            }
402        }
403
404        let optional_list = vec!["/libp2p/noise/1.0.0".to_string(), "/libp2p/yamux/1.0.0".to_string()];
405        let protocol = peerstore.first_supported_protocol(&peer_id, optional_list);
406        assert_eq!(protocol.unwrap(), "/libp2p/yamux/1.0.0");
407
408        let option_support_list = vec![
409            "/libp2p/secio/1.0.0".to_string(),
410            "/libp2p/noise/1.0.0".to_string(),
411            "/libp2p/yamux/1.0.0".to_string(),
412        ];
413        let support_protocol = peerstore.support_protocols(&peer_id, option_support_list);
414        assert_eq!(
415            support_protocol.unwrap(),
416            vec!["/libp2p/secio/1.0.0".to_string(), "/libp2p/yamux/1.0.0".to_string()]
417        );
418    }
419
420    #[test]
421    fn peerstore_basic() {
422        let keypair = Keypair::generate_secp256k1();
423        let peer_id = PeerId::from_public_key(keypair.public());
424
425        let addrs = vec!["/memory/123456".parse().unwrap(), "/memory/123456".parse().unwrap()];
426        let protos = vec!["/libp2p/secio/1.0.0".to_string(), "/libp2p/yamux/1.0.0".to_string()];
427
428        let ps = PeerStore::default();
429        ps.add_key(&peer_id, keypair.public());
430        ps.add_addrs(&peer_id, addrs, ADDRESS_TTL);
431        ps.add_protocols(&peer_id, protos);
432
433        let optional_list = vec!["/libp2p/noise/1.0.0".to_string(), "/libp2p/yamux/1.0.0".to_string()];
434        let protocol = ps.first_supported_protocol(&peer_id, optional_list);
435        assert_eq!(protocol.unwrap(), "/libp2p/yamux/1.0.0");
436
437        let option_support_list = vec![
438            "/libp2p/secio/1.0.0".to_string(),
439            "/libp2p/noise/1.0.0".to_string(),
440            "/libp2p/yamux/1.0.0".to_string(),
441        ];
442        let support_protocol = ps.support_protocols(&peer_id, option_support_list);
443        assert_eq!(
444            support_protocol.unwrap(),
445            vec!["/libp2p/secio/1.0.0".to_string(), "/libp2p/yamux/1.0.0".to_string()]
446        );
447    }
448
449    #[test]
450    fn peerstore_gc() {
451        let peer_id = PeerId::random();
452        let addrs = vec!["/memory/123456".parse().unwrap()];
453
454        let ps = PeerStore::default();
455        ps.add_addrs(&peer_id, addrs, Duration::from_secs(5));
456        ps.pin(&peer_id);
457        assert!(ps.get_addrs(&peer_id).is_some());
458
459        std::thread::sleep(Duration::from_secs(5));
460        ps.remove_expired_addrs();
461        assert!(ps.get_addrs(&peer_id).is_some());
462
463        ps.unpin(&peer_id);
464        ps.remove_expired_addrs();
465        assert!(ps.get_addrs(&peer_id).is_none());
466    }
467}