Skip to main content

elara_diffusion/
interest.rs

1//! Interest Model - Who wants to observe what state
2//!
3//! Interest determines which nodes receive state updates.
4//! This enables efficient propagation - only send to those who care.
5
6use elara_core::NodeId;
7use std::collections::{HashMap, HashSet};
8
9/// Interest level - how much a node cares about a state
10#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
11pub enum InterestLevel {
12    /// No interest - don't send updates
13    #[default]
14    None = 0,
15    /// Low interest - send major updates only
16    Low = 1,
17    /// Medium interest - send regular updates
18    Medium = 2,
19    /// High interest - send all updates with low latency
20    High = 3,
21    /// Critical interest - prioritize above all else
22    Critical = 4,
23}
24
25/// Interest declaration from a node
26#[derive(Debug, Clone)]
27pub struct InterestDeclaration {
28    /// The node declaring interest
29    pub node: NodeId,
30    /// The state they're interested in
31    pub state_id: u64,
32    /// Level of interest
33    pub level: InterestLevel,
34    /// Timestamp of declaration
35    pub timestamp: i64,
36    /// Time-to-live in milliseconds (0 = permanent)
37    pub ttl_ms: u32,
38}
39
40impl InterestDeclaration {
41    /// Create a new interest declaration
42    pub fn new(node: NodeId, state_id: u64, level: InterestLevel) -> Self {
43        Self {
44            node,
45            state_id,
46            level,
47            timestamp: 0,
48            ttl_ms: 0,
49        }
50    }
51
52    /// Set TTL
53    pub fn with_ttl(mut self, ttl_ms: u32) -> Self {
54        self.ttl_ms = ttl_ms;
55        self
56    }
57
58    /// Check if this declaration has expired
59    pub fn is_expired(&self, current_time: i64) -> bool {
60        if self.ttl_ms == 0 {
61            return false;
62        }
63        current_time > self.timestamp + self.ttl_ms as i64
64    }
65}
66
67/// Interest map - tracks who is interested in what
68#[derive(Debug, Clone, Default)]
69pub struct InterestMap {
70    /// State ID -> (Node ID -> Interest Level)
71    interests: HashMap<u64, HashMap<NodeId, InterestLevel>>,
72
73    /// Node ID -> Set of state IDs they're interested in
74    node_interests: HashMap<NodeId, HashSet<u64>>,
75}
76
77impl InterestMap {
78    /// Create a new interest map
79    pub fn new() -> Self {
80        Self::default()
81    }
82
83    /// Register interest
84    pub fn register(&mut self, decl: InterestDeclaration) {
85        // Add to state -> nodes map
86        self.interests
87            .entry(decl.state_id)
88            .or_default()
89            .insert(decl.node, decl.level);
90
91        // Add to node -> states map
92        if decl.level != InterestLevel::None {
93            self.node_interests
94                .entry(decl.node)
95                .or_default()
96                .insert(decl.state_id);
97        } else {
98            // Remove if interest is None
99            if let Some(states) = self.node_interests.get_mut(&decl.node) {
100                states.remove(&decl.state_id);
101            }
102        }
103    }
104
105    /// Unregister interest
106    pub fn unregister(&mut self, node: NodeId, state_id: u64) {
107        if let Some(nodes) = self.interests.get_mut(&state_id) {
108            nodes.remove(&node);
109        }
110        if let Some(states) = self.node_interests.get_mut(&node) {
111            states.remove(&state_id);
112        }
113    }
114
115    /// Get all nodes interested in a state
116    pub fn interested_nodes(&self, state_id: u64) -> Vec<(NodeId, InterestLevel)> {
117        self.interests
118            .get(&state_id)
119            .map(|nodes| {
120                nodes
121                    .iter()
122                    .filter(|(_, level)| **level != InterestLevel::None)
123                    .map(|(node, level)| (*node, *level))
124                    .collect()
125            })
126            .unwrap_or_default()
127    }
128
129    /// Get nodes with at least a certain interest level
130    pub fn nodes_with_interest(&self, state_id: u64, min_level: InterestLevel) -> Vec<NodeId> {
131        self.interests
132            .get(&state_id)
133            .map(|nodes| {
134                nodes
135                    .iter()
136                    .filter(|(_, level)| **level >= min_level)
137                    .map(|(node, _)| *node)
138                    .collect()
139            })
140            .unwrap_or_default()
141    }
142
143    /// Get all states a node is interested in
144    pub fn node_states(&self, node: NodeId) -> Vec<u64> {
145        self.node_interests
146            .get(&node)
147            .map(|states| states.iter().copied().collect())
148            .unwrap_or_default()
149    }
150
151    /// Get interest level for a specific node and state
152    pub fn get_interest(&self, node: NodeId, state_id: u64) -> InterestLevel {
153        self.interests
154            .get(&state_id)
155            .and_then(|nodes| nodes.get(&node))
156            .copied()
157            .unwrap_or(InterestLevel::None)
158    }
159
160    /// Count interested nodes for a state
161    pub fn interest_count(&self, state_id: u64) -> usize {
162        self.interests
163            .get(&state_id)
164            .map(|nodes| {
165                nodes
166                    .values()
167                    .filter(|l| **l != InterestLevel::None)
168                    .count()
169            })
170            .unwrap_or(0)
171    }
172
173    /// Remove a node entirely (they disconnected)
174    pub fn remove_node(&mut self, node: NodeId) {
175        // Remove from all state interest maps
176        for nodes in self.interests.values_mut() {
177            nodes.remove(&node);
178        }
179
180        // Remove their interest set
181        self.node_interests.remove(&node);
182    }
183}
184
185/// Livestream interest - specialized for streaming scenarios
186#[derive(Debug, Clone)]
187pub struct LivestreamInterest {
188    /// Stream ID
189    pub stream_id: u64,
190
191    /// Interest map for this stream
192    pub interests: InterestMap,
193
194    /// Active viewers (high interest in visual/audio)
195    pub active_viewers: HashSet<NodeId>,
196
197    /// Lurkers (low interest, just presence)
198    pub lurkers: HashSet<NodeId>,
199}
200
201impl LivestreamInterest {
202    /// Create a new livestream interest tracker
203    pub fn new(stream_id: u64) -> Self {
204        Self {
205            stream_id,
206            interests: InterestMap::new(),
207            active_viewers: HashSet::new(),
208            lurkers: HashSet::new(),
209        }
210    }
211
212    /// Add an active viewer
213    pub fn add_viewer(&mut self, node: NodeId) {
214        self.active_viewers.insert(node);
215        self.lurkers.remove(&node);
216
217        // Register high interest in visual and audio
218        self.interests.register(InterestDeclaration::new(
219            node,
220            self.stream_id,
221            InterestLevel::High,
222        ));
223        self.interests.register(InterestDeclaration::new(
224            node,
225            self.stream_id + 1,
226            InterestLevel::High,
227        ));
228    }
229
230    /// Add a lurker (low bandwidth mode)
231    pub fn add_lurker(&mut self, node: NodeId) {
232        self.lurkers.insert(node);
233        self.active_viewers.remove(&node);
234
235        // Register low interest
236        self.interests.register(InterestDeclaration::new(
237            node,
238            self.stream_id,
239            InterestLevel::Low,
240        ));
241    }
242
243    /// Remove a viewer
244    pub fn remove_viewer(&mut self, node: NodeId) {
245        self.active_viewers.remove(&node);
246        self.lurkers.remove(&node);
247        self.interests.remove_node(node);
248    }
249
250    /// Get total viewer count
251    pub fn viewer_count(&self) -> usize {
252        self.active_viewers.len() + self.lurkers.len()
253    }
254
255    /// Get active viewer count
256    pub fn active_count(&self) -> usize {
257        self.active_viewers.len()
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    #[test]
266    fn test_interest_registration() {
267        let mut map = InterestMap::new();
268
269        let node1 = NodeId::new(1);
270        let node2 = NodeId::new(2);
271        let state_id = 100;
272
273        map.register(InterestDeclaration::new(
274            node1,
275            state_id,
276            InterestLevel::High,
277        ));
278        map.register(InterestDeclaration::new(
279            node2,
280            state_id,
281            InterestLevel::Medium,
282        ));
283
284        assert_eq!(map.interest_count(state_id), 2);
285        assert_eq!(map.get_interest(node1, state_id), InterestLevel::High);
286        assert_eq!(map.get_interest(node2, state_id), InterestLevel::Medium);
287    }
288
289    #[test]
290    fn test_nodes_with_interest() {
291        let mut map = InterestMap::new();
292
293        let node1 = NodeId::new(1);
294        let node2 = NodeId::new(2);
295        let node3 = NodeId::new(3);
296        let state_id = 100;
297
298        map.register(InterestDeclaration::new(
299            node1,
300            state_id,
301            InterestLevel::High,
302        ));
303        map.register(InterestDeclaration::new(
304            node2,
305            state_id,
306            InterestLevel::Medium,
307        ));
308        map.register(InterestDeclaration::new(
309            node3,
310            state_id,
311            InterestLevel::Low,
312        ));
313
314        let high_nodes = map.nodes_with_interest(state_id, InterestLevel::High);
315        assert_eq!(high_nodes.len(), 1);
316
317        let medium_nodes = map.nodes_with_interest(state_id, InterestLevel::Medium);
318        assert_eq!(medium_nodes.len(), 2);
319    }
320
321    #[test]
322    fn test_livestream_interest() {
323        let mut stream = LivestreamInterest::new(1000);
324
325        let viewer1 = NodeId::new(1);
326        let viewer2 = NodeId::new(2);
327        let lurker = NodeId::new(3);
328
329        stream.add_viewer(viewer1);
330        stream.add_viewer(viewer2);
331        stream.add_lurker(lurker);
332
333        assert_eq!(stream.viewer_count(), 3);
334        assert_eq!(stream.active_count(), 2);
335
336        stream.remove_viewer(viewer1);
337        assert_eq!(stream.viewer_count(), 2);
338    }
339}