Skip to main content

oxirs_core/distributed/
sharding.rs

1//! Semantic-aware sharding for distributed RDF storage
2//!
3//! This module implements intelligent sharding that keeps semantically related
4//! RDF data together to minimize cross-shard queries and improve performance.
5
6#![allow(dead_code)]
7
8use crate::model::{NamedNode, Object, Predicate, Subject, Triple};
9use crate::store::IndexedGraph;
10use anyhow::{anyhow, Result};
11use dashmap::DashMap;
12use parking_lot::{Mutex, RwLock};
13use serde::{Deserialize, Serialize};
14use std::collections::hash_map::DefaultHasher;
15use std::collections::{HashMap, VecDeque};
16use std::hash::{Hash, Hasher};
17use std::sync::Arc;
18
19/// Sharding configuration
20#[derive(Debug, Clone)]
21pub struct ShardingConfig {
22    /// Number of shards
23    pub shard_count: usize,
24
25    /// Replication factor
26    pub replication_factor: usize,
27
28    /// Enable semantic-aware partitioning
29    pub semantic_partitioning: bool,
30
31    /// Maximum shard size (number of triples)
32    pub max_shard_size: usize,
33
34    /// Enable dynamic rebalancing
35    pub enable_rebalancing: bool,
36
37    /// Rebalancing threshold (imbalance ratio)
38    pub rebalancing_threshold: f64,
39}
40
41impl Default for ShardingConfig {
42    fn default() -> Self {
43        Self {
44            shard_count: 16,
45            replication_factor: 3,
46            semantic_partitioning: true,
47            max_shard_size: 10_000_000,
48            enable_rebalancing: true,
49            rebalancing_threshold: 0.2,
50        }
51    }
52}
53
54/// Shard identifier
55pub type ShardId = u32;
56
57/// Node identifier
58pub type NodeId = u64;
59
60/// Sharding strategy
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub enum ShardingStrategy {
63    /// Hash-based sharding (simple but no semantic awareness)
64    Hash,
65
66    /// Subject-based sharding (keeps all triples with same subject together)
67    Subject,
68
69    /// Predicate-based sharding (groups by predicate type)
70    Predicate,
71
72    /// Graph-based sharding (for named graphs)
73    Graph,
74
75    /// Semantic sharding (intelligent grouping based on relationships)
76    Semantic(SemanticStrategy),
77
78    /// Hybrid strategy combining multiple approaches
79    Hybrid {
80        primary: Box<ShardingStrategy>,
81        secondary: Box<ShardingStrategy>,
82    },
83}
84
85/// Semantic sharding strategy
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct SemanticStrategy {
88    /// Entity types to keep together
89    pub entity_groups: HashMap<String, Vec<String>>,
90
91    /// Predicates that indicate strong relationships
92    pub relationship_predicates: Vec<String>,
93
94    /// Namespace-based grouping
95    pub namespace_groups: HashMap<String, ShardId>,
96
97    /// Class hierarchy for grouping
98    pub class_hierarchy: HashMap<String, String>,
99}
100
101impl Default for SemanticStrategy {
102    fn default() -> Self {
103        let mut entity_groups = HashMap::new();
104        entity_groups.insert(
105            "Person".to_string(),
106            vec![
107                "name".to_string(),
108                "email".to_string(),
109                "address".to_string(),
110            ],
111        );
112        entity_groups.insert(
113            "Organization".to_string(),
114            vec![
115                "name".to_string(),
116                "location".to_string(),
117                "employees".to_string(),
118            ],
119        );
120
121        let relationship_predicates = vec![
122            "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(),
123            "http://www.w3.org/2000/01/rdf-schema#subClassOf".to_string(),
124            "http://www.w3.org/2002/07/owl#sameAs".to_string(),
125        ];
126
127        Self {
128            entity_groups,
129            relationship_predicates,
130            namespace_groups: HashMap::new(),
131            class_hierarchy: HashMap::new(),
132        }
133    }
134}
135
136/// Shard manager for distributed RDF storage
137pub struct ShardManager {
138    /// Configuration
139    config: ShardingConfig,
140
141    /// Sharding strategy
142    strategy: ShardingStrategy,
143
144    /// Shard metadata
145    shard_metadata: Arc<RwLock<HashMap<ShardId, ShardMetadata>>>,
146
147    /// Shard assignments (which nodes host which shards)
148    shard_assignments: Arc<RwLock<HashMap<ShardId, Vec<NodeId>>>>,
149
150    /// Entity to shard mapping for semantic sharding
151    entity_shard_map: Arc<DashMap<String, ShardId>>,
152
153    /// Statistics for each shard
154    shard_stats: Arc<DashMap<ShardId, ShardStatistics>>,
155
156    /// Pending migrations
157    pending_migrations: Arc<Mutex<VecDeque<Migration>>>,
158
159    /// Local shards (shards hosted on this node)
160    local_shards: Arc<DashMap<ShardId, IndexedGraph>>,
161}
162
163/// Shard metadata
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct ShardMetadata {
166    pub id: ShardId,
167    pub version: u64,
168    pub triple_count: usize,
169    pub size_bytes: usize,
170    pub created_at: std::time::SystemTime,
171    pub last_modified: std::time::SystemTime,
172    pub primary_node: NodeId,
173    pub replica_nodes: Vec<NodeId>,
174}
175
176/// Shard statistics
177#[derive(Debug, Clone, Default)]
178pub struct ShardStatistics {
179    pub read_count: u64,
180    pub write_count: u64,
181    pub query_latency_ms: f64,
182    pub hot_entities: Vec<String>,
183    pub access_pattern: AccessPattern,
184}
185
186/// Access pattern for a shard
187#[derive(Debug, Clone, Default)]
188pub struct AccessPattern {
189    pub read_heavy: bool,
190    pub write_heavy: bool,
191    pub temporal_locality: f64,
192    pub spatial_locality: f64,
193}
194
195/// Migration operation
196#[derive(Debug, Clone)]
197pub struct Migration {
198    pub shard_id: ShardId,
199    pub from_node: NodeId,
200    pub to_node: NodeId,
201    pub triples: Vec<Triple>,
202    pub reason: MigrationReason,
203}
204
205/// Reason for migration
206#[derive(Debug, Clone)]
207pub enum MigrationReason {
208    /// Load balancing
209    LoadBalance,
210    /// Node failure
211    NodeFailure,
212    /// Manual rebalancing
213    Manual,
214    /// Semantic regrouping
215    SemanticOptimization,
216}
217
218impl ShardManager {
219    /// Create a new shard manager
220    pub fn new(config: ShardingConfig, strategy: ShardingStrategy) -> Self {
221        let mut shard_metadata = HashMap::new();
222        let mut shard_assignments = HashMap::new();
223
224        // Initialize shards
225        for shard_id in 0..config.shard_count {
226            let metadata = ShardMetadata {
227                id: shard_id as ShardId,
228                version: 0,
229                triple_count: 0,
230                size_bytes: 0,
231                created_at: std::time::SystemTime::now(),
232                last_modified: std::time::SystemTime::now(),
233                primary_node: 0, // Will be assigned later
234                replica_nodes: vec![],
235            };
236            shard_metadata.insert(shard_id as ShardId, metadata);
237            shard_assignments.insert(shard_id as ShardId, vec![]);
238        }
239
240        Self {
241            config,
242            strategy,
243            shard_metadata: Arc::new(RwLock::new(shard_metadata)),
244            shard_assignments: Arc::new(RwLock::new(shard_assignments)),
245            entity_shard_map: Arc::new(DashMap::new()),
246            shard_stats: Arc::new(DashMap::new()),
247            pending_migrations: Arc::new(Mutex::new(VecDeque::new())),
248            local_shards: Arc::new(DashMap::new()),
249        }
250    }
251
252    /// Determine which shard a triple belongs to
253    pub fn get_shard_for_triple(&self, triple: &Triple) -> ShardId {
254        match &self.strategy {
255            ShardingStrategy::Hash => self.hash_shard(triple),
256            ShardingStrategy::Subject => self.subject_shard(triple),
257            ShardingStrategy::Predicate => self.predicate_shard(triple),
258            ShardingStrategy::Graph => self.graph_shard(triple),
259            ShardingStrategy::Semantic(strategy) => self.semantic_shard(triple, strategy),
260            ShardingStrategy::Hybrid { primary, secondary } => {
261                // Try primary strategy first, fallback to secondary
262                let primary_shard = self.get_shard_with_strategy(triple, primary);
263                if self.is_shard_overloaded(primary_shard) {
264                    self.get_shard_with_strategy(triple, secondary)
265                } else {
266                    primary_shard
267                }
268            }
269        }
270    }
271
272    /// Get shard using specific strategy
273    fn get_shard_with_strategy(&self, triple: &Triple, strategy: &ShardingStrategy) -> ShardId {
274        match strategy {
275            ShardingStrategy::Hash => self.hash_shard(triple),
276            ShardingStrategy::Subject => self.subject_shard(triple),
277            ShardingStrategy::Predicate => self.predicate_shard(triple),
278            ShardingStrategy::Graph => self.graph_shard(triple),
279            ShardingStrategy::Semantic(s) => self.semantic_shard(triple, s),
280            ShardingStrategy::Hybrid { primary, .. } => {
281                self.get_shard_with_strategy(triple, primary)
282            }
283        }
284    }
285
286    /// Hash-based sharding
287    fn hash_shard(&self, triple: &Triple) -> ShardId {
288        let mut hasher = DefaultHasher::new();
289        triple.subject().to_string().hash(&mut hasher);
290        triple.predicate().to_string().hash(&mut hasher);
291        triple.object().to_string().hash(&mut hasher);
292        (hasher.finish() % self.config.shard_count as u64) as ShardId
293    }
294
295    /// Subject-based sharding
296    fn subject_shard(&self, triple: &Triple) -> ShardId {
297        let mut hasher = DefaultHasher::new();
298        triple.subject().to_string().hash(&mut hasher);
299        (hasher.finish() % self.config.shard_count as u64) as ShardId
300    }
301
302    /// Predicate-based sharding
303    fn predicate_shard(&self, triple: &Triple) -> ShardId {
304        let mut hasher = DefaultHasher::new();
305        triple.predicate().to_string().hash(&mut hasher);
306        (hasher.finish() % self.config.shard_count as u64) as ShardId
307    }
308
309    /// Graph-based sharding (simplified for default graph)
310    fn graph_shard(&self, _triple: &Triple) -> ShardId {
311        // In a real implementation, would use quad's graph component
312        0
313    }
314
315    /// Semantic-aware sharding
316    fn semantic_shard(&self, triple: &Triple, strategy: &SemanticStrategy) -> ShardId {
317        let subject_str = triple.subject().to_string();
318
319        // Check if we already have a mapping for this entity
320        if let Some(shard) = self.entity_shard_map.get(&subject_str) {
321            return *shard;
322        }
323
324        // Check namespace-based grouping
325        for (namespace, shard_id) in &strategy.namespace_groups {
326            if subject_str.starts_with(namespace) {
327                self.entity_shard_map.insert(subject_str.clone(), *shard_id);
328                return *shard_id;
329            }
330        }
331
332        // Check if this is a relationship predicate
333        let predicate_str = triple.predicate().to_string();
334        if strategy.relationship_predicates.contains(&predicate_str) {
335            // Keep related entities on the same shard
336            if let Some(object_shard) = self.get_object_shard(triple.object()) {
337                self.entity_shard_map
338                    .insert(subject_str.clone(), object_shard);
339                return object_shard;
340            }
341        }
342
343        // Check entity groups
344        for (entity_type, properties) in &strategy.entity_groups {
345            if predicate_str.contains(entity_type)
346                || properties.iter().any(|p| predicate_str.contains(p))
347            {
348                // Group entities of the same type together
349                let mut hasher = DefaultHasher::new();
350                entity_type.hash(&mut hasher);
351                let shard = (hasher.finish() % self.config.shard_count as u64) as ShardId;
352                self.entity_shard_map.insert(subject_str.clone(), shard);
353                return shard;
354            }
355        }
356
357        // Fallback to hash-based sharding
358        let shard = self.hash_shard(triple);
359        self.entity_shard_map.insert(subject_str, shard);
360        shard
361    }
362
363    /// Get shard for an object if it's an entity
364    fn get_object_shard(&self, object: &Object) -> Option<ShardId> {
365        match object {
366            Object::NamedNode(node) => {
367                let node_str = node.as_str().to_string();
368                self.entity_shard_map.get(&node_str).map(|s| *s)
369            }
370            Object::BlankNode(node) => {
371                let node_str = node.as_str().to_string();
372                self.entity_shard_map.get(&node_str).map(|s| *s)
373            }
374            _ => None,
375        }
376    }
377
378    /// Check if a shard is overloaded
379    fn is_shard_overloaded(&self, shard_id: ShardId) -> bool {
380        if let Some(metadata) = self.shard_metadata.read().get(&shard_id) {
381            metadata.triple_count > self.config.max_shard_size
382        } else {
383            false
384        }
385    }
386
387    /// Insert a triple into the appropriate shard
388    pub fn insert_triple(&self, triple: Triple) -> Result<()> {
389        let shard_id = self.get_shard_for_triple(&triple);
390
391        // Update local shard if we have it
392        match self.local_shards.get_mut(&shard_id) {
393            Some(shard) => {
394                shard.insert(&triple);
395
396                // Update statistics
397                if let Some(mut stats) = self.shard_stats.get_mut(&shard_id) {
398                    stats.write_count += 1;
399                }
400
401                // Update metadata
402                self.update_shard_metadata(shard_id, 1, 0);
403            }
404            _ => {
405                // Forward to remote shard
406                // In a real implementation, would send to remote node
407                return Err(anyhow!("Shard {} not available locally", shard_id));
408            }
409        }
410
411        Ok(())
412    }
413
414    /// Query triples from shards
415    pub fn query_triples(
416        &self,
417        subject: Option<&Subject>,
418        predicate: Option<&Predicate>,
419        object: Option<&Object>,
420    ) -> Result<Vec<Triple>> {
421        let mut results = Vec::new();
422
423        // Determine which shards to query
424        let shards_to_query = self.get_shards_for_query(subject, predicate, object);
425
426        // Query each relevant shard
427        for shard_id in shards_to_query {
428            if let Some(shard) = self.local_shards.get(&shard_id) {
429                let shard_results = shard.match_pattern(subject, predicate, object);
430                results.extend(shard_results);
431
432                // Update statistics
433                if let Some(mut stats) = self.shard_stats.get_mut(&shard_id) {
434                    stats.read_count += 1;
435                }
436            }
437        }
438
439        Ok(results)
440    }
441
442    /// Determine which shards to query based on pattern
443    fn get_shards_for_query(
444        &self,
445        subject: Option<&Subject>,
446        predicate: Option<&Predicate>,
447        _object: Option<&Object>,
448    ) -> Vec<ShardId> {
449        // If we have a specific subject, we can route to specific shard
450        if let Some(subj) = subject {
451            let triple = Triple::new(
452                subj.clone(),
453                predicate.cloned().unwrap_or_else(|| {
454                    Predicate::NamedNode(
455                        NamedNode::new("http://example.org/dummy").expect("dummy IRI is valid"),
456                    )
457                }),
458                Object::NamedNode(
459                    NamedNode::new("http://example.org/dummy").expect("dummy IRI is valid"),
460                ),
461            );
462            vec![self.get_shard_for_triple(&triple)]
463        } else if let Some(pred) = predicate {
464            // For predicate queries, might need to query multiple shards
465            match &self.strategy {
466                ShardingStrategy::Predicate => {
467                    // Can route to specific shard
468                    let mut hasher = DefaultHasher::new();
469                    pred.to_string().hash(&mut hasher);
470                    vec![(hasher.finish() % self.config.shard_count as u64) as ShardId]
471                }
472                _ => {
473                    // Need to query all shards
474                    (0..self.config.shard_count).map(|i| i as ShardId).collect()
475                }
476            }
477        } else {
478            // Full scan - query all shards
479            (0..self.config.shard_count).map(|i| i as ShardId).collect()
480        }
481    }
482
483    /// Update shard metadata
484    fn update_shard_metadata(&self, shard_id: ShardId, triple_delta: i64, size_delta: i64) {
485        let mut metadata = self.shard_metadata.write();
486        if let Some(shard_meta) = metadata.get_mut(&shard_id) {
487            if triple_delta > 0 {
488                shard_meta.triple_count += triple_delta as usize;
489            } else {
490                shard_meta.triple_count = shard_meta
491                    .triple_count
492                    .saturating_sub((-triple_delta) as usize);
493            }
494
495            if size_delta > 0 {
496                shard_meta.size_bytes += size_delta as usize;
497            } else {
498                shard_meta.size_bytes =
499                    shard_meta.size_bytes.saturating_sub((-size_delta) as usize);
500            }
501
502            shard_meta.last_modified = std::time::SystemTime::now();
503            shard_meta.version += 1;
504        }
505    }
506
507    /// Check if rebalancing is needed
508    pub fn needs_rebalancing(&self) -> bool {
509        if !self.config.enable_rebalancing {
510            return false;
511        }
512
513        let metadata = self.shard_metadata.read();
514        if metadata.is_empty() {
515            return false;
516        }
517
518        let sizes: Vec<usize> = metadata.values().map(|m| m.triple_count).collect();
519        let avg_size = sizes.iter().sum::<usize>() / sizes.len();
520        let max_size = sizes.iter().max().copied().unwrap_or(0);
521        let min_size = sizes.iter().min().copied().unwrap_or(0);
522
523        // Check imbalance ratio
524        if avg_size > 0 {
525            let imbalance = (max_size as f64 - min_size as f64) / avg_size as f64;
526            imbalance > self.config.rebalancing_threshold
527        } else {
528            false
529        }
530    }
531
532    /// Plan rebalancing operations
533    pub fn plan_rebalancing(&self) -> Vec<Migration> {
534        let mut migrations = Vec::new();
535
536        let metadata = self.shard_metadata.read();
537        let mut shard_sizes: Vec<(ShardId, usize)> = metadata
538            .iter()
539            .map(|(id, meta)| (*id, meta.triple_count))
540            .collect();
541
542        // Sort by size
543        shard_sizes.sort_by_key(|&(_, size)| size);
544
545        let avg_size = shard_sizes.iter().map(|(_, size)| size).sum::<usize>() / shard_sizes.len();
546
547        // Find overloaded and underloaded shards
548        let overloaded: Vec<_> = shard_sizes
549            .iter()
550            .filter(|(_, size)| {
551                *size > avg_size + (avg_size as f64 * self.config.rebalancing_threshold) as usize
552            })
553            .collect();
554
555        let underloaded: Vec<_> = shard_sizes
556            .iter()
557            .filter(|(_, size)| {
558                *size < avg_size - (avg_size as f64 * self.config.rebalancing_threshold) as usize
559            })
560            .collect();
561
562        // Plan migrations from overloaded to underloaded
563        for (over_shard, over_size) in overloaded {
564            for (_under_shard, under_size) in &underloaded {
565                let to_move = (*over_size - avg_size).min(avg_size - *under_size);
566                if to_move > 0 {
567                    // In a real implementation, would select specific triples to move
568                    let migration = Migration {
569                        shard_id: *over_shard,
570                        from_node: 0,    // Would get from shard assignments
571                        to_node: 0,      // Would get from shard assignments
572                        triples: vec![], // Would select actual triples
573                        reason: MigrationReason::LoadBalance,
574                    };
575                    migrations.push(migration);
576                }
577            }
578        }
579
580        migrations
581    }
582
583    /// Execute a migration
584    pub async fn execute_migration(&self, migration: &Migration) -> Result<()> {
585        // In a real implementation, would:
586        // 1. Lock affected shards
587        // 2. Copy triples to destination
588        // 3. Verify successful copy
589        // 4. Remove from source
590        // 5. Update metadata and mappings
591        // 6. Unlock shards
592
593        // For now, just update metadata
594        self.update_shard_metadata(migration.shard_id, -(migration.triples.len() as i64), 0);
595
596        Ok(())
597    }
598
599    /// Get shard statistics
600    pub fn get_shard_statistics(&self) -> HashMap<ShardId, ShardStatistics> {
601        self.shard_stats
602            .iter()
603            .map(|entry| (*entry.key(), entry.value().clone()))
604            .collect()
605    }
606
607    /// Get load distribution across shards
608    pub fn get_load_distribution(&self) -> HashMap<ShardId, f64> {
609        let total_ops: u64 = self
610            .shard_stats
611            .iter()
612            .map(|entry| entry.value().read_count + entry.value().write_count)
613            .sum();
614
615        if total_ops == 0 {
616            return HashMap::new();
617        }
618
619        self.shard_stats
620            .iter()
621            .map(|entry| {
622                let shard_ops = entry.value().read_count + entry.value().write_count;
623                (*entry.key(), shard_ops as f64 / total_ops as f64)
624            })
625            .collect()
626    }
627}
628
629/// Shard router for query optimization
630pub struct ShardRouter {
631    manager: Arc<ShardManager>,
632}
633
634impl ShardRouter {
635    /// Create a new shard router
636    pub fn new(manager: Arc<ShardManager>) -> Self {
637        Self { manager }
638    }
639
640    /// Route a SPARQL query to appropriate shards
641    pub fn route_query(&self, _query: &str) -> Result<Vec<ShardId>> {
642        // In a real implementation, would parse SPARQL and analyze patterns
643        // For now, return all shards for complex queries
644        Ok((0..self.manager.config.shard_count)
645            .map(|i| i as ShardId)
646            .collect())
647    }
648
649    /// Optimize query plan for distributed execution
650    pub fn optimize_distributed_query(&self, query: &str) -> Result<DistributedQueryPlan> {
651        // Simplified implementation
652        let shards = self.route_query(query)?;
653
654        Ok(DistributedQueryPlan {
655            query: query.to_string(),
656            shard_operations: shards
657                .into_iter()
658                .map(|shard| ShardOperation {
659                    shard_id: shard,
660                    operation: query.to_string(),
661                    estimated_cost: 1.0,
662                })
663                .collect(),
664            merge_strategy: MergeStrategy::Union,
665        })
666    }
667}
668
669/// Distributed query plan
670#[derive(Debug, Clone)]
671pub struct DistributedQueryPlan {
672    pub query: String,
673    pub shard_operations: Vec<ShardOperation>,
674    pub merge_strategy: MergeStrategy,
675}
676
677/// Operation on a specific shard
678#[derive(Debug, Clone)]
679pub struct ShardOperation {
680    pub shard_id: ShardId,
681    pub operation: String,
682    pub estimated_cost: f64,
683}
684
685/// Strategy for merging results from multiple shards
686#[derive(Debug, Clone)]
687pub enum MergeStrategy {
688    /// Simple union of results
689    Union,
690    /// Intersection of results
691    Intersection,
692    /// Join operation
693    Join { join_key: String },
694    /// Aggregation
695    Aggregate { group_by: Vec<String> },
696}
697
698#[cfg(test)]
699mod tests {
700    use super::*;
701    use crate::model::{Literal, NamedNode, Triple};
702
703    #[test]
704    fn test_hash_sharding() {
705        let config = ShardingConfig::default();
706        let manager = ShardManager::new(config, ShardingStrategy::Hash);
707
708        let triple1 = Triple::new(
709            NamedNode::new("http://example.org/s1").expect("valid IRI"),
710            NamedNode::new("http://example.org/p").expect("valid IRI"),
711            Literal::new("value1"),
712        );
713
714        let triple2 = Triple::new(
715            NamedNode::new("http://example.org/s2").expect("valid IRI"),
716            NamedNode::new("http://example.org/p").expect("valid IRI"),
717            Literal::new("value2"),
718        );
719
720        let shard1 = manager.get_shard_for_triple(&triple1);
721        let shard2 = manager.get_shard_for_triple(&triple2);
722
723        // Different subjects should likely go to different shards
724        // (not guaranteed with hash, but likely with enough shards)
725        assert!(shard1 < 16);
726        assert!(shard2 < 16);
727    }
728
729    #[test]
730    fn test_subject_sharding() {
731        let config = ShardingConfig::default();
732        let manager = ShardManager::new(config, ShardingStrategy::Subject);
733
734        let subject = NamedNode::new("http://example.org/entity1").expect("valid IRI");
735
736        let triple1 = Triple::new(
737            subject.clone(),
738            NamedNode::new("http://example.org/p1").expect("valid IRI"),
739            Literal::new("value1"),
740        );
741
742        let triple2 = Triple::new(
743            subject.clone(),
744            NamedNode::new("http://example.org/p2").expect("valid IRI"),
745            Literal::new("value2"),
746        );
747
748        let shard1 = manager.get_shard_for_triple(&triple1);
749        let shard2 = manager.get_shard_for_triple(&triple2);
750
751        // Same subject should go to same shard
752        assert_eq!(shard1, shard2);
753    }
754
755    #[test]
756    fn test_semantic_sharding() {
757        let config = ShardingConfig::default();
758        let strategy = SemanticStrategy::default();
759        let manager = ShardManager::new(config, ShardingStrategy::Semantic(strategy));
760
761        let person = NamedNode::new("http://example.org/person1").expect("valid IRI");
762
763        let triple1 = Triple::new(
764            person.clone(),
765            NamedNode::new("http://example.org/name").expect("valid IRI"),
766            Literal::new("John"),
767        );
768
769        let triple2 = Triple::new(
770            person.clone(),
771            NamedNode::new("http://example.org/email").expect("valid IRI"),
772            Literal::new("john@example.org"),
773        );
774
775        let shard1 = manager.get_shard_for_triple(&triple1);
776        let shard2 = manager.get_shard_for_triple(&triple2);
777
778        // Same entity should stay on same shard
779        assert_eq!(shard1, shard2);
780    }
781
782    #[test]
783    fn test_rebalancing_detection() {
784        let config = ShardingConfig {
785            shard_count: 4,
786            rebalancing_threshold: 0.2,
787            ..Default::default()
788        };
789        let manager = ShardManager::new(config, ShardingStrategy::Hash);
790
791        // Initially balanced
792        assert!(!manager.needs_rebalancing());
793
794        // Simulate imbalance
795        manager.update_shard_metadata(0, 1000, 0);
796        manager.update_shard_metadata(1, 100, 0);
797
798        // Should need rebalancing now
799        assert!(manager.needs_rebalancing());
800    }
801
802    #[test]
803    fn test_query_routing() {
804        let config = ShardingConfig::default();
805        let manager = Arc::new(ShardManager::new(config, ShardingStrategy::Subject));
806        let router = ShardRouter::new(manager);
807
808        let query = "SELECT ?s ?p ?o WHERE { ?s ?p ?o }";
809        let shards = router.route_query(query).expect("operation should succeed");
810
811        // Full scan should query all shards
812        assert_eq!(shards.len(), 16);
813    }
814}