apt_swarm/p2p/
peering.rs

1use crate::db::DatabaseClient;
2use crate::errors::*;
3use crate::keyring::Keyring;
4use crate::net;
5use crate::p2p;
6use crate::p2p::peerdb::{self, MetricType};
7use crate::p2p::proto::{PeerAddr, SyncRequest};
8use crate::sync;
9use crate::timers::EasedInterval;
10use ipnetwork::IpNetwork;
11use sequoia_openpgp::Fingerprint;
12use std::collections::VecDeque;
13use std::convert::Infallible;
14use std::net::{IpAddr, SocketAddr};
15use std::num::NonZeroUsize;
16use std::sync::LazyLock;
17use std::time::Duration;
18use tokio::io::AsyncWriteExt;
19use tokio::sync::mpsc;
20use tokio::time;
21
22pub static P2P_BLOCK_LIST: LazyLock<Vec<IpNetwork>> = LazyLock::new(|| {
23    vec![
24        "127.0.0.1/8".parse().unwrap(),
25        "10.0.0.1/8".parse().unwrap(),
26        "172.16.0.0/12".parse().unwrap(),
27        "192.168.0.0/16".parse().unwrap(),
28        "169.254.0.0/16".parse().unwrap(),
29        "224.0.0.0/4".parse().unwrap(),
30    ]
31});
32pub static P2P_ILLEGAL_PORTS: &[u16] = &[
33    21, 22, 23, 25, 53, 80, 110, 143, 389, 443, 587, 993, 995, 1194, 3128, 3389, 5900, 6667, 6669,
34    6697, 8080,
35];
36
37/// When an ip is in cooldown, this port is still allowed, until the specific port goes into cooldown too
38pub const STANDARD_P2P_PORT: u16 = 16169;
39
40/// How often to connect to one of our known peers
41const P2P_SYNC_CONNECT_INTERVAL: Duration = Duration::from_secs(60 * 10); // 10min
42/// Time until we make our first connection to an already known peer
43const P2P_SYNC_CONNECT_DELAY: Duration = Duration::from_secs(30); // 30sec
44const P2P_SYNC_CONNECT_JITTER: Duration = Duration::from_secs(3);
45
46pub const COOLDOWN_LRU_SIZE: usize = 16_384;
47pub const COOLDOWN_PORT_AFTER_SUCCESS: Duration = Duration::from_secs(60 * 5); // 5min
48pub const COOLDOWN_PORT_AFTER_ERROR: Duration = Duration::from_secs(60 * 60); // 1hour
49pub const COOLDOWN_HOST_AFTER_ERROR: Duration = Duration::from_secs(60 * 60); // 1hour
50pub const COOLDOWN_HOST_THRESHOLD: usize = 10;
51
52pub async fn pull_from_peer<D: DatabaseClient + Sync + Send>(
53    db: &mut D,
54    keyring: &Keyring,
55    peerdb: &peerdb::Client,
56    fingerprints: &[Fingerprint],
57    addr: &PeerAddr,
58    proxy: Option<SocketAddr>,
59) -> Result<()> {
60    // setup connection
61    let mut sock = match net::connect(addr, proxy).await {
62        Ok(sock) => {
63            peerdb.successful(MetricType::Connect, addr.clone());
64            sock
65        }
66        Err(err) => {
67            peerdb.error(MetricType::Connect, addr.clone());
68            return Err(err);
69        }
70    };
71    let (mut rx, mut tx) = sock.split();
72
73    // perform handshake
74    match net::handshake(&mut rx, &mut tx).await {
75        Ok(_) => {
76            peerdb.successful(MetricType::Handshake, addr.clone());
77        }
78        Err(err) => {
79            peerdb.error(MetricType::Handshake, addr.clone());
80            tx.shutdown().await.ok();
81            return Err(err);
82        }
83    }
84    peerdb.write();
85
86    // sync from peer
87    let result = sync::sync_pull(db, keyring, fingerprints, false, &mut tx, rx).await;
88
89    // shutdown connection
90    tx.shutdown().await.ok();
91    // peer.sync.successful();
92    result
93}
94
95#[derive(Debug, Default)]
96pub struct CooldownEntry {
97    tries: VecDeque<time::Instant>,
98}
99
100impl CooldownEntry {
101    fn filter(&mut self) {
102        let now = time::Instant::now();
103        self.tries.retain(|e| now < *e);
104    }
105
106    pub fn has_capacity(&mut self) -> bool {
107        self.filter();
108        self.tries.len() < COOLDOWN_HOST_THRESHOLD
109    }
110
111    pub fn mark_bad(&mut self) {
112        self.filter();
113        self.tries
114            .push_back(time::Instant::now() + COOLDOWN_HOST_AFTER_ERROR);
115    }
116}
117
118#[derive(Debug)]
119pub struct Cooldowns {
120    ip_cache: lru::LruCache<IpAddr, CooldownEntry>,
121    port_cache: lru::LruCache<PeerAddr, time::Instant>,
122}
123
124impl Cooldowns {
125    pub fn new() -> Self {
126        let ip_cache = lru::LruCache::new(NonZeroUsize::new(COOLDOWN_LRU_SIZE).unwrap());
127        let port_cache = lru::LruCache::new(NonZeroUsize::new(COOLDOWN_LRU_SIZE).unwrap());
128        Cooldowns {
129            ip_cache,
130            port_cache,
131        }
132    }
133
134    pub fn can_approach(&mut self, addr: &PeerAddr) -> bool {
135        let now = time::Instant::now();
136
137        if let PeerAddr::Inet(addr) = &addr {
138            if addr.port() != STANDARD_P2P_PORT {
139                if let Some(entry) = self.ip_cache.get_mut(&addr.ip()) {
140                    if !entry.has_capacity() {
141                        return false;
142                    }
143                }
144            }
145        }
146
147        if let Some(entry) = self.port_cache.get(addr) {
148            now >= *entry
149        } else {
150            true
151        }
152    }
153
154    pub fn mark_ok(&mut self, addr: PeerAddr) {
155        self.port_cache
156            .put(addr, time::Instant::now() + COOLDOWN_PORT_AFTER_SUCCESS);
157    }
158
159    pub fn mark_bad(&mut self, addr: PeerAddr) {
160        if let PeerAddr::Inet(addr) = &addr {
161            self.ip_cache
162                .get_or_insert_mut(addr.ip(), CooldownEntry::default)
163                .mark_bad();
164        }
165
166        self.port_cache
167            .put(addr, time::Instant::now() + COOLDOWN_PORT_AFTER_ERROR);
168    }
169}
170
171impl Default for Cooldowns {
172    fn default() -> Self {
173        Self::new()
174    }
175}
176
177pub async fn spawn<D: DatabaseClient + Sync + Send>(
178    db: &mut D,
179    keyring: Keyring,
180    peerdb: peerdb::Client,
181    proxy: Option<SocketAddr>,
182    mut rx: mpsc::Receiver<p2p::proto::SyncRequest>,
183) -> Result<Infallible> {
184    // keep track of connection attempts to avoid flooding
185    let mut cooldown = Cooldowns::new();
186
187    let mut interval = EasedInterval::new(P2P_SYNC_CONNECT_DELAY, P2P_SYNC_CONNECT_INTERVAL);
188    loop {
189        // Wait for request, or automatically connect to known peer
190        let req = tokio::select! {
191            req = rx.recv() => {
192                let Some(req) = req else { break };
193
194                // register all addresses as known before attempting to sync
195                peerdb.add_advertised_peers(req.addrs.clone());
196
197                req
198            }
199            _ = interval.tick() => {
200                // Automatically pick a known peer
201                let addrs = peerdb.sample(None).await?;
202                debug!("Automatically selected peers for periodic sync: {addrs:?}");
203                SyncRequest {
204                    hint: None,
205                    addrs,
206                }
207            }
208        };
209
210        // TODO: allow concurrent syncs
211
212        // sync from addresses
213        for addr in req.addrs {
214            // only connect if we're not already in sync
215            if let Some(hint) = &req.hint {
216                let fp = &hint.fp;
217                let (idx, _num) = db
218                    .index_from_scan(&sync::TreeQuery {
219                        fp: fp.clone(),
220                        hash_algo: "sha256".to_string(),
221                        prefix: None,
222                    })
223                    .await?;
224
225                if *hint.idx == idx {
226                    debug!(
227                        "We're already in sync with peer: addr={addr:?}, fp={fp:?}, idx={idx:?}"
228                    );
229                    continue;
230                }
231            }
232
233            // prepare connection
234            if let PeerAddr::Inet(addr) = &addr {
235                for block in P2P_BLOCK_LIST.iter() {
236                    if block.contains(addr.ip()) {
237                        debug!(
238                            "Address is on a blocklist, skipping: addr={addr:?}, block={block:?}"
239                        );
240                        continue;
241                    }
242                }
243                if P2P_ILLEGAL_PORTS.contains(&addr.port()) {
244                    debug!("Port is on blocklist, skipping: addr={addr:?}");
245                    continue;
246                }
247            }
248
249            if !cooldown.can_approach(&addr) {
250                debug!("Address is still in cooldown, skipping for now: {addr:?}");
251                continue;
252            }
253
254            p2p::random_jitter(P2P_SYNC_CONNECT_JITTER).await;
255
256            info!("Syncing from remote peer: {addr:?}");
257            let ret = pull_from_peer(db, &keyring, &peerdb, &[], &addr, proxy).await;
258            debug!("Connection to {addr:?} has been closed");
259            match ret {
260                Ok(_) => {
261                    cooldown.mark_ok(addr);
262                    break;
263                }
264                Err(err) => {
265                    warn!("Error while syncing from peer {addr:?}: {err:#}");
266                    cooldown.mark_bad(addr);
267                }
268            }
269            peerdb.write();
270        }
271    }
272
273    bail!("Peering task has crashed")
274}