Skip to main content

elara_diffusion/
topology.rs

1//! Topology - The shape of state propagation network
2//!
3//! How state flows through the swarm. This is NOT a CDN topology.
4//! It's a dynamic, interest-driven propagation graph.
5
6use elara_core::NodeId;
7use std::collections::{HashMap, HashSet};
8
9/// Edge in the propagation graph
10#[derive(Debug, Clone, Copy)]
11pub struct PropagationEdge {
12    /// Source node
13    pub from: NodeId,
14    /// Destination node
15    pub to: NodeId,
16    /// Estimated latency in milliseconds
17    pub latency_ms: u32,
18    /// Bandwidth capacity (relative, 0-100)
19    pub bandwidth: u8,
20    /// Is this edge active?
21    pub active: bool,
22}
23
24impl PropagationEdge {
25    pub fn new(from: NodeId, to: NodeId) -> Self {
26        Self {
27            from,
28            to,
29            latency_ms: 50,
30            bandwidth: 100,
31            active: true,
32        }
33    }
34
35    pub fn with_latency(mut self, latency_ms: u32) -> Self {
36        self.latency_ms = latency_ms;
37        self
38    }
39
40    pub fn with_bandwidth(mut self, bandwidth: u8) -> Self {
41        self.bandwidth = bandwidth;
42        self
43    }
44}
45
46/// Propagation topology
47#[derive(Debug, Clone, Default)]
48pub struct PropagationTopology {
49    /// All nodes in the topology
50    nodes: HashSet<NodeId>,
51
52    /// Edges: from_node -> list of edges
53    edges: HashMap<NodeId, Vec<PropagationEdge>>,
54
55    /// Reverse edges: to_node -> list of from_nodes
56    reverse_edges: HashMap<NodeId, HashSet<NodeId>>,
57}
58
59impl PropagationTopology {
60    /// Create a new empty topology
61    pub fn new() -> Self {
62        Self::default()
63    }
64
65    /// Add a node
66    pub fn add_node(&mut self, node: NodeId) {
67        self.nodes.insert(node);
68    }
69
70    /// Remove a node and all its edges
71    pub fn remove_node(&mut self, node: NodeId) {
72        self.nodes.remove(&node);
73        self.edges.remove(&node);
74
75        // Remove edges pointing to this node
76        for edges in self.edges.values_mut() {
77            edges.retain(|e| e.to != node);
78        }
79
80        self.reverse_edges.remove(&node);
81        for sources in self.reverse_edges.values_mut() {
82            sources.remove(&node);
83        }
84    }
85
86    /// Add an edge
87    pub fn add_edge(&mut self, edge: PropagationEdge) {
88        self.nodes.insert(edge.from);
89        self.nodes.insert(edge.to);
90
91        self.edges.entry(edge.from).or_default().push(edge);
92        self.reverse_edges
93            .entry(edge.to)
94            .or_default()
95            .insert(edge.from);
96    }
97
98    /// Get all edges from a node
99    pub fn edges_from(&self, node: NodeId) -> &[PropagationEdge] {
100        self.edges.get(&node).map(|v| v.as_slice()).unwrap_or(&[])
101    }
102
103    /// Get all nodes that can receive from a node
104    pub fn downstream(&self, node: NodeId) -> Vec<NodeId> {
105        self.edges_from(node).iter().map(|e| e.to).collect()
106    }
107
108    /// Get all nodes that can send to a node
109    pub fn upstream(&self, node: NodeId) -> Vec<NodeId> {
110        self.reverse_edges
111            .get(&node)
112            .map(|s| s.iter().copied().collect())
113            .unwrap_or_default()
114    }
115
116    /// Get node count
117    pub fn node_count(&self) -> usize {
118        self.nodes.len()
119    }
120
121    /// Check if a node exists
122    pub fn has_node(&self, node: NodeId) -> bool {
123        self.nodes.contains(&node)
124    }
125}
126
127/// Star topology - one central node (broadcaster) to all viewers
128#[derive(Debug, Clone)]
129pub struct StarTopology {
130    /// Central node (broadcaster)
131    pub center: NodeId,
132    /// Leaf nodes (viewers)
133    pub leaves: HashSet<NodeId>,
134    /// The underlying topology
135    pub topology: PropagationTopology,
136}
137
138impl StarTopology {
139    /// Create a new star topology
140    pub fn new(center: NodeId) -> Self {
141        let mut topology = PropagationTopology::new();
142        topology.add_node(center);
143
144        Self {
145            center,
146            leaves: HashSet::new(),
147            topology,
148        }
149    }
150
151    /// Add a leaf (viewer)
152    pub fn add_leaf(&mut self, leaf: NodeId) {
153        self.leaves.insert(leaf);
154        self.topology
155            .add_edge(PropagationEdge::new(self.center, leaf));
156    }
157
158    /// Remove a leaf
159    pub fn remove_leaf(&mut self, leaf: NodeId) {
160        self.leaves.remove(&leaf);
161        self.topology.remove_node(leaf);
162    }
163
164    /// Get leaf count
165    pub fn leaf_count(&self) -> usize {
166        self.leaves.len()
167    }
168}
169
170/// Tree topology - hierarchical relay (P2P CDN-like)
171#[derive(Debug, Clone)]
172pub struct TreeTopology {
173    /// Root node (broadcaster)
174    pub root: NodeId,
175    /// Parent of each node
176    parents: HashMap<NodeId, NodeId>,
177    /// Children of each node
178    children: HashMap<NodeId, HashSet<NodeId>>,
179    /// Maximum children per node (fan-out)
180    pub max_fanout: usize,
181    /// The underlying topology
182    pub topology: PropagationTopology,
183}
184
185impl TreeTopology {
186    /// Create a new tree topology
187    pub fn new(root: NodeId, max_fanout: usize) -> Self {
188        let mut topology = PropagationTopology::new();
189        topology.add_node(root);
190
191        Self {
192            root,
193            parents: HashMap::new(),
194            children: HashMap::new(),
195            max_fanout,
196            topology,
197        }
198    }
199
200    /// Add a node to the tree (finds best parent)
201    pub fn add_node(&mut self, node: NodeId) -> NodeId {
202        // Find a parent with room
203        let parent = self.find_parent();
204
205        self.parents.insert(node, parent);
206        self.children.entry(parent).or_default().insert(node);
207        self.topology.add_edge(PropagationEdge::new(parent, node));
208
209        parent
210    }
211
212    /// Find a parent with room for more children
213    fn find_parent(&self) -> NodeId {
214        // Start with root
215        let root_children = self.children.get(&self.root).map(|c| c.len()).unwrap_or(0);
216        if root_children < self.max_fanout {
217            return self.root;
218        }
219
220        // BFS to find a node with room
221        let mut queue: Vec<NodeId> = self
222            .children
223            .get(&self.root)
224            .map(|c| c.iter().copied().collect())
225            .unwrap_or_default();
226
227        while let Some(node) = queue.pop() {
228            let child_count = self.children.get(&node).map(|c| c.len()).unwrap_or(0);
229            if child_count < self.max_fanout {
230                return node;
231            }
232
233            if let Some(children) = self.children.get(&node) {
234                queue.extend(children.iter().copied());
235            }
236        }
237
238        // Fallback to root
239        self.root
240    }
241
242    /// Remove a node (and reassign its children)
243    pub fn remove_node(&mut self, node: NodeId) {
244        if node == self.root {
245            return; // Can't remove root
246        }
247
248        // Get parent and children
249        let parent = self.parents.remove(&node);
250        let children = self.children.remove(&node).unwrap_or_default();
251
252        // Remove from parent's children
253        if let Some(p) = parent {
254            if let Some(siblings) = self.children.get_mut(&p) {
255                siblings.remove(&node);
256            }
257        }
258
259        // Reassign children to parent or find new parents
260        for child in children {
261            if let Some(p) = parent {
262                self.parents.insert(child, p);
263                self.children.entry(p).or_default().insert(child);
264                self.topology.add_edge(PropagationEdge::new(p, child));
265            }
266        }
267
268        self.topology.remove_node(node);
269    }
270
271    /// Get depth of a node
272    pub fn depth(&self, node: NodeId) -> usize {
273        let mut depth = 0;
274        let mut current = node;
275
276        while let Some(&parent) = self.parents.get(&current) {
277            depth += 1;
278            current = parent;
279        }
280
281        depth
282    }
283
284    /// Get total node count
285    pub fn node_count(&self) -> usize {
286        self.topology.node_count()
287    }
288}
289
290/// Mesh topology - fully connected (for small groups)
291#[derive(Debug, Clone)]
292pub struct MeshTopology {
293    /// All nodes
294    nodes: HashSet<NodeId>,
295    /// The underlying topology
296    pub topology: PropagationTopology,
297}
298
299impl MeshTopology {
300    /// Create a new mesh topology
301    pub fn new() -> Self {
302        Self {
303            nodes: HashSet::new(),
304            topology: PropagationTopology::new(),
305        }
306    }
307
308    /// Add a node (connects to all existing nodes)
309    pub fn add_node(&mut self, node: NodeId) {
310        // Add edges to/from all existing nodes
311        for &existing in &self.nodes {
312            self.topology.add_edge(PropagationEdge::new(existing, node));
313            self.topology.add_edge(PropagationEdge::new(node, existing));
314        }
315
316        self.nodes.insert(node);
317        self.topology.add_node(node);
318    }
319
320    /// Remove a node
321    pub fn remove_node(&mut self, node: NodeId) {
322        self.nodes.remove(&node);
323        self.topology.remove_node(node);
324    }
325
326    /// Get node count
327    pub fn node_count(&self) -> usize {
328        self.nodes.len()
329    }
330}
331
332impl Default for MeshTopology {
333    fn default() -> Self {
334        Self::new()
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341
342    #[test]
343    fn test_star_topology() {
344        let broadcaster = NodeId::new(1);
345        let mut star = StarTopology::new(broadcaster);
346
347        star.add_leaf(NodeId::new(2));
348        star.add_leaf(NodeId::new(3));
349        star.add_leaf(NodeId::new(4));
350
351        assert_eq!(star.leaf_count(), 3);
352        assert_eq!(star.topology.downstream(broadcaster).len(), 3);
353    }
354
355    #[test]
356    fn test_tree_topology() {
357        let root = NodeId::new(1);
358        let mut tree = TreeTopology::new(root, 2);
359
360        // Add 6 nodes
361        for i in 2..=7 {
362            tree.add_node(NodeId::new(i));
363        }
364
365        assert_eq!(tree.node_count(), 7);
366
367        // Root should have 2 children (max fanout)
368        assert_eq!(tree.topology.downstream(root).len(), 2);
369    }
370
371    #[test]
372    fn test_mesh_topology() {
373        let mut mesh = MeshTopology::new();
374
375        mesh.add_node(NodeId::new(1));
376        mesh.add_node(NodeId::new(2));
377        mesh.add_node(NodeId::new(3));
378
379        assert_eq!(mesh.node_count(), 3);
380
381        // Each node should be connected to 2 others
382        assert_eq!(mesh.topology.downstream(NodeId::new(1)).len(), 2);
383    }
384}