dynomite/cluster/capability/negotiator.rs
1//! Negotiation logic shared between [`CapabilityRegistry`] and
2//! its tests.
3//!
4//! The flow is:
5//!
6//! 1. For every capability the local node has registered, look
7//! up the matching entry in the peer ad (by capability name).
8//! 2. If the peer advertises the capability, ask the local cap
9//! to merge: this returns the highest local value also
10//! supported by the peer, or `None` if there is no overlap.
11//! 3. If the peer does not advertise the capability, or the
12//! merge returned `None`, fall back to the local "floor"
13//! value (the lowest-preference local value).
14//!
15//! Capabilities that the peer advertises but the local node has
16//! never registered are silently ignored: there is nothing the
17//! local node can pick because it does not know how to decode
18//! the values.
19//!
20//! [`CapabilityRegistry`]: super::CapabilityRegistry
21
22use std::collections::HashMap;
23
24use crate::cluster::capability::registry::{CapabilityAd, CapabilityRegistry};
25
26/// Outcome of a single round of negotiation.
27///
28/// Each entry is `(capability name, encoded chosen value)`. The
29/// caller decodes the value via the registered
30/// [`crate::cluster::capability::Capability::decode_value`] of
31/// the matching cap.
32#[derive(Clone, Debug, Default, Eq, PartialEq)]
33pub struct NegotiatedCapabilities {
34 chosen: HashMap<String, Vec<u8>>,
35}
36
37impl NegotiatedCapabilities {
38 /// Construct an empty result.
39 #[must_use]
40 pub fn new() -> Self {
41 Self::default()
42 }
43
44 /// Insert or overwrite the chosen value for `name`.
45 pub fn insert(&mut self, name: String, value: Vec<u8>) {
46 self.chosen.insert(name, value);
47 }
48
49 /// Look up the encoded value picked for `name`.
50 #[must_use]
51 pub fn get(&self, name: &str) -> Option<&[u8]> {
52 self.chosen.get(name).map(Vec::as_slice)
53 }
54
55 /// True when the result holds no entries.
56 #[must_use]
57 pub fn is_empty(&self) -> bool {
58 self.chosen.is_empty()
59 }
60
61 /// Number of negotiated entries.
62 #[must_use]
63 pub fn len(&self) -> usize {
64 self.chosen.len()
65 }
66
67 /// Iterate over `(name, value-bytes)` pairs.
68 pub fn iter(&self) -> impl Iterator<Item = (&String, &Vec<u8>)> {
69 self.chosen.iter()
70 }
71}
72
73/// Compute the negotiated value for every locally registered
74/// capability, falling back to the floor when the peer does not
75/// advertise overlap.
76///
77/// The function is exported as `pub` at the module level so
78/// alternative storage layouts (or future test harnesses that
79/// wrap a registry) can reuse it without duplicating the
80/// floor-fallback logic.
81pub(crate) fn negotiate_with_floor(
82 registry: &CapabilityRegistry,
83 peer_ad: &CapabilityAd,
84) -> NegotiatedCapabilities {
85 let slots = registry.slots_for_negotiation();
86 // Build a peer lookup once: name -> peer-supplied byte
87 // blobs.
88 let mut peer_by_name: HashMap<&str, &[Vec<u8>]> =
89 HashMap::with_capacity(peer_ad.entries().len());
90 for entry in peer_ad.entries() {
91 peer_by_name.insert(entry.name(), entry.supported());
92 }
93 let mut out = NegotiatedCapabilities::new();
94 for (name, slot) in slots {
95 let chosen = if let Some(peer_supported) = peer_by_name.get(name) {
96 match slot.merge_bytes(peer_supported) {
97 Some(v) => v,
98 None => slot.floor_bytes().to_vec(),
99 }
100 } else {
101 // Peer never declared this capability - the safest
102 // assumption is "peer only supports the floor". The
103 // local node treats the floor as authoritative.
104 slot.floor_bytes().to_vec()
105 };
106 out.insert((*name).to_string(), chosen);
107 }
108 // Capabilities the peer ships but the local node does not
109 // know about are ignored: there is no way to pick a value
110 // for an unknown cap because the registry cannot decode it.
111 out
112}