mod edge_resolver;
mod ghost_cache;
pub use edge_resolver::{CrossShardEdgeManager, CrossShardEdgeStats};
pub use ghost_cache::{GhostCacheStats, GhostNodeCache};
use crate::hashing::ConsistentHashRing;
use crate::types::*;
use phago_core::substrate::Substrate;
use phago_core::topology::TopologyGraph;
use phago_core::types::{DocumentId, NodeData, NodeId, Position, Tick};
use phago_runtime::colony::{Colony, ColonyConfig, ColonyStats};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct ShardedColony {
shard_id: ShardId,
local: Colony,
ghost_cache: GhostNodeCache,
edge_manager: CrossShardEdgeManager,
hash_ring: Arc<RwLock<ConsistentHashRing>>,
peers: HashMap<ShardId, String>,
pending_cross_edges: Vec<CrossShardEdge>,
}
impl ShardedColony {
pub fn new(
shard_id: ShardId,
config: ColonyConfig,
hash_ring: Arc<RwLock<ConsistentHashRing>>,
) -> Self {
Self {
shard_id,
local: Colony::from_config(config),
ghost_cache: GhostNodeCache::new(1000), edge_manager: CrossShardEdgeManager::new(),
hash_ring,
peers: HashMap::new(),
pending_cross_edges: Vec::new(),
}
}
pub fn with_ghost_cache_size(
shard_id: ShardId,
config: ColonyConfig,
hash_ring: Arc<RwLock<ConsistentHashRing>>,
ghost_cache_size: usize,
) -> Self {
Self {
shard_id,
local: Colony::from_config(config),
ghost_cache: GhostNodeCache::new(ghost_cache_size),
edge_manager: CrossShardEdgeManager::new(),
hash_ring,
peers: HashMap::new(),
pending_cross_edges: Vec::new(),
}
}
pub fn shard_id(&self) -> ShardId {
self.shard_id
}
pub async fn owns_document(&self, doc_id: &DocumentId) -> bool {
let ring = self.hash_ring.read().await;
ring.get_shard(doc_id) == self.shard_id
}
pub fn owns_document_sync(&self, doc_id: &DocumentId, ring: &ConsistentHashRing) -> bool {
ring.get_shard(doc_id) == self.shard_id
}
pub async fn ingest_document(
&mut self,
title: &str,
content: &str,
position: Position,
) -> DistributedResult<DocumentId> {
let doc_id = DocumentId::new();
if !self.owns_document(&doc_id).await {
return Err(DistributedError::RoutingFailed(doc_id));
}
let actual_id = self.local.ingest_document(title, content, position);
Ok(actual_id)
}
pub fn ingest_document_direct(
&mut self,
title: &str,
content: &str,
position: Position,
) -> DocumentId {
self.local.ingest_document(title, content, position)
}
pub fn tick_phase(&mut self, phase: TickPhase) -> PhaseResult {
match phase {
TickPhase::Sense => {
PhaseResult {
shard_id: self.shard_id,
phase,
tick: self.local.substrate().current_tick(),
cross_shard_edges: Vec::new(),
node_count: self.local.stats().graph_nodes,
edge_count: self.local.stats().graph_edges,
}
}
TickPhase::Act | TickPhase::Decay => {
let _events = self.local.tick();
let cross_edges = std::mem::take(&mut self.pending_cross_edges);
PhaseResult {
shard_id: self.shard_id,
phase,
tick: self.local.substrate().current_tick(),
cross_shard_edges: cross_edges,
node_count: self.local.stats().graph_nodes,
edge_count: self.local.stats().graph_edges,
}
}
TickPhase::Advance => {
PhaseResult {
shard_id: self.shard_id,
phase,
tick: self.local.substrate().current_tick(),
cross_shard_edges: Vec::new(),
node_count: self.local.stats().graph_nodes,
edge_count: self.local.stats().graph_edges,
}
}
}
}
pub fn get_term_frequencies(&self, terms: &[String]) -> HashMap<String, u64> {
let mut freqs = HashMap::new();
let graph = self.local.substrate().graph();
for term in terms {
let count = graph.find_nodes_by_label(term).len();
if count > 0 {
freqs.insert(term.clone(), count as u64);
}
}
freqs
}
pub fn get_node(&self, id: &NodeId) -> Option<NodeData> {
self.local.substrate().graph().get_node(id).cloned()
}
pub fn add_peer(&mut self, shard_id: ShardId, address: String) {
self.peers.insert(shard_id, address);
}
pub fn remove_peer(&mut self, shard_id: ShardId) {
self.peers.remove(&shard_id);
self.ghost_cache.invalidate_shard(shard_id);
}
pub fn peers(&self) -> &HashMap<ShardId, String> {
&self.peers
}
pub fn peer_address(&self, shard_id: &ShardId) -> Option<&String> {
self.peers.get(shard_id)
}
pub fn local(&self) -> &Colony {
&self.local
}
pub fn local_mut(&mut self) -> &mut Colony {
&mut self.local
}
pub fn ghost_cache(&self) -> &GhostNodeCache {
&self.ghost_cache
}
pub fn ghost_cache_mut(&mut self) -> &mut GhostNodeCache {
&mut self.ghost_cache
}
pub fn edge_manager(&self) -> &CrossShardEdgeManager {
&self.edge_manager
}
pub fn edge_manager_mut(&mut self) -> &mut CrossShardEdgeManager {
&mut self.edge_manager
}
pub fn register_cross_shard_edge(&mut self, edge: CrossShardEdge) {
self.edge_manager.add_outgoing_edge(edge.clone());
self.pending_cross_edges.push(edge);
}
pub fn handle_shard_offline(&mut self, shard_id: ShardId) -> (usize, usize) {
let edges_removed = self.edge_manager.remove_shard_edges(shard_id);
let ghosts_invalidated = self.ghost_cache.invalidate_shard(shard_id);
(edges_removed, ghosts_invalidated)
}
pub fn decay_cross_shard_edges(&mut self, rate: f64, threshold: f64) -> Vec<CrossShardEdge> {
self.edge_manager.decay_edges(rate, threshold)
}
pub fn cross_shard_edge_stats(&self) -> CrossShardEdgeStats {
self.edge_manager.stats()
}
pub fn connected_shards(&self) -> Vec<ShardId> {
self.edge_manager.connected_shards()
}
pub fn take_pending_for_resolution(&mut self) -> HashMap<ShardId, Vec<CrossShardEdge>> {
let pending = self.edge_manager.take_pending();
let mut by_shard: HashMap<ShardId, Vec<CrossShardEdge>> = HashMap::new();
for edge in pending {
by_shard.entry(edge.to_shard).or_default().push(edge);
}
by_shard
}
pub fn add_pending_cross_edge(&mut self, edge: CrossShardEdge) {
self.pending_cross_edges.push(edge);
}
pub fn pending_cross_edges(&self) -> &[CrossShardEdge] {
&self.pending_cross_edges
}
pub fn clear_pending_cross_edges(&mut self) {
self.pending_cross_edges.clear();
}
pub fn hash_ring(&self) -> &Arc<RwLock<ConsistentHashRing>> {
&self.hash_ring
}
pub fn health(&self) -> ShardHealth {
let stats = self.local.stats();
ShardHealth {
shard_id: self.shard_id,
healthy: true,
load: stats.agents_alive as f64 / 100.0, memory_usage_mb: 0, pending_operations: self.pending_cross_edges.len(),
}
}
pub fn stats(&self) -> ColonyStats {
self.local.stats()
}
pub fn current_tick(&self) -> Tick {
self.local.substrate().current_tick()
}
pub fn node_count(&self) -> usize {
self.local.substrate().node_count()
}
pub fn document_count(&self) -> usize {
self.local.substrate().all_documents().len()
}
pub fn tick(&mut self) {
self.local.tick();
}
pub fn run(&mut self, ticks: u64) {
self.local.run(ticks);
}
pub fn shard_info(&self, address: String) -> ShardInfo {
let stats = self.local.stats();
ShardInfo {
id: self.shard_id,
address,
node_count: stats.graph_nodes,
edge_count: stats.graph_edges,
document_count: stats.documents_total,
last_heartbeat: 0, }
}
pub fn execute_local_query(&self, request: &LocalQueryRequest) -> LocalQueryResult {
let graph = self.local.substrate().graph();
let mut results = Vec::new();
for term in &request.query_terms {
let matching_nodes = graph.find_nodes_by_label(term);
for node_id in matching_nodes {
if let Some(node) = graph.get_node(&node_id) {
let score = node.access_count as f64 * 0.1;
results.push(ScoredNode {
node_id,
label: node.label.clone(),
score,
shard_id: self.shard_id,
});
}
}
}
results.sort_by(|a, b| {
b.score
.partial_cmp(&a.score)
.unwrap_or(std::cmp::Ordering::Equal)
});
results.truncate(request.max_results);
let term_frequencies = self.get_term_frequencies(&request.query_terms);
LocalQueryResult {
shard_id: self.shard_id,
results,
term_frequencies,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_shard() -> (ShardedColony, Arc<RwLock<ConsistentHashRing>>) {
let hash_ring = Arc::new(RwLock::new(ConsistentHashRing::new(3)));
let shard = ShardedColony::new(ShardId::new(0), ColonyConfig::default(), hash_ring.clone());
(shard, hash_ring)
}
#[test]
fn test_new_sharded_colony() {
let (shard, _) = create_test_shard();
assert_eq!(shard.shard_id(), ShardId::new(0));
assert!(shard.pending_cross_edges().is_empty());
assert_eq!(shard.ghost_cache().len(), 0);
}
#[test]
fn test_add_peer() {
let (mut shard, _) = create_test_shard();
shard.add_peer(ShardId::new(1), "127.0.0.1:8081".to_string());
shard.add_peer(ShardId::new(2), "127.0.0.1:8082".to_string());
assert_eq!(shard.peers().len(), 2);
assert_eq!(
shard.peer_address(&ShardId::new(1)),
Some(&"127.0.0.1:8081".to_string())
);
}
#[test]
fn test_remove_peer_invalidates_ghosts() {
let (mut shard, _) = create_test_shard();
shard.add_peer(ShardId::new(1), "127.0.0.1:8081".to_string());
let ghost = GhostNode::new(NodeId::from_seed(1), ShardId::new(1), "test".to_string());
shard.ghost_cache_mut().insert(ghost);
assert_eq!(shard.ghost_cache().len(), 1);
shard.remove_peer(ShardId::new(1));
assert_eq!(shard.ghost_cache().len(), 0);
}
#[test]
fn test_tick_phase_sense() {
let (mut shard, _) = create_test_shard();
let result = shard.tick_phase(TickPhase::Sense);
assert_eq!(result.shard_id, ShardId::new(0));
assert_eq!(result.phase, TickPhase::Sense);
assert!(result.cross_shard_edges.is_empty());
}
#[test]
fn test_tick_phase_act() {
let (mut shard, _) = create_test_shard();
let result = shard.tick_phase(TickPhase::Act);
assert_eq!(result.shard_id, ShardId::new(0));
assert_eq!(result.phase, TickPhase::Act);
}
#[test]
fn test_health() {
let (shard, _) = create_test_shard();
let health = shard.health();
assert_eq!(health.shard_id, ShardId::new(0));
assert!(health.healthy);
assert_eq!(health.pending_operations, 0);
}
#[test]
fn test_add_pending_cross_edge() {
let (mut shard, _) = create_test_shard();
let edge = CrossShardEdge {
from_node: NodeId::from_seed(1),
to_node: NodeId::from_seed(2),
to_shard: ShardId::new(1),
weight: 0.5,
};
shard.add_pending_cross_edge(edge);
assert_eq!(shard.pending_cross_edges().len(), 1);
shard.clear_pending_cross_edges();
assert!(shard.pending_cross_edges().is_empty());
}
#[test]
fn test_ingest_document_direct() {
let (mut shard, _) = create_test_shard();
let doc_id = shard.ingest_document_direct("Test", "Content", Position::new(0.0, 0.0));
let stats = shard.stats();
assert_eq!(stats.documents_total, 1);
assert!(!doc_id.0.is_nil());
}
#[test]
fn test_get_term_frequencies() {
let (shard, _) = create_test_shard();
let freqs = shard.get_term_frequencies(&["test".to_string()]);
assert!(freqs.is_empty());
}
#[test]
fn test_execute_local_query() {
let (shard, _) = create_test_shard();
let request = LocalQueryRequest {
query_terms: vec!["test".to_string()],
max_results: 10,
global_df: HashMap::new(),
};
let result = shard.execute_local_query(&request);
assert_eq!(result.shard_id, ShardId::new(0));
assert!(result.results.is_empty()); }
#[test]
fn test_shard_info() {
let (shard, _) = create_test_shard();
let info = shard.shard_info("127.0.0.1:8080".to_string());
assert_eq!(info.id, ShardId::new(0));
assert_eq!(info.address, "127.0.0.1:8080");
assert_eq!(info.node_count, 0);
assert_eq!(info.edge_count, 0);
}
#[test]
fn test_node_count_and_document_count() {
let (mut shard, _) = create_test_shard();
assert_eq!(shard.node_count(), 0);
assert_eq!(shard.document_count(), 0);
shard.ingest_document_direct("Test", "Content", Position::new(0.0, 0.0));
assert_eq!(shard.document_count(), 1);
}
#[test]
fn test_tick_and_run() {
let (mut shard, _) = create_test_shard();
shard.tick();
assert_eq!(shard.current_tick(), 1);
shard.run(5);
assert_eq!(shard.current_tick(), 6);
}
#[tokio::test]
async fn test_owns_document() {
let (shard, _hash_ring) = create_test_shard();
let mut owned_count = 0;
for i in 0..100 {
let doc_id = DocumentId::from_seed(i);
if shard.owns_document(&doc_id).await {
owned_count += 1;
}
}
assert!(owned_count > 20 && owned_count < 50);
}
#[test]
fn test_with_ghost_cache_size() {
let hash_ring = Arc::new(RwLock::new(ConsistentHashRing::new(3)));
let shard = ShardedColony::with_ghost_cache_size(
ShardId::new(0),
ColonyConfig::default(),
hash_ring,
500,
);
assert_eq!(shard.ghost_cache().capacity(), 500);
}
}