Skip to main content

sim_lib_stream_fabric/
holders.rs

1//! Grow-only content holder index.
2//!
3//! A holder index answers "who holds content id X" for
4//! [`ContentAddressedFabric`](crate::ContentAddressedFabric). It carries only
5//! holder facts, never content values or transport policy.
6
7use std::collections::{BTreeMap, BTreeSet};
8use std::sync::{Mutex, MutexGuard};
9
10use sim_kernel::{Error, Result, Symbol};
11
12use crate::{ContentKey, HeldContent};
13
14/// A grow-only index of which nodes hold which content ids.
15///
16/// Content never mutates, so a holder fact is monotonic: it is either useful or
17/// stale because the holder is unreachable. Merge is set union, giving the
18/// index convergence without coordination or consensus. A stale announcement
19/// degrades to a peer miss and the fabric continues to any other holder.
20#[derive(Default)]
21pub struct HoldingIndex {
22    holders: Mutex<BTreeMap<ContentKey, BTreeSet<Symbol>>>,
23}
24
25impl HoldingIndex {
26    /// Announces that `fact.holder` holds `fact.key`.
27    ///
28    /// Re-announcing the same fact is idempotent.
29    pub fn announce(&self, fact: HeldContent) -> Result<()> {
30        lock(&self.holders)?
31            .entry(fact.key)
32            .or_default()
33            .insert(fact.holder);
34        Ok(())
35    }
36
37    /// Returns holders announced for `key`, sorted by symbol order.
38    pub fn holders_of(&self, key: &ContentKey) -> Vec<Symbol> {
39        self.holders
40            .lock()
41            .ok()
42            .and_then(|holders| holders.get(key).cloned())
43            .map(|holders| holders.into_iter().collect())
44            .unwrap_or_default()
45    }
46
47    /// Merges all facts from `other` by grow-only set union.
48    pub fn merge(&self, other: &HoldingIndex) -> Result<()> {
49        let other = lock(&other.holders)?;
50        let mut holders = lock(&self.holders)?;
51        for (key, nodes) in other.iter() {
52            holders
53                .entry(key.clone())
54                .or_default()
55                .extend(nodes.iter().cloned());
56        }
57        Ok(())
58    }
59}
60
61fn lock(
62    holders: &Mutex<BTreeMap<ContentKey, BTreeSet<Symbol>>>,
63) -> Result<MutexGuard<'_, BTreeMap<ContentKey, BTreeSet<Symbol>>>> {
64    holders
65        .lock()
66        .map_err(|_| Error::Eval("holding index mutex poisoned".to_owned()))
67}
68
69#[cfg(test)]
70mod tests {
71    use sim_kernel::{CapabilityName, Consistency, EvalMode, EvalRequest, Expr};
72
73    use super::*;
74
75    fn key(expr: &str) -> ContentKey {
76        ContentKey::from_request(&EvalRequest {
77            expr: Expr::String(expr.to_owned()),
78            result_shape: None,
79            required_capabilities: vec![CapabilityName::new("fabric.test")],
80            deadline: None,
81            consistency: Consistency::LocalFirst,
82            mode: EvalMode::Eval,
83            answer_limit: None,
84            stream_buffer: None,
85            stream: false,
86            trace: false,
87        })
88    }
89
90    #[test]
91    fn announce_is_idempotent_and_sorted() {
92        let index = HoldingIndex::default();
93        let key = key("shared");
94
95        index
96            .announce(HeldContent::new(Symbol::new("node-b"), key.clone()))
97            .unwrap();
98        index
99            .announce(HeldContent::new(Symbol::new("node-a"), key.clone()))
100            .unwrap();
101        index
102            .announce(HeldContent::new(Symbol::new("node-b"), key.clone()))
103            .unwrap();
104
105        assert_eq!(
106            index.holders_of(&key),
107            vec![Symbol::new("node-a"), Symbol::new("node-b")]
108        );
109    }
110
111    #[test]
112    fn merge_converges_by_set_union() {
113        let key = key("shared");
114        let left = HoldingIndex::default();
115        let right = HoldingIndex::default();
116
117        left.announce(HeldContent::new(Symbol::new("node-a"), key.clone()))
118            .unwrap();
119        right
120            .announce(HeldContent::new(Symbol::new("node-b"), key.clone()))
121            .unwrap();
122
123        left.merge(&right).unwrap();
124        right.merge(&left).unwrap();
125
126        let expected = vec![Symbol::new("node-a"), Symbol::new("node-b")];
127        assert_eq!(left.holders_of(&key), expected);
128        assert_eq!(right.holders_of(&key), expected);
129    }
130}