sim_lib_stream_fabric/
holders.rs1use std::collections::{BTreeMap, BTreeSet};
8use std::sync::{Mutex, MutexGuard};
9
10use sim_kernel::{Error, Result, Symbol};
11
12use crate::{ContentKey, HeldContent};
13
14#[derive(Default)]
21pub struct HoldingIndex {
22 holders: Mutex<BTreeMap<ContentKey, BTreeSet<Symbol>>>,
23}
24
25impl HoldingIndex {
26 pub fn announce(&self, fact: HeldContent) -> Result<()> {
30 lock(&self.holders)?
31 .entry(fact.key)
32 .or_default()
33 .insert(fact.holder);
34 Ok(())
35 }
36
37 pub fn holders_of(&self, key: &ContentKey) -> Vec<Symbol> {
39 self.holders
40 .lock()
41 .ok()
42 .and_then(|holders| holders.get(key).cloned())
43 .map(|holders| holders.into_iter().collect())
44 .unwrap_or_default()
45 }
46
47 pub fn merge(&self, other: &HoldingIndex) -> Result<()> {
49 let other = lock(&other.holders)?;
50 let mut holders = lock(&self.holders)?;
51 for (key, nodes) in other.iter() {
52 holders
53 .entry(key.clone())
54 .or_default()
55 .extend(nodes.iter().cloned());
56 }
57 Ok(())
58 }
59}
60
61fn lock(
62 holders: &Mutex<BTreeMap<ContentKey, BTreeSet<Symbol>>>,
63) -> Result<MutexGuard<'_, BTreeMap<ContentKey, BTreeSet<Symbol>>>> {
64 holders
65 .lock()
66 .map_err(|_| Error::Eval("holding index mutex poisoned".to_owned()))
67}
68
69#[cfg(test)]
70mod tests {
71 use sim_kernel::{CapabilityName, Consistency, EvalMode, EvalRequest, Expr};
72
73 use super::*;
74
75 fn key(expr: &str) -> ContentKey {
76 ContentKey::from_request(&EvalRequest {
77 expr: Expr::String(expr.to_owned()),
78 result_shape: None,
79 required_capabilities: vec![CapabilityName::new("fabric.test")],
80 deadline: None,
81 consistency: Consistency::LocalFirst,
82 mode: EvalMode::Eval,
83 answer_limit: None,
84 stream_buffer: None,
85 stream: false,
86 trace: false,
87 })
88 }
89
90 #[test]
91 fn announce_is_idempotent_and_sorted() {
92 let index = HoldingIndex::default();
93 let key = key("shared");
94
95 index
96 .announce(HeldContent::new(Symbol::new("node-b"), key.clone()))
97 .unwrap();
98 index
99 .announce(HeldContent::new(Symbol::new("node-a"), key.clone()))
100 .unwrap();
101 index
102 .announce(HeldContent::new(Symbol::new("node-b"), key.clone()))
103 .unwrap();
104
105 assert_eq!(
106 index.holders_of(&key),
107 vec![Symbol::new("node-a"), Symbol::new("node-b")]
108 );
109 }
110
111 #[test]
112 fn merge_converges_by_set_union() {
113 let key = key("shared");
114 let left = HoldingIndex::default();
115 let right = HoldingIndex::default();
116
117 left.announce(HeldContent::new(Symbol::new("node-a"), key.clone()))
118 .unwrap();
119 right
120 .announce(HeldContent::new(Symbol::new("node-b"), key.clone()))
121 .unwrap();
122
123 left.merge(&right).unwrap();
124 right.merge(&left).unwrap();
125
126 let expected = vec![Symbol::new("node-a"), Symbol::new("node-b")];
127 assert_eq!(left.holders_of(&key), expected);
128 assert_eq!(right.holders_of(&key), expected);
129 }
130}