Skip to main content

oxirs_core/store/
indexed_graph.rs

1//! High-performance multi-index graph implementation
2//!
3//! This module provides an indexed graph structure with SPO, POS, and OSP indexes
4//! for efficient triple pattern matching. All terms are interned for memory efficiency.
5
6use crate::model::{Object, Predicate, Subject, Triple};
7use crate::store::term_interner::TermInterner;
8use std::collections::{BTreeMap, BTreeSet, HashMap};
9use std::sync::{Arc, RwLock};
10
11/// Type alias for complex index structures
12type TripleIndex = Arc<RwLock<BTreeMap<u32, BTreeMap<u32, BTreeSet<u32>>>>>;
13
14/// Interned triple representation using term IDs
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
16pub struct InternedTriple {
17    pub subject_id: u32,
18    pub predicate_id: u32,
19    pub object_id: u32,
20}
21
22/// Index type for selecting the best index for a query pattern
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum IndexType {
25    SPO, // Subject -> Predicate -> Object
26    POS, // Predicate -> Object -> Subject
27    OSP, // Object -> Subject -> Predicate
28}
29
30/// Multi-indexed graph with term interning
31#[derive(Debug)]
32pub struct IndexedGraph {
33    /// Term interner for string deduplication
34    interner: Arc<TermInterner>,
35    /// SPO index: Subject -> Predicate -> Object
36    spo_index: TripleIndex,
37    /// POS index: Predicate -> Object -> Subject
38    pos_index: TripleIndex,
39    /// OSP index: Object -> Subject -> Predicate
40    osp_index: TripleIndex,
41    /// Total number of triples
42    triple_count: Arc<RwLock<usize>>,
43    /// Index statistics for query optimization
44    stats: Arc<RwLock<IndexStats>>,
45}
46
47/// Statistics for index usage and performance
48#[derive(Debug, Clone, Default)]
49pub struct IndexStats {
50    pub spo_lookups: usize,
51    pub pos_lookups: usize,
52    pub osp_lookups: usize,
53    pub total_insertions: usize,
54    pub total_deletions: usize,
55    pub batch_insertions: usize,
56}
57
58impl IndexStats {
59    /// Get the most frequently used index
60    pub fn most_used_index(&self) -> IndexType {
61        if self.spo_lookups >= self.pos_lookups && self.spo_lookups >= self.osp_lookups {
62            IndexType::SPO
63        } else if self.pos_lookups >= self.osp_lookups {
64            IndexType::POS
65        } else {
66            IndexType::OSP
67        }
68    }
69}
70
71impl IndexedGraph {
72    /// Create a new indexed graph
73    pub fn new() -> Self {
74        IndexedGraph {
75            interner: Arc::new(TermInterner::new()),
76            spo_index: Arc::new(RwLock::new(BTreeMap::new())),
77            pos_index: Arc::new(RwLock::new(BTreeMap::new())),
78            osp_index: Arc::new(RwLock::new(BTreeMap::new())),
79            triple_count: Arc::new(RwLock::new(0)),
80            stats: Arc::new(RwLock::new(IndexStats::default())),
81        }
82    }
83
84    /// Create a new indexed graph with a custom interner
85    pub fn with_interner(interner: Arc<TermInterner>) -> Self {
86        IndexedGraph {
87            interner,
88            spo_index: Arc::new(RwLock::new(BTreeMap::new())),
89            pos_index: Arc::new(RwLock::new(BTreeMap::new())),
90            osp_index: Arc::new(RwLock::new(BTreeMap::new())),
91            triple_count: Arc::new(RwLock::new(0)),
92            stats: Arc::new(RwLock::new(IndexStats::default())),
93        }
94    }
95
96    /// Insert a triple into the graph
97    pub fn insert(&self, triple: &Triple) -> bool {
98        // Intern the terms
99        let s_id = self.interner.intern_subject(triple.subject());
100        let p_id = self.interner.intern_predicate(triple.predicate());
101        let o_id = self.interner.intern_object(triple.object());
102
103        let interned = InternedTriple {
104            subject_id: s_id,
105            predicate_id: p_id,
106            object_id: o_id,
107        };
108
109        self.insert_interned(interned)
110    }
111
112    /// Insert an already interned triple
113    fn insert_interned(&self, triple: InternedTriple) -> bool {
114        let mut spo = self.spo_index.write().expect("spo_index lock poisoned");
115        let mut pos = self.pos_index.write().expect("pos_index lock poisoned");
116        let mut osp = self.osp_index.write().expect("osp_index lock poisoned");
117
118        // Check if triple already exists in SPO index
119        if let Some(po_map) = spo.get(&triple.subject_id) {
120            if let Some(o_set) = po_map.get(&triple.predicate_id) {
121                if o_set.contains(&triple.object_id) {
122                    return false; // Triple already exists
123                }
124            }
125        }
126
127        // Insert into SPO index
128        spo.entry(triple.subject_id)
129            .or_default()
130            .entry(triple.predicate_id)
131            .or_default()
132            .insert(triple.object_id);
133
134        // Insert into POS index
135        pos.entry(triple.predicate_id)
136            .or_default()
137            .entry(triple.object_id)
138            .or_default()
139            .insert(triple.subject_id);
140
141        // Insert into OSP index
142        osp.entry(triple.object_id)
143            .or_default()
144            .entry(triple.subject_id)
145            .or_default()
146            .insert(triple.predicate_id);
147
148        // Update counts
149        *self
150            .triple_count
151            .write()
152            .expect("triple_count lock poisoned") += 1;
153        self.stats
154            .write()
155            .expect("stats lock poisoned")
156            .total_insertions += 1;
157
158        true
159    }
160
161    /// Batch insert multiple triples for better performance
162    pub fn batch_insert(&self, triples: &[Triple]) -> Vec<bool> {
163        // Pre-intern all terms to minimize lock contention
164        let interned_triples: Vec<InternedTriple> = triples
165            .iter()
166            .map(|t| InternedTriple {
167                subject_id: self.interner.intern_subject(t.subject()),
168                predicate_id: self.interner.intern_predicate(t.predicate()),
169                object_id: self.interner.intern_object(t.object()),
170            })
171            .collect();
172
173        // Batch insert with single lock acquisition
174        let mut spo = self.spo_index.write().expect("spo_index lock poisoned");
175        let mut pos = self.pos_index.write().expect("pos_index lock poisoned");
176        let mut osp = self.osp_index.write().expect("osp_index lock poisoned");
177        let mut count = self
178            .triple_count
179            .write()
180            .expect("triple_count lock poisoned");
181        let mut stats = self.stats.write().expect("stats lock poisoned");
182
183        let mut results = Vec::with_capacity(triples.len());
184        let mut inserted_count = 0;
185
186        for triple in interned_triples {
187            // Check existence
188            let exists = spo
189                .get(&triple.subject_id)
190                .and_then(|po| po.get(&triple.predicate_id))
191                .is_some_and(|o_set| o_set.contains(&triple.object_id));
192
193            if !exists {
194                // Insert into all indexes
195                spo.entry(triple.subject_id)
196                    .or_default()
197                    .entry(triple.predicate_id)
198                    .or_default()
199                    .insert(triple.object_id);
200
201                pos.entry(triple.predicate_id)
202                    .or_default()
203                    .entry(triple.object_id)
204                    .or_default()
205                    .insert(triple.subject_id);
206
207                osp.entry(triple.object_id)
208                    .or_default()
209                    .entry(triple.subject_id)
210                    .or_default()
211                    .insert(triple.predicate_id);
212
213                inserted_count += 1;
214                results.push(true);
215            } else {
216                results.push(false);
217            }
218        }
219
220        *count += inserted_count;
221        stats.total_insertions += inserted_count;
222        stats.batch_insertions += 1;
223
224        results
225    }
226
227    /// Remove a triple from the graph
228    pub fn remove(&self, triple: &Triple) -> bool {
229        let s_id = match self.interner.get_subject_id(triple.subject()) {
230            Some(id) => id,
231            None => return false, // Subject not in graph
232        };
233        let p_id = match self.interner.get_predicate_id(triple.predicate()) {
234            Some(id) => id,
235            None => return false, // Predicate not in graph
236        };
237        let o_id = match self.interner.get_object_id(triple.object()) {
238            Some(id) => id,
239            None => return false, // Object not in graph
240        };
241
242        self.remove_interned(s_id, p_id, o_id)
243    }
244
245    /// Remove an interned triple
246    fn remove_interned(&self, s_id: u32, p_id: u32, o_id: u32) -> bool {
247        let mut spo = self.spo_index.write().expect("spo_index lock poisoned");
248        let mut pos = self.pos_index.write().expect("pos_index lock poisoned");
249        let mut osp = self.osp_index.write().expect("osp_index lock poisoned");
250
251        let mut removed = false;
252
253        // Remove from SPO index
254        if let Some(po_map) = spo.get_mut(&s_id) {
255            if let Some(o_set) = po_map.get_mut(&p_id) {
256                removed = o_set.remove(&o_id);
257                if o_set.is_empty() {
258                    po_map.remove(&p_id);
259                    if po_map.is_empty() {
260                        spo.remove(&s_id);
261                    }
262                }
263            }
264        }
265
266        if removed {
267            // Remove from POS index
268            if let Some(os_map) = pos.get_mut(&p_id) {
269                if let Some(s_set) = os_map.get_mut(&o_id) {
270                    s_set.remove(&s_id);
271                    if s_set.is_empty() {
272                        os_map.remove(&o_id);
273                        if os_map.is_empty() {
274                            pos.remove(&p_id);
275                        }
276                    }
277                }
278            }
279
280            // Remove from OSP index
281            if let Some(sp_map) = osp.get_mut(&o_id) {
282                if let Some(p_set) = sp_map.get_mut(&s_id) {
283                    p_set.remove(&p_id);
284                    if p_set.is_empty() {
285                        sp_map.remove(&s_id);
286                        if sp_map.is_empty() {
287                            osp.remove(&o_id);
288                        }
289                    }
290                }
291            }
292
293            *self
294                .triple_count
295                .write()
296                .expect("triple_count lock poisoned") -= 1;
297            self.stats
298                .write()
299                .expect("stats lock poisoned")
300                .total_deletions += 1;
301        }
302
303        removed
304    }
305
306    /// Query triples matching a pattern using the most efficient index
307    pub fn query(
308        &self,
309        subject: Option<&Subject>,
310        predicate: Option<&Predicate>,
311        object: Option<&Object>,
312    ) -> Vec<Triple> {
313        // Determine the best index to use
314        let index_type =
315            self.select_index(subject.is_some(), predicate.is_some(), object.is_some());
316
317        // Update stats
318        match index_type {
319            IndexType::SPO => self.stats.write().expect("stats lock poisoned").spo_lookups += 1,
320            IndexType::POS => self.stats.write().expect("stats lock poisoned").pos_lookups += 1,
321            IndexType::OSP => self.stats.write().expect("stats lock poisoned").osp_lookups += 1,
322        }
323
324        // Convert terms to IDs if provided
325        let s_id = subject.and_then(|s| self.interner.get_subject_id(s));
326        let p_id = predicate.and_then(|p| self.interner.get_predicate_id(p));
327        let o_id = object.and_then(|o| self.interner.get_object_id(o));
328
329        // If any required term is not found, return empty result
330        if (subject.is_some() && s_id.is_none())
331            || (predicate.is_some() && p_id.is_none())
332            || (object.is_some() && o_id.is_none())
333        {
334            return Vec::new();
335        }
336
337        // Query using the selected index
338        let interned_results = match index_type {
339            IndexType::SPO => self.query_spo(s_id, p_id, o_id),
340            IndexType::POS => self.query_pos(p_id, o_id, s_id),
341            IndexType::OSP => self.query_osp(o_id, s_id, p_id),
342        };
343
344        // Convert back to triples
345        interned_results
346            .into_iter()
347            .filter_map(|it| self.interned_to_triple(it))
348            .collect()
349    }
350
351    /// Select the best index based on the query pattern
352    fn select_index(&self, has_s: bool, has_p: bool, has_o: bool) -> IndexType {
353        match (has_s, has_p, has_o) {
354            (true, true, _) => IndexType::SPO,      // S+P given, use SPO
355            (true, false, true) => IndexType::OSP,  // S+O given, use OSP
356            (false, true, true) => IndexType::POS,  // P+O given, use POS
357            (true, false, false) => IndexType::SPO, // Only S given
358            (false, true, false) => IndexType::POS, // Only P given
359            (false, false, true) => IndexType::OSP, // Only O given
360            _ => IndexType::SPO,                    // No constraint or all given, default to SPO
361        }
362    }
363
364    /// Query using SPO index
365    fn query_spo(
366        &self,
367        s_id: Option<u32>,
368        p_id: Option<u32>,
369        o_id: Option<u32>,
370    ) -> Vec<InternedTriple> {
371        let spo = self.spo_index.read().expect("spo_index lock poisoned");
372        let mut results = Vec::new();
373
374        match (s_id, p_id, o_id) {
375            (Some(s), Some(p), Some(o)) => {
376                // Exact match
377                if let Some(po_map) = spo.get(&s) {
378                    if let Some(o_set) = po_map.get(&p) {
379                        if o_set.contains(&o) {
380                            results.push(InternedTriple {
381                                subject_id: s,
382                                predicate_id: p,
383                                object_id: o,
384                            });
385                        }
386                    }
387                }
388            }
389            (Some(s), Some(p), None) => {
390                // S+P given
391                if let Some(po_map) = spo.get(&s) {
392                    if let Some(o_set) = po_map.get(&p) {
393                        for &o in o_set {
394                            results.push(InternedTriple {
395                                subject_id: s,
396                                predicate_id: p,
397                                object_id: o,
398                            });
399                        }
400                    }
401                }
402            }
403            (Some(s), None, None) => {
404                // Only S given
405                if let Some(po_map) = spo.get(&s) {
406                    for (&p, o_set) in po_map {
407                        for &o in o_set {
408                            results.push(InternedTriple {
409                                subject_id: s,
410                                predicate_id: p,
411                                object_id: o,
412                            });
413                        }
414                    }
415                }
416            }
417            (None, None, None) => {
418                // All triples
419                for (&s, po_map) in spo.iter() {
420                    for (&p, o_set) in po_map {
421                        for &o in o_set {
422                            results.push(InternedTriple {
423                                subject_id: s,
424                                predicate_id: p,
425                                object_id: o,
426                            });
427                        }
428                    }
429                }
430            }
431            _ => {
432                // Other patterns better served by different indexes
433                // But we can still handle them, just less efficiently
434                for (&s, po_map) in spo.iter() {
435                    if s_id.map_or(true, |id| id == s) {
436                        for (&p, o_set) in po_map {
437                            if p_id.map_or(true, |id| id == p) {
438                                for &o in o_set {
439                                    if o_id.map_or(true, |id| id == o) {
440                                        results.push(InternedTriple {
441                                            subject_id: s,
442                                            predicate_id: p,
443                                            object_id: o,
444                                        });
445                                    }
446                                }
447                            }
448                        }
449                    }
450                }
451            }
452        }
453
454        results
455    }
456
457    /// Query using POS index
458    fn query_pos(
459        &self,
460        p_id: Option<u32>,
461        o_id: Option<u32>,
462        s_id: Option<u32>,
463    ) -> Vec<InternedTriple> {
464        let pos = self.pos_index.read().expect("pos_index lock poisoned");
465        let mut results = Vec::new();
466
467        match (p_id, o_id) {
468            (Some(p), Some(o)) => {
469                // P+O given
470                if let Some(os_map) = pos.get(&p) {
471                    if let Some(s_set) = os_map.get(&o) {
472                        for &s in s_set {
473                            if s_id.map_or(true, |id| id == s) {
474                                results.push(InternedTriple {
475                                    subject_id: s,
476                                    predicate_id: p,
477                                    object_id: o,
478                                });
479                            }
480                        }
481                    }
482                }
483            }
484            (Some(p), None) => {
485                // Only P given
486                if let Some(os_map) = pos.get(&p) {
487                    for (&o, s_set) in os_map {
488                        for &s in s_set {
489                            if s_id.map_or(true, |id| id == s) {
490                                results.push(InternedTriple {
491                                    subject_id: s,
492                                    predicate_id: p,
493                                    object_id: o,
494                                });
495                            }
496                        }
497                    }
498                }
499            }
500            _ => {
501                // Fallback to SPO for other patterns
502                return self.query_spo(s_id, p_id, o_id);
503            }
504        }
505
506        results
507    }
508
509    /// Query using OSP index
510    fn query_osp(
511        &self,
512        o_id: Option<u32>,
513        s_id: Option<u32>,
514        p_id: Option<u32>,
515    ) -> Vec<InternedTriple> {
516        let osp = self.osp_index.read().expect("osp_index lock poisoned");
517        let mut results = Vec::new();
518
519        match (o_id, s_id) {
520            (Some(o), Some(s)) => {
521                // O+S given
522                if let Some(sp_map) = osp.get(&o) {
523                    if let Some(p_set) = sp_map.get(&s) {
524                        for &p in p_set {
525                            if p_id.map_or(true, |id| id == p) {
526                                results.push(InternedTriple {
527                                    subject_id: s,
528                                    predicate_id: p,
529                                    object_id: o,
530                                });
531                            }
532                        }
533                    }
534                }
535            }
536            (Some(o), None) => {
537                // Only O given
538                if let Some(sp_map) = osp.get(&o) {
539                    for (&s, p_set) in sp_map {
540                        for &p in p_set {
541                            if p_id.map_or(true, |id| id == p) {
542                                results.push(InternedTriple {
543                                    subject_id: s,
544                                    predicate_id: p,
545                                    object_id: o,
546                                });
547                            }
548                        }
549                    }
550                }
551            }
552            _ => {
553                // Fallback to SPO for other patterns
554                return self.query_spo(s_id, p_id, o_id);
555            }
556        }
557
558        results
559    }
560
561    /// Convert an interned triple back to a regular triple
562    fn interned_to_triple(&self, interned: InternedTriple) -> Option<Triple> {
563        let subject = self.interner.get_subject(interned.subject_id)?;
564        let predicate = self.interner.get_predicate(interned.predicate_id)?;
565        let object = self.interner.get_object(interned.object_id)?;
566        Some(Triple::new(subject, predicate, object))
567    }
568
569    /// Get the number of triples in the graph
570    pub fn len(&self) -> usize {
571        *self
572            .triple_count
573            .read()
574            .expect("triple_count lock poisoned")
575    }
576
577    /// Check if the graph is empty
578    pub fn is_empty(&self) -> bool {
579        self.len() == 0
580    }
581
582    /// Get memory usage statistics
583    pub fn memory_usage(&self) -> MemoryUsage {
584        let spo = self.spo_index.read().expect("spo_index lock poisoned");
585        let pos = self.pos_index.read().expect("pos_index lock poisoned");
586        let osp = self.osp_index.read().expect("osp_index lock poisoned");
587
588        let spo_entries = count_index_entries(&spo);
589        let pos_entries = count_index_entries(&pos);
590        let osp_entries = count_index_entries(&osp);
591
592        MemoryUsage {
593            term_interner_bytes: self.interner.memory_usage(),
594            spo_index_bytes: estimate_index_memory(&spo),
595            pos_index_bytes: estimate_index_memory(&pos),
596            osp_index_bytes: estimate_index_memory(&osp),
597            total_triple_count: self.len(),
598            index_entry_count: spo_entries + pos_entries + osp_entries,
599        }
600    }
601
602    /// Get index statistics
603    pub fn index_stats(&self) -> IndexStats {
604        self.stats.read().expect("stats lock poisoned").clone()
605    }
606
607    /// Clear all data from the graph
608    pub fn clear(&self) {
609        self.spo_index
610            .write()
611            .expect("spo_index lock poisoned")
612            .clear();
613        self.pos_index
614            .write()
615            .expect("pos_index lock poisoned")
616            .clear();
617        self.osp_index
618            .write()
619            .expect("osp_index lock poisoned")
620            .clear();
621        *self
622            .triple_count
623            .write()
624            .expect("triple_count lock poisoned") = 0;
625        self.interner.clear();
626    }
627
628    /// Get a reference to the term interner
629    pub fn interner(&self) -> &Arc<TermInterner> {
630        &self.interner
631    }
632
633    /// Parallel insert of multiple triples using rayon
634    #[cfg(feature = "parallel")]
635    pub fn par_insert_batch(&self, triples: Vec<Triple>) -> Vec<bool> {
636        use rayon::prelude::*;
637
638        // First pass: intern all terms in parallel
639        let interned_triples: Vec<_> = triples
640            .par_iter()
641            .map(|triple| InternedTriple {
642                subject_id: self.interner.intern_subject(triple.subject()),
643                predicate_id: self.interner.intern_predicate(triple.predicate()),
644                object_id: self.interner.intern_object(triple.object()),
645            })
646            .collect();
647
648        // Group by subject for better lock granularity
649        let mut grouped: HashMap<u32, Vec<InternedTriple>> = HashMap::new();
650        for interned in interned_triples {
651            grouped
652                .entry(interned.subject_id)
653                .or_default()
654                .push(interned);
655        }
656
657        // Process groups in parallel
658        let results: Vec<_> = grouped
659            .into_par_iter()
660            .flat_map(|(_, group)| {
661                let mut group_results = Vec::new();
662                for interned in group {
663                    let inserted = self.insert_interned(interned);
664                    group_results.push(inserted);
665                }
666                group_results
667            })
668            .collect();
669
670        results
671    }
672
673    /// Parallel remove of multiple triples
674    #[cfg(feature = "parallel")]
675    pub fn par_remove_batch(&self, triples: &[Triple]) -> Vec<bool> {
676        use rayon::prelude::*;
677
678        triples
679            .par_iter()
680            .map(|triple| self.remove(triple))
681            .collect()
682    }
683
684    /// Parallel query with multiple patterns
685    #[cfg(feature = "parallel")]
686    pub fn par_query_batch(
687        &self,
688        patterns: Vec<(Option<Subject>, Option<Predicate>, Option<Object>)>,
689    ) -> Vec<Vec<Triple>> {
690        use rayon::prelude::*;
691
692        patterns
693            .into_par_iter()
694            .map(|(s, p, o)| self.query(s.as_ref(), p.as_ref(), o.as_ref()))
695            .collect()
696    }
697
698    /// Apply a transformation to all triples in parallel
699    #[cfg(feature = "parallel")]
700    pub fn par_transform<F>(&self, transform: F) -> Vec<Triple>
701    where
702        F: Fn(&Triple) -> Option<Triple> + Send + Sync,
703    {
704        use rayon::prelude::*;
705
706        // Get all triples
707        let all_triples = self.query(None, None, None);
708
709        // Transform in parallel
710        all_triples
711            .into_par_iter()
712            .filter_map(|triple| transform(&triple))
713            .collect()
714    }
715
716    /// Parallel map over all triples
717    #[cfg(feature = "parallel")]
718    pub fn par_map<F, R>(&self, mapper: F) -> Vec<R>
719    where
720        F: Fn(&Triple) -> R + Send + Sync,
721        R: Send,
722    {
723        use rayon::prelude::*;
724
725        let all_triples = self.query(None, None, None);
726        all_triples
727            .into_par_iter()
728            .map(|triple| mapper(&triple))
729            .collect()
730    }
731
732    /// Parallel filter triples
733    #[cfg(feature = "parallel")]
734    pub fn par_filter<F>(&self, predicate: F) -> Vec<Triple>
735    where
736        F: Fn(&Triple) -> bool + Send + Sync,
737    {
738        use rayon::prelude::*;
739
740        let all_triples = self.query(None, None, None);
741        all_triples
742            .into_par_iter()
743            .filter(|triple| predicate(triple))
744            .collect()
745    }
746
747    /// Parallel fold operation
748    #[cfg(feature = "parallel")]
749    pub fn par_fold<F, R>(&self, init: R, fold_fn: F) -> R
750    where
751        F: Fn(R, &Triple) -> R + Send + Sync,
752        R: Send + Sync + Clone + 'static + std::ops::Add<Output = R>,
753    {
754        use rayon::prelude::*;
755
756        let all_triples = self.query(None, None, None);
757        all_triples
758            .into_par_iter()
759            .fold(|| init.clone(), |acc, triple| fold_fn(acc, &triple))
760            .reduce(|| init.clone(), |acc1, acc2| acc1 + acc2)
761    }
762
763    /// Iterator over all triples in the graph
764    pub fn iter(&self) -> impl Iterator<Item = Triple> + use<> {
765        self.query(None, None, None).into_iter()
766    }
767
768    /// Match a pattern and return matching triples
769    pub fn match_pattern(
770        &self,
771        subject: Option<&Subject>,
772        predicate: Option<&Predicate>,
773        object: Option<&Object>,
774    ) -> Vec<Triple> {
775        self.query(subject, predicate, object)
776    }
777}
778
779impl Default for IndexedGraph {
780    fn default() -> Self {
781        Self::new()
782    }
783}
784
785/// Memory usage statistics
786#[derive(Debug, Clone)]
787pub struct MemoryUsage {
788    pub term_interner_bytes: usize,
789    pub spo_index_bytes: usize,
790    pub pos_index_bytes: usize,
791    pub osp_index_bytes: usize,
792    pub total_triple_count: usize,
793    pub index_entry_count: usize,
794}
795
796impl MemoryUsage {
797    /// Get total memory usage in bytes
798    pub fn total_bytes(&self) -> usize {
799        self.term_interner_bytes
800            + self.spo_index_bytes
801            + self.pos_index_bytes
802            + self.osp_index_bytes
803    }
804
805    /// Get average bytes per triple
806    pub fn bytes_per_triple(&self) -> f64 {
807        if self.total_triple_count == 0 {
808            0.0
809        } else {
810            self.total_bytes() as f64 / self.total_triple_count as f64
811        }
812    }
813}
814
815/// Count entries in an index
816fn count_index_entries(index: &BTreeMap<u32, BTreeMap<u32, BTreeSet<u32>>>) -> usize {
817    index.values().map(|inner| inner.len()).sum()
818}
819
820/// Estimate memory usage of an index
821fn estimate_index_memory(index: &BTreeMap<u32, BTreeMap<u32, BTreeSet<u32>>>) -> usize {
822    let mut total = 0;
823    for inner in index.values() {
824        total += 4; // Key size
825        total += 24; // BTreeMap overhead
826        for set in inner.values() {
827            total += 4; // Key size
828            total += 24; // BTreeSet overhead
829            total += set.len() * 4; // Values
830        }
831    }
832    total
833}
834
835#[cfg(test)]
836mod tests {
837    use super::*;
838    use crate::model::{Literal, NamedNode};
839
840    fn create_test_triple(s: &str, p: &str, o: &str) -> Triple {
841        Triple::new(
842            NamedNode::new(s).expect("valid IRI"),
843            NamedNode::new(p).expect("valid IRI"),
844            Literal::new(o),
845        )
846    }
847
848    #[test]
849    fn test_basic_operations() {
850        let graph = IndexedGraph::new();
851
852        let triple =
853            create_test_triple("http://example.org/s1", "http://example.org/p1", "object1");
854
855        // Test insertion
856        assert!(graph.insert(&triple));
857        assert!(!graph.insert(&triple)); // Duplicate
858        assert_eq!(graph.len(), 1);
859
860        // Test query
861        let results = graph.query(Some(triple.subject()), None, None);
862        assert_eq!(results.len(), 1);
863        assert_eq!(results[0], triple);
864
865        // Test removal
866        assert!(graph.remove(&triple));
867        assert!(!graph.remove(&triple)); // Already removed
868        assert_eq!(graph.len(), 0);
869    }
870
871    #[test]
872    fn test_batch_insert() {
873        let graph = IndexedGraph::new();
874
875        let triples = vec![
876            create_test_triple("http://example.org/s1", "http://example.org/p1", "o1"),
877            create_test_triple("http://example.org/s1", "http://example.org/p2", "o2"),
878            create_test_triple("http://example.org/s2", "http://example.org/p1", "o3"),
879            create_test_triple("http://example.org/s1", "http://example.org/p1", "o1"), // Duplicate
880        ];
881
882        let results = graph.batch_insert(&triples);
883        assert_eq!(results, vec![true, true, true, false]);
884        assert_eq!(graph.len(), 3);
885    }
886
887    #[test]
888    fn test_query_patterns() {
889        let graph = IndexedGraph::new();
890
891        // Insert test data
892        let triples = vec![
893            create_test_triple("http://example.org/s1", "http://example.org/p1", "o1"),
894            create_test_triple("http://example.org/s1", "http://example.org/p2", "o2"),
895            create_test_triple("http://example.org/s2", "http://example.org/p1", "o3"),
896            create_test_triple("http://example.org/s2", "http://example.org/p2", "o4"),
897        ];
898
899        for triple in &triples {
900            graph.insert(triple);
901        }
902
903        // Test S pattern
904        let s1 = Subject::NamedNode(NamedNode::new("http://example.org/s1").expect("valid IRI"));
905        let results = graph.query(Some(&s1), None, None);
906        assert_eq!(results.len(), 2);
907
908        // Test P pattern
909        let p1 = Predicate::NamedNode(NamedNode::new("http://example.org/p1").expect("valid IRI"));
910        let results = graph.query(None, Some(&p1), None);
911        assert_eq!(results.len(), 2);
912
913        // Test O pattern
914        let o1 = Object::Literal(Literal::new("o1"));
915        let results = graph.query(None, None, Some(&o1));
916        assert_eq!(results.len(), 1);
917
918        // Test SP pattern
919        let results = graph.query(Some(&s1), Some(&p1), None);
920        assert_eq!(results.len(), 1);
921
922        // Test PO pattern
923        let results = graph.query(None, Some(&p1), Some(&o1));
924        assert_eq!(results.len(), 1);
925
926        // Test SO pattern
927        let o2 = Object::Literal(Literal::new("o2"));
928        let results = graph.query(Some(&s1), None, Some(&o2));
929        assert_eq!(results.len(), 1);
930
931        // Test SPO pattern (exact match)
932        let results = graph.query(Some(&s1), Some(&p1), Some(&o1));
933        assert_eq!(results.len(), 1);
934
935        // Test all triples
936        let results = graph.query(None, None, None);
937        assert_eq!(results.len(), 4);
938    }
939
940    #[test]
941    fn test_index_selection() {
942        let graph = IndexedGraph::new();
943
944        // SPO patterns
945        assert_eq!(graph.select_index(true, true, true), IndexType::SPO);
946        assert_eq!(graph.select_index(true, true, false), IndexType::SPO);
947        assert_eq!(graph.select_index(true, false, false), IndexType::SPO);
948
949        // POS patterns
950        assert_eq!(graph.select_index(false, true, true), IndexType::POS);
951        assert_eq!(graph.select_index(false, true, false), IndexType::POS);
952
953        // OSP patterns
954        assert_eq!(graph.select_index(true, false, true), IndexType::OSP);
955        assert_eq!(graph.select_index(false, false, true), IndexType::OSP);
956    }
957
958    #[test]
959    fn test_memory_usage() {
960        let graph = IndexedGraph::new();
961
962        // Insert some triples
963        for i in 0..10 {
964            let triple = create_test_triple(
965                &format!("http://example.org/s{i}"),
966                "http://example.org/p1",
967                &format!("object{i}"),
968            );
969            graph.insert(&triple);
970        }
971
972        let usage = graph.memory_usage();
973        assert_eq!(usage.total_triple_count, 10);
974        assert!(usage.total_bytes() > 0);
975        assert!(usage.bytes_per_triple() > 0.0);
976    }
977
978    #[test]
979    fn test_clear() {
980        let graph = IndexedGraph::new();
981
982        // Insert and then clear
983        let triple = create_test_triple("http://example.org/s1", "http://example.org/p1", "o1");
984        graph.insert(&triple);
985        assert_eq!(graph.len(), 1);
986
987        graph.clear();
988        assert_eq!(graph.len(), 0);
989        assert!(graph.is_empty());
990
991        // Should be able to insert again after clear
992        assert!(graph.insert(&triple));
993        assert_eq!(graph.len(), 1);
994    }
995
996    #[test]
997    fn test_concurrent_access() {
998        use std::sync::Arc;
999        use std::thread;
1000
1001        let graph = Arc::new(IndexedGraph::new());
1002        let mut handles = vec![];
1003
1004        // Concurrent insertions
1005        for i in 0..10 {
1006            let graph_clone = Arc::clone(&graph);
1007            let handle = thread::spawn(move || {
1008                let triple = create_test_triple(
1009                    &format!("http://example.org/s{i}"),
1010                    "http://example.org/p1",
1011                    &format!("o{i}"),
1012                );
1013                graph_clone.insert(&triple)
1014            });
1015            handles.push(handle);
1016        }
1017
1018        for handle in handles {
1019            handle.join().expect("thread should not panic");
1020        }
1021
1022        assert_eq!(graph.len(), 10);
1023
1024        // Concurrent queries
1025        let mut handles = vec![];
1026        for _ in 0..10 {
1027            let graph_clone = Arc::clone(&graph);
1028            let handle = thread::spawn(move || graph_clone.query(None, None, None).len());
1029            handles.push(handle);
1030        }
1031
1032        for handle in handles {
1033            let count = handle.join().expect("thread should not panic");
1034            assert_eq!(count, 10);
1035        }
1036    }
1037}