1use serde::{Deserialize, Serialize};
22use std::collections::{HashMap, HashSet};
23use std::fs::File;
24use std::io;
25use std::io::{Read, Write};
26use std::str::FromStr;
27use std::sync::{Arc, Mutex};
28use std::time::{Duration, Instant};
29
30use crate::{PeerId, PublicKey};
31use libp2prs_multiaddr::Multiaddr;
32
33pub const ADDRESS_TTL: Duration = Duration::from_secs(60 * 60);
34pub const TEMP_ADDR_TTL: Duration = Duration::from_secs(2 * 60);
35pub const PROVIDER_ADDR_TTL: Duration = Duration::from_secs(10 * 60);
36pub const RECENTLY_CONNECTED_ADDR_TTL: Duration = Duration::from_secs(10 * 60);
37pub const OWN_OBSERVED_ADDR_TTL: Duration = Duration::from_secs(10 * 60);
38
39pub const PERMANENT_ADDR_TTL: Duration = Duration::from_secs(u64::MAX - 1);
40pub const CONNECTED_ADDR_TTL: Duration = Duration::from_secs(u64::MAX - 2);
41
42#[derive(Default, Clone)]
43pub struct PeerStore {
44 inner: Arc<Mutex<HashMap<PeerId, PeerRecord>>>,
45}
46
47#[derive(Clone, Debug, Serialize, Deserialize)]
48pub struct PeerSaved {
49 addr: Multiaddr,
50 ttl: Duration,
51}
52
53#[derive(Clone)]
55struct PeerRecord {
56 pinned: bool,
60 addrs: Vec<AddrBookRecord>,
62 key: Option<PublicKey>,
64 protos: HashSet<String>,
66}
67
68impl PeerRecord {
69 fn new(addrs: Vec<AddrBookRecord>, key: Option<PublicKey>, protos: HashSet<String>) -> Self {
70 Self {
71 pinned: false,
72 addrs,
73 key,
74 protos,
75 }
76 }
77}
78
79#[derive(Clone, Debug)]
80struct AddrBookRecord {
81 addr: Multiaddr,
82 ttl: Duration,
83 expiry: Instant,
84}
85
86impl Into<Multiaddr> for AddrBookRecord {
87 fn into(self) -> Multiaddr {
88 self.addr
89 }
90}
91
92impl PeerStore {
93 pub fn save_data(&self) -> io::Result<()> {
95 let mut ds_addr_book = HashMap::new();
96
97 {
98 let guard = self.inner.lock().unwrap();
99 for (peer_id, value) in guard.iter() {
101 let key = peer_id.to_string();
102 let mut v = Vec::new();
103 for item in value.addrs.to_vec() {
105 v.push(PeerSaved {
106 addr: item.addr,
107 ttl: item.ttl,
108 })
109 }
110 ds_addr_book.insert(key, v);
111 }
112 }
113 let json_addrbook = serde_json::to_string(&ds_addr_book)?;
114
115 let mut file = File::create("./ds_addr_book.txt")?;
116 file.write_all(json_addrbook.as_bytes())
117 }
118
119 pub fn load_data(&self) -> io::Result<()> {
121 let mut file = match File::open("./ds_addr_book.txt") {
122 Ok(file) => file,
123 Err(e) => {
124 if e.kind() == io::ErrorKind::NotFound {
125 File::create("./ds_addr_book.txt")?
126 } else {
127 return Err(e);
128 }
129 }
130 };
131 let metadata = file.metadata()?;
132 let length = metadata.len() as usize;
133 if length == 0 {
134 return Ok(());
135 }
136 let mut buf = vec![0u8; length];
137
138 let _ = file.read_exact(buf.as_mut())?;
140 let json_data: HashMap<String, Vec<PeerSaved>> =
141 serde_json::from_slice(&buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
142
143 let mut guard = self.inner.lock().unwrap();
145 for (key, value) in json_data {
146 let peer_id = PeerId::from_str(&key).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
147 let mut v = Vec::new();
148 for item in value {
149 v.push(AddrBookRecord::new(item.addr, item.ttl));
150 }
151 guard.insert(peer_id, PeerRecord::new(v, None, Default::default()));
152 }
153
154 Ok(())
155 }
156
157 pub fn get_peers(&self) -> Vec<PeerId> {
159 let guard = self.inner.lock().unwrap();
160 guard.keys().cloned().collect()
161 }
162
163 pub fn pin(&self, peer_id: &PeerId) {
165 let mut guard = self.inner.lock().unwrap();
166 if let Some(pr) = guard.get_mut(peer_id) {
167 pr.pinned = true;
168 }
169 }
170
171 pub fn unpin(&self, peer_id: &PeerId) {
173 let mut guard = self.inner.lock().unwrap();
174 if let Some(pr) = guard.get_mut(peer_id) {
175 pr.pinned = false;
176 }
177 }
178
179 pub fn pinned(&self, peer_id: &PeerId) -> bool {
181 let guard = self.inner.lock().unwrap();
182 guard.get(peer_id).map_or(false, |pr| pr.pinned)
183 }
184
185 pub fn add_key(&self, peer_id: &PeerId, key: PublicKey) {
187 let mut guard = self.inner.lock().unwrap();
188 if let Some(pr) = guard.get_mut(peer_id) {
189 pr.key = Some(key);
190 }
191 }
192
193 pub fn get_key(&self, peer_id: &PeerId) -> Option<PublicKey> {
195 let guard = self.inner.lock().unwrap();
196 guard.get(peer_id).and_then(|pr| pr.key.clone())
197 }
198
199 pub fn add_addr(&self, peer_id: &PeerId, addr: Multiaddr, ttl: Duration) {
201 self.add_addrs(peer_id, vec![addr], ttl)
202 }
203
204 pub fn add_addrs(&self, peer_id: &PeerId, addrs: Vec<Multiaddr>, ttl: Duration) {
206 let mut guard = self.inner.lock().unwrap();
207 if let Some(pr) = guard.get_mut(peer_id) {
208 for addr in addrs {
209 if let Some(record) = pr.addrs.iter_mut().find(|item| item.addr == addr) {
210 record.set_ttl(ttl);
212 } else {
213 pr.addrs.push(AddrBookRecord::new(addr, ttl));
214 }
215 }
216 } else {
217 let vec = addrs.into_iter().map(|addr| AddrBookRecord::new(addr, ttl)).collect();
219 guard.insert(*peer_id, PeerRecord::new(vec, None, Default::default()));
220 }
221 }
222
223 pub fn clear_addrs(&self, peer_id: &PeerId) {
225 let mut guard = self.inner.lock().unwrap();
226 if let Some(pr) = guard.get_mut(peer_id) {
227 pr.addrs.clear();
228 }
229 }
230
231 pub fn get_addrs(&self, peer_id: &PeerId) -> Option<Vec<Multiaddr>> {
233 let guard = self.inner.lock().unwrap();
234 guard.get(peer_id).map(|pr| pr.addrs.iter().map(|a| a.clone().into()).collect())
235 }
236
237 pub fn update_addr(&self, peer_id: &PeerId, new_ttl: Duration) {
239 let mut guard = self.inner.lock().unwrap();
240
241 if let Some(pr) = guard.get_mut(peer_id) {
242 for record in pr.addrs.iter_mut() {
243 record.set_ttl(new_ttl);
244 }
245 }
246 }
247
248 pub fn remove_expired_addrs(&self) {
250 let mut to_remove = vec![];
251 let mut guard = self.inner.lock().unwrap();
252 for (peer, pr) in guard.iter_mut() {
253 if !pr.pinned {
254 log::debug!("GC attempt for {:?}", peer);
255 pr.addrs.retain(|record| record.expiry.elapsed() < record.ttl);
256 if pr.addrs.is_empty() {
258 log::debug!("remove {:?} from peerstore", peer);
259 to_remove.push(*peer);
260 }
261 }
262 }
263
264 for peer in to_remove {
265 guard.remove(&peer);
266 }
267 }
268
269 pub fn add_protocols(&self, peer_id: &PeerId, protos: Vec<String>) {
271 let mut guard = self.inner.lock().unwrap();
272 if let Some(pr) = guard.get_mut(peer_id) {
273 pr.protos.extend(protos);
274 } else {
275 let mut s = HashSet::new();
276 s.extend(protos);
277 guard.insert(*peer_id, PeerRecord::new(Default::default(), None, s));
278 }
279 }
280
281 pub fn clear_protocols(&self, peer_id: &PeerId) {
283 let mut guard = self.inner.lock().unwrap();
284 if let Some(pr) = guard.get_mut(peer_id) {
285 pr.protos.clear();
286 }
287 }
288
289 pub fn get_protocols(&self, peer_id: &PeerId) -> Option<Vec<String>> {
291 let guard = self.inner.lock().unwrap();
292 guard.get(peer_id).map(|pr| pr.protos.iter().cloned().collect())
293 }
294
295 pub fn first_supported_protocol(&self, peer_id: &PeerId, protos: Vec<String>) -> Option<String> {
297 let guard = self.inner.lock().unwrap();
298 if let Some(pr) = guard.get(peer_id) {
299 for proto in protos {
300 if pr.protos.contains(&proto) {
301 return Some(proto);
302 }
303 }
304 }
305 None
306 }
307
308 pub fn support_protocols(&self, peer_id: &PeerId, protos: Vec<String>) -> Option<Vec<String>> {
310 let guard = self.inner.lock().unwrap();
311 if let Some(pr) = guard.get(peer_id) {
312 let mut proto_list = Vec::with_capacity(protos.len());
313 for item in protos {
314 if pr.protos.contains(&item) {
315 proto_list.push(item)
316 }
317 }
318 Some(proto_list)
319 } else {
320 None
321 }
322 }
323}
324
325#[allow(dead_code)]
326impl AddrBookRecord {
327 pub fn new(addr: Multiaddr, ttl: Duration) -> Self {
328 Self {
329 addr,
330 ttl,
331 expiry: Instant::now(),
332 }
333 }
334 pub fn get_addr(&self) -> &Multiaddr {
336 &self.addr
337 }
338
339 pub fn set_ttl(&mut self, ttl: Duration) {
341 self.ttl = ttl;
342 self.expiry = Instant::now();
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use crate::identity::Keypair;
349 use crate::peerstore::{PeerStore, ADDRESS_TTL};
350 use crate::PeerId;
351 use libp2prs_multiaddr::Multiaddr;
352 use std::time::Duration;
353
354 #[test]
355 fn addr_basic() {
356 let keypair = Keypair::generate_secp256k1();
357 let peer_id = PeerId::from_public_key(keypair.public());
358
359 let peerstore = PeerStore::default();
360
361 peerstore.add_key(&peer_id, keypair.public());
362 peerstore.add_addr(&peer_id, "/memory/123456".parse().unwrap(), Duration::from_secs(1));
363
364 assert_eq!(
365 peerstore.get_addrs(&peer_id).unwrap().first().unwrap(),
366 &"/memory/123456".parse::<Multiaddr>().unwrap()
367 );
368
369 peerstore.add_addr(&peer_id, "/memory/654321".parse().unwrap(), Duration::from_secs(1));
370 let addrs = peerstore.get_addrs(&peer_id).unwrap();
371 assert_eq!(addrs.len(), 2);
372
373 peerstore.add_addr(&peer_id, "/memory/654321".parse().unwrap(), Duration::from_secs(1));
374 let addrs = peerstore.get_addrs(&peer_id).unwrap();
375 assert_eq!(addrs.len(), 2);
376
377 peerstore.clear_addrs(&peer_id);
378 assert_eq!(peerstore.get_addrs(&peer_id).unwrap().len(), 0);
379 }
380
381 #[test]
382 fn proto_basic() {
383 let keypair = Keypair::generate_secp256k1();
384 let peer_id = PeerId::from_public_key(keypair.public());
385
386 let peerstore = PeerStore::default();
387
388 let proto_list = vec!["/libp2p/secio/1.0.0".to_string(), "/libp2p/yamux/1.0.0".to_string()];
389
390 peerstore.add_key(&peer_id, keypair.public());
391 peerstore.add_protocols(&peer_id, proto_list.clone());
392
393 let p = peerstore.get_protocols(&peer_id).unwrap();
394 for i in proto_list {
397 if p.contains(&i) {
398 continue;
399 } else {
400 unreachable!()
401 }
402 }
403
404 let optional_list = vec!["/libp2p/noise/1.0.0".to_string(), "/libp2p/yamux/1.0.0".to_string()];
405 let protocol = peerstore.first_supported_protocol(&peer_id, optional_list);
406 assert_eq!(protocol.unwrap(), "/libp2p/yamux/1.0.0");
407
408 let option_support_list = vec![
409 "/libp2p/secio/1.0.0".to_string(),
410 "/libp2p/noise/1.0.0".to_string(),
411 "/libp2p/yamux/1.0.0".to_string(),
412 ];
413 let support_protocol = peerstore.support_protocols(&peer_id, option_support_list);
414 assert_eq!(
415 support_protocol.unwrap(),
416 vec!["/libp2p/secio/1.0.0".to_string(), "/libp2p/yamux/1.0.0".to_string()]
417 );
418 }
419
420 #[test]
421 fn peerstore_basic() {
422 let keypair = Keypair::generate_secp256k1();
423 let peer_id = PeerId::from_public_key(keypair.public());
424
425 let addrs = vec!["/memory/123456".parse().unwrap(), "/memory/123456".parse().unwrap()];
426 let protos = vec!["/libp2p/secio/1.0.0".to_string(), "/libp2p/yamux/1.0.0".to_string()];
427
428 let ps = PeerStore::default();
429 ps.add_key(&peer_id, keypair.public());
430 ps.add_addrs(&peer_id, addrs, ADDRESS_TTL);
431 ps.add_protocols(&peer_id, protos);
432
433 let optional_list = vec!["/libp2p/noise/1.0.0".to_string(), "/libp2p/yamux/1.0.0".to_string()];
434 let protocol = ps.first_supported_protocol(&peer_id, optional_list);
435 assert_eq!(protocol.unwrap(), "/libp2p/yamux/1.0.0");
436
437 let option_support_list = vec![
438 "/libp2p/secio/1.0.0".to_string(),
439 "/libp2p/noise/1.0.0".to_string(),
440 "/libp2p/yamux/1.0.0".to_string(),
441 ];
442 let support_protocol = ps.support_protocols(&peer_id, option_support_list);
443 assert_eq!(
444 support_protocol.unwrap(),
445 vec!["/libp2p/secio/1.0.0".to_string(), "/libp2p/yamux/1.0.0".to_string()]
446 );
447 }
448
449 #[test]
450 fn peerstore_gc() {
451 let peer_id = PeerId::random();
452 let addrs = vec!["/memory/123456".parse().unwrap()];
453
454 let ps = PeerStore::default();
455 ps.add_addrs(&peer_id, addrs, Duration::from_secs(5));
456 ps.pin(&peer_id);
457 assert!(ps.get_addrs(&peer_id).is_some());
458
459 std::thread::sleep(Duration::from_secs(5));
460 ps.remove_expired_addrs();
461 assert!(ps.get_addrs(&peer_id).is_some());
462
463 ps.unpin(&peer_id);
464 ps.remove_expired_addrs();
465 assert!(ps.get_addrs(&peer_id).is_none());
466 }
467}