Skip to main content

zebra_network/peer_set/
inventory_registry.rs

1//! Inventory Registry Implementation
2//!
3//! [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html
4
5use std::{
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10use futures::{FutureExt, Stream, StreamExt};
11use indexmap::IndexMap;
12use tokio::{
13    sync::broadcast,
14    time::{self, Instant},
15};
16use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream, IntervalStream};
17
18use zebra_chain::serialization::AtLeastOne;
19
20use crate::{
21    constants::INVENTORY_ROTATION_INTERVAL,
22    protocol::{external::InventoryHash, internal::InventoryResponse},
23    BoxError, PeerSocketAddr,
24};
25
26use self::update::Update;
27
28/// Underlying type for the alias InventoryStatus::*
29use InventoryResponse::*;
30
31pub mod update;
32
33#[cfg(test)]
34mod tests;
35
36/// The maximum number of inventory hashes we will track from a single peer.
37///
38/// # Security
39///
40/// This limits known memory denial of service attacks like <https://invdos.net/> to a total of:
41/// ```text
42/// 1000 inventory * 2 maps * 32-64 bytes per inventory = less than 1 MB
43/// 1000 inventory * 70 peers * 2 maps * 6-18 bytes per address = up to 3 MB
44/// ```
45///
46/// Since the inventory registry is an efficiency optimisation, which falls back to a
47/// random peer, we only need to track a small number of hashes for available inventory.
48///
49/// But we want to be able to track a significant amount of missing inventory,
50/// to limit queries for globally missing inventory.
51//
52// TODO: split this into available (25) and missing (1000 or more?)
53pub const MAX_INV_PER_MAP: usize = 1000;
54
55/// The maximum number of peers we will track inventory for.
56///
57/// # Security
58///
59/// This limits known memory denial of service attacks. See [`MAX_INV_PER_MAP`] for details.
60///
61/// Since the inventory registry is an efficiency optimisation, which falls back to a
62/// random peer, we only need to track a small number of peers per inv for available inventory.
63///
64/// But we want to be able to track missing inventory for almost all our peers,
65/// so we only query a few peers for inventory that is genuinely missing from the network.
66//
67// TODO: split this into available (25) and missing (70)
68pub const MAX_PEERS_PER_INV: usize = 70;
69
70/// A peer inventory status, which tracks a hash for both available and missing inventory.
71pub type InventoryStatus<T> = InventoryResponse<T, T>;
72
73/// A peer inventory status change, used in the inventory status channel.
74///
75/// For performance reasons, advertisements should only be tracked
76/// for hashes that are rare on the network.
77/// So Zebra only tracks single-block inventory messages.
78///
79/// For security reasons, all `notfound` rejections should be tracked.
80/// This also helps with performance, if the hash is rare on the network.
81pub type InventoryChange = InventoryStatus<(AtLeastOne<InventoryHash>, PeerSocketAddr)>;
82
83/// An internal marker used in inventory status hash maps.
84type InventoryMarker = InventoryStatus<()>;
85
86/// An Inventory Registry for tracking recent inventory advertisements and missing inventory.
87///
88/// For more details please refer to the [RFC].
89///
90/// [RFC]: https://zebra.zfnd.org/dev/rfcs/0003-inventory-tracking.html
91pub struct InventoryRegistry {
92    /// Map tracking the latest inventory status from the current interval
93    /// period.
94    //
95    // TODO: split maps into available and missing, so we can limit them separately.
96    current: IndexMap<InventoryHash, IndexMap<PeerSocketAddr, InventoryMarker>>,
97
98    /// Map tracking inventory statuses from the previous interval period.
99    prev: IndexMap<InventoryHash, IndexMap<PeerSocketAddr, InventoryMarker>>,
100
101    /// Stream of incoming inventory statuses to register.
102    inv_stream: Pin<
103        Box<dyn Stream<Item = Result<InventoryChange, BroadcastStreamRecvError>> + Send + 'static>,
104    >,
105
106    /// Interval tracking when we should next rotate our maps.
107    interval: IntervalStream,
108}
109
110impl std::fmt::Debug for InventoryRegistry {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        f.debug_struct("InventoryRegistry")
113            .field("current", &self.current)
114            .field("prev", &self.prev)
115            .finish()
116    }
117}
118
119impl InventoryChange {
120    /// Returns a new available inventory change from a single hash.
121    pub fn new_available(hash: InventoryHash, peer: PeerSocketAddr) -> Self {
122        let bv = AtLeastOne::from_vec(vec![hash]).expect("bounded vec must fit");
123        InventoryStatus::Available((bv, peer))
124    }
125
126    /// Returns a new missing inventory change from a single hash.
127    #[allow(dead_code)]
128    pub fn new_missing(hash: InventoryHash, peer: PeerSocketAddr) -> Self {
129        let bv = AtLeastOne::from_vec(vec![hash]).expect("bounded vec must fit");
130        InventoryStatus::Missing((bv, peer))
131    }
132
133    /// Returns a new available multiple inventory change, if `hashes` contains at least one change.
134    pub fn new_available_multi<'a>(
135        hashes: impl IntoIterator<Item = &'a InventoryHash>,
136        peer: PeerSocketAddr,
137    ) -> Option<Self> {
138        let mut hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();
139
140        // # Security
141        //
142        // Don't send more hashes than we're going to store.
143        // It doesn't matter which hashes we choose, because this is an efficiency optimisation.
144        //
145        //  This limits known memory denial of service attacks to:
146        // `1000 hashes * 200 peers/channel capacity * 32-64 bytes = up to 12 MB`
147        hashes.truncate(MAX_INV_PER_MAP);
148
149        let hashes = hashes.try_into().ok();
150
151        hashes.map(|hashes| InventoryStatus::Available((hashes, peer)))
152    }
153
154    /// Returns a new missing multiple inventory change, if `hashes` contains at least one change.
155    pub fn new_missing_multi<'a>(
156        hashes: impl IntoIterator<Item = &'a InventoryHash>,
157        peer: PeerSocketAddr,
158    ) -> Option<Self> {
159        let mut hashes: Vec<InventoryHash> = hashes.into_iter().copied().collect();
160
161        // # Security
162        //
163        // Don't send more hashes than we're going to store.
164        // It doesn't matter which hashes we choose, because this is an efficiency optimisation.
165        hashes.truncate(MAX_INV_PER_MAP);
166
167        let hashes = hashes.try_into().ok();
168
169        hashes.map(|hashes| InventoryStatus::Missing((hashes, peer)))
170    }
171}
172
173impl<T> InventoryStatus<T> {
174    /// Get a marker for the status, without any associated data.
175    pub fn marker(&self) -> InventoryMarker {
176        self.as_ref().map(|_inner| ())
177    }
178
179    /// Maps an `InventoryStatus<T>` to `InventoryStatus<U>` by applying a function to a contained value.
180    pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> InventoryStatus<U> {
181        // Based on Option::map from https://doc.rust-lang.org/src/core/option.rs.html#844
182        match self {
183            Available(item) => Available(f(item)),
184            Missing(item) => Missing(f(item)),
185        }
186    }
187}
188
189impl<T: Clone> InventoryStatus<T> {
190    /// Returns a clone of the inner item, regardless of status.
191    pub fn to_inner(&self) -> T {
192        match self {
193            Available(item) | Missing(item) => item.clone(),
194        }
195    }
196}
197
198impl InventoryRegistry {
199    /// Returns a new Inventory Registry for `inv_stream`.
200    pub fn new(inv_stream: broadcast::Receiver<InventoryChange>) -> Self {
201        let interval = INVENTORY_ROTATION_INTERVAL;
202
203        // Don't do an immediate rotation, current and prev are already empty.
204        let mut interval = tokio::time::interval_at(Instant::now() + interval, interval);
205        // # Security
206        //
207        // If the rotation time is late, execute as many ticks as needed to catch up.
208        // This is a tradeoff between memory usage and quickly accessing remote data
209        // under heavy load. Bursting prioritises lower memory usage.
210        //
211        // Skipping or delaying could keep peer inventory in memory for a longer time,
212        // further increasing memory load or delays due to virtual memory swapping.
213        interval.set_missed_tick_behavior(time::MissedTickBehavior::Burst);
214
215        Self {
216            current: Default::default(),
217            prev: Default::default(),
218            inv_stream: BroadcastStream::new(inv_stream).boxed(),
219            interval: IntervalStream::new(interval),
220        }
221    }
222
223    /// Returns an iterator over addrs of peers that have recently advertised `hash` in their inventory.
224    pub fn advertising_peers(&self, hash: InventoryHash) -> impl Iterator<Item = &PeerSocketAddr> {
225        self.status_peers(hash)
226            .filter_map(|addr_status| addr_status.available())
227    }
228
229    /// Returns an iterator over addrs of peers that have recently missed `hash` in their inventory.
230    #[allow(dead_code)]
231    pub fn missing_peers(&self, hash: InventoryHash) -> impl Iterator<Item = &PeerSocketAddr> {
232        self.status_peers(hash)
233            .filter_map(|addr_status| addr_status.missing())
234    }
235
236    /// Returns an iterator over peer inventory statuses for `hash`.
237    ///
238    /// Prefers current statuses to previously rotated statuses for the same peer.
239    pub fn status_peers(
240        &self,
241        hash: InventoryHash,
242    ) -> impl Iterator<Item = InventoryStatus<&PeerSocketAddr>> {
243        let prev = self.prev.get(&hash);
244        let current = self.current.get(&hash);
245
246        // # Security
247        //
248        // Prefer `current` statuses for the same peer over previously rotated statuses.
249        // This makes sure Zebra is using the most up-to-date network information.
250        let prev = prev
251            .into_iter()
252            .flatten()
253            .filter(move |(addr, _status)| !self.has_current_status(hash, **addr));
254        let current = current.into_iter().flatten();
255
256        current
257            .chain(prev)
258            .map(|(addr, status)| status.map(|()| addr))
259    }
260
261    /// Returns true if there is a current status entry for `hash` and `addr`.
262    pub fn has_current_status(&self, hash: InventoryHash, addr: PeerSocketAddr) -> bool {
263        self.current
264            .get(&hash)
265            .and_then(|current| current.get(&addr))
266            .is_some()
267    }
268
269    /// Returns an iterator over peer inventory status hashes.
270    ///
271    /// Yields current statuses first, then previously rotated statuses.
272    /// This can include multiple statuses for the same hash.
273    #[allow(dead_code)]
274    pub fn status_hashes(
275        &self,
276    ) -> impl Iterator<Item = (&InventoryHash, &IndexMap<PeerSocketAddr, InventoryMarker>)> {
277        self.current.iter().chain(self.prev.iter())
278    }
279
280    /// Returns a future that waits for new registry updates.
281    #[allow(dead_code)]
282    pub fn update(&mut self) -> Update<'_> {
283        Update::new(self)
284    }
285
286    /// Drive periodic inventory tasks.
287    ///
288    /// Rotates the inventory HashMaps on every timer tick.
289    /// Drains the inv_stream channel and registers all advertised inventory.
290    ///
291    /// Returns an error if the inventory channel is closed.
292    ///
293    /// Otherwise, returns `Ok` if it performed at least one update or rotation, or `Poll::Pending`
294    /// if there was no inventory change. Always registers a wakeup for the next inventory update
295    /// or rotation, even when it returns `Ok`.
296    pub fn poll_inventory(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), BoxError>> {
297        let mut result = Poll::Pending;
298
299        // # Correctness
300        //
301        // Registers the current task for wakeup when the timer next becomes ready.
302        // (But doesn't return, because we also want to register the task for wakeup when more
303        // inventory arrives.)
304        //
305        // # Security
306        //
307        // Only rotate one inventory per peer request, to give the next inventory
308        // time to gather some peer advertisements. This is a tradeoff between
309        // memory usage and quickly accessing remote data under heavy load.
310        //
311        // This prevents a burst edge case where all inventory is emptied after
312        // two interval ticks are delayed.
313        if Pin::new(&mut self.interval).poll_next(cx).is_ready() {
314            self.rotate();
315            result = Poll::Ready(Ok(()));
316        }
317
318        // This module uses a broadcast channel instead of an mpsc channel, even
319        // though there's a single consumer of inventory advertisements, because
320        // the broadcast channel has ring-buffer behavior: when the channel is
321        // full, sending a new message displaces the oldest message in the
322        // channel.
323        //
324        // This is the behavior we want for inventory advertisements, because we
325        // want to have a bounded buffer of unprocessed advertisements, and we
326        // want to prioritize new inventory (which is likely only at a specific
327        // peer) over old inventory (which is likely more widely distributed).
328        //
329        // The broadcast channel reports dropped messages by returning
330        // `RecvError::Lagged`. It's crucial that we handle that error here
331        // rather than propagating it through the peer set's Service::poll_ready
332        // implementation, where reporting a failure means reporting a permanent
333        // failure of the peer set.
334
335        // Returns Pending if all messages are processed, but the channel could get more.
336        loop {
337            let channel_result = self.inv_stream.next().poll_unpin(cx);
338
339            match channel_result {
340                Poll::Ready(Some(Ok(change))) => {
341                    self.register(change);
342                    result = Poll::Ready(Ok(()));
343                }
344                Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(count)))) => {
345                    // This isn't a fatal inventory error, it's expected behaviour when Zebra is
346                    // under load from peers.
347                    metrics::counter!("pool.inventory.dropped").increment(1);
348                    metrics::counter!("pool.inventory.dropped.messages").increment(count);
349
350                    // If this message happens a lot, we should improve inventory registry
351                    // performance, or poll the registry or peer set in a separate task.
352                    info!(count, "dropped lagged inventory advertisements");
353                }
354                Poll::Ready(None) => {
355                    // If the channel is empty and returns None, all senders, including the one in
356                    // the handshaker, have been dropped, which really is a permanent failure.
357                    result = Poll::Ready(Err(broadcast::error::RecvError::Closed.into()));
358                }
359                Poll::Pending => {
360                    break;
361                }
362            }
363        }
364
365        result
366    }
367
368    /// Record the given inventory `change` for the peer `addr`.
369    ///
370    /// `Missing` markers are not updated until the registry rotates, for security reasons.
371    fn register(&mut self, change: InventoryChange) {
372        let new_status = change.marker();
373        let (invs, addr) = change.to_inner();
374
375        for inv in invs {
376            use InventoryHash::*;
377            assert!(
378                matches!(inv, Block(_) | Tx(_) | Wtx(_)),
379                "unexpected inventory type: {inv:?} from peer: {addr:?}",
380            );
381
382            let hash_peers = self.current.entry(inv).or_default();
383
384            // # Security
385            //
386            // Prefer `missing` over `advertised`, so malicious peers can't reset their own entries,
387            // and funnel multiple failing requests to themselves.
388            if let Some(old_status) = hash_peers.get(&addr) {
389                if old_status.is_missing() && new_status.is_available() {
390                    debug!(?new_status, ?old_status, ?addr, ?inv, "skipping new status");
391                    continue;
392                }
393
394                debug!(
395                    ?new_status,
396                    ?old_status,
397                    ?addr,
398                    ?inv,
399                    "keeping both new and old status"
400                );
401            }
402
403            let replaced_status = hash_peers.insert(addr, new_status);
404
405            debug!(
406                ?new_status,
407                ?replaced_status,
408                ?addr,
409                ?inv,
410                "inserted new status"
411            );
412
413            // # Security
414            //
415            // Limit the number of stored peers per hash, removing the oldest entries,
416            // because newer entries are likely to be more relevant.
417            //
418            // TODO: do random or weighted random eviction instead?
419            if hash_peers.len() > MAX_PEERS_PER_INV {
420                // Performance: `MAX_PEERS_PER_INV` is small, so O(n) performance is acceptable.
421                hash_peers.shift_remove_index(0);
422            }
423
424            // # Security
425            //
426            // Limit the number of stored inventory hashes, removing the oldest entries,
427            // because newer entries are likely to be more relevant.
428            //
429            // TODO: do random or weighted random eviction instead?
430            if self.current.len() > MAX_INV_PER_MAP {
431                // Performance: `MAX_INV_PER_MAP` is small, so O(n) performance is acceptable.
432                self.current.shift_remove_index(0);
433            }
434        }
435    }
436
437    /// Replace the prev HashMap with current's and replace current with an empty
438    /// HashMap
439    fn rotate(&mut self) {
440        self.prev = std::mem::take(&mut self.current);
441    }
442}