1use elara_core::{NodeId, StateTime};
6use std::collections::HashMap;
7
8use crate::{
9 InterestDeclaration, InterestLevel, InterestMap, LivestreamAuthority, LivestreamInterest,
10 PropagationTopology, StarTopology, StateUpdate, TreeTopology,
11};
12
13#[derive(Debug, Clone)]
15pub struct SwarmConfig {
16 pub star_to_tree_threshold: usize,
18 pub tree_fanout: usize,
20 pub bandwidth_per_viewer: u32,
22 pub keyframe_interval_ms: u32,
24}
25
26impl Default for SwarmConfig {
27 fn default() -> Self {
28 Self {
29 star_to_tree_threshold: 50,
30 tree_fanout: 5,
31 bandwidth_per_viewer: 500_000, keyframe_interval_ms: 2000,
33 }
34 }
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum SwarmState {
40 Initializing,
42 Active,
44 Paused,
46 Ended,
48}
49
50#[derive(Debug)]
52pub struct LivestreamSwarm {
53 pub stream_id: u64,
55 pub config: SwarmConfig,
57 pub state: SwarmState,
59 pub authority: LivestreamAuthority,
61 pub interest: LivestreamInterest,
63 topology: SwarmTopology,
65 last_keyframe: StateTime,
67 sequence: u64,
69 pub stats: SwarmStats,
71}
72
73#[derive(Debug)]
75enum SwarmTopology {
76 Star(StarTopology),
77 Tree(TreeTopology),
78}
79
80impl LivestreamSwarm {
81 pub fn new(stream_id: u64, broadcaster: NodeId, config: SwarmConfig) -> Self {
83 Self {
84 stream_id,
85 config: config.clone(),
86 state: SwarmState::Initializing,
87 authority: LivestreamAuthority::new(broadcaster, stream_id),
88 interest: LivestreamInterest::new(stream_id),
89 topology: SwarmTopology::Star(StarTopology::new(broadcaster)),
90 last_keyframe: StateTime::from_millis(0),
91 sequence: 0,
92 stats: SwarmStats::new(),
93 }
94 }
95
96 pub fn start(&mut self) {
98 self.state = SwarmState::Active;
99 }
100
101 pub fn pause(&mut self) {
103 self.state = SwarmState::Paused;
104 }
105
106 pub fn resume(&mut self) {
108 self.state = SwarmState::Active;
109 }
110
111 pub fn end(&mut self) {
113 self.state = SwarmState::Ended;
114 }
115
116 pub fn add_viewer(&mut self, viewer: NodeId) {
118 self.interest.add_viewer(viewer);
119
120 match &mut self.topology {
121 SwarmTopology::Star(star) => {
122 star.add_leaf(viewer);
123
124 if star.leaf_count() > self.config.star_to_tree_threshold {
126 self.switch_to_tree();
127 }
128 }
129 SwarmTopology::Tree(tree) => {
130 tree.add_node(viewer);
131 }
132 }
133
134 self.stats.peak_viewers = self.stats.peak_viewers.max(self.viewer_count() as u32);
135 }
136
137 pub fn remove_viewer(&mut self, viewer: NodeId) {
139 self.interest.remove_viewer(viewer);
140
141 match &mut self.topology {
142 SwarmTopology::Star(star) => {
143 star.remove_leaf(viewer);
144 }
145 SwarmTopology::Tree(tree) => {
146 tree.remove_node(viewer);
147 }
148 }
149 }
150
151 fn switch_to_tree(&mut self) {
153 if let SwarmTopology::Star(star) = &self.topology {
154 let mut tree = TreeTopology::new(star.center, self.config.tree_fanout);
155
156 for &leaf in &star.leaves {
158 tree.add_node(leaf);
159 }
160
161 self.topology = SwarmTopology::Tree(tree);
162 }
163 }
164
165 pub fn viewer_count(&self) -> usize {
167 self.interest.viewer_count()
168 }
169
170 pub fn broadcaster(&self) -> NodeId {
172 self.authority.broadcaster
173 }
174
175 pub fn can_broadcast(&self, node: NodeId) -> bool {
177 self.authority.can_mutate_visual(node)
178 }
179
180 pub fn create_update(
182 &mut self,
183 timestamp: StateTime,
184 size: usize,
185 is_keyframe: bool,
186 ) -> StateUpdate {
187 self.sequence += 1;
188
189 let mut update =
190 StateUpdate::new(self.stream_id, self.broadcaster(), self.sequence, timestamp)
191 .with_size(size);
192
193 if is_keyframe {
194 update = update.keyframe();
195 self.last_keyframe = timestamp;
196 }
197
198 update
199 }
200
201 pub fn needs_keyframe(&self, current_time: StateTime) -> bool {
203 let elapsed = current_time.as_millis() - self.last_keyframe.as_millis();
204 elapsed >= self.config.keyframe_interval_ms as i64
205 }
206
207 pub fn get_targets(&self) -> Vec<NodeId> {
209 match &self.topology {
210 SwarmTopology::Star(star) => star.leaves.iter().copied().collect(),
211 SwarmTopology::Tree(tree) => tree.topology.downstream(self.broadcaster()),
212 }
213 }
214}
215
216#[derive(Debug, Clone, Default)]
218pub struct SwarmStats {
219 pub updates_sent: u64,
221 pub bytes_sent: u64,
223 pub peak_viewers: u32,
225 pub total_viewers: u32,
227 pub duration_seconds: u32,
229}
230
231impl SwarmStats {
232 pub fn new() -> Self {
233 Self::default()
234 }
235}
236
237#[derive(Debug)]
239pub struct GroupSwarm {
240 pub group_id: u64,
242 participants: HashMap<NodeId, ParticipantState>,
244 interests: InterestMap,
246 topology: PropagationTopology,
248 pub max_participants: usize,
250}
251
252#[derive(Debug, Clone)]
254pub struct ParticipantState {
255 pub node: NodeId,
257 pub video_enabled: bool,
259 pub audio_enabled: bool,
261 pub screen_sharing: bool,
263 pub joined_at: StateTime,
265}
266
267impl GroupSwarm {
268 pub fn new(group_id: u64, max_participants: usize) -> Self {
270 Self {
271 group_id,
272 participants: HashMap::new(),
273 interests: InterestMap::new(),
274 topology: PropagationTopology::new(),
275 max_participants,
276 }
277 }
278
279 pub fn add_participant(&mut self, node: NodeId, joined_at: StateTime) -> bool {
281 if self.participants.len() >= self.max_participants {
282 return false;
283 }
284
285 let state = ParticipantState {
286 node,
287 video_enabled: true,
288 audio_enabled: true,
289 screen_sharing: false,
290 joined_at,
291 };
292
293 for &existing in self.participants.keys() {
295 self.topology
296 .add_edge(crate::PropagationEdge::new(existing, node));
297 self.topology
298 .add_edge(crate::PropagationEdge::new(node, existing));
299 }
300
301 for &existing in self.participants.keys() {
303 self.interests.register(InterestDeclaration::new(
304 node,
305 existing.0,
306 InterestLevel::High,
307 ));
308 self.interests.register(InterestDeclaration::new(
309 existing,
310 node.0,
311 InterestLevel::High,
312 ));
313 }
314
315 self.participants.insert(node, state);
316 self.topology.add_node(node);
317
318 true
319 }
320
321 pub fn remove_participant(&mut self, node: NodeId) {
323 self.participants.remove(&node);
324 self.topology.remove_node(node);
325 self.interests.remove_node(node);
326 }
327
328 pub fn participant_count(&self) -> usize {
330 self.participants.len()
331 }
332
333 pub fn toggle_video(&mut self, node: NodeId, enabled: bool) {
335 if let Some(p) = self.participants.get_mut(&node) {
336 p.video_enabled = enabled;
337 }
338 }
339
340 pub fn toggle_audio(&mut self, node: NodeId, enabled: bool) {
342 if let Some(p) = self.participants.get_mut(&node) {
343 p.audio_enabled = enabled;
344 }
345 }
346
347 pub fn start_screen_share(&mut self, node: NodeId) -> bool {
349 if self.participants.values().any(|p| p.screen_sharing) {
351 return false;
352 }
353
354 if let Some(p) = self.participants.get_mut(&node) {
355 p.screen_sharing = true;
356 return true;
357 }
358
359 false
360 }
361
362 pub fn stop_screen_share(&mut self, node: NodeId) {
364 if let Some(p) = self.participants.get_mut(&node) {
365 p.screen_sharing = false;
366 }
367 }
368
369 pub fn participants(&self) -> Vec<&ParticipantState> {
371 self.participants.values().collect()
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378
379 #[test]
380 fn test_livestream_swarm() {
381 let broadcaster = NodeId::new(1);
382 let mut swarm = LivestreamSwarm::new(1000, broadcaster, SwarmConfig::default());
383
384 swarm.start();
385 assert_eq!(swarm.state, SwarmState::Active);
386
387 for i in 2..=10 {
389 swarm.add_viewer(NodeId::new(i));
390 }
391
392 assert_eq!(swarm.viewer_count(), 9);
393 assert!(swarm.can_broadcast(broadcaster));
394 assert!(!swarm.can_broadcast(NodeId::new(2)));
395 }
396
397 #[test]
398 fn test_livestream_topology_switch() {
399 let broadcaster = NodeId::new(1);
400 let config = SwarmConfig {
401 star_to_tree_threshold: 5,
402 ..Default::default()
403 };
404 let mut swarm = LivestreamSwarm::new(1000, broadcaster, config);
405
406 for i in 2..=10 {
408 swarm.add_viewer(NodeId::new(i));
409 }
410
411 assert!(matches!(swarm.topology, SwarmTopology::Tree(_)));
413 }
414
415 #[test]
416 fn test_group_swarm() {
417 let mut group = GroupSwarm::new(2000, 10);
418
419 let time = StateTime::from_millis(0);
420
421 assert!(group.add_participant(NodeId::new(1), time));
422 assert!(group.add_participant(NodeId::new(2), time));
423 assert!(group.add_participant(NodeId::new(3), time));
424
425 assert_eq!(group.participant_count(), 3);
426
427 assert!(group.start_screen_share(NodeId::new(1)));
429 assert!(!group.start_screen_share(NodeId::new(2))); group.stop_screen_share(NodeId::new(1));
432 assert!(group.start_screen_share(NodeId::new(2))); }
434}