Skip to main content

elara_diffusion/
swarm.rs

1//! Swarm - Complete diffusion system for livestream/group
2//!
3//! Combines authority, interest, topology, and propagation.
4
5use elara_core::{NodeId, StateTime};
6use std::collections::HashMap;
7
8use crate::{
9    InterestDeclaration, InterestLevel, InterestMap, LivestreamAuthority, LivestreamInterest,
10    PropagationTopology, StarTopology, StateUpdate, TreeTopology,
11};
12
13/// Swarm configuration
14#[derive(Debug, Clone)]
15pub struct SwarmConfig {
16    /// Maximum viewers before switching to tree topology
17    pub star_to_tree_threshold: usize,
18    /// Maximum fan-out for tree topology
19    pub tree_fanout: usize,
20    /// Bandwidth budget per viewer (bytes/second)
21    pub bandwidth_per_viewer: u32,
22    /// Keyframe interval in milliseconds
23    pub keyframe_interval_ms: u32,
24}
25
26impl Default for SwarmConfig {
27    fn default() -> Self {
28        Self {
29            star_to_tree_threshold: 50,
30            tree_fanout: 5,
31            bandwidth_per_viewer: 500_000, // 500 KB/s
32            keyframe_interval_ms: 2000,
33        }
34    }
35}
36
37/// Swarm state
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum SwarmState {
40    /// Initializing
41    Initializing,
42    /// Active and streaming
43    Active,
44    /// Paused (broadcaster paused)
45    Paused,
46    /// Ended
47    Ended,
48}
49
50/// Livestream swarm - complete system for a single livestream
51#[derive(Debug)]
52pub struct LivestreamSwarm {
53    /// Stream ID
54    pub stream_id: u64,
55    /// Configuration
56    pub config: SwarmConfig,
57    /// Current state
58    pub state: SwarmState,
59    /// Authority model
60    pub authority: LivestreamAuthority,
61    /// Interest tracking
62    pub interest: LivestreamInterest,
63    /// Current topology (star or tree)
64    topology: SwarmTopology,
65    /// Last keyframe time
66    last_keyframe: StateTime,
67    /// Sequence counter
68    sequence: u64,
69    /// Statistics
70    pub stats: SwarmStats,
71}
72
73/// Topology type
74#[derive(Debug)]
75enum SwarmTopology {
76    Star(StarTopology),
77    Tree(TreeTopology),
78}
79
80impl LivestreamSwarm {
81    /// Create a new livestream swarm
82    pub fn new(stream_id: u64, broadcaster: NodeId, config: SwarmConfig) -> Self {
83        Self {
84            stream_id,
85            config: config.clone(),
86            state: SwarmState::Initializing,
87            authority: LivestreamAuthority::new(broadcaster, stream_id),
88            interest: LivestreamInterest::new(stream_id),
89            topology: SwarmTopology::Star(StarTopology::new(broadcaster)),
90            last_keyframe: StateTime::from_millis(0),
91            sequence: 0,
92            stats: SwarmStats::new(),
93        }
94    }
95
96    /// Start the stream
97    pub fn start(&mut self) {
98        self.state = SwarmState::Active;
99    }
100
101    /// Pause the stream
102    pub fn pause(&mut self) {
103        self.state = SwarmState::Paused;
104    }
105
106    /// Resume the stream
107    pub fn resume(&mut self) {
108        self.state = SwarmState::Active;
109    }
110
111    /// End the stream
112    pub fn end(&mut self) {
113        self.state = SwarmState::Ended;
114    }
115
116    /// Add a viewer
117    pub fn add_viewer(&mut self, viewer: NodeId) {
118        self.interest.add_viewer(viewer);
119
120        match &mut self.topology {
121            SwarmTopology::Star(star) => {
122                star.add_leaf(viewer);
123
124                // Check if we need to switch to tree
125                if star.leaf_count() > self.config.star_to_tree_threshold {
126                    self.switch_to_tree();
127                }
128            }
129            SwarmTopology::Tree(tree) => {
130                tree.add_node(viewer);
131            }
132        }
133
134        self.stats.peak_viewers = self.stats.peak_viewers.max(self.viewer_count() as u32);
135    }
136
137    /// Remove a viewer
138    pub fn remove_viewer(&mut self, viewer: NodeId) {
139        self.interest.remove_viewer(viewer);
140
141        match &mut self.topology {
142            SwarmTopology::Star(star) => {
143                star.remove_leaf(viewer);
144            }
145            SwarmTopology::Tree(tree) => {
146                tree.remove_node(viewer);
147            }
148        }
149    }
150
151    /// Switch from star to tree topology
152    fn switch_to_tree(&mut self) {
153        if let SwarmTopology::Star(star) = &self.topology {
154            let mut tree = TreeTopology::new(star.center, self.config.tree_fanout);
155
156            // Add all existing leaves
157            for &leaf in &star.leaves {
158                tree.add_node(leaf);
159            }
160
161            self.topology = SwarmTopology::Tree(tree);
162        }
163    }
164
165    /// Get viewer count
166    pub fn viewer_count(&self) -> usize {
167        self.interest.viewer_count()
168    }
169
170    /// Get broadcaster
171    pub fn broadcaster(&self) -> NodeId {
172        self.authority.broadcaster
173    }
174
175    /// Check if a node can broadcast
176    pub fn can_broadcast(&self, node: NodeId) -> bool {
177        self.authority.can_mutate_visual(node)
178    }
179
180    /// Create a state update for broadcasting
181    pub fn create_update(
182        &mut self,
183        timestamp: StateTime,
184        size: usize,
185        is_keyframe: bool,
186    ) -> StateUpdate {
187        self.sequence += 1;
188
189        let mut update =
190            StateUpdate::new(self.stream_id, self.broadcaster(), self.sequence, timestamp)
191                .with_size(size);
192
193        if is_keyframe {
194            update = update.keyframe();
195            self.last_keyframe = timestamp;
196        }
197
198        update
199    }
200
201    /// Check if we need a keyframe
202    pub fn needs_keyframe(&self, current_time: StateTime) -> bool {
203        let elapsed = current_time.as_millis() - self.last_keyframe.as_millis();
204        elapsed >= self.config.keyframe_interval_ms as i64
205    }
206
207    /// Get propagation targets for an update
208    pub fn get_targets(&self) -> Vec<NodeId> {
209        match &self.topology {
210            SwarmTopology::Star(star) => star.leaves.iter().copied().collect(),
211            SwarmTopology::Tree(tree) => tree.topology.downstream(self.broadcaster()),
212        }
213    }
214}
215
216/// Swarm statistics
217#[derive(Debug, Clone, Default)]
218pub struct SwarmStats {
219    /// Total updates sent
220    pub updates_sent: u64,
221    /// Total bytes sent
222    pub bytes_sent: u64,
223    /// Peak viewer count
224    pub peak_viewers: u32,
225    /// Total unique viewers
226    pub total_viewers: u32,
227    /// Stream duration in seconds
228    pub duration_seconds: u32,
229}
230
231impl SwarmStats {
232    pub fn new() -> Self {
233        Self::default()
234    }
235}
236
237/// Group swarm - for video calls (symmetric authority)
238#[derive(Debug)]
239pub struct GroupSwarm {
240    /// Group ID
241    pub group_id: u64,
242    /// All participants
243    participants: HashMap<NodeId, ParticipantState>,
244    /// Interest map
245    interests: InterestMap,
246    /// Topology (mesh for small groups)
247    topology: PropagationTopology,
248    /// Maximum participants
249    pub max_participants: usize,
250}
251
252/// Participant state in a group
253#[derive(Debug, Clone)]
254pub struct ParticipantState {
255    /// Node ID
256    pub node: NodeId,
257    /// Is video enabled?
258    pub video_enabled: bool,
259    /// Is audio enabled?
260    pub audio_enabled: bool,
261    /// Is screen sharing?
262    pub screen_sharing: bool,
263    /// Join time
264    pub joined_at: StateTime,
265}
266
267impl GroupSwarm {
268    /// Create a new group swarm
269    pub fn new(group_id: u64, max_participants: usize) -> Self {
270        Self {
271            group_id,
272            participants: HashMap::new(),
273            interests: InterestMap::new(),
274            topology: PropagationTopology::new(),
275            max_participants,
276        }
277    }
278
279    /// Add a participant
280    pub fn add_participant(&mut self, node: NodeId, joined_at: StateTime) -> bool {
281        if self.participants.len() >= self.max_participants {
282            return false;
283        }
284
285        let state = ParticipantState {
286            node,
287            video_enabled: true,
288            audio_enabled: true,
289            screen_sharing: false,
290            joined_at,
291        };
292
293        // Add edges to/from all existing participants (mesh)
294        for &existing in self.participants.keys() {
295            self.topology
296                .add_edge(crate::PropagationEdge::new(existing, node));
297            self.topology
298                .add_edge(crate::PropagationEdge::new(node, existing));
299        }
300
301        // Register interest in all other participants' states
302        for &existing in self.participants.keys() {
303            self.interests.register(InterestDeclaration::new(
304                node,
305                existing.0,
306                InterestLevel::High,
307            ));
308            self.interests.register(InterestDeclaration::new(
309                existing,
310                node.0,
311                InterestLevel::High,
312            ));
313        }
314
315        self.participants.insert(node, state);
316        self.topology.add_node(node);
317
318        true
319    }
320
321    /// Remove a participant
322    pub fn remove_participant(&mut self, node: NodeId) {
323        self.participants.remove(&node);
324        self.topology.remove_node(node);
325        self.interests.remove_node(node);
326    }
327
328    /// Get participant count
329    pub fn participant_count(&self) -> usize {
330        self.participants.len()
331    }
332
333    /// Toggle video for a participant
334    pub fn toggle_video(&mut self, node: NodeId, enabled: bool) {
335        if let Some(p) = self.participants.get_mut(&node) {
336            p.video_enabled = enabled;
337        }
338    }
339
340    /// Toggle audio for a participant
341    pub fn toggle_audio(&mut self, node: NodeId, enabled: bool) {
342        if let Some(p) = self.participants.get_mut(&node) {
343            p.audio_enabled = enabled;
344        }
345    }
346
347    /// Start screen sharing
348    pub fn start_screen_share(&mut self, node: NodeId) -> bool {
349        // Only one person can screen share at a time
350        if self.participants.values().any(|p| p.screen_sharing) {
351            return false;
352        }
353
354        if let Some(p) = self.participants.get_mut(&node) {
355            p.screen_sharing = true;
356            return true;
357        }
358
359        false
360    }
361
362    /// Stop screen sharing
363    pub fn stop_screen_share(&mut self, node: NodeId) {
364        if let Some(p) = self.participants.get_mut(&node) {
365            p.screen_sharing = false;
366        }
367    }
368
369    /// Get all participants
370    pub fn participants(&self) -> Vec<&ParticipantState> {
371        self.participants.values().collect()
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378
379    #[test]
380    fn test_livestream_swarm() {
381        let broadcaster = NodeId::new(1);
382        let mut swarm = LivestreamSwarm::new(1000, broadcaster, SwarmConfig::default());
383
384        swarm.start();
385        assert_eq!(swarm.state, SwarmState::Active);
386
387        // Add viewers
388        for i in 2..=10 {
389            swarm.add_viewer(NodeId::new(i));
390        }
391
392        assert_eq!(swarm.viewer_count(), 9);
393        assert!(swarm.can_broadcast(broadcaster));
394        assert!(!swarm.can_broadcast(NodeId::new(2)));
395    }
396
397    #[test]
398    fn test_livestream_topology_switch() {
399        let broadcaster = NodeId::new(1);
400        let config = SwarmConfig {
401            star_to_tree_threshold: 5,
402            ..Default::default()
403        };
404        let mut swarm = LivestreamSwarm::new(1000, broadcaster, config);
405
406        // Add viewers until we switch to tree
407        for i in 2..=10 {
408            swarm.add_viewer(NodeId::new(i));
409        }
410
411        // Should have switched to tree
412        assert!(matches!(swarm.topology, SwarmTopology::Tree(_)));
413    }
414
415    #[test]
416    fn test_group_swarm() {
417        let mut group = GroupSwarm::new(2000, 10);
418
419        let time = StateTime::from_millis(0);
420
421        assert!(group.add_participant(NodeId::new(1), time));
422        assert!(group.add_participant(NodeId::new(2), time));
423        assert!(group.add_participant(NodeId::new(3), time));
424
425        assert_eq!(group.participant_count(), 3);
426
427        // Test screen sharing
428        assert!(group.start_screen_share(NodeId::new(1)));
429        assert!(!group.start_screen_share(NodeId::new(2))); // Already sharing
430
431        group.stop_screen_share(NodeId::new(1));
432        assert!(group.start_screen_share(NodeId::new(2))); // Now allowed
433    }
434}