use elara_core::{NodeId, StateTime};
use std::collections::HashMap;
use crate::{
InterestDeclaration, InterestLevel, InterestMap, LivestreamAuthority, LivestreamInterest,
PropagationTopology, StarTopology, StateUpdate, TreeTopology,
};
#[derive(Debug, Clone)]
pub struct SwarmConfig {
pub star_to_tree_threshold: usize,
pub tree_fanout: usize,
pub bandwidth_per_viewer: u32,
pub keyframe_interval_ms: u32,
}
impl Default for SwarmConfig {
fn default() -> Self {
Self {
star_to_tree_threshold: 50,
tree_fanout: 5,
bandwidth_per_viewer: 500_000, keyframe_interval_ms: 2000,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SwarmState {
Initializing,
Active,
Paused,
Ended,
}
#[derive(Debug)]
pub struct LivestreamSwarm {
pub stream_id: u64,
pub config: SwarmConfig,
pub state: SwarmState,
pub authority: LivestreamAuthority,
pub interest: LivestreamInterest,
topology: SwarmTopology,
last_keyframe: StateTime,
sequence: u64,
pub stats: SwarmStats,
}
#[derive(Debug)]
enum SwarmTopology {
Star(StarTopology),
Tree(TreeTopology),
}
impl LivestreamSwarm {
pub fn new(stream_id: u64, broadcaster: NodeId, config: SwarmConfig) -> Self {
Self {
stream_id,
config: config.clone(),
state: SwarmState::Initializing,
authority: LivestreamAuthority::new(broadcaster, stream_id),
interest: LivestreamInterest::new(stream_id),
topology: SwarmTopology::Star(StarTopology::new(broadcaster)),
last_keyframe: StateTime::from_millis(0),
sequence: 0,
stats: SwarmStats::new(),
}
}
pub fn start(&mut self) {
self.state = SwarmState::Active;
}
pub fn pause(&mut self) {
self.state = SwarmState::Paused;
}
pub fn resume(&mut self) {
self.state = SwarmState::Active;
}
pub fn end(&mut self) {
self.state = SwarmState::Ended;
}
pub fn add_viewer(&mut self, viewer: NodeId) {
self.interest.add_viewer(viewer);
match &mut self.topology {
SwarmTopology::Star(star) => {
star.add_leaf(viewer);
if star.leaf_count() > self.config.star_to_tree_threshold {
self.switch_to_tree();
}
}
SwarmTopology::Tree(tree) => {
tree.add_node(viewer);
}
}
self.stats.peak_viewers = self.stats.peak_viewers.max(self.viewer_count() as u32);
}
pub fn remove_viewer(&mut self, viewer: NodeId) {
self.interest.remove_viewer(viewer);
match &mut self.topology {
SwarmTopology::Star(star) => {
star.remove_leaf(viewer);
}
SwarmTopology::Tree(tree) => {
tree.remove_node(viewer);
}
}
}
fn switch_to_tree(&mut self) {
if let SwarmTopology::Star(star) = &self.topology {
let mut tree = TreeTopology::new(star.center, self.config.tree_fanout);
for &leaf in &star.leaves {
tree.add_node(leaf);
}
self.topology = SwarmTopology::Tree(tree);
}
}
pub fn viewer_count(&self) -> usize {
self.interest.viewer_count()
}
pub fn broadcaster(&self) -> NodeId {
self.authority.broadcaster
}
pub fn can_broadcast(&self, node: NodeId) -> bool {
self.authority.can_mutate_visual(node)
}
pub fn create_update(
&mut self,
timestamp: StateTime,
size: usize,
is_keyframe: bool,
) -> StateUpdate {
self.sequence += 1;
let mut update =
StateUpdate::new(self.stream_id, self.broadcaster(), self.sequence, timestamp)
.with_size(size);
if is_keyframe {
update = update.keyframe();
self.last_keyframe = timestamp;
}
update
}
pub fn needs_keyframe(&self, current_time: StateTime) -> bool {
let elapsed = current_time.as_millis() - self.last_keyframe.as_millis();
elapsed >= self.config.keyframe_interval_ms as i64
}
pub fn get_targets(&self) -> Vec<NodeId> {
match &self.topology {
SwarmTopology::Star(star) => star.leaves.iter().copied().collect(),
SwarmTopology::Tree(tree) => tree.topology.downstream(self.broadcaster()),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct SwarmStats {
pub updates_sent: u64,
pub bytes_sent: u64,
pub peak_viewers: u32,
pub total_viewers: u32,
pub duration_seconds: u32,
}
impl SwarmStats {
pub fn new() -> Self {
Self::default()
}
}
#[derive(Debug)]
pub struct GroupSwarm {
pub group_id: u64,
participants: HashMap<NodeId, ParticipantState>,
interests: InterestMap,
topology: PropagationTopology,
pub max_participants: usize,
}
#[derive(Debug, Clone)]
pub struct ParticipantState {
pub node: NodeId,
pub video_enabled: bool,
pub audio_enabled: bool,
pub screen_sharing: bool,
pub joined_at: StateTime,
}
impl GroupSwarm {
pub fn new(group_id: u64, max_participants: usize) -> Self {
Self {
group_id,
participants: HashMap::new(),
interests: InterestMap::new(),
topology: PropagationTopology::new(),
max_participants,
}
}
pub fn add_participant(&mut self, node: NodeId, joined_at: StateTime) -> bool {
if self.participants.len() >= self.max_participants {
return false;
}
let state = ParticipantState {
node,
video_enabled: true,
audio_enabled: true,
screen_sharing: false,
joined_at,
};
for &existing in self.participants.keys() {
self.topology
.add_edge(crate::PropagationEdge::new(existing, node));
self.topology
.add_edge(crate::PropagationEdge::new(node, existing));
}
for &existing in self.participants.keys() {
self.interests.register(InterestDeclaration::new(
node,
existing.0,
InterestLevel::High,
));
self.interests.register(InterestDeclaration::new(
existing,
node.0,
InterestLevel::High,
));
}
self.participants.insert(node, state);
self.topology.add_node(node);
true
}
pub fn remove_participant(&mut self, node: NodeId) {
self.participants.remove(&node);
self.topology.remove_node(node);
self.interests.remove_node(node);
}
pub fn participant_count(&self) -> usize {
self.participants.len()
}
pub fn toggle_video(&mut self, node: NodeId, enabled: bool) {
if let Some(p) = self.participants.get_mut(&node) {
p.video_enabled = enabled;
}
}
pub fn toggle_audio(&mut self, node: NodeId, enabled: bool) {
if let Some(p) = self.participants.get_mut(&node) {
p.audio_enabled = enabled;
}
}
pub fn start_screen_share(&mut self, node: NodeId) -> bool {
if self.participants.values().any(|p| p.screen_sharing) {
return false;
}
if let Some(p) = self.participants.get_mut(&node) {
p.screen_sharing = true;
return true;
}
false
}
pub fn stop_screen_share(&mut self, node: NodeId) {
if let Some(p) = self.participants.get_mut(&node) {
p.screen_sharing = false;
}
}
pub fn participants(&self) -> Vec<&ParticipantState> {
self.participants.values().collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_livestream_swarm() {
let broadcaster = NodeId::new(1);
let mut swarm = LivestreamSwarm::new(1000, broadcaster, SwarmConfig::default());
swarm.start();
assert_eq!(swarm.state, SwarmState::Active);
for i in 2..=10 {
swarm.add_viewer(NodeId::new(i));
}
assert_eq!(swarm.viewer_count(), 9);
assert!(swarm.can_broadcast(broadcaster));
assert!(!swarm.can_broadcast(NodeId::new(2)));
}
#[test]
fn test_livestream_topology_switch() {
let broadcaster = NodeId::new(1);
let config = SwarmConfig {
star_to_tree_threshold: 5,
..Default::default()
};
let mut swarm = LivestreamSwarm::new(1000, broadcaster, config);
for i in 2..=10 {
swarm.add_viewer(NodeId::new(i));
}
assert!(matches!(swarm.topology, SwarmTopology::Tree(_)));
}
#[test]
fn test_group_swarm() {
let mut group = GroupSwarm::new(2000, 10);
let time = StateTime::from_millis(0);
assert!(group.add_participant(NodeId::new(1), time));
assert!(group.add_participant(NodeId::new(2), time));
assert!(group.add_participant(NodeId::new(3), time));
assert_eq!(group.participant_count(), 3);
assert!(group.start_screen_share(NodeId::new(1)));
assert!(!group.start_screen_share(NodeId::new(2)));
group.stop_screen_share(NodeId::new(1));
assert!(group.start_screen_share(NodeId::new(2))); }
}