Skip to main content

phago_distributed/shard/
mod.rs

1//! Sharded colony implementation.
2//!
3//! This module provides the `ShardedColony` type which wraps a local Colony
4//! and adds distributed coordination capabilities:
5//!
6//! - Document routing via consistent hash ring
7//! - Ghost node cache for cross-shard references
8//! - Tick phase coordination with the coordinator
9//!
10//! # Architecture
11//!
12//! Each shard in the cluster runs a `ShardedColony` that manages:
13//! - A local `Colony` instance with its agents and substrate
14//! - A `GhostNodeCache` for caching references to nodes on other shards
15//! - A reference to the `ConsistentHashRing` for document routing
16//!
17//! # Example
18//!
19//! ```ignore
20//! use phago_distributed::shard::ShardedColony;
21//! use phago_distributed::hashing::ConsistentHashRing;
22//! use phago_runtime::colony::ColonyConfig;
23//!
24//! let hash_ring = Arc::new(RwLock::new(ConsistentHashRing::new(3)));
25//! let mut shard = ShardedColony::new(
26//!     ShardId::new(0),
27//!     ColonyConfig::default(),
28//!     hash_ring,
29//! );
30//!
31//! // Check if this shard owns a document
32//! if shard.owns_document(&doc_id).await {
33//!     shard.ingest_document("title", "content", position).await?;
34//! }
35//!
36//! // Execute tick phases
37//! let result = shard.tick_phase(TickPhase::Sense);
38//! ```
39
40mod edge_resolver;
41mod ghost_cache;
42
43pub use edge_resolver::{CrossShardEdgeManager, CrossShardEdgeStats};
44pub use ghost_cache::{GhostCacheStats, GhostNodeCache};
45
46use crate::hashing::ConsistentHashRing;
47use crate::types::*;
48use phago_core::substrate::Substrate;
49use phago_core::topology::TopologyGraph;
50use phago_core::types::{DocumentId, NodeData, NodeId, Position, Tick};
51use phago_runtime::colony::{Colony, ColonyConfig, ColonyStats};
52use std::collections::HashMap;
53use std::sync::Arc;
54use tokio::sync::RwLock;
55
56/// A sharded colony that participates in distributed coordination.
57///
58/// ShardedColony wraps a local Colony instance and adds the machinery
59/// needed for distributed operation:
60///
61/// - Consistent hashing for document routing
62/// - Ghost node cache for cross-shard edge references
63/// - Cross-shard edge management for edges spanning shards
64/// - Peer shard address tracking
65/// - Cross-shard edge collection during tick phases
66pub struct ShardedColony {
67    /// This shard's ID.
68    shard_id: ShardId,
69    /// The local colony instance.
70    local: Colony,
71    /// Cache of ghost nodes from other shards.
72    ghost_cache: GhostNodeCache,
73    /// Manager for cross-shard edges.
74    edge_manager: CrossShardEdgeManager,
75    /// Hash ring for document routing.
76    hash_ring: Arc<RwLock<ConsistentHashRing>>,
77    /// Addresses of peer shards.
78    peers: HashMap<ShardId, String>,
79    /// Pending cross-shard edges to resolve.
80    pending_cross_edges: Vec<CrossShardEdge>,
81}
82
83impl ShardedColony {
84    /// Create a new sharded colony.
85    ///
86    /// # Arguments
87    ///
88    /// * `shard_id` - Unique identifier for this shard
89    /// * `config` - Colony configuration parameters
90    /// * `hash_ring` - Shared consistent hash ring for routing
91    ///
92    /// # Example
93    ///
94    /// ```ignore
95    /// let shard = ShardedColony::new(
96    ///     ShardId::new(0),
97    ///     ColonyConfig::default(),
98    ///     Arc::new(RwLock::new(ConsistentHashRing::new(3))),
99    /// );
100    /// ```
101    pub fn new(
102        shard_id: ShardId,
103        config: ColonyConfig,
104        hash_ring: Arc<RwLock<ConsistentHashRing>>,
105    ) -> Self {
106        Self {
107            shard_id,
108            local: Colony::from_config(config),
109            ghost_cache: GhostNodeCache::new(1000), // Cache up to 1000 ghost nodes
110            edge_manager: CrossShardEdgeManager::new(),
111            hash_ring,
112            peers: HashMap::new(),
113            pending_cross_edges: Vec::new(),
114        }
115    }
116
117    /// Create a new sharded colony with a custom ghost cache size.
118    ///
119    /// # Arguments
120    ///
121    /// * `shard_id` - Unique identifier for this shard
122    /// * `config` - Colony configuration parameters
123    /// * `hash_ring` - Shared consistent hash ring for routing
124    /// * `ghost_cache_size` - Maximum number of ghost nodes to cache
125    pub fn with_ghost_cache_size(
126        shard_id: ShardId,
127        config: ColonyConfig,
128        hash_ring: Arc<RwLock<ConsistentHashRing>>,
129        ghost_cache_size: usize,
130    ) -> Self {
131        Self {
132            shard_id,
133            local: Colony::from_config(config),
134            ghost_cache: GhostNodeCache::new(ghost_cache_size),
135            edge_manager: CrossShardEdgeManager::new(),
136            hash_ring,
137            peers: HashMap::new(),
138            pending_cross_edges: Vec::new(),
139        }
140    }
141
142    /// Get this shard's ID.
143    pub fn shard_id(&self) -> ShardId {
144        self.shard_id
145    }
146
147    /// Check if a document belongs to this shard.
148    ///
149    /// Uses the consistent hash ring to determine ownership.
150    ///
151    /// # Arguments
152    ///
153    /// * `doc_id` - The document ID to check
154    pub async fn owns_document(&self, doc_id: &DocumentId) -> bool {
155        let ring = self.hash_ring.read().await;
156        ring.get_shard(doc_id) == self.shard_id
157    }
158
159    /// Check ownership synchronously (for non-async contexts).
160    ///
161    /// This is a blocking operation and should only be used when
162    /// async is not available.
163    pub fn owns_document_sync(&self, doc_id: &DocumentId, ring: &ConsistentHashRing) -> bool {
164        ring.get_shard(doc_id) == self.shard_id
165    }
166
167    /// Ingest a document (only if this shard owns it).
168    ///
169    /// Returns an error if the document should be routed to a different shard.
170    ///
171    /// # Arguments
172    ///
173    /// * `title` - Document title
174    /// * `content` - Document content
175    /// * `position` - Spatial position in the substrate
176    ///
177    /// # Returns
178    ///
179    /// The document ID if successful, or a routing error if this shard
180    /// doesn't own the document.
181    pub async fn ingest_document(
182        &mut self,
183        title: &str,
184        content: &str,
185        position: Position,
186    ) -> DistributedResult<DocumentId> {
187        // Create document first to get ID
188        let doc_id = DocumentId::new();
189
190        // Check if we own this document
191        if !self.owns_document(&doc_id).await {
192            return Err(DistributedError::RoutingFailed(doc_id));
193        }
194
195        // Ingest into local colony
196        let actual_id = self.local.ingest_document(title, content, position);
197        Ok(actual_id)
198    }
199
200    /// Ingest a document with a pre-determined ID.
201    ///
202    /// This is useful when routing a document from the coordinator,
203    /// where the ID has already been assigned.
204    ///
205    /// # Arguments
206    ///
207    /// * `title` - Document title
208    /// * `content` - Document content
209    /// * `position` - Spatial position in the substrate
210    pub fn ingest_document_direct(
211        &mut self,
212        title: &str,
213        content: &str,
214        position: Position,
215    ) -> DocumentId {
216        self.local.ingest_document(title, content, position)
217    }
218
219    /// Execute a tick phase.
220    ///
221    /// Each tick is divided into phases that must be synchronized across
222    /// all shards. The coordinator ensures all shards complete each phase
223    /// before moving to the next.
224    ///
225    /// # Arguments
226    ///
227    /// * `phase` - The phase to execute
228    ///
229    /// # Returns
230    ///
231    /// A `PhaseResult` containing statistics and any cross-shard edges
232    /// that need resolution.
233    pub fn tick_phase(&mut self, phase: TickPhase) -> PhaseResult {
234        match phase {
235            TickPhase::Sense => {
236                // In distributed mode, sense phase prepares agent decisions
237                // but doesn't execute them yet. This allows the coordinator
238                // to synchronize before the Act phase.
239                PhaseResult {
240                    shard_id: self.shard_id,
241                    phase,
242                    tick: self.local.substrate().current_tick(),
243                    cross_shard_edges: Vec::new(),
244                    node_count: self.local.stats().graph_nodes,
245                    edge_count: self.local.stats().graph_edges,
246                }
247            }
248            TickPhase::Act | TickPhase::Decay => {
249                // Run a full local tick (Colony.tick() handles both agent
250                // actions and decay in one pass)
251                let _events = self.local.tick();
252
253                // Collect any cross-shard edges from this tick
254                let cross_edges = std::mem::take(&mut self.pending_cross_edges);
255
256                PhaseResult {
257                    shard_id: self.shard_id,
258                    phase,
259                    tick: self.local.substrate().current_tick(),
260                    cross_shard_edges: cross_edges,
261                    node_count: self.local.stats().graph_nodes,
262                    edge_count: self.local.stats().graph_edges,
263                }
264            }
265            TickPhase::Advance => {
266                // Advance tick is handled by coordinator - we just report status
267                PhaseResult {
268                    shard_id: self.shard_id,
269                    phase,
270                    tick: self.local.substrate().current_tick(),
271                    cross_shard_edges: Vec::new(),
272                    node_count: self.local.stats().graph_nodes,
273                    edge_count: self.local.stats().graph_edges,
274                }
275            }
276        }
277    }
278
279    /// Get local term frequencies for TF-IDF computation.
280    ///
281    /// This is called by the coordinator to aggregate document frequencies
282    /// across all shards for proper TF-IDF scoring.
283    ///
284    /// # Arguments
285    ///
286    /// * `terms` - The terms to count
287    ///
288    /// # Returns
289    ///
290    /// A map of term to document frequency on this shard.
291    pub fn get_term_frequencies(&self, terms: &[String]) -> HashMap<String, u64> {
292        let mut freqs = HashMap::new();
293        let graph = self.local.substrate().graph();
294
295        for term in terms {
296            let count = graph.find_nodes_by_label(term).len();
297            if count > 0 {
298                freqs.insert(term.clone(), count as u64);
299            }
300        }
301
302        freqs
303    }
304
305    /// Get a node's data from the local graph.
306    ///
307    /// # Arguments
308    ///
309    /// * `id` - The node ID to look up
310    ///
311    /// # Returns
312    ///
313    /// The node data if found on this shard.
314    pub fn get_node(&self, id: &NodeId) -> Option<NodeData> {
315        self.local.substrate().graph().get_node(id).cloned()
316    }
317
318    /// Add a peer shard address.
319    ///
320    /// # Arguments
321    ///
322    /// * `shard_id` - The peer's shard ID
323    /// * `address` - The peer's network address (e.g., "127.0.0.1:8081")
324    pub fn add_peer(&mut self, shard_id: ShardId, address: String) {
325        self.peers.insert(shard_id, address);
326    }
327
328    /// Remove a peer shard.
329    ///
330    /// Also invalidates any ghost nodes from that shard.
331    ///
332    /// # Arguments
333    ///
334    /// * `shard_id` - The peer's shard ID
335    pub fn remove_peer(&mut self, shard_id: ShardId) {
336        self.peers.remove(&shard_id);
337        self.ghost_cache.invalidate_shard(shard_id);
338    }
339
340    /// Get peer addresses.
341    pub fn peers(&self) -> &HashMap<ShardId, String> {
342        &self.peers
343    }
344
345    /// Get a peer's address.
346    pub fn peer_address(&self, shard_id: &ShardId) -> Option<&String> {
347        self.peers.get(shard_id)
348    }
349
350    /// Get the local colony reference.
351    pub fn local(&self) -> &Colony {
352        &self.local
353    }
354
355    /// Get mutable local colony reference.
356    pub fn local_mut(&mut self) -> &mut Colony {
357        &mut self.local
358    }
359
360    /// Get ghost node cache.
361    pub fn ghost_cache(&self) -> &GhostNodeCache {
362        &self.ghost_cache
363    }
364
365    /// Get mutable ghost node cache.
366    pub fn ghost_cache_mut(&mut self) -> &mut GhostNodeCache {
367        &mut self.ghost_cache
368    }
369
370    /// Get the cross-shard edge manager.
371    pub fn edge_manager(&self) -> &CrossShardEdgeManager {
372        &self.edge_manager
373    }
374
375    /// Get mutable cross-shard edge manager.
376    pub fn edge_manager_mut(&mut self) -> &mut CrossShardEdgeManager {
377        &mut self.edge_manager
378    }
379
380    /// Register an outgoing cross-shard edge with the edge manager.
381    ///
382    /// This registers the edge for tracking and ghost node resolution.
383    /// The edge is also added to pending_cross_edges for phase synchronization.
384    ///
385    /// # Arguments
386    ///
387    /// * `edge` - The cross-shard edge to register
388    pub fn register_cross_shard_edge(&mut self, edge: CrossShardEdge) {
389        self.edge_manager.add_outgoing_edge(edge.clone());
390        self.pending_cross_edges.push(edge);
391    }
392
393    /// Handle edges when a shard goes offline.
394    ///
395    /// Removes all edges to/from the specified shard and invalidates
396    /// corresponding ghost nodes.
397    ///
398    /// # Arguments
399    ///
400    /// * `shard_id` - The shard that went offline
401    ///
402    /// # Returns
403    ///
404    /// A tuple of (edges_removed, ghosts_invalidated).
405    pub fn handle_shard_offline(&mut self, shard_id: ShardId) -> (usize, usize) {
406        let edges_removed = self.edge_manager.remove_shard_edges(shard_id);
407        let ghosts_invalidated = self.ghost_cache.invalidate_shard(shard_id);
408        (edges_removed, ghosts_invalidated)
409    }
410
411    /// Decay all cross-shard edges and prune weak ones.
412    ///
413    /// # Arguments
414    ///
415    /// * `rate` - Decay rate (0.0 to 1.0)
416    /// * `threshold` - Minimum weight threshold
417    ///
418    /// # Returns
419    ///
420    /// Vector of pruned edges.
421    pub fn decay_cross_shard_edges(&mut self, rate: f64, threshold: f64) -> Vec<CrossShardEdge> {
422        self.edge_manager.decay_edges(rate, threshold)
423    }
424
425    /// Get statistics about cross-shard edges.
426    pub fn cross_shard_edge_stats(&self) -> CrossShardEdgeStats {
427        self.edge_manager.stats()
428    }
429
430    /// Get all shards this shard has edges to.
431    pub fn connected_shards(&self) -> Vec<ShardId> {
432        self.edge_manager.connected_shards()
433    }
434
435    /// Resolve pending edges by fetching ghost nodes.
436    ///
437    /// Takes the pending edges from the manager and returns them for
438    /// processing. After ghost nodes are fetched, the caller should
439    /// insert them into the ghost cache.
440    ///
441    /// # Returns
442    ///
443    /// Pending edges grouped by target shard for efficient batching.
444    pub fn take_pending_for_resolution(&mut self) -> HashMap<ShardId, Vec<CrossShardEdge>> {
445        let pending = self.edge_manager.take_pending();
446        let mut by_shard: HashMap<ShardId, Vec<CrossShardEdge>> = HashMap::new();
447        for edge in pending {
448            by_shard.entry(edge.to_shard).or_default().push(edge);
449        }
450        by_shard
451    }
452
453    /// Add a cross-shard edge to be synchronized.
454    ///
455    /// Cross-shard edges are collected during the Act phase and sent
456    /// to the coordinator for resolution during the Exchange phase.
457    ///
458    /// # Arguments
459    ///
460    /// * `edge` - The cross-shard edge to add
461    pub fn add_pending_cross_edge(&mut self, edge: CrossShardEdge) {
462        self.pending_cross_edges.push(edge);
463    }
464
465    /// Get pending cross-shard edges.
466    pub fn pending_cross_edges(&self) -> &[CrossShardEdge] {
467        &self.pending_cross_edges
468    }
469
470    /// Clear pending cross-shard edges (after they've been processed).
471    pub fn clear_pending_cross_edges(&mut self) {
472        self.pending_cross_edges.clear();
473    }
474
475    /// Get the hash ring reference.
476    pub fn hash_ring(&self) -> &Arc<RwLock<ConsistentHashRing>> {
477        &self.hash_ring
478    }
479
480    /// Get shard health information.
481    pub fn health(&self) -> ShardHealth {
482        let stats = self.local.stats();
483        ShardHealth {
484            shard_id: self.shard_id,
485            healthy: true,
486            load: stats.agents_alive as f64 / 100.0, // Rough load estimate
487            memory_usage_mb: 0,                      // Would need actual measurement
488            pending_operations: self.pending_cross_edges.len(),
489        }
490    }
491
492    /// Get detailed colony statistics.
493    pub fn stats(&self) -> ColonyStats {
494        self.local.stats()
495    }
496
497    /// Get the current tick number.
498    pub fn current_tick(&self) -> Tick {
499        self.local.substrate().current_tick()
500    }
501
502    /// Get the total number of nodes in this shard's graph.
503    pub fn node_count(&self) -> usize {
504        self.local.substrate().node_count()
505    }
506
507    /// Get the total number of documents in this shard.
508    pub fn document_count(&self) -> usize {
509        self.local.substrate().all_documents().len()
510    }
511
512    /// Run a single tick on the local colony.
513    pub fn tick(&mut self) {
514        self.local.tick();
515    }
516
517    /// Run multiple ticks on the local colony.
518    pub fn run(&mut self, ticks: u64) {
519        self.local.run(ticks);
520    }
521
522    /// Get shard info for registration with coordinator.
523    pub fn shard_info(&self, address: String) -> ShardInfo {
524        let stats = self.local.stats();
525        ShardInfo {
526            id: self.shard_id,
527            address,
528            node_count: stats.graph_nodes,
529            edge_count: stats.graph_edges,
530            document_count: stats.documents_total,
531            last_heartbeat: 0, // Set by coordinator
532        }
533    }
534
535    /// Execute a local query and return scored results.
536    ///
537    /// # Arguments
538    ///
539    /// * `request` - The query request from the coordinator
540    ///
541    /// # Returns
542    ///
543    /// Local query results including scored nodes and term frequencies.
544    pub fn execute_local_query(&self, request: &LocalQueryRequest) -> LocalQueryResult {
545        let graph = self.local.substrate().graph();
546        let mut results = Vec::new();
547
548        // Simple term matching (a real implementation would use TF-IDF)
549        for term in &request.query_terms {
550            let matching_nodes = graph.find_nodes_by_label(term);
551            for node_id in matching_nodes {
552                if let Some(node) = graph.get_node(&node_id) {
553                    // Score based on access count (simple relevance proxy)
554                    let score = node.access_count as f64 * 0.1;
555                    results.push(ScoredNode {
556                        node_id,
557                        label: node.label.clone(),
558                        score,
559                        shard_id: self.shard_id,
560                    });
561                }
562            }
563        }
564
565        // Sort by score descending and limit results
566        results.sort_by(|a, b| {
567            b.score
568                .partial_cmp(&a.score)
569                .unwrap_or(std::cmp::Ordering::Equal)
570        });
571        results.truncate(request.max_results);
572
573        // Get term frequencies for global DF computation
574        let term_frequencies = self.get_term_frequencies(&request.query_terms);
575
576        LocalQueryResult {
577            shard_id: self.shard_id,
578            results,
579            term_frequencies,
580        }
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use super::*;
587
588    fn create_test_shard() -> (ShardedColony, Arc<RwLock<ConsistentHashRing>>) {
589        let hash_ring = Arc::new(RwLock::new(ConsistentHashRing::new(3)));
590        let shard = ShardedColony::new(ShardId::new(0), ColonyConfig::default(), hash_ring.clone());
591        (shard, hash_ring)
592    }
593
594    #[test]
595    fn test_new_sharded_colony() {
596        let (shard, _) = create_test_shard();
597        assert_eq!(shard.shard_id(), ShardId::new(0));
598        assert!(shard.pending_cross_edges().is_empty());
599        assert_eq!(shard.ghost_cache().len(), 0);
600    }
601
602    #[test]
603    fn test_add_peer() {
604        let (mut shard, _) = create_test_shard();
605        shard.add_peer(ShardId::new(1), "127.0.0.1:8081".to_string());
606        shard.add_peer(ShardId::new(2), "127.0.0.1:8082".to_string());
607
608        assert_eq!(shard.peers().len(), 2);
609        assert_eq!(
610            shard.peer_address(&ShardId::new(1)),
611            Some(&"127.0.0.1:8081".to_string())
612        );
613    }
614
615    #[test]
616    fn test_remove_peer_invalidates_ghosts() {
617        let (mut shard, _) = create_test_shard();
618        shard.add_peer(ShardId::new(1), "127.0.0.1:8081".to_string());
619
620        // Add some ghost nodes from shard 1
621        let ghost = GhostNode::new(NodeId::from_seed(1), ShardId::new(1), "test".to_string());
622        shard.ghost_cache_mut().insert(ghost);
623        assert_eq!(shard.ghost_cache().len(), 1);
624
625        // Remove peer - should invalidate ghosts
626        shard.remove_peer(ShardId::new(1));
627        assert_eq!(shard.ghost_cache().len(), 0);
628    }
629
630    #[test]
631    fn test_tick_phase_sense() {
632        let (mut shard, _) = create_test_shard();
633        let result = shard.tick_phase(TickPhase::Sense);
634
635        assert_eq!(result.shard_id, ShardId::new(0));
636        assert_eq!(result.phase, TickPhase::Sense);
637        assert!(result.cross_shard_edges.is_empty());
638    }
639
640    #[test]
641    fn test_tick_phase_act() {
642        let (mut shard, _) = create_test_shard();
643        let result = shard.tick_phase(TickPhase::Act);
644
645        assert_eq!(result.shard_id, ShardId::new(0));
646        assert_eq!(result.phase, TickPhase::Act);
647    }
648
649    #[test]
650    fn test_health() {
651        let (shard, _) = create_test_shard();
652        let health = shard.health();
653
654        assert_eq!(health.shard_id, ShardId::new(0));
655        assert!(health.healthy);
656        assert_eq!(health.pending_operations, 0);
657    }
658
659    #[test]
660    fn test_add_pending_cross_edge() {
661        let (mut shard, _) = create_test_shard();
662
663        let edge = CrossShardEdge {
664            from_node: NodeId::from_seed(1),
665            to_node: NodeId::from_seed(2),
666            to_shard: ShardId::new(1),
667            weight: 0.5,
668        };
669
670        shard.add_pending_cross_edge(edge);
671        assert_eq!(shard.pending_cross_edges().len(), 1);
672
673        shard.clear_pending_cross_edges();
674        assert!(shard.pending_cross_edges().is_empty());
675    }
676
677    #[test]
678    fn test_ingest_document_direct() {
679        let (mut shard, _) = create_test_shard();
680
681        let doc_id = shard.ingest_document_direct("Test", "Content", Position::new(0.0, 0.0));
682
683        let stats = shard.stats();
684        assert_eq!(stats.documents_total, 1);
685        assert!(!doc_id.0.is_nil());
686    }
687
688    #[test]
689    fn test_get_term_frequencies() {
690        let (shard, _) = create_test_shard();
691
692        // Empty graph should return empty frequencies
693        let freqs = shard.get_term_frequencies(&["test".to_string()]);
694        assert!(freqs.is_empty());
695    }
696
697    #[test]
698    fn test_execute_local_query() {
699        let (shard, _) = create_test_shard();
700
701        let request = LocalQueryRequest {
702            query_terms: vec!["test".to_string()],
703            max_results: 10,
704            global_df: HashMap::new(),
705        };
706
707        let result = shard.execute_local_query(&request);
708        assert_eq!(result.shard_id, ShardId::new(0));
709        assert!(result.results.is_empty()); // No nodes yet
710    }
711
712    #[test]
713    fn test_shard_info() {
714        let (shard, _) = create_test_shard();
715        let info = shard.shard_info("127.0.0.1:8080".to_string());
716
717        assert_eq!(info.id, ShardId::new(0));
718        assert_eq!(info.address, "127.0.0.1:8080");
719        assert_eq!(info.node_count, 0);
720        assert_eq!(info.edge_count, 0);
721    }
722
723    #[test]
724    fn test_node_count_and_document_count() {
725        let (mut shard, _) = create_test_shard();
726
727        assert_eq!(shard.node_count(), 0);
728        assert_eq!(shard.document_count(), 0);
729
730        shard.ingest_document_direct("Test", "Content", Position::new(0.0, 0.0));
731        assert_eq!(shard.document_count(), 1);
732    }
733
734    #[test]
735    fn test_tick_and_run() {
736        let (mut shard, _) = create_test_shard();
737
738        shard.tick();
739        assert_eq!(shard.current_tick(), 1);
740
741        shard.run(5);
742        assert_eq!(shard.current_tick(), 6);
743    }
744
745    #[tokio::test]
746    async fn test_owns_document() {
747        let (shard, _hash_ring) = create_test_shard();
748
749        // Test a few document IDs - at least some should be owned by shard 0
750        let mut owned_count = 0;
751        for i in 0..100 {
752            let doc_id = DocumentId::from_seed(i);
753            if shard.owns_document(&doc_id).await {
754                owned_count += 1;
755            }
756        }
757
758        // With 3 shards, shard 0 should own roughly 1/3 of documents
759        assert!(owned_count > 20 && owned_count < 50);
760    }
761
762    #[test]
763    fn test_with_ghost_cache_size() {
764        let hash_ring = Arc::new(RwLock::new(ConsistentHashRing::new(3)));
765        let shard = ShardedColony::with_ghost_cache_size(
766            ShardId::new(0),
767            ColonyConfig::default(),
768            hash_ring,
769            500,
770        );
771
772        assert_eq!(shard.ghost_cache().capacity(), 500);
773    }
774}