1#![allow(dead_code)]
7
8use crate::model::{NamedNode, Object, Predicate, Subject, Triple};
9use crate::store::IndexedGraph;
10use anyhow::{anyhow, Result};
11use dashmap::DashMap;
12use parking_lot::{Mutex, RwLock};
13use serde::{Deserialize, Serialize};
14use std::collections::hash_map::DefaultHasher;
15use std::collections::{HashMap, VecDeque};
16use std::hash::{Hash, Hasher};
17use std::sync::Arc;
18
19#[derive(Debug, Clone)]
21pub struct ShardingConfig {
22 pub shard_count: usize,
24
25 pub replication_factor: usize,
27
28 pub semantic_partitioning: bool,
30
31 pub max_shard_size: usize,
33
34 pub enable_rebalancing: bool,
36
37 pub rebalancing_threshold: f64,
39}
40
41impl Default for ShardingConfig {
42 fn default() -> Self {
43 Self {
44 shard_count: 16,
45 replication_factor: 3,
46 semantic_partitioning: true,
47 max_shard_size: 10_000_000,
48 enable_rebalancing: true,
49 rebalancing_threshold: 0.2,
50 }
51 }
52}
53
54pub type ShardId = u32;
56
57pub type NodeId = u64;
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub enum ShardingStrategy {
63 Hash,
65
66 Subject,
68
69 Predicate,
71
72 Graph,
74
75 Semantic(SemanticStrategy),
77
78 Hybrid {
80 primary: Box<ShardingStrategy>,
81 secondary: Box<ShardingStrategy>,
82 },
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct SemanticStrategy {
88 pub entity_groups: HashMap<String, Vec<String>>,
90
91 pub relationship_predicates: Vec<String>,
93
94 pub namespace_groups: HashMap<String, ShardId>,
96
97 pub class_hierarchy: HashMap<String, String>,
99}
100
101impl Default for SemanticStrategy {
102 fn default() -> Self {
103 let mut entity_groups = HashMap::new();
104 entity_groups.insert(
105 "Person".to_string(),
106 vec![
107 "name".to_string(),
108 "email".to_string(),
109 "address".to_string(),
110 ],
111 );
112 entity_groups.insert(
113 "Organization".to_string(),
114 vec![
115 "name".to_string(),
116 "location".to_string(),
117 "employees".to_string(),
118 ],
119 );
120
121 let relationship_predicates = vec![
122 "http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(),
123 "http://www.w3.org/2000/01/rdf-schema#subClassOf".to_string(),
124 "http://www.w3.org/2002/07/owl#sameAs".to_string(),
125 ];
126
127 Self {
128 entity_groups,
129 relationship_predicates,
130 namespace_groups: HashMap::new(),
131 class_hierarchy: HashMap::new(),
132 }
133 }
134}
135
136pub struct ShardManager {
138 config: ShardingConfig,
140
141 strategy: ShardingStrategy,
143
144 shard_metadata: Arc<RwLock<HashMap<ShardId, ShardMetadata>>>,
146
147 shard_assignments: Arc<RwLock<HashMap<ShardId, Vec<NodeId>>>>,
149
150 entity_shard_map: Arc<DashMap<String, ShardId>>,
152
153 shard_stats: Arc<DashMap<ShardId, ShardStatistics>>,
155
156 pending_migrations: Arc<Mutex<VecDeque<Migration>>>,
158
159 local_shards: Arc<DashMap<ShardId, IndexedGraph>>,
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct ShardMetadata {
166 pub id: ShardId,
167 pub version: u64,
168 pub triple_count: usize,
169 pub size_bytes: usize,
170 pub created_at: std::time::SystemTime,
171 pub last_modified: std::time::SystemTime,
172 pub primary_node: NodeId,
173 pub replica_nodes: Vec<NodeId>,
174}
175
176#[derive(Debug, Clone, Default)]
178pub struct ShardStatistics {
179 pub read_count: u64,
180 pub write_count: u64,
181 pub query_latency_ms: f64,
182 pub hot_entities: Vec<String>,
183 pub access_pattern: AccessPattern,
184}
185
186#[derive(Debug, Clone, Default)]
188pub struct AccessPattern {
189 pub read_heavy: bool,
190 pub write_heavy: bool,
191 pub temporal_locality: f64,
192 pub spatial_locality: f64,
193}
194
195#[derive(Debug, Clone)]
197pub struct Migration {
198 pub shard_id: ShardId,
199 pub from_node: NodeId,
200 pub to_node: NodeId,
201 pub triples: Vec<Triple>,
202 pub reason: MigrationReason,
203}
204
205#[derive(Debug, Clone)]
207pub enum MigrationReason {
208 LoadBalance,
210 NodeFailure,
212 Manual,
214 SemanticOptimization,
216}
217
218impl ShardManager {
219 pub fn new(config: ShardingConfig, strategy: ShardingStrategy) -> Self {
221 let mut shard_metadata = HashMap::new();
222 let mut shard_assignments = HashMap::new();
223
224 for shard_id in 0..config.shard_count {
226 let metadata = ShardMetadata {
227 id: shard_id as ShardId,
228 version: 0,
229 triple_count: 0,
230 size_bytes: 0,
231 created_at: std::time::SystemTime::now(),
232 last_modified: std::time::SystemTime::now(),
233 primary_node: 0, replica_nodes: vec![],
235 };
236 shard_metadata.insert(shard_id as ShardId, metadata);
237 shard_assignments.insert(shard_id as ShardId, vec![]);
238 }
239
240 Self {
241 config,
242 strategy,
243 shard_metadata: Arc::new(RwLock::new(shard_metadata)),
244 shard_assignments: Arc::new(RwLock::new(shard_assignments)),
245 entity_shard_map: Arc::new(DashMap::new()),
246 shard_stats: Arc::new(DashMap::new()),
247 pending_migrations: Arc::new(Mutex::new(VecDeque::new())),
248 local_shards: Arc::new(DashMap::new()),
249 }
250 }
251
252 pub fn get_shard_for_triple(&self, triple: &Triple) -> ShardId {
254 match &self.strategy {
255 ShardingStrategy::Hash => self.hash_shard(triple),
256 ShardingStrategy::Subject => self.subject_shard(triple),
257 ShardingStrategy::Predicate => self.predicate_shard(triple),
258 ShardingStrategy::Graph => self.graph_shard(triple),
259 ShardingStrategy::Semantic(strategy) => self.semantic_shard(triple, strategy),
260 ShardingStrategy::Hybrid { primary, secondary } => {
261 let primary_shard = self.get_shard_with_strategy(triple, primary);
263 if self.is_shard_overloaded(primary_shard) {
264 self.get_shard_with_strategy(triple, secondary)
265 } else {
266 primary_shard
267 }
268 }
269 }
270 }
271
272 fn get_shard_with_strategy(&self, triple: &Triple, strategy: &ShardingStrategy) -> ShardId {
274 match strategy {
275 ShardingStrategy::Hash => self.hash_shard(triple),
276 ShardingStrategy::Subject => self.subject_shard(triple),
277 ShardingStrategy::Predicate => self.predicate_shard(triple),
278 ShardingStrategy::Graph => self.graph_shard(triple),
279 ShardingStrategy::Semantic(s) => self.semantic_shard(triple, s),
280 ShardingStrategy::Hybrid { primary, .. } => {
281 self.get_shard_with_strategy(triple, primary)
282 }
283 }
284 }
285
286 fn hash_shard(&self, triple: &Triple) -> ShardId {
288 let mut hasher = DefaultHasher::new();
289 triple.subject().to_string().hash(&mut hasher);
290 triple.predicate().to_string().hash(&mut hasher);
291 triple.object().to_string().hash(&mut hasher);
292 (hasher.finish() % self.config.shard_count as u64) as ShardId
293 }
294
295 fn subject_shard(&self, triple: &Triple) -> ShardId {
297 let mut hasher = DefaultHasher::new();
298 triple.subject().to_string().hash(&mut hasher);
299 (hasher.finish() % self.config.shard_count as u64) as ShardId
300 }
301
302 fn predicate_shard(&self, triple: &Triple) -> ShardId {
304 let mut hasher = DefaultHasher::new();
305 triple.predicate().to_string().hash(&mut hasher);
306 (hasher.finish() % self.config.shard_count as u64) as ShardId
307 }
308
309 fn graph_shard(&self, _triple: &Triple) -> ShardId {
311 0
313 }
314
315 fn semantic_shard(&self, triple: &Triple, strategy: &SemanticStrategy) -> ShardId {
317 let subject_str = triple.subject().to_string();
318
319 if let Some(shard) = self.entity_shard_map.get(&subject_str) {
321 return *shard;
322 }
323
324 for (namespace, shard_id) in &strategy.namespace_groups {
326 if subject_str.starts_with(namespace) {
327 self.entity_shard_map.insert(subject_str.clone(), *shard_id);
328 return *shard_id;
329 }
330 }
331
332 let predicate_str = triple.predicate().to_string();
334 if strategy.relationship_predicates.contains(&predicate_str) {
335 if let Some(object_shard) = self.get_object_shard(triple.object()) {
337 self.entity_shard_map
338 .insert(subject_str.clone(), object_shard);
339 return object_shard;
340 }
341 }
342
343 for (entity_type, properties) in &strategy.entity_groups {
345 if predicate_str.contains(entity_type)
346 || properties.iter().any(|p| predicate_str.contains(p))
347 {
348 let mut hasher = DefaultHasher::new();
350 entity_type.hash(&mut hasher);
351 let shard = (hasher.finish() % self.config.shard_count as u64) as ShardId;
352 self.entity_shard_map.insert(subject_str.clone(), shard);
353 return shard;
354 }
355 }
356
357 let shard = self.hash_shard(triple);
359 self.entity_shard_map.insert(subject_str, shard);
360 shard
361 }
362
363 fn get_object_shard(&self, object: &Object) -> Option<ShardId> {
365 match object {
366 Object::NamedNode(node) => {
367 let node_str = node.as_str().to_string();
368 self.entity_shard_map.get(&node_str).map(|s| *s)
369 }
370 Object::BlankNode(node) => {
371 let node_str = node.as_str().to_string();
372 self.entity_shard_map.get(&node_str).map(|s| *s)
373 }
374 _ => None,
375 }
376 }
377
378 fn is_shard_overloaded(&self, shard_id: ShardId) -> bool {
380 if let Some(metadata) = self.shard_metadata.read().get(&shard_id) {
381 metadata.triple_count > self.config.max_shard_size
382 } else {
383 false
384 }
385 }
386
387 pub fn insert_triple(&self, triple: Triple) -> Result<()> {
389 let shard_id = self.get_shard_for_triple(&triple);
390
391 match self.local_shards.get_mut(&shard_id) {
393 Some(shard) => {
394 shard.insert(&triple);
395
396 if let Some(mut stats) = self.shard_stats.get_mut(&shard_id) {
398 stats.write_count += 1;
399 }
400
401 self.update_shard_metadata(shard_id, 1, 0);
403 }
404 _ => {
405 return Err(anyhow!("Shard {} not available locally", shard_id));
408 }
409 }
410
411 Ok(())
412 }
413
414 pub fn query_triples(
416 &self,
417 subject: Option<&Subject>,
418 predicate: Option<&Predicate>,
419 object: Option<&Object>,
420 ) -> Result<Vec<Triple>> {
421 let mut results = Vec::new();
422
423 let shards_to_query = self.get_shards_for_query(subject, predicate, object);
425
426 for shard_id in shards_to_query {
428 if let Some(shard) = self.local_shards.get(&shard_id) {
429 let shard_results = shard.match_pattern(subject, predicate, object);
430 results.extend(shard_results);
431
432 if let Some(mut stats) = self.shard_stats.get_mut(&shard_id) {
434 stats.read_count += 1;
435 }
436 }
437 }
438
439 Ok(results)
440 }
441
442 fn get_shards_for_query(
444 &self,
445 subject: Option<&Subject>,
446 predicate: Option<&Predicate>,
447 _object: Option<&Object>,
448 ) -> Vec<ShardId> {
449 if let Some(subj) = subject {
451 let triple = Triple::new(
452 subj.clone(),
453 predicate.cloned().unwrap_or_else(|| {
454 Predicate::NamedNode(
455 NamedNode::new("http://example.org/dummy").expect("dummy IRI is valid"),
456 )
457 }),
458 Object::NamedNode(
459 NamedNode::new("http://example.org/dummy").expect("dummy IRI is valid"),
460 ),
461 );
462 vec![self.get_shard_for_triple(&triple)]
463 } else if let Some(pred) = predicate {
464 match &self.strategy {
466 ShardingStrategy::Predicate => {
467 let mut hasher = DefaultHasher::new();
469 pred.to_string().hash(&mut hasher);
470 vec![(hasher.finish() % self.config.shard_count as u64) as ShardId]
471 }
472 _ => {
473 (0..self.config.shard_count).map(|i| i as ShardId).collect()
475 }
476 }
477 } else {
478 (0..self.config.shard_count).map(|i| i as ShardId).collect()
480 }
481 }
482
483 fn update_shard_metadata(&self, shard_id: ShardId, triple_delta: i64, size_delta: i64) {
485 let mut metadata = self.shard_metadata.write();
486 if let Some(shard_meta) = metadata.get_mut(&shard_id) {
487 if triple_delta > 0 {
488 shard_meta.triple_count += triple_delta as usize;
489 } else {
490 shard_meta.triple_count = shard_meta
491 .triple_count
492 .saturating_sub((-triple_delta) as usize);
493 }
494
495 if size_delta > 0 {
496 shard_meta.size_bytes += size_delta as usize;
497 } else {
498 shard_meta.size_bytes =
499 shard_meta.size_bytes.saturating_sub((-size_delta) as usize);
500 }
501
502 shard_meta.last_modified = std::time::SystemTime::now();
503 shard_meta.version += 1;
504 }
505 }
506
507 pub fn needs_rebalancing(&self) -> bool {
509 if !self.config.enable_rebalancing {
510 return false;
511 }
512
513 let metadata = self.shard_metadata.read();
514 if metadata.is_empty() {
515 return false;
516 }
517
518 let sizes: Vec<usize> = metadata.values().map(|m| m.triple_count).collect();
519 let avg_size = sizes.iter().sum::<usize>() / sizes.len();
520 let max_size = sizes.iter().max().copied().unwrap_or(0);
521 let min_size = sizes.iter().min().copied().unwrap_or(0);
522
523 if avg_size > 0 {
525 let imbalance = (max_size as f64 - min_size as f64) / avg_size as f64;
526 imbalance > self.config.rebalancing_threshold
527 } else {
528 false
529 }
530 }
531
532 pub fn plan_rebalancing(&self) -> Vec<Migration> {
534 let mut migrations = Vec::new();
535
536 let metadata = self.shard_metadata.read();
537 let mut shard_sizes: Vec<(ShardId, usize)> = metadata
538 .iter()
539 .map(|(id, meta)| (*id, meta.triple_count))
540 .collect();
541
542 shard_sizes.sort_by_key(|&(_, size)| size);
544
545 let avg_size = shard_sizes.iter().map(|(_, size)| size).sum::<usize>() / shard_sizes.len();
546
547 let overloaded: Vec<_> = shard_sizes
549 .iter()
550 .filter(|(_, size)| {
551 *size > avg_size + (avg_size as f64 * self.config.rebalancing_threshold) as usize
552 })
553 .collect();
554
555 let underloaded: Vec<_> = shard_sizes
556 .iter()
557 .filter(|(_, size)| {
558 *size < avg_size - (avg_size as f64 * self.config.rebalancing_threshold) as usize
559 })
560 .collect();
561
562 for (over_shard, over_size) in overloaded {
564 for (_under_shard, under_size) in &underloaded {
565 let to_move = (*over_size - avg_size).min(avg_size - *under_size);
566 if to_move > 0 {
567 let migration = Migration {
569 shard_id: *over_shard,
570 from_node: 0, to_node: 0, triples: vec![], reason: MigrationReason::LoadBalance,
574 };
575 migrations.push(migration);
576 }
577 }
578 }
579
580 migrations
581 }
582
583 pub async fn execute_migration(&self, migration: &Migration) -> Result<()> {
585 self.update_shard_metadata(migration.shard_id, -(migration.triples.len() as i64), 0);
595
596 Ok(())
597 }
598
599 pub fn get_shard_statistics(&self) -> HashMap<ShardId, ShardStatistics> {
601 self.shard_stats
602 .iter()
603 .map(|entry| (*entry.key(), entry.value().clone()))
604 .collect()
605 }
606
607 pub fn get_load_distribution(&self) -> HashMap<ShardId, f64> {
609 let total_ops: u64 = self
610 .shard_stats
611 .iter()
612 .map(|entry| entry.value().read_count + entry.value().write_count)
613 .sum();
614
615 if total_ops == 0 {
616 return HashMap::new();
617 }
618
619 self.shard_stats
620 .iter()
621 .map(|entry| {
622 let shard_ops = entry.value().read_count + entry.value().write_count;
623 (*entry.key(), shard_ops as f64 / total_ops as f64)
624 })
625 .collect()
626 }
627}
628
629pub struct ShardRouter {
631 manager: Arc<ShardManager>,
632}
633
634impl ShardRouter {
635 pub fn new(manager: Arc<ShardManager>) -> Self {
637 Self { manager }
638 }
639
640 pub fn route_query(&self, _query: &str) -> Result<Vec<ShardId>> {
642 Ok((0..self.manager.config.shard_count)
645 .map(|i| i as ShardId)
646 .collect())
647 }
648
649 pub fn optimize_distributed_query(&self, query: &str) -> Result<DistributedQueryPlan> {
651 let shards = self.route_query(query)?;
653
654 Ok(DistributedQueryPlan {
655 query: query.to_string(),
656 shard_operations: shards
657 .into_iter()
658 .map(|shard| ShardOperation {
659 shard_id: shard,
660 operation: query.to_string(),
661 estimated_cost: 1.0,
662 })
663 .collect(),
664 merge_strategy: MergeStrategy::Union,
665 })
666 }
667}
668
669#[derive(Debug, Clone)]
671pub struct DistributedQueryPlan {
672 pub query: String,
673 pub shard_operations: Vec<ShardOperation>,
674 pub merge_strategy: MergeStrategy,
675}
676
677#[derive(Debug, Clone)]
679pub struct ShardOperation {
680 pub shard_id: ShardId,
681 pub operation: String,
682 pub estimated_cost: f64,
683}
684
685#[derive(Debug, Clone)]
687pub enum MergeStrategy {
688 Union,
690 Intersection,
692 Join { join_key: String },
694 Aggregate { group_by: Vec<String> },
696}
697
698#[cfg(test)]
699mod tests {
700 use super::*;
701 use crate::model::{Literal, NamedNode, Triple};
702
703 #[test]
704 fn test_hash_sharding() {
705 let config = ShardingConfig::default();
706 let manager = ShardManager::new(config, ShardingStrategy::Hash);
707
708 let triple1 = Triple::new(
709 NamedNode::new("http://example.org/s1").expect("valid IRI"),
710 NamedNode::new("http://example.org/p").expect("valid IRI"),
711 Literal::new("value1"),
712 );
713
714 let triple2 = Triple::new(
715 NamedNode::new("http://example.org/s2").expect("valid IRI"),
716 NamedNode::new("http://example.org/p").expect("valid IRI"),
717 Literal::new("value2"),
718 );
719
720 let shard1 = manager.get_shard_for_triple(&triple1);
721 let shard2 = manager.get_shard_for_triple(&triple2);
722
723 assert!(shard1 < 16);
726 assert!(shard2 < 16);
727 }
728
729 #[test]
730 fn test_subject_sharding() {
731 let config = ShardingConfig::default();
732 let manager = ShardManager::new(config, ShardingStrategy::Subject);
733
734 let subject = NamedNode::new("http://example.org/entity1").expect("valid IRI");
735
736 let triple1 = Triple::new(
737 subject.clone(),
738 NamedNode::new("http://example.org/p1").expect("valid IRI"),
739 Literal::new("value1"),
740 );
741
742 let triple2 = Triple::new(
743 subject.clone(),
744 NamedNode::new("http://example.org/p2").expect("valid IRI"),
745 Literal::new("value2"),
746 );
747
748 let shard1 = manager.get_shard_for_triple(&triple1);
749 let shard2 = manager.get_shard_for_triple(&triple2);
750
751 assert_eq!(shard1, shard2);
753 }
754
755 #[test]
756 fn test_semantic_sharding() {
757 let config = ShardingConfig::default();
758 let strategy = SemanticStrategy::default();
759 let manager = ShardManager::new(config, ShardingStrategy::Semantic(strategy));
760
761 let person = NamedNode::new("http://example.org/person1").expect("valid IRI");
762
763 let triple1 = Triple::new(
764 person.clone(),
765 NamedNode::new("http://example.org/name").expect("valid IRI"),
766 Literal::new("John"),
767 );
768
769 let triple2 = Triple::new(
770 person.clone(),
771 NamedNode::new("http://example.org/email").expect("valid IRI"),
772 Literal::new("john@example.org"),
773 );
774
775 let shard1 = manager.get_shard_for_triple(&triple1);
776 let shard2 = manager.get_shard_for_triple(&triple2);
777
778 assert_eq!(shard1, shard2);
780 }
781
782 #[test]
783 fn test_rebalancing_detection() {
784 let config = ShardingConfig {
785 shard_count: 4,
786 rebalancing_threshold: 0.2,
787 ..Default::default()
788 };
789 let manager = ShardManager::new(config, ShardingStrategy::Hash);
790
791 assert!(!manager.needs_rebalancing());
793
794 manager.update_shard_metadata(0, 1000, 0);
796 manager.update_shard_metadata(1, 100, 0);
797
798 assert!(manager.needs_rebalancing());
800 }
801
802 #[test]
803 fn test_query_routing() {
804 let config = ShardingConfig::default();
805 let manager = Arc::new(ShardManager::new(config, ShardingStrategy::Subject));
806 let router = ShardRouter::new(manager);
807
808 let query = "SELECT ?s ?p ?o WHERE { ?s ?p ?o }";
809 let shards = router.route_query(query).expect("operation should succeed");
810
811 assert_eq!(shards.len(), 16);
813 }
814}