1use crate::types::{CrossShardEdge, ShardId};
45use phago_core::types::NodeId;
46use std::collections::HashMap;
47
48pub struct CrossShardEdgeManager {
59 outgoing_edges: HashMap<NodeId, Vec<CrossShardEdge>>,
61 incoming_edges: HashMap<NodeId, Vec<CrossShardEdge>>,
63 pending_resolution: Vec<CrossShardEdge>,
65}
66
67impl CrossShardEdgeManager {
68 pub fn new() -> Self {
77 Self {
78 outgoing_edges: HashMap::new(),
79 incoming_edges: HashMap::new(),
80 pending_resolution: Vec::new(),
81 }
82 }
83
84 pub fn with_capacity(capacity: usize) -> Self {
90 Self {
91 outgoing_edges: HashMap::with_capacity(capacity),
92 incoming_edges: HashMap::with_capacity(capacity),
93 pending_resolution: Vec::with_capacity(capacity),
94 }
95 }
96
97 pub fn add_outgoing_edge(&mut self, edge: CrossShardEdge) {
118 self.outgoing_edges
119 .entry(edge.from_node)
120 .or_insert_with(Vec::new)
121 .push(edge.clone());
122 self.pending_resolution.push(edge);
123 }
124
125 pub fn add_outgoing_edges(&mut self, edges: impl IntoIterator<Item = CrossShardEdge>) {
133 for edge in edges {
134 self.add_outgoing_edge(edge);
135 }
136 }
137
138 pub fn add_incoming_edge(&mut self, edge: CrossShardEdge) {
147 self.incoming_edges
148 .entry(edge.to_node)
149 .or_insert_with(Vec::new)
150 .push(edge);
151 }
152
153 pub fn pending_edges(&self) -> &[CrossShardEdge] {
162 &self.pending_resolution
163 }
164
165 pub fn pending_count(&self) -> usize {
167 self.pending_resolution.len()
168 }
169
170 pub fn has_pending(&self) -> bool {
172 !self.pending_resolution.is_empty()
173 }
174
175 pub fn clear_pending(&mut self) {
180 self.pending_resolution.clear();
181 }
182
183 pub fn take_pending(&mut self) -> Vec<CrossShardEdge> {
192 std::mem::take(&mut self.pending_resolution)
193 }
194
195 pub fn get_outgoing(&self, node_id: &NodeId) -> Option<&Vec<CrossShardEdge>> {
205 self.outgoing_edges.get(node_id)
206 }
207
208 pub fn get_incoming(&self, node_id: &NodeId) -> Option<&Vec<CrossShardEdge>> {
218 self.incoming_edges.get(node_id)
219 }
220
221 pub fn has_outgoing(&self, node_id: &NodeId) -> bool {
223 self.outgoing_edges
224 .get(node_id)
225 .map_or(false, |v| !v.is_empty())
226 }
227
228 pub fn has_incoming(&self, node_id: &NodeId) -> bool {
230 self.incoming_edges
231 .get(node_id)
232 .map_or(false, |v| !v.is_empty())
233 }
234
235 pub fn remove_shard_edges(&mut self, shard_id: ShardId) -> usize {
248 let mut removed = 0;
249
250 for edges in self.outgoing_edges.values_mut() {
251 let before = edges.len();
252 edges.retain(|e| e.to_shard != shard_id);
253 removed += before - edges.len();
254 }
255
256 for edges in self.incoming_edges.values_mut() {
257 let before = edges.len();
258 edges.retain(|e| e.to_shard != shard_id);
259 removed += before - edges.len();
260 }
261
262 self.pending_resolution.retain(|e| e.to_shard != shard_id);
263
264 removed
265 }
266
267 pub fn remove_node_edges(&mut self, node_id: &NodeId) -> (usize, usize) {
279 let outgoing = self.outgoing_edges.remove(node_id).map_or(0, |v| v.len());
280 let incoming = self.incoming_edges.remove(node_id).map_or(0, |v| v.len());
281
282 self.pending_resolution.retain(|e| e.from_node != *node_id);
283
284 (outgoing, incoming)
285 }
286
287 pub fn decay_edges(&mut self, rate: f64, threshold: f64) -> Vec<CrossShardEdge> {
301 let mut pruned = Vec::new();
302
303 for edges in self.outgoing_edges.values_mut() {
304 let mut i = 0;
305 while i < edges.len() {
306 let new_weight = edges[i].weight * (1.0 - rate);
307 if new_weight < threshold {
308 pruned.push(edges.swap_remove(i));
309 } else {
310 edges[i].weight = new_weight;
311 i += 1;
312 }
313 }
314 }
315
316 for edges in self.incoming_edges.values_mut() {
318 edges.retain_mut(|e| {
319 e.weight *= 1.0 - rate;
320 e.weight >= threshold
321 });
322 }
323
324 pruned
325 }
326
327 pub fn strengthen_edge(
339 &mut self,
340 from_node: &NodeId,
341 to_node: &NodeId,
342 amount: f64,
343 ) -> Option<f64> {
344 if let Some(edges) = self.outgoing_edges.get_mut(from_node) {
345 for edge in edges.iter_mut() {
346 if edge.to_node == *to_node {
347 edge.weight = (edge.weight + amount).min(1.0);
348 return Some(edge.weight);
349 }
350 }
351 }
352 None
353 }
354
355 pub fn connected_shards(&self) -> Vec<ShardId> {
361 let mut shards: Vec<ShardId> = self
362 .outgoing_edges
363 .values()
364 .flat_map(|edges| edges.iter().map(|e| e.to_shard))
365 .collect();
366 shards.sort();
367 shards.dedup();
368 shards
369 }
370
371 pub fn edges_by_shard(&self) -> HashMap<ShardId, Vec<&CrossShardEdge>> {
379 let mut by_shard: HashMap<ShardId, Vec<&CrossShardEdge>> = HashMap::new();
380 for edges in self.outgoing_edges.values() {
381 for edge in edges {
382 by_shard.entry(edge.to_shard).or_default().push(edge);
383 }
384 }
385 by_shard
386 }
387
388 pub fn pending_by_shard(&self) -> HashMap<ShardId, Vec<&CrossShardEdge>> {
392 let mut by_shard: HashMap<ShardId, Vec<&CrossShardEdge>> = HashMap::new();
393 for edge in &self.pending_resolution {
394 by_shard.entry(edge.to_shard).or_default().push(edge);
395 }
396 by_shard
397 }
398
399 pub fn edge_count(&self) -> usize {
401 self.outgoing_edges.values().map(|v| v.len()).sum::<usize>()
402 + self.incoming_edges.values().map(|v| v.len()).sum::<usize>()
403 }
404
405 pub fn outgoing_count(&self) -> usize {
407 self.outgoing_edges.values().map(|v| v.len()).sum()
408 }
409
410 pub fn incoming_count(&self) -> usize {
412 self.incoming_edges.values().map(|v| v.len()).sum()
413 }
414
415 pub fn nodes_with_outgoing(&self) -> usize {
417 self.outgoing_edges
418 .iter()
419 .filter(|(_, v)| !v.is_empty())
420 .count()
421 }
422
423 pub fn nodes_with_incoming(&self) -> usize {
425 self.incoming_edges
426 .iter()
427 .filter(|(_, v)| !v.is_empty())
428 .count()
429 }
430
431 pub fn clear(&mut self) {
433 self.outgoing_edges.clear();
434 self.incoming_edges.clear();
435 self.pending_resolution.clear();
436 }
437
438 pub fn is_empty(&self) -> bool {
440 self.outgoing_edges.values().all(|v| v.is_empty())
441 && self.incoming_edges.values().all(|v| v.is_empty())
442 }
443
444 pub fn stats(&self) -> CrossShardEdgeStats {
446 let mut edges_by_shard: HashMap<ShardId, usize> = HashMap::new();
447 let mut total_weight = 0.0;
448 let mut edge_count = 0;
449
450 for edges in self.outgoing_edges.values() {
451 for edge in edges {
452 *edges_by_shard.entry(edge.to_shard).or_insert(0) += 1;
453 total_weight += edge.weight;
454 edge_count += 1;
455 }
456 }
457
458 CrossShardEdgeStats {
459 outgoing_edges: self.outgoing_count(),
460 incoming_edges: self.incoming_count(),
461 pending_resolution: self.pending_resolution.len(),
462 connected_shards: self.connected_shards().len(),
463 edges_by_shard,
464 average_weight: if edge_count > 0 {
465 total_weight / edge_count as f64
466 } else {
467 0.0
468 },
469 }
470 }
471}
472
473impl Default for CrossShardEdgeManager {
474 fn default() -> Self {
475 Self::new()
476 }
477}
478
479#[derive(Debug, Clone)]
481pub struct CrossShardEdgeStats {
482 pub outgoing_edges: usize,
484 pub incoming_edges: usize,
486 pub pending_resolution: usize,
488 pub connected_shards: usize,
490 pub edges_by_shard: HashMap<ShardId, usize>,
492 pub average_weight: f64,
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499
500 fn make_edge(from: u64, to: u64, shard: u32) -> CrossShardEdge {
501 CrossShardEdge {
502 from_node: NodeId::from_seed(from),
503 to_node: NodeId::from_seed(to),
504 to_shard: ShardId::new(shard),
505 weight: 0.5,
506 }
507 }
508
509 fn make_edge_with_weight(from: u64, to: u64, shard: u32, weight: f64) -> CrossShardEdge {
510 CrossShardEdge {
511 from_node: NodeId::from_seed(from),
512 to_node: NodeId::from_seed(to),
513 to_shard: ShardId::new(shard),
514 weight,
515 }
516 }
517
518 #[test]
519 fn test_new() {
520 let manager = CrossShardEdgeManager::new();
521 assert_eq!(manager.edge_count(), 0);
522 assert!(manager.is_empty());
523 assert!(!manager.has_pending());
524 }
525
526 #[test]
527 fn test_with_capacity() {
528 let manager = CrossShardEdgeManager::with_capacity(100);
529 assert_eq!(manager.edge_count(), 0);
530 }
531
532 #[test]
533 fn test_add_and_get_outgoing_edges() {
534 let mut manager = CrossShardEdgeManager::new();
535 let edge = make_edge(1, 2, 1);
536
537 manager.add_outgoing_edge(edge.clone());
538
539 assert_eq!(manager.edge_count(), 1);
540 assert_eq!(manager.outgoing_count(), 1);
541 assert!(manager.has_outgoing(&NodeId::from_seed(1)));
542 assert!(!manager.has_outgoing(&NodeId::from_seed(2)));
543
544 let outgoing = manager.get_outgoing(&NodeId::from_seed(1)).unwrap();
545 assert_eq!(outgoing.len(), 1);
546 assert_eq!(outgoing[0].to_shard, ShardId::new(1));
547 }
548
549 #[test]
550 fn test_add_incoming_edge() {
551 let mut manager = CrossShardEdgeManager::new();
552 let edge = make_edge(1, 2, 1);
553
554 manager.add_incoming_edge(edge);
555
556 assert_eq!(manager.incoming_count(), 1);
557 assert!(manager.has_incoming(&NodeId::from_seed(2)));
558 }
559
560 #[test]
561 fn test_pending_edges() {
562 let mut manager = CrossShardEdgeManager::new();
563
564 assert!(!manager.has_pending());
565 assert_eq!(manager.pending_count(), 0);
566
567 manager.add_outgoing_edge(make_edge(1, 2, 1));
568
569 assert!(manager.has_pending());
570 assert_eq!(manager.pending_count(), 1);
571 assert_eq!(manager.pending_edges().len(), 1);
572
573 manager.clear_pending();
574
575 assert!(!manager.has_pending());
576 assert_eq!(manager.pending_count(), 0);
577 }
578
579 #[test]
580 fn test_take_pending() {
581 let mut manager = CrossShardEdgeManager::new();
582 manager.add_outgoing_edge(make_edge(1, 2, 1));
583 manager.add_outgoing_edge(make_edge(3, 4, 2));
584
585 let pending = manager.take_pending();
586
587 assert_eq!(pending.len(), 2);
588 assert!(!manager.has_pending());
589 }
590
591 #[test]
592 fn test_remove_shard_edges() {
593 let mut manager = CrossShardEdgeManager::new();
594 manager.add_outgoing_edge(make_edge(1, 2, 1));
595 manager.add_outgoing_edge(make_edge(3, 4, 2));
596 manager.add_outgoing_edge(make_edge(5, 6, 1));
597
598 let removed = manager.remove_shard_edges(ShardId::new(1));
599
600 assert_eq!(removed, 2);
601 assert_eq!(manager.outgoing_count(), 1);
602 assert!(manager
603 .get_outgoing(&NodeId::from_seed(1))
604 .unwrap()
605 .is_empty());
606 assert!(!manager
607 .get_outgoing(&NodeId::from_seed(3))
608 .unwrap()
609 .is_empty());
610 }
611
612 #[test]
613 fn test_remove_node_edges() {
614 let mut manager = CrossShardEdgeManager::new();
615 manager.add_outgoing_edge(make_edge(1, 2, 1));
616 manager.add_outgoing_edge(make_edge(1, 3, 2));
617 manager.add_incoming_edge(make_edge(5, 1, 0));
618
619 let (outgoing, incoming) = manager.remove_node_edges(&NodeId::from_seed(1));
620
621 assert_eq!(outgoing, 2);
622 assert_eq!(incoming, 1);
623 assert!(manager.get_outgoing(&NodeId::from_seed(1)).is_none());
624 }
625
626 #[test]
627 fn test_decay_edges() {
628 let mut manager = CrossShardEdgeManager::new();
629 manager.add_outgoing_edge(make_edge_with_weight(1, 2, 1, 0.5));
630 manager.add_outgoing_edge(make_edge_with_weight(3, 4, 2, 0.1));
631
632 manager.clear_pending();
634
635 let pruned = manager.decay_edges(0.5, 0.1);
637
638 assert_eq!(pruned.len(), 1);
639 assert_eq!(pruned[0].from_node, NodeId::from_seed(3));
640
641 let remaining = manager.get_outgoing(&NodeId::from_seed(1)).unwrap();
643 assert!((remaining[0].weight - 0.25).abs() < 0.001);
644 }
645
646 #[test]
647 fn test_strengthen_edge() {
648 let mut manager = CrossShardEdgeManager::new();
649 manager.add_outgoing_edge(make_edge_with_weight(1, 2, 1, 0.3));
650
651 let new_weight = manager.strengthen_edge(&NodeId::from_seed(1), &NodeId::from_seed(2), 0.2);
652
653 assert_eq!(new_weight, Some(0.5));
654
655 let clamped = manager.strengthen_edge(&NodeId::from_seed(1), &NodeId::from_seed(2), 0.8);
657
658 assert_eq!(clamped, Some(1.0));
659 }
660
661 #[test]
662 fn test_connected_shards() {
663 let mut manager = CrossShardEdgeManager::new();
664 manager.add_outgoing_edge(make_edge(1, 2, 1));
665 manager.add_outgoing_edge(make_edge(3, 4, 2));
666 manager.add_outgoing_edge(make_edge(5, 6, 1));
667
668 let shards = manager.connected_shards();
669
670 assert_eq!(shards.len(), 2);
671 assert!(shards.contains(&ShardId::new(1)));
672 assert!(shards.contains(&ShardId::new(2)));
673 }
674
675 #[test]
676 fn test_edges_by_shard() {
677 let mut manager = CrossShardEdgeManager::new();
678 manager.add_outgoing_edge(make_edge(1, 2, 1));
679 manager.add_outgoing_edge(make_edge(3, 4, 2));
680 manager.add_outgoing_edge(make_edge(5, 6, 1));
681
682 let by_shard = manager.edges_by_shard();
683
684 assert_eq!(by_shard.get(&ShardId::new(1)).unwrap().len(), 2);
685 assert_eq!(by_shard.get(&ShardId::new(2)).unwrap().len(), 1);
686 }
687
688 #[test]
689 fn test_pending_by_shard() {
690 let mut manager = CrossShardEdgeManager::new();
691 manager.add_outgoing_edge(make_edge(1, 2, 1));
692 manager.add_outgoing_edge(make_edge(3, 4, 2));
693 manager.add_outgoing_edge(make_edge(5, 6, 1));
694
695 let by_shard = manager.pending_by_shard();
696
697 assert_eq!(by_shard.get(&ShardId::new(1)).unwrap().len(), 2);
698 assert_eq!(by_shard.get(&ShardId::new(2)).unwrap().len(), 1);
699 }
700
701 #[test]
702 fn test_edge_counts() {
703 let mut manager = CrossShardEdgeManager::new();
704 manager.add_outgoing_edge(make_edge(1, 2, 1));
705 manager.add_outgoing_edge(make_edge(1, 3, 2));
706 manager.add_incoming_edge(make_edge(4, 5, 0));
707
708 assert_eq!(manager.outgoing_count(), 2);
709 assert_eq!(manager.incoming_count(), 1);
710 assert_eq!(manager.edge_count(), 3);
711 assert_eq!(manager.nodes_with_outgoing(), 1);
712 assert_eq!(manager.nodes_with_incoming(), 1);
713 }
714
715 #[test]
716 fn test_clear() {
717 let mut manager = CrossShardEdgeManager::new();
718 manager.add_outgoing_edge(make_edge(1, 2, 1));
719 manager.add_incoming_edge(make_edge(3, 4, 0));
720
721 manager.clear();
722
723 assert!(manager.is_empty());
724 assert_eq!(manager.edge_count(), 0);
725 assert!(!manager.has_pending());
726 }
727
728 #[test]
729 fn test_stats() {
730 let mut manager = CrossShardEdgeManager::new();
731 manager.add_outgoing_edge(make_edge_with_weight(1, 2, 1, 0.4));
732 manager.add_outgoing_edge(make_edge_with_weight(3, 4, 2, 0.6));
733 manager.add_incoming_edge(make_edge(5, 6, 0));
734
735 let stats = manager.stats();
736
737 assert_eq!(stats.outgoing_edges, 2);
738 assert_eq!(stats.incoming_edges, 1);
739 assert_eq!(stats.pending_resolution, 2);
740 assert_eq!(stats.connected_shards, 2);
741 assert!((stats.average_weight - 0.5).abs() < 0.001);
742 }
743
744 #[test]
745 fn test_add_outgoing_edges_batch() {
746 let mut manager = CrossShardEdgeManager::new();
747 let edges = vec![make_edge(1, 2, 1), make_edge(3, 4, 2), make_edge(5, 6, 3)];
748
749 manager.add_outgoing_edges(edges);
750
751 assert_eq!(manager.outgoing_count(), 3);
752 assert_eq!(manager.pending_count(), 3);
753 }
754
755 #[test]
756 fn test_default() {
757 let manager = CrossShardEdgeManager::default();
758 assert!(manager.is_empty());
759 }
760
761 #[test]
762 fn test_multiple_edges_same_source() {
763 let mut manager = CrossShardEdgeManager::new();
764 manager.add_outgoing_edge(make_edge(1, 2, 1));
765 manager.add_outgoing_edge(make_edge(1, 3, 2));
766 manager.add_outgoing_edge(make_edge(1, 4, 3));
767
768 let outgoing = manager.get_outgoing(&NodeId::from_seed(1)).unwrap();
769 assert_eq!(outgoing.len(), 3);
770 assert_eq!(manager.nodes_with_outgoing(), 1);
771 }
772}