1use crate::config::Config;
2use crate::errors::*;
3use crate::p2p::proto::PeerAddr;
4use chrono::{DateTime, TimeDelta, Utc};
5use colored::{Color, Colorize};
6use serde::{Deserialize, Serialize};
7use std::borrow::Cow;
8use std::collections::btree_map::Entry;
9use std::collections::BTreeMap;
10use std::convert::Infallible;
11use std::path::PathBuf;
12use std::time::Duration;
13use tokio::fs;
14use tokio::sync::mpsc;
15use tokio::sync::mpsc::error::TrySendError;
16use tokio::time;
17
18const EXPIRE_ERROR_THRESHOLD: usize = 30;
19const EXPIRE_UNLESS_ADVERTISED_SINCE: Duration = Duration::from_secs(3600 * 24 * 14);
20
21const PEERDB_EXPIRE_INTERVAL: Duration = Duration::from_secs(60);
22const PEERDB_SAMPLE_SIZE: usize = 5;
23
24#[derive(Debug)]
25pub enum Req {
26 AddAdvertisedPeers(Vec<PeerAddr>),
27 Sample {
28 max_success_age: Option<Duration>,
29 tx: mpsc::Sender<Vec<PeerAddr>>,
30 },
31 Metric {
32 metric: MetricType,
33 value: MetricValue,
34 addr: PeerAddr,
35 },
36 Write,
37}
38
39#[derive(Debug, Clone)]
40pub struct Client {
41 tx: mpsc::Sender<Req>,
42}
43
44impl Client {
45 pub fn new() -> (Self, mpsc::Receiver<Req>) {
46 let (tx, rx) = mpsc::channel(1024);
47 (Self { tx }, rx)
48 }
49
50 async fn request<T>(&self, req: Req, mut rx: mpsc::Receiver<T>) -> Result<T> {
51 self.tx
52 .send(req)
53 .await
54 .map_err(|_| anyhow!("PeerDb server disconnected"))?;
55 let ret = rx.recv().await.context("PeerDb server disconnected")?;
56 Ok(ret)
57 }
58
59 fn lossy_send(&self, req: Req) {
60 if let Err(TrySendError::Full(req)) = self.tx.try_send(req) {
61 warn!("Discarding peerdb request because backlog is full: {req:?}");
62 }
63 }
64
65 pub fn add_advertised_peers(&self, addrs: Vec<PeerAddr>) {
66 self.lossy_send(Req::AddAdvertisedPeers(addrs));
67 }
68
69 #[inline]
70 pub fn successful(&self, metric: MetricType, addr: PeerAddr) {
71 self.lossy_send(Req::Metric {
72 metric,
73 value: MetricValue::Successful,
74 addr,
75 })
76 }
77
78 #[inline]
79 pub fn error(&self, metric: MetricType, addr: PeerAddr) {
80 self.lossy_send(Req::Metric {
81 metric,
82 value: MetricValue::Error,
83 addr,
84 })
85 }
86
87 pub async fn sample(&self, max_success_age: Option<Duration>) -> Result<Vec<PeerAddr>> {
88 let (tx, rx) = mpsc::channel(1);
89 self.request(
90 Req::Sample {
91 max_success_age,
92 tx,
93 },
94 rx,
95 )
96 .await
97 }
98
99 pub fn write(&self) {
100 self.lossy_send(Req::Write);
101 }
102}
103
104pub fn format_time_opt(time: Option<DateTime<Utc>>) -> Cow<'static, str> {
105 if let Some(time) = time {
106 Cow::Owned(format_time(time))
107 } else {
108 Cow::Borrowed("-")
109 }
110}
111
112pub fn format_time(time: DateTime<Utc>) -> String {
113 time.format("%FT%T").to_string()
114}
115
116#[derive(Debug, Default, PartialEq, Serialize, Deserialize)]
117pub struct Metric {
118 pub last_attempt: Option<DateTime<Utc>>,
119 pub errors_since: usize,
120 pub last_success: Option<DateTime<Utc>>,
121}
122
123impl Metric {
124 pub fn metric(&mut self, value: MetricValue) {
125 match value {
126 MetricValue::Successful => self.successful(),
127 MetricValue::Error => self.error(),
128 }
129 }
130
131 pub fn successful(&mut self) {
132 self.errors_since = 0;
133 let now = Utc::now();
134 self.last_success = Some(now);
135 self.last_attempt = Some(now);
136 }
137
138 pub fn error(&mut self) {
139 self.errors_since += 1;
140 self.last_attempt = Some(Utc::now());
141 }
142
143 pub fn format_stats(&self) -> String {
144 format!(
145 "last_attempt={:<19} errors_since={} last_success={}",
146 format_time_opt(self.last_attempt).yellow(),
147 self.errors_since
148 .to_string()
149 .color(if self.errors_since == 0 {
150 Color::Green
151 } else {
152 Color::Red
153 }),
154 format_time_opt(self.last_success).yellow(),
155 )
156 }
157}
158
159#[derive(Debug)]
160pub enum MetricType {
161 Connect,
162 Handshake,
163}
164
165#[derive(Debug)]
166pub enum MetricValue {
167 Successful,
168 Error,
169}
170
171#[derive(Debug, Default, PartialEq, Serialize, Deserialize)]
172pub struct PeerStats {
173 #[serde(default)]
174 pub connect: Metric,
175 #[serde(default)]
176 pub handshake: Metric,
177 pub last_advertised: Option<DateTime<Utc>>,
178}
179
180impl PeerStats {
181 pub fn metric(&mut self, metric: MetricType, value: MetricValue) {
182 match metric {
183 MetricType::Connect => self.connect.metric(value),
184 MetricType::Handshake => self.handshake.metric(value),
185 }
186 }
187
188 pub fn expired(&self, now: DateTime<Utc>) -> bool {
189 let Some(last_advertised) = self.last_advertised else {
191 return false;
192 };
193 if last_advertised + EXPIRE_UNLESS_ADVERTISED_SINCE > now {
194 return false;
195 }
196
197 if self.connect.errors_since > EXPIRE_ERROR_THRESHOLD {
199 return true;
200 }
201
202 if self.handshake.errors_since > EXPIRE_ERROR_THRESHOLD {
204 return true;
205 }
206
207 false
209 }
210}
211
212#[derive(Debug, Default, PartialEq, Serialize, Deserialize)]
213struct Data {
214 pub peers: BTreeMap<PeerAddr, PeerStats>,
215}
216
217pub struct PeerDb {
218 data: Data,
219 path: PathBuf,
220 new_path: PathBuf,
221}
222
223impl PeerDb {
224 pub fn add_peer(&mut self, addr: PeerAddr) -> (&mut PeerStats, bool) {
228 trace!("Adding address to peerdb: {addr:?}");
229 match self.data.peers.entry(addr) {
230 entry @ Entry::Vacant(_) => (entry.or_default(), true),
231 entry @ Entry::Occupied(_) => (entry.or_default(), false),
232 }
233 }
234
235 pub fn add_advertised_peers(&mut self, addrs: &[PeerAddr]) {
240 let now = Utc::now();
241 for addr in addrs {
242 let (peer, _new) = self.add_peer(addr.clone());
243 peer.last_advertised = Some(now);
244 }
245 }
246
247 pub fn peers(&self) -> &BTreeMap<PeerAddr, PeerStats> {
248 &self.data.peers
249 }
250
251 pub fn sample(&self, max_success_age: Option<Duration>) -> Vec<PeerAddr> {
253 let now = Utc::now();
254 let delta = max_success_age
255 .map(|max_success_age| TimeDelta::from_std(max_success_age).unwrap_or(TimeDelta::MAX));
256
257 let mut peers = self
259 .data
260 .peers
261 .iter()
262 .flat_map(|(addr, stats)| {
263 if let Some(delta) = delta {
264 let last_success = stats.handshake.last_success?;
265 if now.signed_duration_since(last_success) > delta {
266 return None;
267 }
268 }
269 Some(addr)
270 })
271 .collect::<Vec<_>>();
272
273 fastrand::shuffle(&mut peers);
274
275 peers
277 .into_iter()
278 .take(PEERDB_SAMPLE_SIZE)
279 .cloned()
280 .collect()
281 }
282
283 pub fn expire_old_peers(&mut self, now: DateTime<Utc>) -> bool {
294 let before = self.data.peers.len();
295 self.data.peers.retain(|_, peer| !peer.expired(now));
296 let after = self.data.peers.len();
297 if after != before {
298 info!("Removed {} expired peers", before.saturating_sub(after));
299 true
300 } else {
301 false
302 }
303 }
304
305 pub async fn read(config: &Config) -> Result<Self> {
309 let mut db = Self {
310 data: Data::default(),
311 path: config.peerdb_path()?,
312 new_path: config.peerdb_new_path()?,
313 };
314
315 let path = &db.path;
316 debug!("Reading peerdb from file: {path:?}");
317 let Ok(buf) = fs::read(&path).await else {
318 debug!("Failed to read peerdb file, using empty");
319 return Ok(db);
320 };
321 let Ok(data) = serde_json::from_slice(&buf) else {
322 debug!("Failed to parse peerdb file, using empty");
323 return Ok(db);
324 };
325
326 db.data = data;
327 Ok(db)
328 }
329
330 pub async fn write(&self) -> Result<()> {
333 let buf = serde_json::to_string(&self.data).context("Failed to serialize peerdb")?;
334
335 let new_path = &self.new_path;
336 debug!("Writing peerdb file to disk: {new_path:?}");
337 fs::write(&new_path, &buf)
338 .await
339 .with_context(|| anyhow!("Failed to write peerdb file at {new_path:?}"))?;
340
341 let path = &self.path;
342 debug!("Moving peerdb file to final location: {path:?}");
343 fs::rename(&new_path, &path)
344 .await
345 .with_context(|| anyhow!("Failed to rename peerdb {new_path:?} to {path:?}"))?;
346
347 Ok(())
348 }
349}
350
351pub async fn spawn(mut peerdb: PeerDb, mut rx: mpsc::Receiver<Req>) -> Result<Infallible> {
352 let mut interval = time::interval(PEERDB_EXPIRE_INTERVAL);
353
354 loop {
355 tokio::select! {
356 req = rx.recv() => {
357 let Some(req) = req else { break };
358 match req {
359 Req::AddAdvertisedPeers(addrs) => {
360 peerdb.add_advertised_peers(&addrs);
361 peerdb.write().await?;
362 }
363 Req::Sample { max_success_age, tx } => {
364 let sample = peerdb.sample(max_success_age);
365 tx.send(sample).await.ok();
366 }
367 Req::Metric { metric, value, addr } => {
368 let (peer, _new) = peerdb.add_peer(addr);
369 peer.metric(metric, value);
370 }
371 Req::Write => peerdb.write().await?,
372 }
373 }
374 _ = interval.tick() => {
375 if peerdb.expire_old_peers(Utc::now()) {
376 peerdb.write().await?;
377 }
378 }
379 }
380 }
381 bail!("PeerDb channel has been closed");
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387
388 #[test]
389 fn parse_basic_db() {
390 let data = r#"
391 {"peers":{"[2001:db8::]:16169":{}}}
392 "#;
393 let data = serde_json::from_str::<Data>(data).unwrap();
394 assert_eq!(
395 data,
396 Data {
397 peers: [("[2001:db8::]:16169".parse().unwrap(), PeerStats::default())]
398 .into_iter()
399 .collect(),
400 }
401 );
402 }
403
404 #[test]
405 fn test_expired_peers() {
406 fn datetime(s: &str) -> DateTime<Utc> {
407 s.parse::<DateTime<Utc>>().unwrap()
408 }
409 let now = datetime("2025-02-17T01:00:00Z");
410
411 assert!(!PeerStats {
413 connect: Metric::default(),
414 handshake: Metric::default(),
415 last_advertised: None,
416 }
417 .expired(now));
418
419 assert!(PeerStats {
421 connect: Metric {
422 last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
423 errors_since: 500,
424 last_success: None,
425 },
426 handshake: Metric::default(),
427 last_advertised: Some(datetime("2025-01-01T13:37:00Z")),
428 }
429 .expired(now));
430
431 assert!(PeerStats {
433 connect: Metric {
434 last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
435 errors_since: 0,
436 last_success: Some(datetime("2025-02-17T00:45:00Z")),
437 },
438 handshake: Metric {
439 last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
440 errors_since: 500,
441 last_success: Some(datetime("2025-01-14T00:45:00Z")),
442 },
443 last_advertised: Some(datetime("2025-01-01T13:37:00Z")),
444 }
445 .expired(now));
446
447 assert!(!PeerStats {
449 connect: Metric {
450 last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
451 errors_since: 500,
452 last_success: None,
453 },
454 handshake: Metric::default(),
455 last_advertised: Some(datetime("2025-02-14T13:37:00Z")),
456 }
457 .expired(now));
458
459 assert!(!PeerStats {
461 connect: Metric {
462 last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
463 errors_since: 0,
464 last_success: Some(datetime("2025-02-17T00:45:00Z")),
465 },
466 handshake: Metric {
467 last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
468 errors_since: 500,
469 last_success: Some(datetime("2025-01-14T00:45:00Z")),
470 },
471 last_advertised: Some(datetime("2025-02-14T13:37:00Z")),
472 }
473 .expired(now));
474
475 assert!(!PeerStats {
477 connect: Metric {
478 last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
479 errors_since: 500,
480 last_success: None,
481 },
482 handshake: Metric::default(),
483 last_advertised: None,
484 }
485 .expired(now));
486
487 assert!(!PeerStats {
489 connect: Metric {
490 last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
491 errors_since: 0,
492 last_success: Some(datetime("2025-02-17T00:45:00Z")),
493 },
494 handshake: Metric {
495 last_attempt: Some(datetime("2025-02-17T00:45:00Z")),
496 errors_since: 500,
497 last_success: Some(datetime("2025-01-14T00:45:00Z")),
498 },
499 last_advertised: None,
500 }
501 .expired(now));
502 }
503}