1mod edge_resolver;
41mod ghost_cache;
42
43pub use edge_resolver::{CrossShardEdgeManager, CrossShardEdgeStats};
44pub use ghost_cache::{GhostCacheStats, GhostNodeCache};
45
46use crate::hashing::ConsistentHashRing;
47use crate::types::*;
48use phago_core::substrate::Substrate;
49use phago_core::topology::TopologyGraph;
50use phago_core::types::{DocumentId, NodeData, NodeId, Position, Tick};
51use phago_runtime::colony::{Colony, ColonyConfig, ColonyStats};
52use std::collections::HashMap;
53use std::sync::Arc;
54use tokio::sync::RwLock;
55
56pub struct ShardedColony {
67 shard_id: ShardId,
69 local: Colony,
71 ghost_cache: GhostNodeCache,
73 edge_manager: CrossShardEdgeManager,
75 hash_ring: Arc<RwLock<ConsistentHashRing>>,
77 peers: HashMap<ShardId, String>,
79 pending_cross_edges: Vec<CrossShardEdge>,
81}
82
83impl ShardedColony {
84 pub fn new(
102 shard_id: ShardId,
103 config: ColonyConfig,
104 hash_ring: Arc<RwLock<ConsistentHashRing>>,
105 ) -> Self {
106 Self {
107 shard_id,
108 local: Colony::from_config(config),
109 ghost_cache: GhostNodeCache::new(1000), edge_manager: CrossShardEdgeManager::new(),
111 hash_ring,
112 peers: HashMap::new(),
113 pending_cross_edges: Vec::new(),
114 }
115 }
116
117 pub fn with_ghost_cache_size(
126 shard_id: ShardId,
127 config: ColonyConfig,
128 hash_ring: Arc<RwLock<ConsistentHashRing>>,
129 ghost_cache_size: usize,
130 ) -> Self {
131 Self {
132 shard_id,
133 local: Colony::from_config(config),
134 ghost_cache: GhostNodeCache::new(ghost_cache_size),
135 edge_manager: CrossShardEdgeManager::new(),
136 hash_ring,
137 peers: HashMap::new(),
138 pending_cross_edges: Vec::new(),
139 }
140 }
141
142 pub fn shard_id(&self) -> ShardId {
144 self.shard_id
145 }
146
147 pub async fn owns_document(&self, doc_id: &DocumentId) -> bool {
155 let ring = self.hash_ring.read().await;
156 ring.get_shard(doc_id) == self.shard_id
157 }
158
159 pub fn owns_document_sync(&self, doc_id: &DocumentId, ring: &ConsistentHashRing) -> bool {
164 ring.get_shard(doc_id) == self.shard_id
165 }
166
167 pub async fn ingest_document(
182 &mut self,
183 title: &str,
184 content: &str,
185 position: Position,
186 ) -> DistributedResult<DocumentId> {
187 let doc_id = DocumentId::new();
189
190 if !self.owns_document(&doc_id).await {
192 return Err(DistributedError::RoutingFailed(doc_id));
193 }
194
195 let actual_id = self.local.ingest_document(title, content, position);
197 Ok(actual_id)
198 }
199
200 pub fn ingest_document_direct(
211 &mut self,
212 title: &str,
213 content: &str,
214 position: Position,
215 ) -> DocumentId {
216 self.local.ingest_document(title, content, position)
217 }
218
219 pub fn tick_phase(&mut self, phase: TickPhase) -> PhaseResult {
234 match phase {
235 TickPhase::Sense => {
236 PhaseResult {
240 shard_id: self.shard_id,
241 phase,
242 tick: self.local.substrate().current_tick(),
243 cross_shard_edges: Vec::new(),
244 node_count: self.local.stats().graph_nodes,
245 edge_count: self.local.stats().graph_edges,
246 }
247 }
248 TickPhase::Act | TickPhase::Decay => {
249 let _events = self.local.tick();
252
253 let cross_edges = std::mem::take(&mut self.pending_cross_edges);
255
256 PhaseResult {
257 shard_id: self.shard_id,
258 phase,
259 tick: self.local.substrate().current_tick(),
260 cross_shard_edges: cross_edges,
261 node_count: self.local.stats().graph_nodes,
262 edge_count: self.local.stats().graph_edges,
263 }
264 }
265 TickPhase::Advance => {
266 PhaseResult {
268 shard_id: self.shard_id,
269 phase,
270 tick: self.local.substrate().current_tick(),
271 cross_shard_edges: Vec::new(),
272 node_count: self.local.stats().graph_nodes,
273 edge_count: self.local.stats().graph_edges,
274 }
275 }
276 }
277 }
278
279 pub fn get_term_frequencies(&self, terms: &[String]) -> HashMap<String, u64> {
292 let mut freqs = HashMap::new();
293 let graph = self.local.substrate().graph();
294
295 for term in terms {
296 let count = graph.find_nodes_by_label(term).len();
297 if count > 0 {
298 freqs.insert(term.clone(), count as u64);
299 }
300 }
301
302 freqs
303 }
304
305 pub fn get_node(&self, id: &NodeId) -> Option<NodeData> {
315 self.local.substrate().graph().get_node(id).cloned()
316 }
317
318 pub fn add_peer(&mut self, shard_id: ShardId, address: String) {
325 self.peers.insert(shard_id, address);
326 }
327
328 pub fn remove_peer(&mut self, shard_id: ShardId) {
336 self.peers.remove(&shard_id);
337 self.ghost_cache.invalidate_shard(shard_id);
338 }
339
340 pub fn peers(&self) -> &HashMap<ShardId, String> {
342 &self.peers
343 }
344
345 pub fn peer_address(&self, shard_id: &ShardId) -> Option<&String> {
347 self.peers.get(shard_id)
348 }
349
350 pub fn local(&self) -> &Colony {
352 &self.local
353 }
354
355 pub fn local_mut(&mut self) -> &mut Colony {
357 &mut self.local
358 }
359
360 pub fn ghost_cache(&self) -> &GhostNodeCache {
362 &self.ghost_cache
363 }
364
365 pub fn ghost_cache_mut(&mut self) -> &mut GhostNodeCache {
367 &mut self.ghost_cache
368 }
369
370 pub fn edge_manager(&self) -> &CrossShardEdgeManager {
372 &self.edge_manager
373 }
374
375 pub fn edge_manager_mut(&mut self) -> &mut CrossShardEdgeManager {
377 &mut self.edge_manager
378 }
379
380 pub fn register_cross_shard_edge(&mut self, edge: CrossShardEdge) {
389 self.edge_manager.add_outgoing_edge(edge.clone());
390 self.pending_cross_edges.push(edge);
391 }
392
393 pub fn handle_shard_offline(&mut self, shard_id: ShardId) -> (usize, usize) {
406 let edges_removed = self.edge_manager.remove_shard_edges(shard_id);
407 let ghosts_invalidated = self.ghost_cache.invalidate_shard(shard_id);
408 (edges_removed, ghosts_invalidated)
409 }
410
411 pub fn decay_cross_shard_edges(&mut self, rate: f64, threshold: f64) -> Vec<CrossShardEdge> {
422 self.edge_manager.decay_edges(rate, threshold)
423 }
424
425 pub fn cross_shard_edge_stats(&self) -> CrossShardEdgeStats {
427 self.edge_manager.stats()
428 }
429
430 pub fn connected_shards(&self) -> Vec<ShardId> {
432 self.edge_manager.connected_shards()
433 }
434
435 pub fn take_pending_for_resolution(&mut self) -> HashMap<ShardId, Vec<CrossShardEdge>> {
445 let pending = self.edge_manager.take_pending();
446 let mut by_shard: HashMap<ShardId, Vec<CrossShardEdge>> = HashMap::new();
447 for edge in pending {
448 by_shard.entry(edge.to_shard).or_default().push(edge);
449 }
450 by_shard
451 }
452
453 pub fn add_pending_cross_edge(&mut self, edge: CrossShardEdge) {
462 self.pending_cross_edges.push(edge);
463 }
464
465 pub fn pending_cross_edges(&self) -> &[CrossShardEdge] {
467 &self.pending_cross_edges
468 }
469
470 pub fn clear_pending_cross_edges(&mut self) {
472 self.pending_cross_edges.clear();
473 }
474
475 pub fn hash_ring(&self) -> &Arc<RwLock<ConsistentHashRing>> {
477 &self.hash_ring
478 }
479
480 pub fn health(&self) -> ShardHealth {
482 let stats = self.local.stats();
483 ShardHealth {
484 shard_id: self.shard_id,
485 healthy: true,
486 load: stats.agents_alive as f64 / 100.0, memory_usage_mb: 0, pending_operations: self.pending_cross_edges.len(),
489 }
490 }
491
492 pub fn stats(&self) -> ColonyStats {
494 self.local.stats()
495 }
496
497 pub fn current_tick(&self) -> Tick {
499 self.local.substrate().current_tick()
500 }
501
502 pub fn node_count(&self) -> usize {
504 self.local.substrate().node_count()
505 }
506
507 pub fn document_count(&self) -> usize {
509 self.local.substrate().all_documents().len()
510 }
511
512 pub fn tick(&mut self) {
514 self.local.tick();
515 }
516
517 pub fn run(&mut self, ticks: u64) {
519 self.local.run(ticks);
520 }
521
522 pub fn shard_info(&self, address: String) -> ShardInfo {
524 let stats = self.local.stats();
525 ShardInfo {
526 id: self.shard_id,
527 address,
528 node_count: stats.graph_nodes,
529 edge_count: stats.graph_edges,
530 document_count: stats.documents_total,
531 last_heartbeat: 0, }
533 }
534
535 pub fn execute_local_query(&self, request: &LocalQueryRequest) -> LocalQueryResult {
545 let graph = self.local.substrate().graph();
546 let mut results = Vec::new();
547
548 for term in &request.query_terms {
550 let matching_nodes = graph.find_nodes_by_label(term);
551 for node_id in matching_nodes {
552 if let Some(node) = graph.get_node(&node_id) {
553 let score = node.access_count as f64 * 0.1;
555 results.push(ScoredNode {
556 node_id,
557 label: node.label.clone(),
558 score,
559 shard_id: self.shard_id,
560 });
561 }
562 }
563 }
564
565 results.sort_by(|a, b| {
567 b.score
568 .partial_cmp(&a.score)
569 .unwrap_or(std::cmp::Ordering::Equal)
570 });
571 results.truncate(request.max_results);
572
573 let term_frequencies = self.get_term_frequencies(&request.query_terms);
575
576 LocalQueryResult {
577 shard_id: self.shard_id,
578 results,
579 term_frequencies,
580 }
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use super::*;
587
588 fn create_test_shard() -> (ShardedColony, Arc<RwLock<ConsistentHashRing>>) {
589 let hash_ring = Arc::new(RwLock::new(ConsistentHashRing::new(3)));
590 let shard = ShardedColony::new(ShardId::new(0), ColonyConfig::default(), hash_ring.clone());
591 (shard, hash_ring)
592 }
593
594 #[test]
595 fn test_new_sharded_colony() {
596 let (shard, _) = create_test_shard();
597 assert_eq!(shard.shard_id(), ShardId::new(0));
598 assert!(shard.pending_cross_edges().is_empty());
599 assert_eq!(shard.ghost_cache().len(), 0);
600 }
601
602 #[test]
603 fn test_add_peer() {
604 let (mut shard, _) = create_test_shard();
605 shard.add_peer(ShardId::new(1), "127.0.0.1:8081".to_string());
606 shard.add_peer(ShardId::new(2), "127.0.0.1:8082".to_string());
607
608 assert_eq!(shard.peers().len(), 2);
609 assert_eq!(
610 shard.peer_address(&ShardId::new(1)),
611 Some(&"127.0.0.1:8081".to_string())
612 );
613 }
614
615 #[test]
616 fn test_remove_peer_invalidates_ghosts() {
617 let (mut shard, _) = create_test_shard();
618 shard.add_peer(ShardId::new(1), "127.0.0.1:8081".to_string());
619
620 let ghost = GhostNode::new(NodeId::from_seed(1), ShardId::new(1), "test".to_string());
622 shard.ghost_cache_mut().insert(ghost);
623 assert_eq!(shard.ghost_cache().len(), 1);
624
625 shard.remove_peer(ShardId::new(1));
627 assert_eq!(shard.ghost_cache().len(), 0);
628 }
629
630 #[test]
631 fn test_tick_phase_sense() {
632 let (mut shard, _) = create_test_shard();
633 let result = shard.tick_phase(TickPhase::Sense);
634
635 assert_eq!(result.shard_id, ShardId::new(0));
636 assert_eq!(result.phase, TickPhase::Sense);
637 assert!(result.cross_shard_edges.is_empty());
638 }
639
640 #[test]
641 fn test_tick_phase_act() {
642 let (mut shard, _) = create_test_shard();
643 let result = shard.tick_phase(TickPhase::Act);
644
645 assert_eq!(result.shard_id, ShardId::new(0));
646 assert_eq!(result.phase, TickPhase::Act);
647 }
648
649 #[test]
650 fn test_health() {
651 let (shard, _) = create_test_shard();
652 let health = shard.health();
653
654 assert_eq!(health.shard_id, ShardId::new(0));
655 assert!(health.healthy);
656 assert_eq!(health.pending_operations, 0);
657 }
658
659 #[test]
660 fn test_add_pending_cross_edge() {
661 let (mut shard, _) = create_test_shard();
662
663 let edge = CrossShardEdge {
664 from_node: NodeId::from_seed(1),
665 to_node: NodeId::from_seed(2),
666 to_shard: ShardId::new(1),
667 weight: 0.5,
668 };
669
670 shard.add_pending_cross_edge(edge);
671 assert_eq!(shard.pending_cross_edges().len(), 1);
672
673 shard.clear_pending_cross_edges();
674 assert!(shard.pending_cross_edges().is_empty());
675 }
676
677 #[test]
678 fn test_ingest_document_direct() {
679 let (mut shard, _) = create_test_shard();
680
681 let doc_id = shard.ingest_document_direct("Test", "Content", Position::new(0.0, 0.0));
682
683 let stats = shard.stats();
684 assert_eq!(stats.documents_total, 1);
685 assert!(!doc_id.0.is_nil());
686 }
687
688 #[test]
689 fn test_get_term_frequencies() {
690 let (shard, _) = create_test_shard();
691
692 let freqs = shard.get_term_frequencies(&["test".to_string()]);
694 assert!(freqs.is_empty());
695 }
696
697 #[test]
698 fn test_execute_local_query() {
699 let (shard, _) = create_test_shard();
700
701 let request = LocalQueryRequest {
702 query_terms: vec!["test".to_string()],
703 max_results: 10,
704 global_df: HashMap::new(),
705 };
706
707 let result = shard.execute_local_query(&request);
708 assert_eq!(result.shard_id, ShardId::new(0));
709 assert!(result.results.is_empty()); }
711
712 #[test]
713 fn test_shard_info() {
714 let (shard, _) = create_test_shard();
715 let info = shard.shard_info("127.0.0.1:8080".to_string());
716
717 assert_eq!(info.id, ShardId::new(0));
718 assert_eq!(info.address, "127.0.0.1:8080");
719 assert_eq!(info.node_count, 0);
720 assert_eq!(info.edge_count, 0);
721 }
722
723 #[test]
724 fn test_node_count_and_document_count() {
725 let (mut shard, _) = create_test_shard();
726
727 assert_eq!(shard.node_count(), 0);
728 assert_eq!(shard.document_count(), 0);
729
730 shard.ingest_document_direct("Test", "Content", Position::new(0.0, 0.0));
731 assert_eq!(shard.document_count(), 1);
732 }
733
734 #[test]
735 fn test_tick_and_run() {
736 let (mut shard, _) = create_test_shard();
737
738 shard.tick();
739 assert_eq!(shard.current_tick(), 1);
740
741 shard.run(5);
742 assert_eq!(shard.current_tick(), 6);
743 }
744
745 #[tokio::test]
746 async fn test_owns_document() {
747 let (shard, _hash_ring) = create_test_shard();
748
749 let mut owned_count = 0;
751 for i in 0..100 {
752 let doc_id = DocumentId::from_seed(i);
753 if shard.owns_document(&doc_id).await {
754 owned_count += 1;
755 }
756 }
757
758 assert!(owned_count > 20 && owned_count < 50);
760 }
761
762 #[test]
763 fn test_with_ghost_cache_size() {
764 let hash_ring = Arc::new(RwLock::new(ConsistentHashRing::new(3)));
765 let shard = ShardedColony::with_ghost_cache_size(
766 ShardId::new(0),
767 ColonyConfig::default(),
768 hash_ring,
769 500,
770 );
771
772 assert_eq!(shard.ghost_cache().capacity(), 500);
773 }
774}