zebra_network/peer_set/inventory_registry.rs
1//! Inventory Registry Implementation
2//!
3//! [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html
4
5use std::{
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use futures::{FutureExt, Stream, StreamExt};
11use indexmap::IndexMap;
12use tokio::{
13 sync::broadcast,
14 time::{self, Instant},
15};
16use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream, IntervalStream};
17
18use zebra_chain::serialization::AtLeastOne;
19
20use crate::{
21 constants::INVENTORY_ROTATION_INTERVAL,
22 protocol::{external::InventoryHash, internal::InventoryResponse},
23 BoxError, PeerSocketAddr,
24};
25
26use self::update::Update;
27
28/// Underlying type for the alias InventoryStatus::*
29use InventoryResponse::*;
30
31pub mod update;
32
33#[cfg(test)]
34mod tests;
35
36/// The maximum number of inventory hashes we will track from a single peer.
37///
38/// # Security
39///
40/// This limits known memory denial of service attacks like <https://invdos.net/> to a total of:
41/// ```text
42/// 1000 inventory * 2 maps * 32-64 bytes per inventory = less than 1 MB
43/// 1000 inventory * 70 peers * 2 maps * 6-18 bytes per address = up to 3 MB
44/// ```
45///
46/// Since the inventory registry is an efficiency optimisation, which falls back to a
47/// random peer, we only need to track a small number of hashes for available inventory.
48///
49/// But we want to be able to track a significant amount of missing inventory,
50/// to limit queries for globally missing inventory.
51//
52// TODO: split this into available (25) and missing (1000 or more?)
53pub const MAX_INV_PER_MAP: usize = 1000;
54
55/// The maximum number of peers we will track inventory for.
56///
57/// # Security
58///
59/// This limits known memory denial of service attacks. See [`MAX_INV_PER_MAP`] for details.
60///
61/// Since the inventory registry is an efficiency optimisation, which falls back to a
62/// random peer, we only need to track a small number of peers per inv for available inventory.
63///
64/// But we want to be able to track missing inventory for almost all our peers,
65/// so we only query a few peers for inventory that is genuinely missing from the network.
66//
67// TODO: split this into available (25) and missing (70)
68pub const MAX_PEERS_PER_INV: usize = 70;
69
70/// A peer inventory status, which tracks a hash for both available and missing inventory.
71pub type InventoryStatus<T> = InventoryResponse<T, T>;
72
73/// A peer inventory status change, used in the inventory status channel.
74///
75/// For performance reasons, advertisements should only be tracked
76/// for hashes that are rare on the network.
77/// So Zebra only tracks single-block inventory messages.
78///
79/// For security reasons, all `notfound` rejections should be tracked.
80/// This also helps with performance, if the hash is rare on the network.
81pub type InventoryChange = InventoryStatus<(AtLeastOne<InventoryHash>, PeerSocketAddr)>;
82
83/// An internal marker used in inventory status hash maps.
84type InventoryMarker = InventoryStatus<()>;
85
86/// An Inventory Registry for tracking recent inventory advertisements and missing inventory.
87///
88/// For more details please refer to the [RFC].
89///
90/// [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html
91pub struct InventoryRegistry {
92 /// Map tracking the latest inventory status from the current interval
93 /// period.
94 //
95 // TODO: split maps into available and missing, so we can limit them separately.
96 current: IndexMap<InventoryHash, IndexMap<PeerSocketAddr, InventoryMarker>>,
97
98 /// Map tracking inventory statuses from the previous interval period.
99 prev: IndexMap<InventoryHash, IndexMap<PeerSocketAddr, InventoryMarker>>,
100
101 /// Stream of incoming inventory statuses to register.
102 inv_stream: Pin<
103 Box<dyn Stream<Item = Result<InventoryChange, BroadcastStreamRecvError>> + Send + 'static>,
104 >,
105
106 /// Interval tracking when we should next rotate our maps.
107 interval: IntervalStream,
108}
109
110impl std::fmt::Debug for InventoryRegistry {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 f.debug_struct("InventoryRegistry")
113 .field("current", &self.current)
114 .field("prev", &self.prev)
115 .finish()
116 }
117}
118
119impl InventoryChange {
120 /// Returns a new available inventory change from a single hash.
121 pub fn new_available(hash: InventoryHash, peer: PeerSocketAddr) -> Self {
122 let bv = AtLeastOne::from_vec(vec![hash]).expect("bounded vec must fit");
123 InventoryStatus::Available((bv, peer))
124 }
125
126 /// Returns a new missing inventory change from a single hash.
127 #[allow(dead_code)]
128 pub fn new_missing(hash: InventoryHash, peer: PeerSocketAddr) -> Self {
129 let bv = AtLeastOne::from_vec(vec![hash]).expect("bounded vec must fit");
130 InventoryStatus::Missing((bv, peer))
131 }
132
133 /// Returns a new available multiple inventory change, if `hashes` contains at least one change.
134 pub fn new_available_multi<'a>(
135 hashes: impl IntoIterator<Item = &'a InventoryHash>,
136 peer: PeerSocketAddr,
137 ) -> Option<Self> {
138 let mut hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();
139
140 // # Security
141 //
142 // Don't send more hashes than we're going to store.
143 // It doesn't matter which hashes we choose, because this is an efficiency optimisation.
144 //
145 // This limits known memory denial of service attacks to:
146 // `1000 hashes * 200 peers/channel capacity * 32-64 bytes = up to 12 MB`
147 hashes.truncate(MAX_INV_PER_MAP);
148
149 let hashes = hashes.try_into().ok();
150
151 hashes.map(|hashes| InventoryStatus::Available((hashes, peer)))
152 }
153
154 /// Returns a new missing multiple inventory change, if `hashes` contains at least one change.
155 pub fn new_missing_multi<'a>(
156 hashes: impl IntoIterator<Item = &'a InventoryHash>,
157 peer: PeerSocketAddr,
158 ) -> Option<Self> {
159 let mut hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();
160
161 // # Security
162 //
163 // Don't send more hashes than we're going to store.
164 // It doesn't matter which hashes we choose, because this is an efficiency optimisation.
165 hashes.truncate(MAX_INV_PER_MAP);
166
167 let hashes = hashes.try_into().ok();
168
169 hashes.map(|hashes| InventoryStatus::Missing((hashes, peer)))
170 }
171}
172
173impl<T> InventoryStatus<T> {
174 /// Get a marker for the status, without any associated data.
175 pub fn marker(&self) -> InventoryMarker {
176 self.as_ref().map(|_inner| ())
177 }
178
179 /// Maps an `InventoryStatus<T>` to `InventoryStatus<U>` by applying a function to a contained value.
180 pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> InventoryStatus<U> {
181 // Based on Option::map from https://doc.rust-lang.org/src/core/option.rs.html#844
182 match self {
183 Available(item) => Available(f(item)),
184 Missing(item) => Missing(f(item)),
185 }
186 }
187}
188
189impl<T: Clone> InventoryStatus<T> {
190 /// Returns a clone of the inner item, regardless of status.
191 pub fn to_inner(&self) -> T {
192 match self {
193 Available(item) | Missing(item) => item.clone(),
194 }
195 }
196}
197
198impl InventoryRegistry {
199 /// Returns a new Inventory Registry for `inv_stream`.
200 pub fn new(inv_stream: broadcast::Receiver<InventoryChange>) -> Self {
201 let interval = INVENTORY_ROTATION_INTERVAL;
202
203 // Don't do an immediate rotation, current and prev are already empty.
204 let mut interval = tokio::time::interval_at(Instant::now() + interval, interval);
205 // # Security
206 //
207 // If the rotation time is late, execute as many ticks as needed to catch up.
208 // This is a tradeoff between memory usage and quickly accessing remote data
209 // under heavy load. Bursting prioritises lower memory usage.
210 //
211 // Skipping or delaying could keep peer inventory in memory for a longer time,
212 // further increasing memory load or delays due to virtual memory swapping.
213 interval.set_missed_tick_behavior(time::MissedTickBehavior::Burst);
214
215 Self {
216 current: Default::default(),
217 prev: Default::default(),
218 inv_stream: BroadcastStream::new(inv_stream).boxed(),
219 interval: IntervalStream::new(interval),
220 }
221 }
222
223 /// Returns an iterator over addrs of peers that have recently advertised `hash` in their inventory.
224 pub fn advertising_peers(&self, hash: InventoryHash) -> impl Iterator<Item = &PeerSocketAddr> {
225 self.status_peers(hash)
226 .filter_map(|addr_status| addr_status.available())
227 }
228
229 /// Returns an iterator over addrs of peers that have recently missed `hash` in their inventory.
230 #[allow(dead_code)]
231 pub fn missing_peers(&self, hash: InventoryHash) -> impl Iterator<Item = &PeerSocketAddr> {
232 self.status_peers(hash)
233 .filter_map(|addr_status| addr_status.missing())
234 }
235
236 /// Returns an iterator over peer inventory statuses for `hash`.
237 ///
238 /// Prefers current statuses to previously rotated statuses for the same peer.
239 pub fn status_peers(
240 &self,
241 hash: InventoryHash,
242 ) -> impl Iterator<Item = InventoryStatus<&PeerSocketAddr>> {
243 let prev = self.prev.get(&hash);
244 let current = self.current.get(&hash);
245
246 // # Security
247 //
248 // Prefer `current` statuses for the same peer over previously rotated statuses.
249 // This makes sure Zebra is using the most up-to-date network information.
250 let prev = prev
251 .into_iter()
252 .flatten()
253 .filter(move |(addr, _status)| !self.has_current_status(hash, **addr));
254 let current = current.into_iter().flatten();
255
256 current
257 .chain(prev)
258 .map(|(addr, status)| status.map(|()| addr))
259 }
260
261 /// Returns true if there is a current status entry for `hash` and `addr`.
262 pub fn has_current_status(&self, hash: InventoryHash, addr: PeerSocketAddr) -> bool {
263 self.current
264 .get(&hash)
265 .and_then(|current| current.get(&addr))
266 .is_some()
267 }
268
269 /// Returns an iterator over peer inventory status hashes.
270 ///
271 /// Yields current statuses first, then previously rotated statuses.
272 /// This can include multiple statuses for the same hash.
273 #[allow(dead_code)]
274 pub fn status_hashes(
275 &self,
276 ) -> impl Iterator<Item = (&InventoryHash, &IndexMap<PeerSocketAddr, InventoryMarker>)> {
277 self.current.iter().chain(self.prev.iter())
278 }
279
280 /// Returns a future that waits for new registry updates.
281 #[allow(dead_code)]
282 pub fn update(&mut self) -> Update<'_> {
283 Update::new(self)
284 }
285
286 /// Drive periodic inventory tasks.
287 ///
288 /// Rotates the inventory HashMaps on every timer tick.
289 /// Drains the inv_stream channel and registers all advertised inventory.
290 ///
291 /// Returns an error if the inventory channel is closed.
292 ///
293 /// Otherwise, returns `Ok` if it performed at least one update or rotation, or `Poll::Pending`
294 /// if there was no inventory change. Always registers a wakeup for the next inventory update
295 /// or rotation, even when it returns `Ok`.
296 pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
297 let mut result = Poll::Pending;
298
299 // # Correctness
300 //
301 // Registers the current task for wakeup when the timer next becomes ready.
302 // (But doesn't return, because we also want to register the task for wakeup when more
303 // inventory arrives.)
304 //
305 // # Security
306 //
307 // Only rotate one inventory per peer request, to give the next inventory
308 // time to gather some peer advertisements. This is a tradeoff between
309 // memory usage and quickly accessing remote data under heavy load.
310 //
311 // This prevents a burst edge case where all inventory is emptied after
312 // two interval ticks are delayed.
313 if Pin::new(&mut self.interval).poll_next(cx).is_ready() {
314 self.rotate();
315 result = Poll::Ready(Ok(()));
316 }
317
318 // This module uses a broadcast channel instead of an mpsc channel, even
319 // though there's a single consumer of inventory advertisements, because
320 // the broadcast channel has ring-buffer behavior: when the channel is
321 // full, sending a new message displaces the oldest message in the
322 // channel.
323 //
324 // This is the behavior we want for inventory advertisements, because we
325 // want to have a bounded buffer of unprocessed advertisements, and we
326 // want to prioritize new inventory (which is likely only at a specific
327 // peer) over old inventory (which is likely more widely distributed).
328 //
329 // The broadcast channel reports dropped messages by returning
330 // `RecvError::Lagged`. It's crucial that we handle that error here
331 // rather than propagating it through the peer set's Service::poll_ready
332 // implementation, where reporting a failure means reporting a permanent
333 // failure of the peer set.
334
335 // Returns Pending if all messages are processed, but the channel could get more.
336 loop {
337 let channel_result = self.inv_stream.next().poll_unpin(cx);
338
339 match channel_result {
340 Poll::Ready(Some(Ok(change))) => {
341 self.register(change);
342 result = Poll::Ready(Ok(()));
343 }
344 Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(count)))) => {
345 // This isn't a fatal inventory error, it's expected behaviour when Zebra is
346 // under load from peers.
347 metrics::counter!("pool.inventory.dropped").increment(1);
348 metrics::counter!("pool.inventory.dropped.messages").increment(count);
349
350 // If this message happens a lot, we should improve inventory registry
351 // performance, or poll the registry or peer set in a separate task.
352 info!(count, "dropped lagged inventory advertisements");
353 }
354 Poll::Ready(None) => {
355 // If the channel is empty and returns None, all senders, including the one in
356 // the handshaker, have been dropped, which really is a permanent failure.
357 result = Poll::Ready(Err(broadcast::error::RecvError::Closed.into()));
358 }
359 Poll::Pending => {
360 break;
361 }
362 }
363 }
364
365 result
366 }
367
368 /// Record the given inventory `change` for the peer `addr`.
369 ///
370 /// `Missing` markers are not updated until the registry rotates, for security reasons.
371 fn register(&mut self, change: InventoryChange) {
372 let new_status = change.marker();
373 let (invs, addr) = change.to_inner();
374
375 for inv in invs {
376 use InventoryHash::*;
377 assert!(
378 matches!(inv, Block(_) | Tx(_) | Wtx(_)),
379 "unexpected inventory type: {inv:?} from peer: {addr:?}",
380 );
381
382 let hash_peers = self.current.entry(inv).or_default();
383
384 // # Security
385 //
386 // Prefer `missing` over `advertised`, so malicious peers can't reset their own entries,
387 // and funnel multiple failing requests to themselves.
388 if let Some(old_status) = hash_peers.get(&addr) {
389 if old_status.is_missing() && new_status.is_available() {
390 debug!(?new_status, ?old_status, ?addr, ?inv, "skipping new status");
391 continue;
392 }
393
394 debug!(
395 ?new_status,
396 ?old_status,
397 ?addr,
398 ?inv,
399 "keeping both new and old status"
400 );
401 }
402
403 let replaced_status = hash_peers.insert(addr, new_status);
404
405 debug!(
406 ?new_status,
407 ?replaced_status,
408 ?addr,
409 ?inv,
410 "inserted new status"
411 );
412
413 // # Security
414 //
415 // Limit the number of stored peers per hash, removing the oldest entries,
416 // because newer entries are likely to be more relevant.
417 //
418 // TODO: do random or weighted random eviction instead?
419 if hash_peers.len() > MAX_PEERS_PER_INV {
420 // Performance: `MAX_PEERS_PER_INV` is small, so O(n) performance is acceptable.
421 hash_peers.shift_remove_index(0);
422 }
423
424 // # Security
425 //
426 // Limit the number of stored inventory hashes, removing the oldest entries,
427 // because newer entries are likely to be more relevant.
428 //
429 // TODO: do random or weighted random eviction instead?
430 if self.current.len() > MAX_INV_PER_MAP {
431 // Performance: `MAX_INV_PER_MAP` is small, so O(n) performance is acceptable.
432 self.current.shift_remove_index(0);
433 }
434 }
435 }
436
437 /// Replace the prev HashMap with current's and replace current with an empty
438 /// HashMap
439 fn rotate(&mut self) {
440 self.prev = std::mem::take(&mut self.current);
441 }
442}