apt_swarm/p2p/
peerdb.rs

1use crate::config::Config;
2use crate::errors::*;
3use crate::p2p::proto::PeerAddr;
4use chrono::{DateTime, TimeDelta, Utc};
5use colored::{Color, Colorize};
6use serde::{Deserialize, Serialize};
7use std::borrow::Cow;
8use std::collections::btree_map::Entry;
9use std::collections::BTreeMap;
10use std::convert::Infallible;
11use std::path::PathBuf;
12use std::time::Duration;
13use tokio::fs;
14use tokio::sync::mpsc;
15use tokio::sync::mpsc::error::TrySendError;
16use tokio::time;
17
18const EXPIRE_ERROR_THRESHOLD: usize = 30;
19const EXPIRE_UNLESS_ADVERTISED_SINCE: Duration = Duration::from_secs(3600 * 24 * 14);
20
21const PEERDB_EXPIRE_INTERVAL: Duration = Duration::from_secs(60);
22const PEERDB_SAMPLE_SIZE: usize = 5;
23
24#[derive(Debug)]
25pub enum Req {
26    AddAdvertisedPeers(Vec<PeerAddr>),
27    Sample {
28        max_success_age: Option<Duration>,
29        tx: mpsc::Sender<Vec<PeerAddr>>,
30    },
31    Metric {
32        metric: MetricType,
33        value: MetricValue,
34        addr: PeerAddr,
35    },
36    Write,
37}
38
39#[derive(Debug, Clone)]
40pub struct Client {
41    tx: mpsc::Sender<Req>,
42}
43
44impl Client {
45    pub fn new() -> (Self, mpsc::Receiver<Req>) {
46        let (tx, rx) = mpsc::channel(1024);
47        (Self { tx }, rx)
48    }
49
50    async fn request<T>(&self, req: Req, mut rx: mpsc::Receiver<T>) -> Result<T> {
51        self.tx
52            .send(req)
53            .await
54            .map_err(|_| anyhow!("PeerDb server disconnected"))?;
55        let ret = rx.recv().await.context("PeerDb server disconnected")?;
56        Ok(ret)
57    }
58
59    fn lossy_send(&self, req: Req) {
60        if let Err(TrySendError::Full(req)) = self.tx.try_send(req) {
61            warn!("Discarding peerdb request because backlog is full: {req:?}");
62        }
63    }
64
65    pub fn add_advertised_peers(&self, addrs: Vec<PeerAddr>) {
66        self.lossy_send(Req::AddAdvertisedPeers(addrs));
67    }
68
69    #[inline]
70    pub fn successful(&self, metric: MetricType, addr: PeerAddr) {
71        self.lossy_send(Req::Metric {
72            metric,
73            value: MetricValue::Successful,
74            addr,
75        })
76    }
77
78    #[inline]
79    pub fn error(&self, metric: MetricType, addr: PeerAddr) {
80        self.lossy_send(Req::Metric {
81            metric,
82            value: MetricValue::Error,
83            addr,
84        })
85    }
86
87    pub async fn sample(&self, max_success_age: Option<Duration>) -> Result<Vec<PeerAddr>> {
88        let (tx, rx) = mpsc::channel(1);
89        self.request(
90            Req::Sample {
91                max_success_age,
92                tx,
93            },
94            rx,
95        )
96        .await
97    }
98
99    pub fn write(&self) {
100        self.lossy_send(Req::Write);
101    }
102}
103
104pub fn format_time_opt(time: Option<DateTime<Utc>>) -> Cow<'static, str> {
105    if let Some(time) = time {
106        Cow::Owned(format_time(time))
107    } else {
108        Cow::Borrowed("-")
109    }
110}
111
112pub fn format_time(time: DateTime<Utc>) -> String {
113    time.format("%FT%T").to_string()
114}
115
116#[derive(Debug, Default, PartialEq, Serialize, Deserialize)]
117pub struct Metric {
118    pub last_attempt: Option<DateTime<Utc>>,
119    pub errors_since: usize,
120    pub last_success: Option<DateTime<Utc>>,
121}
122
123impl Metric {
124    pub fn metric(&mut self, value: MetricValue) {
125        match value {
126            MetricValue::Successful => self.successful(),
127            MetricValue::Error => self.error(),
128        }
129    }
130
131    pub fn successful(&mut self) {
132        self.errors_since = 0;
133        let now = Utc::now();
134        self.last_success = Some(now);
135        self.last_attempt = Some(now);
136    }
137
138    pub fn error(&mut self) {
139        self.errors_since += 1;
140        self.last_attempt = Some(Utc::now());
141    }
142
143    pub fn format_stats(&self) -> String {
144        format!(
145            "last_attempt={:<19}  errors_since={}  last_success={}",
146            format_time_opt(self.last_attempt).yellow(),
147            self.errors_since
148                .to_string()
149                .color(if self.errors_since == 0 {
150                    Color::Green
151                } else {
152                    Color::Red
153                }),
154            format_time_opt(self.last_success).yellow(),
155        )
156    }
157}
158
159#[derive(Debug)]
160pub enum MetricType {
161    Connect,
162    Handshake,
163}
164
165#[derive(Debug)]
166pub enum MetricValue {
167    Successful,
168    Error,
169}
170
171#[derive(Debug, Default, PartialEq, Serialize, Deserialize)]
172pub struct PeerStats {
173    #[serde(default)]
174    pub connect: Metric,
175    #[serde(default)]
176    pub handshake: Metric,
177    pub last_advertised: Option<DateTime<Utc>>,
178}
179
180impl PeerStats {
181    pub fn metric(&mut self, metric: MetricType, value: MetricValue) {
182        match metric {
183            MetricType::Connect => self.connect.metric(value),
184            MetricType::Handshake => self.handshake.metric(value),
185        }
186    }
187
188    pub fn expired(&self, now: DateTime<Utc>) -> bool {
189        // only remove peers that have been advertised, but not recently
190        let Some(last_advertised) = self.last_advertised else {
191            return false;
192        };
193        if last_advertised + EXPIRE_UNLESS_ADVERTISED_SINCE > now {
194            return false;
195        }
196
197        // expire peers we couldn't connect to in a while
198        if self.connect.errors_since > EXPIRE_ERROR_THRESHOLD {
199            return true;
200        }
201
202        // expire peers we couldn't handshake with in a while
203        if self.handshake.errors_since > EXPIRE_ERROR_THRESHOLD {
204            return true;
205        }
206
207        // peer is still good
208        false
209    }
210}
211
212#[derive(Debug, Default, PartialEq, Serialize, Deserialize)]
213struct Data {
214    pub peers: BTreeMap<PeerAddr, PeerStats>,
215}
216
217pub struct PeerDb {
218    data: Data,
219    path: PathBuf,
220    new_path: PathBuf,
221}
222
223impl PeerDb {
224    /// Register a peer address in the database
225    ///
226    /// Returns true if we haven't known this address before.
227    pub fn add_peer(&mut self, addr: PeerAddr) -> (&mut PeerStats, bool) {
228        trace!("Adding address to peerdb: {addr:?}");
229        match self.data.peers.entry(addr) {
230            entry @ Entry::Vacant(_) => (entry.or_default(), true),
231            entry @ Entry::Occupied(_) => (entry.or_default(), false),
232        }
233    }
234
235    /// Register a list of peers that has been actively advertised to us
236    ///
237    /// The database should always be written afterwards to `last_advertised`
238    /// is persisted properly.
239    pub fn add_advertised_peers(&mut self, addrs: &[PeerAddr]) {
240        let now = Utc::now();
241        for addr in addrs {
242            let (peer, _new) = self.add_peer(addr.clone());
243            peer.last_advertised = Some(now);
244        }
245    }
246
247    pub fn peers(&self) -> &BTreeMap<PeerAddr, PeerStats> {
248        &self.data.peers
249    }
250
251    /// Return a sample of random peers to connect to
252    pub fn sample(&self, max_success_age: Option<Duration>) -> Vec<PeerAddr> {
253        let now = Utc::now();
254        let delta = max_success_age
255            .map(|max_success_age| TimeDelta::from_std(max_success_age).unwrap_or(TimeDelta::MAX));
256
257        // apply `max_success_age` filtering
258        let mut peers = self
259            .data
260            .peers
261            .iter()
262            .flat_map(|(addr, stats)| {
263                if let Some(delta) = delta {
264                    let last_success = stats.handshake.last_success?;
265                    if now.signed_duration_since(last_success) > delta {
266                        return None;
267                    }
268                }
269                Some(addr)
270            })
271            .collect::<Vec<_>>();
272
273        fastrand::shuffle(&mut peers);
274
275        // TODO: make this smarter
276        peers
277            .into_iter()
278            .take(PEERDB_SAMPLE_SIZE)
279            .cloned()
280            .collect()
281    }
282
283    /// Remove old peers that both:
284    ///
285    /// - we couldn't successfully connect/handshake with in a while
286    /// - haven't been advertised anymore in a while
287    ///
288    /// Peers that are still being advertised, but we couldn't
289    /// connect/handshake with in a while are still being kept around so we don't
290    /// stop toning down our connection attempts to them.
291    ///
292    /// Returns true if any peers have been removed.
293    pub fn expire_old_peers(&mut self, now: DateTime<Utc>) -> bool {
294        let before = self.data.peers.len();
295        self.data.peers.retain(|_, peer| !peer.expired(now));
296        let after = self.data.peers.len();
297        if after != before {
298            info!("Removed {} expired peers", before.saturating_sub(after));
299            true
300        } else {
301            false
302        }
303    }
304
305    /// Load the local peerdb file from disk.
306    ///
307    /// If this fails, return an empty database so we self-heal.
308    pub async fn read(config: &Config) -> Result<Self> {
309        let mut db = Self {
310            data: Data::default(),
311            path: config.peerdb_path()?,
312            new_path: config.peerdb_new_path()?,
313        };
314
315        let path = &db.path;
316        debug!("Reading peerdb from file: {path:?}");
317        let Ok(buf) = fs::read(&path).await else {
318            debug!("Failed to read peerdb file, using empty");
319            return Ok(db);
320        };
321        let Ok(data) = serde_json::from_slice(&buf) else {
322            debug!("Failed to parse peerdb file, using empty");
323            return Ok(db);
324        };
325
326        db.data = data;
327        Ok(db)
328    }
329
330    /// Write the peerdb to disk, in a way so we don't accidentally lose data
331    /// on an unexpected crash
332    pub async fn write(&self) -> Result<()> {
333        let buf = serde_json::to_string(&self.data).context("Failed to serialize peerdb")?;
334
335        let new_path = &self.new_path;
336        debug!("Writing peerdb file to disk: {new_path:?}");
337        fs::write(&new_path, &buf)
338            .await
339            .with_context(|| anyhow!("Failed to write peerdb file at {new_path:?}"))?;
340
341        let path = &self.path;
342        debug!("Moving peerdb file to final location: {path:?}");
343        fs::rename(&new_path, &path)
344            .await
345            .with_context(|| anyhow!("Failed to rename peerdb {new_path:?} to {path:?}"))?;
346
347        Ok(())
348    }
349}
350
351pub async fn spawn(mut peerdb: PeerDb, mut rx: mpsc::Receiver<Req>) -> Result<Infallible> {
352    let mut interval = time::interval(PEERDB_EXPIRE_INTERVAL);
353
354    loop {
355        tokio::select! {
356            req = rx.recv() => {
357                let Some(req) = req else { break };
358                match req {
359                    Req::AddAdvertisedPeers(addrs) => {
360                        peerdb.add_advertised_peers(&addrs);
361                        peerdb.write().await?;
362                    }
363                    Req::Sample { max_success_age, tx } => {
364                        let sample = peerdb.sample(max_success_age);
365                        tx.send(sample).await.ok();
366                    }
367                    Req::Metric { metric, value, addr } => {
368                        let (peer, _new) = peerdb.add_peer(addr);
369                        peer.metric(metric, value);
370                    }
371                    Req::Write => peerdb.write().await?,
372                }
373            }
374            _ = interval.tick() => {
375                if peerdb.expire_old_peers(Utc::now()) {
376                    peerdb.write().await?;
377                }
378            }
379        }
380    }
381    bail!("PeerDb channel has been closed");
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387
388    #[test]
389    fn parse_basic_db() {
390        let data = r#"
391        {"peers":{"[2001:db8::]:16169":{}}}
392        "#;
393        let data = serde_json::from_str::<Data>(data).unwrap();
394        assert_eq!(
395            data,
396            Data {
397                peers: [("[2001:db8::]:16169".parse().unwrap(), PeerStats::default())]
398                    .into_iter()
399                    .collect(),
400            }
401        );
402    }
403
404    #[test]
405    fn test_expired_peers() {
406        fn datetime(s: &str) -> DateTime<Utc> {
407            s.parse::<DateTime<Utc>>().unwrap()
408        }
409        let now = datetime("2025-02-17T01:00:00Z");
410
411        // empty
412        assert!(!PeerStats {
413            connect: Metric::default(),
414            handshake: Metric::default(),
415            last_advertised: None,
416        }
417        .expired(now));
418
419        // connect errors
420        assert!(PeerStats {
421            connect: Metric {
422                last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
423                errors_since: 500,
424                last_success: None,
425            },
426            handshake: Metric::default(),
427            last_advertised: Some(datetime("2025-01-01T13:37:00Z")),
428        }
429        .expired(now));
430
431        // handshake errors
432        assert!(PeerStats {
433            connect: Metric {
434                last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
435                errors_since: 0,
436                last_success: Some(datetime("2025-02-17T00:45:00Z")),
437            },
438            handshake: Metric {
439                last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
440                errors_since: 500,
441                last_success: Some(datetime("2025-01-14T00:45:00Z")),
442            },
443            last_advertised: Some(datetime("2025-01-01T13:37:00Z")),
444        }
445        .expired(now));
446
447        // connect errors but recently advertised
448        assert!(!PeerStats {
449            connect: Metric {
450                last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
451                errors_since: 500,
452                last_success: None,
453            },
454            handshake: Metric::default(),
455            last_advertised: Some(datetime("2025-02-14T13:37:00Z")),
456        }
457        .expired(now));
458
459        // handshake errors but recently advertised
460        assert!(!PeerStats {
461            connect: Metric {
462                last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
463                errors_since: 0,
464                last_success: Some(datetime("2025-02-17T00:45:00Z")),
465            },
466            handshake: Metric {
467                last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
468                errors_since: 500,
469                last_success: Some(datetime("2025-01-14T00:45:00Z")),
470            },
471            last_advertised: Some(datetime("2025-02-14T13:37:00Z")),
472        }
473        .expired(now));
474
475        // connect errors but never advertised
476        assert!(!PeerStats {
477            connect: Metric {
478                last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
479                errors_since: 500,
480                last_success: None,
481            },
482            handshake: Metric::default(),
483            last_advertised: None,
484        }
485        .expired(now));
486
487        // handshake errors but never advertised
488        assert!(!PeerStats {
489            connect: Metric {
490                last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
491                errors_since: 0,
492                last_success: Some(datetime("2025-02-17T00:45:00Z")),
493            },
494            handshake: Metric {
495                last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
496                errors_since: 500,
497                last_success: Some(datetime("2025-01-14T00:45:00Z")),
498            },
499            last_advertised: None,
500        }
501        .expired(now));
502    }
503}