Skip to main content

phago_distributed/shard/
edge_resolver.rs

1//! Cross-shard edge resolution and management.
2//!
3//! This module provides the `CrossShardEdgeManager` for handling edges that
4//! span multiple shards. When a local node has an edge to a node on another
5//! shard, this manager:
6//!
7//! 1. Tracks the pending cross-shard edges
8//! 2. Coordinates resolution of ghost nodes
9//! 3. Handles edge decay synchronization across shards
10//!
11//! # Architecture
12//!
13//! The manager maintains separate maps for outgoing and incoming edges:
14//! - Outgoing edges: local node -> edges to remote shards
15//! - Incoming edges: local node (target) -> edges from remote shards
16//!
17//! This separation enables efficient lookups for both traversal directions
18//! and supports proper garbage collection when shards go offline.
19//!
20//! # Example
21//!
22//! ```ignore
23//! use phago_distributed::shard::CrossShardEdgeManager;
24//! use phago_distributed::types::CrossShardEdge;
25//!
26//! let mut manager = CrossShardEdgeManager::new();
27//!
28//! // Register an outgoing edge to another shard
29//! let edge = CrossShardEdge {
30//!     from_node: local_node_id,
31//!     to_node: remote_node_id,
32//!     to_shard: ShardId::new(1),
33//!     weight: 0.5,
34//! };
35//! manager.add_outgoing_edge(edge);
36//!
37//! // Later, resolve all pending edges
38//! for pending in manager.pending_edges() {
39//!     // Fetch ghost node data from remote shard
40//! }
41//! manager.clear_pending();
42//! ```
43
44use crate::types::{CrossShardEdge, ShardId};
45use phago_core::types::NodeId;
46use std::collections::HashMap;
47
48/// Manages edges that cross shard boundaries.
49///
50/// When a local node has an edge to a node on another shard, this manager:
51/// 1. Tracks the pending cross-shard edges
52/// 2. Coordinates resolution of ghost nodes
53/// 3. Handles edge decay synchronization across shards
54///
55/// # Thread Safety
56///
57/// This type is not thread-safe. Wrap in a mutex if concurrent access is needed.
58pub struct CrossShardEdgeManager {
59    /// Map of local node ID -> list of cross-shard edges (outgoing).
60    outgoing_edges: HashMap<NodeId, Vec<CrossShardEdge>>,
61    /// Incoming edges from other shards (where we own the target).
62    incoming_edges: HashMap<NodeId, Vec<CrossShardEdge>>,
63    /// Edges pending resolution (need ghost node data fetched).
64    pending_resolution: Vec<CrossShardEdge>,
65}
66
67impl CrossShardEdgeManager {
68    /// Create a new cross-shard edge manager.
69    ///
70    /// # Example
71    ///
72    /// ```ignore
73    /// let manager = CrossShardEdgeManager::new();
74    /// assert_eq!(manager.edge_count(), 0);
75    /// ```
76    pub fn new() -> Self {
77        Self {
78            outgoing_edges: HashMap::new(),
79            incoming_edges: HashMap::new(),
80            pending_resolution: Vec::new(),
81        }
82    }
83
84    /// Create a new manager with pre-allocated capacity.
85    ///
86    /// # Arguments
87    ///
88    /// * `capacity` - Initial capacity for edge maps
89    pub fn with_capacity(capacity: usize) -> Self {
90        Self {
91            outgoing_edges: HashMap::with_capacity(capacity),
92            incoming_edges: HashMap::with_capacity(capacity),
93            pending_resolution: Vec::with_capacity(capacity),
94        }
95    }
96
97    /// Register an outgoing cross-shard edge.
98    ///
99    /// The edge is added to both the outgoing edges map and the pending
100    /// resolution queue. Call `clear_pending()` after resolving ghost nodes.
101    ///
102    /// # Arguments
103    ///
104    /// * `edge` - The cross-shard edge to register
105    ///
106    /// # Example
107    ///
108    /// ```ignore
109    /// let edge = CrossShardEdge {
110    ///     from_node: local_id,
111    ///     to_node: remote_id,
112    ///     to_shard: ShardId::new(1),
113    ///     weight: 0.5,
114    /// };
115    /// manager.add_outgoing_edge(edge);
116    /// ```
117    pub fn add_outgoing_edge(&mut self, edge: CrossShardEdge) {
118        self.outgoing_edges
119            .entry(edge.from_node)
120            .or_insert_with(Vec::new)
121            .push(edge.clone());
122        self.pending_resolution.push(edge);
123    }
124
125    /// Register multiple outgoing edges at once.
126    ///
127    /// More efficient than calling `add_outgoing_edge` in a loop.
128    ///
129    /// # Arguments
130    ///
131    /// * `edges` - Iterator of edges to register
132    pub fn add_outgoing_edges(&mut self, edges: impl IntoIterator<Item = CrossShardEdge>) {
133        for edge in edges {
134            self.add_outgoing_edge(edge);
135        }
136    }
137
138    /// Register an incoming cross-shard edge.
139    ///
140    /// Incoming edges are from nodes on other shards that point to a node
141    /// we own locally.
142    ///
143    /// # Arguments
144    ///
145    /// * `edge` - The cross-shard edge to register
146    pub fn add_incoming_edge(&mut self, edge: CrossShardEdge) {
147        self.incoming_edges
148            .entry(edge.to_node)
149            .or_insert_with(Vec::new)
150            .push(edge);
151    }
152
153    /// Get all pending edges that need ghost node resolution.
154    ///
155    /// These are edges that have been registered but whose target nodes
156    /// have not yet been fetched from their remote shards.
157    ///
158    /// # Returns
159    ///
160    /// A slice of pending cross-shard edges.
161    pub fn pending_edges(&self) -> &[CrossShardEdge] {
162        &self.pending_resolution
163    }
164
165    /// Get the number of pending edges.
166    pub fn pending_count(&self) -> usize {
167        self.pending_resolution.len()
168    }
169
170    /// Check if there are pending edges.
171    pub fn has_pending(&self) -> bool {
172        !self.pending_resolution.is_empty()
173    }
174
175    /// Clear pending edges after resolution.
176    ///
177    /// Call this after successfully fetching ghost node data for all
178    /// pending edges.
179    pub fn clear_pending(&mut self) {
180        self.pending_resolution.clear();
181    }
182
183    /// Take ownership of pending edges and clear the queue.
184    ///
185    /// This is useful when you need to process the edges and don't
186    /// want to clone them.
187    ///
188    /// # Returns
189    ///
190    /// The vector of pending edges.
191    pub fn take_pending(&mut self) -> Vec<CrossShardEdge> {
192        std::mem::take(&mut self.pending_resolution)
193    }
194
195    /// Get outgoing edges for a node.
196    ///
197    /// # Arguments
198    ///
199    /// * `node_id` - The local node ID to look up
200    ///
201    /// # Returns
202    ///
203    /// The list of cross-shard edges from this node, if any.
204    pub fn get_outgoing(&self, node_id: &NodeId) -> Option<&Vec<CrossShardEdge>> {
205        self.outgoing_edges.get(node_id)
206    }
207
208    /// Get incoming edges for a node.
209    ///
210    /// # Arguments
211    ///
212    /// * `node_id` - The local node ID to look up
213    ///
214    /// # Returns
215    ///
216    /// The list of cross-shard edges to this node, if any.
217    pub fn get_incoming(&self, node_id: &NodeId) -> Option<&Vec<CrossShardEdge>> {
218        self.incoming_edges.get(node_id)
219    }
220
221    /// Check if a node has outgoing cross-shard edges.
222    pub fn has_outgoing(&self, node_id: &NodeId) -> bool {
223        self.outgoing_edges
224            .get(node_id)
225            .map_or(false, |v| !v.is_empty())
226    }
227
228    /// Check if a node has incoming cross-shard edges.
229    pub fn has_incoming(&self, node_id: &NodeId) -> bool {
230        self.incoming_edges
231            .get(node_id)
232            .map_or(false, |v| !v.is_empty())
233    }
234
235    /// Remove edges to/from a specific shard.
236    ///
237    /// This is useful when a shard goes offline and all edges to/from
238    /// it should be invalidated.
239    ///
240    /// # Arguments
241    ///
242    /// * `shard_id` - The shard whose edges should be removed
243    ///
244    /// # Returns
245    ///
246    /// The number of edges that were removed.
247    pub fn remove_shard_edges(&mut self, shard_id: ShardId) -> usize {
248        let mut removed = 0;
249
250        for edges in self.outgoing_edges.values_mut() {
251            let before = edges.len();
252            edges.retain(|e| e.to_shard != shard_id);
253            removed += before - edges.len();
254        }
255
256        for edges in self.incoming_edges.values_mut() {
257            let before = edges.len();
258            edges.retain(|e| e.to_shard != shard_id);
259            removed += before - edges.len();
260        }
261
262        self.pending_resolution.retain(|e| e.to_shard != shard_id);
263
264        removed
265    }
266
267    /// Remove all edges for a specific local node.
268    ///
269    /// Call this when a local node is deleted.
270    ///
271    /// # Arguments
272    ///
273    /// * `node_id` - The node whose edges should be removed
274    ///
275    /// # Returns
276    ///
277    /// A tuple of (outgoing_removed, incoming_removed).
278    pub fn remove_node_edges(&mut self, node_id: &NodeId) -> (usize, usize) {
279        let outgoing = self.outgoing_edges.remove(node_id).map_or(0, |v| v.len());
280        let incoming = self.incoming_edges.remove(node_id).map_or(0, |v| v.len());
281
282        self.pending_resolution.retain(|e| e.from_node != *node_id);
283
284        (outgoing, incoming)
285    }
286
287    /// Decay cross-shard edge weights.
288    ///
289    /// Applies exponential decay to all edge weights and removes edges
290    /// that fall below the threshold.
291    ///
292    /// # Arguments
293    ///
294    /// * `rate` - Decay rate (0.0 to 1.0), e.g., 0.1 means 10% decay
295    /// * `threshold` - Minimum weight threshold; edges below this are pruned
296    ///
297    /// # Returns
298    ///
299    /// Vector of edges that were pruned due to low weight.
300    pub fn decay_edges(&mut self, rate: f64, threshold: f64) -> Vec<CrossShardEdge> {
301        let mut pruned = Vec::new();
302
303        for edges in self.outgoing_edges.values_mut() {
304            let mut i = 0;
305            while i < edges.len() {
306                let new_weight = edges[i].weight * (1.0 - rate);
307                if new_weight < threshold {
308                    pruned.push(edges.swap_remove(i));
309                } else {
310                    edges[i].weight = new_weight;
311                    i += 1;
312                }
313            }
314        }
315
316        // Also decay incoming edges
317        for edges in self.incoming_edges.values_mut() {
318            edges.retain_mut(|e| {
319                e.weight *= 1.0 - rate;
320                e.weight >= threshold
321            });
322        }
323
324        pruned
325    }
326
327    /// Strengthen an edge weight.
328    ///
329    /// # Arguments
330    ///
331    /// * `from_node` - Source node ID
332    /// * `to_node` - Target node ID
333    /// * `amount` - Amount to add to the weight
334    ///
335    /// # Returns
336    ///
337    /// The new weight if the edge was found, None otherwise.
338    pub fn strengthen_edge(
339        &mut self,
340        from_node: &NodeId,
341        to_node: &NodeId,
342        amount: f64,
343    ) -> Option<f64> {
344        if let Some(edges) = self.outgoing_edges.get_mut(from_node) {
345            for edge in edges.iter_mut() {
346                if edge.to_node == *to_node {
347                    edge.weight = (edge.weight + amount).min(1.0);
348                    return Some(edge.weight);
349                }
350            }
351        }
352        None
353    }
354
355    /// Get all unique remote shards that have edges.
356    ///
357    /// # Returns
358    ///
359    /// A sorted, deduplicated vector of shard IDs.
360    pub fn connected_shards(&self) -> Vec<ShardId> {
361        let mut shards: Vec<ShardId> = self
362            .outgoing_edges
363            .values()
364            .flat_map(|edges| edges.iter().map(|e| e.to_shard))
365            .collect();
366        shards.sort();
367        shards.dedup();
368        shards
369    }
370
371    /// Get edges grouped by target shard.
372    ///
373    /// Useful for batching requests to remote shards.
374    ///
375    /// # Returns
376    ///
377    /// A map of shard ID to edges targeting that shard.
378    pub fn edges_by_shard(&self) -> HashMap<ShardId, Vec<&CrossShardEdge>> {
379        let mut by_shard: HashMap<ShardId, Vec<&CrossShardEdge>> = HashMap::new();
380        for edges in self.outgoing_edges.values() {
381            for edge in edges {
382                by_shard.entry(edge.to_shard).or_default().push(edge);
383            }
384        }
385        by_shard
386    }
387
388    /// Get pending edges grouped by target shard.
389    ///
390    /// Useful for batching ghost node resolution requests.
391    pub fn pending_by_shard(&self) -> HashMap<ShardId, Vec<&CrossShardEdge>> {
392        let mut by_shard: HashMap<ShardId, Vec<&CrossShardEdge>> = HashMap::new();
393        for edge in &self.pending_resolution {
394            by_shard.entry(edge.to_shard).or_default().push(edge);
395        }
396        by_shard
397    }
398
399    /// Total number of cross-shard edges (outgoing + incoming).
400    pub fn edge_count(&self) -> usize {
401        self.outgoing_edges.values().map(|v| v.len()).sum::<usize>()
402            + self.incoming_edges.values().map(|v| v.len()).sum::<usize>()
403    }
404
405    /// Number of outgoing cross-shard edges.
406    pub fn outgoing_count(&self) -> usize {
407        self.outgoing_edges.values().map(|v| v.len()).sum()
408    }
409
410    /// Number of incoming cross-shard edges.
411    pub fn incoming_count(&self) -> usize {
412        self.incoming_edges.values().map(|v| v.len()).sum()
413    }
414
415    /// Number of unique local nodes with outgoing edges.
416    pub fn nodes_with_outgoing(&self) -> usize {
417        self.outgoing_edges
418            .iter()
419            .filter(|(_, v)| !v.is_empty())
420            .count()
421    }
422
423    /// Number of unique local nodes with incoming edges.
424    pub fn nodes_with_incoming(&self) -> usize {
425        self.incoming_edges
426            .iter()
427            .filter(|(_, v)| !v.is_empty())
428            .count()
429    }
430
431    /// Clear all edges.
432    pub fn clear(&mut self) {
433        self.outgoing_edges.clear();
434        self.incoming_edges.clear();
435        self.pending_resolution.clear();
436    }
437
438    /// Check if the manager has any edges.
439    pub fn is_empty(&self) -> bool {
440        self.outgoing_edges.values().all(|v| v.is_empty())
441            && self.incoming_edges.values().all(|v| v.is_empty())
442    }
443
444    /// Get statistics about cross-shard edges.
445    pub fn stats(&self) -> CrossShardEdgeStats {
446        let mut edges_by_shard: HashMap<ShardId, usize> = HashMap::new();
447        let mut total_weight = 0.0;
448        let mut edge_count = 0;
449
450        for edges in self.outgoing_edges.values() {
451            for edge in edges {
452                *edges_by_shard.entry(edge.to_shard).or_insert(0) += 1;
453                total_weight += edge.weight;
454                edge_count += 1;
455            }
456        }
457
458        CrossShardEdgeStats {
459            outgoing_edges: self.outgoing_count(),
460            incoming_edges: self.incoming_count(),
461            pending_resolution: self.pending_resolution.len(),
462            connected_shards: self.connected_shards().len(),
463            edges_by_shard,
464            average_weight: if edge_count > 0 {
465                total_weight / edge_count as f64
466            } else {
467                0.0
468            },
469        }
470    }
471}
472
473impl Default for CrossShardEdgeManager {
474    fn default() -> Self {
475        Self::new()
476    }
477}
478
479/// Statistics about cross-shard edges.
480#[derive(Debug, Clone)]
481pub struct CrossShardEdgeStats {
482    /// Number of outgoing cross-shard edges.
483    pub outgoing_edges: usize,
484    /// Number of incoming cross-shard edges.
485    pub incoming_edges: usize,
486    /// Number of edges pending ghost node resolution.
487    pub pending_resolution: usize,
488    /// Number of unique remote shards connected.
489    pub connected_shards: usize,
490    /// Edges grouped by target shard.
491    pub edges_by_shard: HashMap<ShardId, usize>,
492    /// Average edge weight.
493    pub average_weight: f64,
494}
495
496#[cfg(test)]
497mod tests {
498    use super::*;
499
500    fn make_edge(from: u64, to: u64, shard: u32) -> CrossShardEdge {
501        CrossShardEdge {
502            from_node: NodeId::from_seed(from),
503            to_node: NodeId::from_seed(to),
504            to_shard: ShardId::new(shard),
505            weight: 0.5,
506        }
507    }
508
509    fn make_edge_with_weight(from: u64, to: u64, shard: u32, weight: f64) -> CrossShardEdge {
510        CrossShardEdge {
511            from_node: NodeId::from_seed(from),
512            to_node: NodeId::from_seed(to),
513            to_shard: ShardId::new(shard),
514            weight,
515        }
516    }
517
518    #[test]
519    fn test_new() {
520        let manager = CrossShardEdgeManager::new();
521        assert_eq!(manager.edge_count(), 0);
522        assert!(manager.is_empty());
523        assert!(!manager.has_pending());
524    }
525
526    #[test]
527    fn test_with_capacity() {
528        let manager = CrossShardEdgeManager::with_capacity(100);
529        assert_eq!(manager.edge_count(), 0);
530    }
531
532    #[test]
533    fn test_add_and_get_outgoing_edges() {
534        let mut manager = CrossShardEdgeManager::new();
535        let edge = make_edge(1, 2, 1);
536
537        manager.add_outgoing_edge(edge.clone());
538
539        assert_eq!(manager.edge_count(), 1);
540        assert_eq!(manager.outgoing_count(), 1);
541        assert!(manager.has_outgoing(&NodeId::from_seed(1)));
542        assert!(!manager.has_outgoing(&NodeId::from_seed(2)));
543
544        let outgoing = manager.get_outgoing(&NodeId::from_seed(1)).unwrap();
545        assert_eq!(outgoing.len(), 1);
546        assert_eq!(outgoing[0].to_shard, ShardId::new(1));
547    }
548
549    #[test]
550    fn test_add_incoming_edge() {
551        let mut manager = CrossShardEdgeManager::new();
552        let edge = make_edge(1, 2, 1);
553
554        manager.add_incoming_edge(edge);
555
556        assert_eq!(manager.incoming_count(), 1);
557        assert!(manager.has_incoming(&NodeId::from_seed(2)));
558    }
559
560    #[test]
561    fn test_pending_edges() {
562        let mut manager = CrossShardEdgeManager::new();
563
564        assert!(!manager.has_pending());
565        assert_eq!(manager.pending_count(), 0);
566
567        manager.add_outgoing_edge(make_edge(1, 2, 1));
568
569        assert!(manager.has_pending());
570        assert_eq!(manager.pending_count(), 1);
571        assert_eq!(manager.pending_edges().len(), 1);
572
573        manager.clear_pending();
574
575        assert!(!manager.has_pending());
576        assert_eq!(manager.pending_count(), 0);
577    }
578
579    #[test]
580    fn test_take_pending() {
581        let mut manager = CrossShardEdgeManager::new();
582        manager.add_outgoing_edge(make_edge(1, 2, 1));
583        manager.add_outgoing_edge(make_edge(3, 4, 2));
584
585        let pending = manager.take_pending();
586
587        assert_eq!(pending.len(), 2);
588        assert!(!manager.has_pending());
589    }
590
591    #[test]
592    fn test_remove_shard_edges() {
593        let mut manager = CrossShardEdgeManager::new();
594        manager.add_outgoing_edge(make_edge(1, 2, 1));
595        manager.add_outgoing_edge(make_edge(3, 4, 2));
596        manager.add_outgoing_edge(make_edge(5, 6, 1));
597
598        let removed = manager.remove_shard_edges(ShardId::new(1));
599
600        assert_eq!(removed, 2);
601        assert_eq!(manager.outgoing_count(), 1);
602        assert!(manager
603            .get_outgoing(&NodeId::from_seed(1))
604            .unwrap()
605            .is_empty());
606        assert!(!manager
607            .get_outgoing(&NodeId::from_seed(3))
608            .unwrap()
609            .is_empty());
610    }
611
612    #[test]
613    fn test_remove_node_edges() {
614        let mut manager = CrossShardEdgeManager::new();
615        manager.add_outgoing_edge(make_edge(1, 2, 1));
616        manager.add_outgoing_edge(make_edge(1, 3, 2));
617        manager.add_incoming_edge(make_edge(5, 1, 0));
618
619        let (outgoing, incoming) = manager.remove_node_edges(&NodeId::from_seed(1));
620
621        assert_eq!(outgoing, 2);
622        assert_eq!(incoming, 1);
623        assert!(manager.get_outgoing(&NodeId::from_seed(1)).is_none());
624    }
625
626    #[test]
627    fn test_decay_edges() {
628        let mut manager = CrossShardEdgeManager::new();
629        manager.add_outgoing_edge(make_edge_with_weight(1, 2, 1, 0.5));
630        manager.add_outgoing_edge(make_edge_with_weight(3, 4, 2, 0.1));
631
632        // Clear pending to focus on decay test
633        manager.clear_pending();
634
635        // 50% decay with 0.1 threshold should prune the 0.1 edge
636        let pruned = manager.decay_edges(0.5, 0.1);
637
638        assert_eq!(pruned.len(), 1);
639        assert_eq!(pruned[0].from_node, NodeId::from_seed(3));
640
641        // The remaining edge should have decayed from 0.5 to 0.25
642        let remaining = manager.get_outgoing(&NodeId::from_seed(1)).unwrap();
643        assert!((remaining[0].weight - 0.25).abs() < 0.001);
644    }
645
646    #[test]
647    fn test_strengthen_edge() {
648        let mut manager = CrossShardEdgeManager::new();
649        manager.add_outgoing_edge(make_edge_with_weight(1, 2, 1, 0.3));
650
651        let new_weight = manager.strengthen_edge(&NodeId::from_seed(1), &NodeId::from_seed(2), 0.2);
652
653        assert_eq!(new_weight, Some(0.5));
654
655        // Test clamping to 1.0
656        let clamped = manager.strengthen_edge(&NodeId::from_seed(1), &NodeId::from_seed(2), 0.8);
657
658        assert_eq!(clamped, Some(1.0));
659    }
660
661    #[test]
662    fn test_connected_shards() {
663        let mut manager = CrossShardEdgeManager::new();
664        manager.add_outgoing_edge(make_edge(1, 2, 1));
665        manager.add_outgoing_edge(make_edge(3, 4, 2));
666        manager.add_outgoing_edge(make_edge(5, 6, 1));
667
668        let shards = manager.connected_shards();
669
670        assert_eq!(shards.len(), 2);
671        assert!(shards.contains(&ShardId::new(1)));
672        assert!(shards.contains(&ShardId::new(2)));
673    }
674
675    #[test]
676    fn test_edges_by_shard() {
677        let mut manager = CrossShardEdgeManager::new();
678        manager.add_outgoing_edge(make_edge(1, 2, 1));
679        manager.add_outgoing_edge(make_edge(3, 4, 2));
680        manager.add_outgoing_edge(make_edge(5, 6, 1));
681
682        let by_shard = manager.edges_by_shard();
683
684        assert_eq!(by_shard.get(&ShardId::new(1)).unwrap().len(), 2);
685        assert_eq!(by_shard.get(&ShardId::new(2)).unwrap().len(), 1);
686    }
687
688    #[test]
689    fn test_pending_by_shard() {
690        let mut manager = CrossShardEdgeManager::new();
691        manager.add_outgoing_edge(make_edge(1, 2, 1));
692        manager.add_outgoing_edge(make_edge(3, 4, 2));
693        manager.add_outgoing_edge(make_edge(5, 6, 1));
694
695        let by_shard = manager.pending_by_shard();
696
697        assert_eq!(by_shard.get(&ShardId::new(1)).unwrap().len(), 2);
698        assert_eq!(by_shard.get(&ShardId::new(2)).unwrap().len(), 1);
699    }
700
701    #[test]
702    fn test_edge_counts() {
703        let mut manager = CrossShardEdgeManager::new();
704        manager.add_outgoing_edge(make_edge(1, 2, 1));
705        manager.add_outgoing_edge(make_edge(1, 3, 2));
706        manager.add_incoming_edge(make_edge(4, 5, 0));
707
708        assert_eq!(manager.outgoing_count(), 2);
709        assert_eq!(manager.incoming_count(), 1);
710        assert_eq!(manager.edge_count(), 3);
711        assert_eq!(manager.nodes_with_outgoing(), 1);
712        assert_eq!(manager.nodes_with_incoming(), 1);
713    }
714
715    #[test]
716    fn test_clear() {
717        let mut manager = CrossShardEdgeManager::new();
718        manager.add_outgoing_edge(make_edge(1, 2, 1));
719        manager.add_incoming_edge(make_edge(3, 4, 0));
720
721        manager.clear();
722
723        assert!(manager.is_empty());
724        assert_eq!(manager.edge_count(), 0);
725        assert!(!manager.has_pending());
726    }
727
728    #[test]
729    fn test_stats() {
730        let mut manager = CrossShardEdgeManager::new();
731        manager.add_outgoing_edge(make_edge_with_weight(1, 2, 1, 0.4));
732        manager.add_outgoing_edge(make_edge_with_weight(3, 4, 2, 0.6));
733        manager.add_incoming_edge(make_edge(5, 6, 0));
734
735        let stats = manager.stats();
736
737        assert_eq!(stats.outgoing_edges, 2);
738        assert_eq!(stats.incoming_edges, 1);
739        assert_eq!(stats.pending_resolution, 2);
740        assert_eq!(stats.connected_shards, 2);
741        assert!((stats.average_weight - 0.5).abs() < 0.001);
742    }
743
744    #[test]
745    fn test_add_outgoing_edges_batch() {
746        let mut manager = CrossShardEdgeManager::new();
747        let edges = vec![make_edge(1, 2, 1), make_edge(3, 4, 2), make_edge(5, 6, 3)];
748
749        manager.add_outgoing_edges(edges);
750
751        assert_eq!(manager.outgoing_count(), 3);
752        assert_eq!(manager.pending_count(), 3);
753    }
754
755    #[test]
756    fn test_default() {
757        let manager = CrossShardEdgeManager::default();
758        assert!(manager.is_empty());
759    }
760
761    #[test]
762    fn test_multiple_edges_same_source() {
763        let mut manager = CrossShardEdgeManager::new();
764        manager.add_outgoing_edge(make_edge(1, 2, 1));
765        manager.add_outgoing_edge(make_edge(1, 3, 2));
766        manager.add_outgoing_edge(make_edge(1, 4, 3));
767
768        let outgoing = manager.get_outgoing(&NodeId::from_seed(1)).unwrap();
769        assert_eq!(outgoing.len(), 3);
770        assert_eq!(manager.nodes_with_outgoing(), 1);
771    }
772}