Skip to main content

dynomite/cluster/capability/
negotiator.rs

1//! Negotiation logic shared between [`CapabilityRegistry`] and
2//! its tests.
3//!
4//! The flow is:
5//!
6//! 1. For every capability the local node has registered, look
7//!    up the matching entry in the peer ad (by capability name).
8//! 2. If the peer advertises the capability, ask the local cap
9//!    to merge: this returns the highest local value also
10//!    supported by the peer, or `None` if there is no overlap.
11//! 3. If the peer does not advertise the capability, or the
12//!    merge returned `None`, fall back to the local "floor"
13//!    value (the lowest-preference local value).
14//!
15//! Capabilities that the peer advertises but the local node has
16//! never registered are silently ignored: there is nothing the
17//! local node can pick because it does not know how to decode
18//! the values.
19//!
20//! [`CapabilityRegistry`]: super::CapabilityRegistry
21
22use std::collections::HashMap;
23
24use crate::cluster::capability::registry::{CapabilityAd, CapabilityRegistry};
25
26/// Outcome of a single round of negotiation.
27///
28/// Each entry is `(capability name, encoded chosen value)`. The
29/// caller decodes the value via the registered
30/// [`crate::cluster::capability::Capability::decode_value`] of
31/// the matching cap.
32#[derive(Clone, Debug, Default, Eq, PartialEq)]
33pub struct NegotiatedCapabilities {
34    chosen: HashMap<String, Vec<u8>>,
35}
36
37impl NegotiatedCapabilities {
38    /// Construct an empty result.
39    #[must_use]
40    pub fn new() -> Self {
41        Self::default()
42    }
43
44    /// Insert or overwrite the chosen value for `name`.
45    pub fn insert(&mut self, name: String, value: Vec<u8>) {
46        self.chosen.insert(name, value);
47    }
48
49    /// Look up the encoded value picked for `name`.
50    #[must_use]
51    pub fn get(&self, name: &str) -> Option<&[u8]> {
52        self.chosen.get(name).map(Vec::as_slice)
53    }
54
55    /// True when the result holds no entries.
56    #[must_use]
57    pub fn is_empty(&self) -> bool {
58        self.chosen.is_empty()
59    }
60
61    /// Number of negotiated entries.
62    #[must_use]
63    pub fn len(&self) -> usize {
64        self.chosen.len()
65    }
66
67    /// Iterate over `(name, value-bytes)` pairs.
68    pub fn iter(&self) -> impl Iterator<Item = (&String, &Vec<u8>)> {
69        self.chosen.iter()
70    }
71}
72
73/// Compute the negotiated value for every locally registered
74/// capability, falling back to the floor when the peer does not
75/// advertise overlap.
76///
77/// The function is exported as `pub` at the module level so
78/// alternative storage layouts (or future test harnesses that
79/// wrap a registry) can reuse it without duplicating the
80/// floor-fallback logic.
81pub(crate) fn negotiate_with_floor(
82    registry: &CapabilityRegistry,
83    peer_ad: &CapabilityAd,
84) -> NegotiatedCapabilities {
85    let slots = registry.slots_for_negotiation();
86    // Build a peer lookup once: name -> peer-supplied byte
87    // blobs.
88    let mut peer_by_name: HashMap<&str, &[Vec<u8>]> =
89        HashMap::with_capacity(peer_ad.entries().len());
90    for entry in peer_ad.entries() {
91        peer_by_name.insert(entry.name(), entry.supported());
92    }
93    let mut out = NegotiatedCapabilities::new();
94    for (name, slot) in slots {
95        let chosen = if let Some(peer_supported) = peer_by_name.get(name) {
96            match slot.merge_bytes(peer_supported) {
97                Some(v) => v,
98                None => slot.floor_bytes().to_vec(),
99            }
100        } else {
101            // Peer never declared this capability - the safest
102            // assumption is "peer only supports the floor". The
103            // local node treats the floor as authoritative.
104            slot.floor_bytes().to_vec()
105        };
106        out.insert((*name).to_string(), chosen);
107    }
108    // Capabilities the peer ships but the local node does not
109    // know about are ignored: there is no way to pick a value
110    // for an unknown cap because the registry cannot decode it.
111    out
112}