1use elara_core::NodeId;
7use std::collections::{HashMap, HashSet};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
11pub enum InterestLevel {
12 #[default]
14 None = 0,
15 Low = 1,
17 Medium = 2,
19 High = 3,
21 Critical = 4,
23}
24
25#[derive(Debug, Clone)]
27pub struct InterestDeclaration {
28 pub node: NodeId,
30 pub state_id: u64,
32 pub level: InterestLevel,
34 pub timestamp: i64,
36 pub ttl_ms: u32,
38}
39
40impl InterestDeclaration {
41 pub fn new(node: NodeId, state_id: u64, level: InterestLevel) -> Self {
43 Self {
44 node,
45 state_id,
46 level,
47 timestamp: 0,
48 ttl_ms: 0,
49 }
50 }
51
52 pub fn with_ttl(mut self, ttl_ms: u32) -> Self {
54 self.ttl_ms = ttl_ms;
55 self
56 }
57
58 pub fn is_expired(&self, current_time: i64) -> bool {
60 if self.ttl_ms == 0 {
61 return false;
62 }
63 current_time > self.timestamp + self.ttl_ms as i64
64 }
65}
66
67#[derive(Debug, Clone, Default)]
69pub struct InterestMap {
70 interests: HashMap<u64, HashMap<NodeId, InterestLevel>>,
72
73 node_interests: HashMap<NodeId, HashSet<u64>>,
75}
76
77impl InterestMap {
78 pub fn new() -> Self {
80 Self::default()
81 }
82
83 pub fn register(&mut self, decl: InterestDeclaration) {
85 self.interests
87 .entry(decl.state_id)
88 .or_default()
89 .insert(decl.node, decl.level);
90
91 if decl.level != InterestLevel::None {
93 self.node_interests
94 .entry(decl.node)
95 .or_default()
96 .insert(decl.state_id);
97 } else {
98 if let Some(states) = self.node_interests.get_mut(&decl.node) {
100 states.remove(&decl.state_id);
101 }
102 }
103 }
104
105 pub fn unregister(&mut self, node: NodeId, state_id: u64) {
107 if let Some(nodes) = self.interests.get_mut(&state_id) {
108 nodes.remove(&node);
109 }
110 if let Some(states) = self.node_interests.get_mut(&node) {
111 states.remove(&state_id);
112 }
113 }
114
115 pub fn interested_nodes(&self, state_id: u64) -> Vec<(NodeId, InterestLevel)> {
117 self.interests
118 .get(&state_id)
119 .map(|nodes| {
120 nodes
121 .iter()
122 .filter(|(_, level)| **level != InterestLevel::None)
123 .map(|(node, level)| (*node, *level))
124 .collect()
125 })
126 .unwrap_or_default()
127 }
128
129 pub fn nodes_with_interest(&self, state_id: u64, min_level: InterestLevel) -> Vec<NodeId> {
131 self.interests
132 .get(&state_id)
133 .map(|nodes| {
134 nodes
135 .iter()
136 .filter(|(_, level)| **level >= min_level)
137 .map(|(node, _)| *node)
138 .collect()
139 })
140 .unwrap_or_default()
141 }
142
143 pub fn node_states(&self, node: NodeId) -> Vec<u64> {
145 self.node_interests
146 .get(&node)
147 .map(|states| states.iter().copied().collect())
148 .unwrap_or_default()
149 }
150
151 pub fn get_interest(&self, node: NodeId, state_id: u64) -> InterestLevel {
153 self.interests
154 .get(&state_id)
155 .and_then(|nodes| nodes.get(&node))
156 .copied()
157 .unwrap_or(InterestLevel::None)
158 }
159
160 pub fn interest_count(&self, state_id: u64) -> usize {
162 self.interests
163 .get(&state_id)
164 .map(|nodes| {
165 nodes
166 .values()
167 .filter(|l| **l != InterestLevel::None)
168 .count()
169 })
170 .unwrap_or(0)
171 }
172
173 pub fn remove_node(&mut self, node: NodeId) {
175 for nodes in self.interests.values_mut() {
177 nodes.remove(&node);
178 }
179
180 self.node_interests.remove(&node);
182 }
183}
184
185#[derive(Debug, Clone)]
187pub struct LivestreamInterest {
188 pub stream_id: u64,
190
191 pub interests: InterestMap,
193
194 pub active_viewers: HashSet<NodeId>,
196
197 pub lurkers: HashSet<NodeId>,
199}
200
201impl LivestreamInterest {
202 pub fn new(stream_id: u64) -> Self {
204 Self {
205 stream_id,
206 interests: InterestMap::new(),
207 active_viewers: HashSet::new(),
208 lurkers: HashSet::new(),
209 }
210 }
211
212 pub fn add_viewer(&mut self, node: NodeId) {
214 self.active_viewers.insert(node);
215 self.lurkers.remove(&node);
216
217 self.interests.register(InterestDeclaration::new(
219 node,
220 self.stream_id,
221 InterestLevel::High,
222 ));
223 self.interests.register(InterestDeclaration::new(
224 node,
225 self.stream_id + 1,
226 InterestLevel::High,
227 ));
228 }
229
230 pub fn add_lurker(&mut self, node: NodeId) {
232 self.lurkers.insert(node);
233 self.active_viewers.remove(&node);
234
235 self.interests.register(InterestDeclaration::new(
237 node,
238 self.stream_id,
239 InterestLevel::Low,
240 ));
241 }
242
243 pub fn remove_viewer(&mut self, node: NodeId) {
245 self.active_viewers.remove(&node);
246 self.lurkers.remove(&node);
247 self.interests.remove_node(node);
248 }
249
250 pub fn viewer_count(&self) -> usize {
252 self.active_viewers.len() + self.lurkers.len()
253 }
254
255 pub fn active_count(&self) -> usize {
257 self.active_viewers.len()
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264
265 #[test]
266 fn test_interest_registration() {
267 let mut map = InterestMap::new();
268
269 let node1 = NodeId::new(1);
270 let node2 = NodeId::new(2);
271 let state_id = 100;
272
273 map.register(InterestDeclaration::new(
274 node1,
275 state_id,
276 InterestLevel::High,
277 ));
278 map.register(InterestDeclaration::new(
279 node2,
280 state_id,
281 InterestLevel::Medium,
282 ));
283
284 assert_eq!(map.interest_count(state_id), 2);
285 assert_eq!(map.get_interest(node1, state_id), InterestLevel::High);
286 assert_eq!(map.get_interest(node2, state_id), InterestLevel::Medium);
287 }
288
289 #[test]
290 fn test_nodes_with_interest() {
291 let mut map = InterestMap::new();
292
293 let node1 = NodeId::new(1);
294 let node2 = NodeId::new(2);
295 let node3 = NodeId::new(3);
296 let state_id = 100;
297
298 map.register(InterestDeclaration::new(
299 node1,
300 state_id,
301 InterestLevel::High,
302 ));
303 map.register(InterestDeclaration::new(
304 node2,
305 state_id,
306 InterestLevel::Medium,
307 ));
308 map.register(InterestDeclaration::new(
309 node3,
310 state_id,
311 InterestLevel::Low,
312 ));
313
314 let high_nodes = map.nodes_with_interest(state_id, InterestLevel::High);
315 assert_eq!(high_nodes.len(), 1);
316
317 let medium_nodes = map.nodes_with_interest(state_id, InterestLevel::Medium);
318 assert_eq!(medium_nodes.len(), 2);
319 }
320
321 #[test]
322 fn test_livestream_interest() {
323 let mut stream = LivestreamInterest::new(1000);
324
325 let viewer1 = NodeId::new(1);
326 let viewer2 = NodeId::new(2);
327 let lurker = NodeId::new(3);
328
329 stream.add_viewer(viewer1);
330 stream.add_viewer(viewer2);
331 stream.add_lurker(lurker);
332
333 assert_eq!(stream.viewer_count(), 3);
334 assert_eq!(stream.active_count(), 2);
335
336 stream.remove_viewer(viewer1);
337 assert_eq!(stream.viewer_count(), 2);
338 }
339}