use elara_core::NodeId;
use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub enum InterestLevel {
#[default]
None = 0,
Low = 1,
Medium = 2,
High = 3,
Critical = 4,
}
#[derive(Debug, Clone)]
pub struct InterestDeclaration {
pub node: NodeId,
pub state_id: u64,
pub level: InterestLevel,
pub timestamp: i64,
pub ttl_ms: u32,
}
impl InterestDeclaration {
pub fn new(node: NodeId, state_id: u64, level: InterestLevel) -> Self {
Self {
node,
state_id,
level,
timestamp: 0,
ttl_ms: 0,
}
}
pub fn with_ttl(mut self, ttl_ms: u32) -> Self {
self.ttl_ms = ttl_ms;
self
}
pub fn is_expired(&self, current_time: i64) -> bool {
if self.ttl_ms == 0 {
return false;
}
current_time > self.timestamp + self.ttl_ms as i64
}
}
#[derive(Debug, Clone, Default)]
pub struct InterestMap {
interests: HashMap<u64, HashMap<NodeId, InterestLevel>>,
node_interests: HashMap<NodeId, HashSet<u64>>,
}
impl InterestMap {
pub fn new() -> Self {
Self::default()
}
pub fn register(&mut self, decl: InterestDeclaration) {
self.interests
.entry(decl.state_id)
.or_default()
.insert(decl.node, decl.level);
if decl.level != InterestLevel::None {
self.node_interests
.entry(decl.node)
.or_default()
.insert(decl.state_id);
} else {
if let Some(states) = self.node_interests.get_mut(&decl.node) {
states.remove(&decl.state_id);
}
}
}
pub fn unregister(&mut self, node: NodeId, state_id: u64) {
if let Some(nodes) = self.interests.get_mut(&state_id) {
nodes.remove(&node);
}
if let Some(states) = self.node_interests.get_mut(&node) {
states.remove(&state_id);
}
}
pub fn interested_nodes(&self, state_id: u64) -> Vec<(NodeId, InterestLevel)> {
self.interests
.get(&state_id)
.map(|nodes| {
nodes
.iter()
.filter(|(_, level)| **level != InterestLevel::None)
.map(|(node, level)| (*node, *level))
.collect()
})
.unwrap_or_default()
}
pub fn nodes_with_interest(&self, state_id: u64, min_level: InterestLevel) -> Vec<NodeId> {
self.interests
.get(&state_id)
.map(|nodes| {
nodes
.iter()
.filter(|(_, level)| **level >= min_level)
.map(|(node, _)| *node)
.collect()
})
.unwrap_or_default()
}
pub fn node_states(&self, node: NodeId) -> Vec<u64> {
self.node_interests
.get(&node)
.map(|states| states.iter().copied().collect())
.unwrap_or_default()
}
pub fn get_interest(&self, node: NodeId, state_id: u64) -> InterestLevel {
self.interests
.get(&state_id)
.and_then(|nodes| nodes.get(&node))
.copied()
.unwrap_or(InterestLevel::None)
}
pub fn interest_count(&self, state_id: u64) -> usize {
self.interests
.get(&state_id)
.map(|nodes| {
nodes
.values()
.filter(|l| **l != InterestLevel::None)
.count()
})
.unwrap_or(0)
}
pub fn remove_node(&mut self, node: NodeId) {
for nodes in self.interests.values_mut() {
nodes.remove(&node);
}
self.node_interests.remove(&node);
}
}
#[derive(Debug, Clone)]
pub struct LivestreamInterest {
pub stream_id: u64,
pub interests: InterestMap,
pub active_viewers: HashSet<NodeId>,
pub lurkers: HashSet<NodeId>,
}
impl LivestreamInterest {
pub fn new(stream_id: u64) -> Self {
Self {
stream_id,
interests: InterestMap::new(),
active_viewers: HashSet::new(),
lurkers: HashSet::new(),
}
}
pub fn add_viewer(&mut self, node: NodeId) {
self.active_viewers.insert(node);
self.lurkers.remove(&node);
self.interests.register(InterestDeclaration::new(
node,
self.stream_id,
InterestLevel::High,
));
self.interests.register(InterestDeclaration::new(
node,
self.stream_id + 1,
InterestLevel::High,
));
}
pub fn add_lurker(&mut self, node: NodeId) {
self.lurkers.insert(node);
self.active_viewers.remove(&node);
self.interests.register(InterestDeclaration::new(
node,
self.stream_id,
InterestLevel::Low,
));
}
pub fn remove_viewer(&mut self, node: NodeId) {
self.active_viewers.remove(&node);
self.lurkers.remove(&node);
self.interests.remove_node(node);
}
pub fn viewer_count(&self) -> usize {
self.active_viewers.len() + self.lurkers.len()
}
pub fn active_count(&self) -> usize {
self.active_viewers.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_interest_registration() {
let mut map = InterestMap::new();
let node1 = NodeId::new(1);
let node2 = NodeId::new(2);
let state_id = 100;
map.register(InterestDeclaration::new(
node1,
state_id,
InterestLevel::High,
));
map.register(InterestDeclaration::new(
node2,
state_id,
InterestLevel::Medium,
));
assert_eq!(map.interest_count(state_id), 2);
assert_eq!(map.get_interest(node1, state_id), InterestLevel::High);
assert_eq!(map.get_interest(node2, state_id), InterestLevel::Medium);
}
#[test]
fn test_nodes_with_interest() {
let mut map = InterestMap::new();
let node1 = NodeId::new(1);
let node2 = NodeId::new(2);
let node3 = NodeId::new(3);
let state_id = 100;
map.register(InterestDeclaration::new(
node1,
state_id,
InterestLevel::High,
));
map.register(InterestDeclaration::new(
node2,
state_id,
InterestLevel::Medium,
));
map.register(InterestDeclaration::new(
node3,
state_id,
InterestLevel::Low,
));
let high_nodes = map.nodes_with_interest(state_id, InterestLevel::High);
assert_eq!(high_nodes.len(), 1);
let medium_nodes = map.nodes_with_interest(state_id, InterestLevel::Medium);
assert_eq!(medium_nodes.len(), 2);
}
#[test]
fn test_livestream_interest() {
let mut stream = LivestreamInterest::new(1000);
let viewer1 = NodeId::new(1);
let viewer2 = NodeId::new(2);
let lurker = NodeId::new(3);
stream.add_viewer(viewer1);
stream.add_viewer(viewer2);
stream.add_lurker(lurker);
assert_eq!(stream.viewer_count(), 3);
assert_eq!(stream.active_count(), 2);
stream.remove_viewer(viewer1);
assert_eq!(stream.viewer_count(), 2);
}
}