1use crate::db::DatabaseClient;
2use crate::errors::*;
3use crate::keyring::Keyring;
4use crate::net;
5use crate::p2p;
6use crate::p2p::peerdb::{self, MetricType};
7use crate::p2p::proto::{PeerAddr, SyncRequest};
8use crate::sync;
9use crate::timers::EasedInterval;
10use ipnetwork::IpNetwork;
11use sequoia_openpgp::Fingerprint;
12use std::collections::VecDeque;
13use std::convert::Infallible;
14use std::net::{IpAddr, SocketAddr};
15use std::num::NonZeroUsize;
16use std::sync::LazyLock;
17use std::time::Duration;
18use tokio::io::AsyncWriteExt;
19use tokio::sync::mpsc;
20use tokio::time;
21
22pub static P2P_BLOCK_LIST: LazyLock<Vec<IpNetwork>> = LazyLock::new(|| {
23 vec![
24 "127.0.0.1/8".parse().unwrap(),
25 "10.0.0.1/8".parse().unwrap(),
26 "172.16.0.0/12".parse().unwrap(),
27 "192.168.0.0/16".parse().unwrap(),
28 "169.254.0.0/16".parse().unwrap(),
29 "224.0.0.0/4".parse().unwrap(),
30 ]
31});
32pub static P2P_ILLEGAL_PORTS: &[u16] = &[
33 21, 22, 23, 25, 53, 80, 110, 143, 389, 443, 587, 993, 995, 1194, 3128, 3389, 5900, 6667, 6669,
34 6697, 8080,
35];
36
37pub const STANDARD_P2P_PORT: u16 = 16169;
39
40const P2P_SYNC_CONNECT_INTERVAL: Duration = Duration::from_secs(60 * 10); const P2P_SYNC_CONNECT_DELAY: Duration = Duration::from_secs(30); const P2P_SYNC_CONNECT_JITTER: Duration = Duration::from_secs(3);
45
46pub const COOLDOWN_LRU_SIZE: usize = 16_384;
47pub const COOLDOWN_PORT_AFTER_SUCCESS: Duration = Duration::from_secs(60 * 5); pub const COOLDOWN_PORT_AFTER_ERROR: Duration = Duration::from_secs(60 * 60); pub const COOLDOWN_HOST_AFTER_ERROR: Duration = Duration::from_secs(60 * 60); pub const COOLDOWN_HOST_THRESHOLD: usize = 10;
51
52pub async fn pull_from_peer<D: DatabaseClient + Sync + Send>(
53 db: &mut D,
54 keyring: &Keyring,
55 peerdb: &peerdb::Client,
56 fingerprints: &[Fingerprint],
57 addr: &PeerAddr,
58 proxy: Option<SocketAddr>,
59) -> Result<()> {
60 let mut sock = match net::connect(addr, proxy).await {
62 Ok(sock) => {
63 peerdb.successful(MetricType::Connect, addr.clone());
64 sock
65 }
66 Err(err) => {
67 peerdb.error(MetricType::Connect, addr.clone());
68 return Err(err);
69 }
70 };
71 let (mut rx, mut tx) = sock.split();
72
73 match net::handshake(&mut rx, &mut tx).await {
75 Ok(_) => {
76 peerdb.successful(MetricType::Handshake, addr.clone());
77 }
78 Err(err) => {
79 peerdb.error(MetricType::Handshake, addr.clone());
80 tx.shutdown().await.ok();
81 return Err(err);
82 }
83 }
84 peerdb.write();
85
86 let result = sync::sync_pull(db, keyring, fingerprints, false, &mut tx, rx).await;
88
89 tx.shutdown().await.ok();
91 result
93}
94
95#[derive(Debug, Default)]
96pub struct CooldownEntry {
97 tries: VecDeque<time::Instant>,
98}
99
100impl CooldownEntry {
101 fn filter(&mut self) {
102 let now = time::Instant::now();
103 self.tries.retain(|e| now < *e);
104 }
105
106 pub fn has_capacity(&mut self) -> bool {
107 self.filter();
108 self.tries.len() < COOLDOWN_HOST_THRESHOLD
109 }
110
111 pub fn mark_bad(&mut self) {
112 self.filter();
113 self.tries
114 .push_back(time::Instant::now() + COOLDOWN_HOST_AFTER_ERROR);
115 }
116}
117
118#[derive(Debug)]
119pub struct Cooldowns {
120 ip_cache: lru::LruCache<IpAddr, CooldownEntry>,
121 port_cache: lru::LruCache<PeerAddr, time::Instant>,
122}
123
124impl Cooldowns {
125 pub fn new() -> Self {
126 let ip_cache = lru::LruCache::new(NonZeroUsize::new(COOLDOWN_LRU_SIZE).unwrap());
127 let port_cache = lru::LruCache::new(NonZeroUsize::new(COOLDOWN_LRU_SIZE).unwrap());
128 Cooldowns {
129 ip_cache,
130 port_cache,
131 }
132 }
133
134 pub fn can_approach(&mut self, addr: &PeerAddr) -> bool {
135 let now = time::Instant::now();
136
137 if let PeerAddr::Inet(addr) = &addr {
138 if addr.port() != STANDARD_P2P_PORT {
139 if let Some(entry) = self.ip_cache.get_mut(&addr.ip()) {
140 if !entry.has_capacity() {
141 return false;
142 }
143 }
144 }
145 }
146
147 if let Some(entry) = self.port_cache.get(addr) {
148 now >= *entry
149 } else {
150 true
151 }
152 }
153
154 pub fn mark_ok(&mut self, addr: PeerAddr) {
155 self.port_cache
156 .put(addr, time::Instant::now() + COOLDOWN_PORT_AFTER_SUCCESS);
157 }
158
159 pub fn mark_bad(&mut self, addr: PeerAddr) {
160 if let PeerAddr::Inet(addr) = &addr {
161 self.ip_cache
162 .get_or_insert_mut(addr.ip(), CooldownEntry::default)
163 .mark_bad();
164 }
165
166 self.port_cache
167 .put(addr, time::Instant::now() + COOLDOWN_PORT_AFTER_ERROR);
168 }
169}
170
171impl Default for Cooldowns {
172 fn default() -> Self {
173 Self::new()
174 }
175}
176
177pub async fn spawn<D: DatabaseClient + Sync + Send>(
178 db: &mut D,
179 keyring: Keyring,
180 peerdb: peerdb::Client,
181 proxy: Option<SocketAddr>,
182 mut rx: mpsc::Receiver<p2p::proto::SyncRequest>,
183) -> Result<Infallible> {
184 let mut cooldown = Cooldowns::new();
186
187 let mut interval = EasedInterval::new(P2P_SYNC_CONNECT_DELAY, P2P_SYNC_CONNECT_INTERVAL);
188 loop {
189 let req = tokio::select! {
191 req = rx.recv() => {
192 let Some(req) = req else { break };
193
194 peerdb.add_advertised_peers(req.addrs.clone());
196
197 req
198 }
199 _ = interval.tick() => {
200 let addrs = peerdb.sample(None).await?;
202 debug!("Automatically selected peers for periodic sync: {addrs:?}");
203 SyncRequest {
204 hint: None,
205 addrs,
206 }
207 }
208 };
209
210 for addr in req.addrs {
214 if let Some(hint) = &req.hint {
216 let fp = &hint.fp;
217 let (idx, _num) = db
218 .index_from_scan(&sync::TreeQuery {
219 fp: fp.clone(),
220 hash_algo: "sha256".to_string(),
221 prefix: None,
222 })
223 .await?;
224
225 if *hint.idx == idx {
226 debug!(
227 "We're already in sync with peer: addr={addr:?}, fp={fp:?}, idx={idx:?}"
228 );
229 continue;
230 }
231 }
232
233 if let PeerAddr::Inet(addr) = &addr {
235 for block in P2P_BLOCK_LIST.iter() {
236 if block.contains(addr.ip()) {
237 debug!(
238 "Address is on a blocklist, skipping: addr={addr:?}, block={block:?}"
239 );
240 continue;
241 }
242 }
243 if P2P_ILLEGAL_PORTS.contains(&addr.port()) {
244 debug!("Port is on blocklist, skipping: addr={addr:?}");
245 continue;
246 }
247 }
248
249 if !cooldown.can_approach(&addr) {
250 debug!("Address is still in cooldown, skipping for now: {addr:?}");
251 continue;
252 }
253
254 p2p::random_jitter(P2P_SYNC_CONNECT_JITTER).await;
255
256 info!("Syncing from remote peer: {addr:?}");
257 let ret = pull_from_peer(db, &keyring, &peerdb, &[], &addr, proxy).await;
258 debug!("Connection to {addr:?} has been closed");
259 match ret {
260 Ok(_) => {
261 cooldown.mark_ok(addr);
262 break;
263 }
264 Err(err) => {
265 warn!("Error while syncing from peer {addr:?}: {err:#}");
266 cooldown.mark_bad(addr);
267 }
268 }
269 peerdb.write();
270 }
271 }
272
273 bail!("Peering task has crashed")
274}