Skip to main content

selene_graph/
text_index.rs

1//! Reusable BM25 postings indexes over graph node string properties.
2//!
3//! `TextIndex` is an in-memory maintained artifact for one `(label, property)`
4//! pair. Durable state stores only the registration; postings are derived from
5//! primary node rows at registration, recovery, compaction, and commit-time
6//! maintenance. The exact scorer in [`crate::text_search`] remains the
7//! correctness oracle; this module reuses the same tokenizer, IDF formula, and
8//! top-k ordering.
9
10use std::collections::BTreeSet;
11use std::mem::size_of;
12use std::sync::Arc;
13
14use roaring::RoaringBitmap;
15use rustc_hash::FxHashMap;
16
17use selene_core::{CancellationChecker, DbString, LabelSet, NodeId, PropertyMap, Value};
18
19use crate::error::{GraphError, GraphResult};
20use crate::graph::{SeleneGraph, TextIndexEntry};
21use crate::shared::SharedGraph;
22use crate::store::RowIndex;
23use crate::text_search::{
24    DocumentStats, TextSearchError, TextSearchHit, TextTopK, bm25_score, tokenize_borrowed,
25    unique_query_terms,
26};
27
28#[path = "text_index/builder.rs"]
29mod builder;
30#[path = "text_index/candidate.rs"]
31mod candidate;
32use builder::TextIndexBuilder;
33
34/// In-memory BM25 postings index for one node `(label, property)` pair.
35#[derive(Clone, Debug)]
36pub struct TextIndex {
37    label: DbString,
38    property: DbString,
39    rows: RoaringBitmap,
40    document_lengths: FxHashMap<NodeId, u32>,
41    document_terms: FxHashMap<NodeId, Arc<[String]>>,
42    postings: FxHashMap<String, Arc<Vec<TextPosting>>>,
43    total_document_len: u64,
44    posting_count: usize,
45}
46
47impl TextIndex {
48    /// Build a postings index from the current graph snapshot.
49    ///
50    /// Only alive nodes that carry `label` and whose `property` value is a
51    /// non-empty string document are indexed. Non-string values are ignored, which
52    /// matches the exact BM25 scan path.
53    ///
54    /// # Errors
55    ///
56    /// Returns [`GraphError::Inconsistent`] if the label index references a row
57    /// without a resolvable node id or property row.
58    pub fn build(graph: &SeleneGraph, label: DbString, property: DbString) -> GraphResult<Self> {
59        let mut index = TextIndexBuilder::empty(label.clone(), property.clone());
60        let Some(label_rows) = graph.nodes_with_label(&label) else {
61            return Ok(index.finish());
62        };
63
64        for raw_row in label_rows.iter() {
65            if !graph.node_store.is_alive(raw_row) {
66                continue;
67            }
68            let row = RowIndex::new(raw_row);
69            let node_id = graph
70                .node_id_for_row(row)
71                .ok_or_else(|| GraphError::Inconsistent {
72                    reason: format!(
73                        "label index row {raw_row} for {} has no node id",
74                        label.as_str()
75                    ),
76                })?;
77            let properties = graph
78                .node_store
79                .properties
80                .get(raw_row as usize)
81                .ok_or_else(|| GraphError::Inconsistent {
82                    reason: format!(
83                        "text index row {raw_row} for {} has no property row",
84                        label.as_str()
85                    ),
86                })?;
87            let Some(Value::String(text)) = properties.get(&property) else {
88                continue;
89            };
90            index.insert_document(raw_row, node_id, text.as_str());
91        }
92        Ok(index.finish())
93    }
94
95    /// Construct an empty postings index for `label.property`.
96    #[must_use]
97    pub fn empty(label: DbString, property: DbString) -> Self {
98        Self {
99            label,
100            property,
101            rows: RoaringBitmap::new(),
102            document_lengths: FxHashMap::default(),
103            document_terms: FxHashMap::default(),
104            postings: FxHashMap::default(),
105            total_document_len: 0,
106            posting_count: 0,
107        }
108    }
109
110    /// Return the indexed node label.
111    #[must_use]
112    pub const fn label(&self) -> &DbString {
113        &self.label
114    }
115
116    /// Return the indexed node property.
117    #[must_use]
118    pub const fn property(&self) -> &DbString {
119        &self.property
120    }
121
122    /// Return the indexed row bitmap.
123    #[must_use]
124    pub const fn rows(&self) -> &RoaringBitmap {
125        &self.rows
126    }
127
128    /// Return the number of indexed string documents.
129    #[must_use]
130    pub fn document_count(&self) -> usize {
131        self.document_lengths.len()
132    }
133
134    /// Return the number of distinct indexed terms.
135    #[must_use]
136    pub fn term_count(&self) -> usize {
137        self.postings.len()
138    }
139
140    /// Return the total number of term-document postings.
141    #[must_use]
142    pub const fn posting_count(&self) -> usize {
143        self.posting_count
144    }
145
146    /// Return aggregate index counters.
147    #[must_use]
148    pub fn stats(&self) -> TextIndexStats {
149        TextIndexStats {
150            indexed_rows: self.rows.len(),
151            documents: self.document_count(),
152            distinct_terms: self.term_count(),
153            postings: self.posting_count,
154            total_document_len: self.total_document_len,
155        }
156    }
157
158    /// Return an estimated memory usage snapshot for this index.
159    #[must_use]
160    pub fn memory_usage(&self) -> TextIndexMemoryUsage {
161        let row_bitmap_bytes = roaring_heap_bytes(&self.rows);
162        let row_bitmap_serialized_bytes = self.rows.serialized_size();
163        let document_length_bytes = self
164            .document_lengths
165            .capacity()
166            .saturating_mul(size_of::<(NodeId, u32)>());
167        let mut document_term_bytes = self
168            .document_terms
169            .capacity()
170            .saturating_mul(size_of::<(NodeId, Arc<[String]>)>());
171        for terms in self.document_terms.values() {
172            document_term_bytes =
173                document_term_bytes.saturating_add(terms.len().saturating_mul(size_of::<String>()));
174            for term in terms.iter() {
175                document_term_bytes = document_term_bytes.saturating_add(term.capacity());
176            }
177        }
178        let mut posting_bytes = 0usize;
179        let mut term_bytes = 0usize;
180        for (term, postings) in &self.postings {
181            term_bytes = term_bytes.saturating_add(term.capacity());
182            posting_bytes = posting_bytes
183                .saturating_add(postings.capacity().saturating_mul(size_of::<TextPosting>()));
184        }
185        let terms_table_bytes = self
186            .postings
187            .capacity()
188            .saturating_mul(size_of::<(String, Arc<Vec<TextPosting>>)>());
189        let estimated_index_bytes = size_of::<Self>()
190            .saturating_add(row_bitmap_bytes)
191            .saturating_add(document_length_bytes)
192            .saturating_add(document_term_bytes)
193            .saturating_add(terms_table_bytes)
194            .saturating_add(term_bytes)
195            .saturating_add(posting_bytes);
196        TextIndexMemoryUsage {
197            indexed_rows: self.rows.len(),
198            documents: self.document_count(),
199            distinct_terms: self.term_count(),
200            postings: self.posting_count,
201            row_bitmap_bytes,
202            row_bitmap_serialized_bytes,
203            document_length_bytes,
204            document_term_bytes,
205            terms_table_bytes,
206            term_bytes,
207            posting_bytes,
208            estimated_index_bytes,
209        }
210    }
211
212    /// Rank indexed documents for `query` using BM25.
213    #[must_use]
214    pub fn search(&self, query: &str, k: usize) -> Vec<TextSearchHit> {
215        self.search_checked(query, k, CancellationChecker::disabled())
216            .expect("disabled text-index checker cannot fail")
217    }
218
219    /// Rank indexed documents for `query` with cooperative cancellation checks.
220    ///
221    /// The query path visits only postings for query terms, not every indexed
222    /// document. Scores and tie ordering match the exact scan oracle.
223    ///
224    /// # Errors
225    ///
226    /// Returns [`TextSearchError::Cancelled`] or [`TextSearchError::Timeout`] when
227    /// the supplied checker trips while collecting postings.
228    pub fn search_checked(
229        &self,
230        query: &str,
231        k: usize,
232        checker: CancellationChecker<'_>,
233    ) -> Result<Vec<TextSearchHit>, TextSearchError> {
234        checker.check()?;
235        if k == 0 || self.document_lengths.is_empty() {
236            return Ok(Vec::new());
237        }
238        let query_terms = unique_query_terms(query);
239        if query_terms.is_empty() {
240            return Ok(Vec::new());
241        }
242
243        let mut document_frequencies = vec![0_u32; query_terms.len()];
244        let mut candidates: FxHashMap<NodeId, DocumentStats> = FxHashMap::default();
245        let mut postings_since_check = 0usize;
246
247        for (term_index, term) in query_terms.iter().enumerate() {
248            let Some(postings) = self.postings.get(term) else {
249                continue;
250            };
251            document_frequencies[term_index] = u32::try_from(postings.len()).unwrap_or(u32::MAX);
252            for posting in postings.iter() {
253                postings_since_check += 1;
254                if postings_since_check >= crate::text_search::TEXT_SEARCH_CANCEL_STRIDE {
255                    checker.check()?;
256                    postings_since_check = 0;
257                }
258                let len = *self
259                    .document_lengths
260                    .get(&posting.node_id)
261                    .expect("posting node must have document length");
262                let doc = candidates.entry(posting.node_id).or_insert_with(|| {
263                    DocumentStats::zero(posting.node_id, len, query_terms.len())
264                });
265                doc.term_counts[term_index] = posting.term_count;
266            }
267        }
268
269        if candidates.is_empty() {
270            return Ok(Vec::new());
271        }
272        let corpus_len = self.document_lengths.len() as f64;
273        let average_document_len = self.total_document_len as f64 / corpus_len;
274        let mut top_k = TextTopK::new(k);
275        for doc in candidates.into_values() {
276            let score = bm25_score(
277                &doc,
278                &document_frequencies,
279                corpus_len,
280                average_document_len,
281            );
282            if score > 0.0 {
283                top_k.push(doc.node_id, score);
284            }
285        }
286        Ok(top_k.into_hits())
287    }
288
289    pub(crate) fn insert_document(&mut self, row: u32, node_id: NodeId, text: &str) {
290        self.remove_document(row, node_id);
291        let mut counts: FxHashMap<String, u32> = FxHashMap::default();
292        let mut len = 0_u32;
293        for token in tokenize_borrowed(text) {
294            len = len.saturating_add(1);
295            let count = counts.entry(token.into_owned()).or_insert(0);
296            *count = count.saturating_add(1);
297        }
298        if len == 0 {
299            return;
300        }
301
302        self.rows.insert(row);
303        self.document_lengths.insert(node_id, len);
304        self.total_document_len = self.total_document_len.saturating_add(u64::from(len));
305        let mut terms = Vec::with_capacity(counts.len());
306        for (term, term_count) in counts {
307            let postings = self
308                .postings
309                .entry(term.clone())
310                .or_insert_with(|| Arc::new(Vec::new()));
311            let postings = Arc::make_mut(postings);
312            match postings.binary_search_by_key(&node_id, |posting| posting.node_id) {
313                Ok(index) => {
314                    postings[index].term_count = term_count;
315                }
316                Err(index) => {
317                    postings.insert(
318                        index,
319                        TextPosting {
320                            node_id,
321                            term_count,
322                        },
323                    );
324                    self.posting_count = self.posting_count.saturating_add(1);
325                }
326            }
327            terms.push(term);
328        }
329        self.document_terms.insert(node_id, Arc::from(terms));
330    }
331
332    pub(crate) fn remove_document(&mut self, row: u32, node_id: NodeId) {
333        self.rows.remove(row);
334        let Some(length) = self.document_lengths.remove(&node_id) else {
335            return;
336        };
337        self.total_document_len = self.total_document_len.saturating_sub(u64::from(length));
338        let Some(terms) = self.document_terms.remove(&node_id) else {
339            return;
340        };
341        for term in terms.iter() {
342            let remove_term = if let Some(postings) = self.postings.get_mut(term.as_str()) {
343                let postings = Arc::make_mut(postings);
344                if let Ok(index) =
345                    postings.binary_search_by_key(&node_id, |posting| posting.node_id)
346                {
347                    postings.remove(index);
348                    self.posting_count = self.posting_count.saturating_sub(1);
349                }
350                postings.is_empty()
351            } else {
352                false
353            };
354            if remove_term {
355                self.postings.remove(term.as_str());
356            }
357        }
358    }
359
360    pub(crate) fn rows_eq(&self, reference: &Self) -> bool {
361        self.rows == reference.rows
362            && self.document_lengths == reference.document_lengths
363            && self.total_document_len == reference.total_document_len
364            && self.posting_count == reference.posting_count
365            && self.postings == reference.postings
366    }
367}
368
369/// Aggregate counters for a [`TextIndex`].
370#[derive(Clone, Copy, Debug, Eq, PartialEq)]
371pub struct TextIndexStats {
372    /// Number of indexed graph rows.
373    pub indexed_rows: u64,
374    /// Number of string documents with at least one token.
375    pub documents: usize,
376    /// Number of distinct terms in the vocabulary.
377    pub distinct_terms: usize,
378    /// Number of term-document postings.
379    pub postings: usize,
380    /// Sum of token counts across indexed documents.
381    pub total_document_len: u64,
382}
383
384/// Estimated memory usage for one [`TextIndex`].
385#[derive(Clone, Copy, Debug, Eq, PartialEq)]
386pub struct TextIndexMemoryUsage {
387    /// Number of indexed graph rows.
388    pub indexed_rows: u64,
389    /// Number of string documents with at least one token.
390    pub documents: usize,
391    /// Number of distinct terms in the vocabulary.
392    pub distinct_terms: usize,
393    /// Number of term-document postings.
394    pub postings: usize,
395    /// Estimated heap bytes used by the row bitmap containers.
396    pub row_bitmap_bytes: usize,
397    /// Serialized byte size of the row bitmap.
398    pub row_bitmap_serialized_bytes: usize,
399    /// Estimated bytes for the document-length map.
400    pub document_length_bytes: usize,
401    /// Estimated bytes for per-document term lists used by commit maintenance.
402    pub document_term_bytes: usize,
403    /// Estimated bytes for the postings hash table.
404    pub terms_table_bytes: usize,
405    /// Estimated bytes for term string buffers.
406    pub term_bytes: usize,
407    /// Estimated bytes for posting vectors.
408    pub posting_bytes: usize,
409    /// Estimated bytes reachable from the index object.
410    pub estimated_index_bytes: usize,
411}
412
413impl SeleneGraph {
414    /// Build a reusable BM25 postings index for `label.property`.
415    ///
416    /// The returned index is tied to this graph snapshot. Mutations committed
417    /// after the snapshot is read require rebuilding or durable registration in a
418    /// later maintained-index layer.
419    ///
420    /// # Errors
421    ///
422    /// Returns [`GraphError::Inconsistent`] if graph label/property columns are
423    /// internally inconsistent while the snapshot is scanned.
424    pub fn build_text_index(
425        &self,
426        label: &DbString,
427        property: &DbString,
428    ) -> GraphResult<TextIndex> {
429        TextIndex::build(self, label.clone(), property.clone())
430    }
431
432    /// Rank string-valued node properties through a transient postings index.
433    ///
434    /// This is primarily useful for tests and benchmark comparisons. Repeated
435    /// production queries should build a [`TextIndex`] once and call
436    /// [`TextIndex::search`] directly.
437    ///
438    /// # Errors
439    ///
440    /// Returns [`GraphError::Inconsistent`] if index construction observes corrupt
441    /// graph columns.
442    pub fn indexed_text_search_nodes(
443        &self,
444        label: &DbString,
445        property: &DbString,
446        query: &str,
447        k: usize,
448    ) -> GraphResult<Vec<TextSearchHit>> {
449        Ok(self.build_text_index(label, property)?.search(query, k))
450    }
451}
452
453impl SharedGraph {
454    /// Build a reusable BM25 postings index from the current shared snapshot.
455    ///
456    /// # Errors
457    ///
458    /// Returns [`GraphError::Inconsistent`] if index construction observes corrupt
459    /// graph columns.
460    pub fn build_text_index(
461        &self,
462        label: &DbString,
463        property: &DbString,
464    ) -> GraphResult<TextIndex> {
465        self.read().build_text_index(label, property)
466    }
467
468    /// Rank string-valued node properties through a transient postings index.
469    ///
470    /// # Errors
471    ///
472    /// Returns [`GraphError::Inconsistent`] if index construction observes corrupt
473    /// graph columns.
474    pub fn indexed_text_search_nodes(
475        &self,
476        label: &DbString,
477        property: &DbString,
478        query: &str,
479        k: usize,
480    ) -> GraphResult<Vec<TextSearchHit>> {
481        self.read()
482            .indexed_text_search_nodes(label, property, query, k)
483    }
484}
485
486type TextIndexMap = FxHashMap<(DbString, DbString), TextIndexEntry>;
487
488pub(crate) fn apply_node_create(
489    indexes: &mut TextIndexMap,
490    labels: &LabelSet,
491    props: &PropertyMap,
492    row: u32,
493    node_id: NodeId,
494) {
495    for label in labels.iter() {
496        for (property, value) in props.iter() {
497            insert_commit(
498                indexes,
499                label.clone(),
500                property.clone(),
501                value,
502                row,
503                node_id,
504            );
505        }
506    }
507}
508
509pub(crate) fn apply_node_delete(
510    indexes: &mut TextIndexMap,
511    labels: &LabelSet,
512    props: &PropertyMap,
513    row: u32,
514    node_id: NodeId,
515) {
516    for label in labels.iter() {
517        for (property, value) in props.iter() {
518            remove_commit(
519                indexes,
520                label.clone(),
521                property.clone(),
522                value,
523                row,
524                node_id,
525            );
526        }
527    }
528}
529
530pub(crate) fn apply_node_update(
531    indexes: &mut TextIndexMap,
532    old_labels: &LabelSet,
533    old_props: &PropertyMap,
534    new_labels: &LabelSet,
535    new_props: &PropertyMap,
536    row: u32,
537    node_id: NodeId,
538) {
539    let candidates = candidate_keys(indexes, old_labels, old_props, new_labels, new_props);
540    for (label, property) in candidates {
541        match (
542            indexable_text(old_labels, old_props, &label, &property),
543            indexable_text(new_labels, new_props, &label, &property),
544        ) {
545            (Some(old_text), Some(new_text)) if old_text == new_text => {}
546            (Some(_), Some(new_text)) => {
547                insert_commit(
548                    indexes,
549                    label.clone(),
550                    property.clone(),
551                    new_text,
552                    row,
553                    node_id,
554                );
555            }
556            (Some(old_text), None) => {
557                remove_commit(
558                    indexes,
559                    label.clone(),
560                    property.clone(),
561                    old_text,
562                    row,
563                    node_id,
564                );
565            }
566            (None, Some(new_text)) => {
567                insert_commit(
568                    indexes,
569                    label.clone(),
570                    property.clone(),
571                    new_text,
572                    row,
573                    node_id,
574                );
575            }
576            (None, None) => {}
577        }
578    }
579}
580
581pub(crate) fn rebuild_text_indexes(graph: &mut SeleneGraph) -> GraphResult<()> {
582    let registrations: Vec<((DbString, DbString), Option<DbString>)> = graph
583        .text_index
584        .iter()
585        .map(|(key, entry)| (key.clone(), entry.name.clone()))
586        .collect();
587    graph.text_index.clear();
588    for ((label, property), name) in registrations {
589        let index = TextIndex::build(graph, label.clone(), property.clone())?;
590        graph
591            .text_index
592            .insert((label, property), TextIndexEntry::new(index, name));
593    }
594    Ok(())
595}
596
597fn candidate_keys(
598    indexes: &TextIndexMap,
599    old_labels: &LabelSet,
600    old_props: &PropertyMap,
601    new_labels: &LabelSet,
602    new_props: &PropertyMap,
603) -> BTreeSet<(DbString, DbString)> {
604    if indexes.is_empty() {
605        return BTreeSet::new();
606    }
607    let mut labels: BTreeSet<DbString> = BTreeSet::new();
608    labels.extend(old_labels.iter().cloned());
609    labels.extend(new_labels.iter().cloned());
610
611    let mut properties: BTreeSet<DbString> = BTreeSet::new();
612    properties.extend(old_props.keys().cloned());
613    properties.extend(new_props.keys().cloned());
614
615    let mut candidates = BTreeSet::new();
616    for label in &labels {
617        for property in &properties {
618            let key = (label.clone(), property.clone());
619            if indexes.contains_key(&key) {
620                candidates.insert(key);
621            }
622        }
623    }
624    candidates
625}
626
627fn indexable_text<'a>(
628    labels: &LabelSet,
629    props: &'a PropertyMap,
630    label: &DbString,
631    property: &DbString,
632) -> Option<&'a str> {
633    if !labels.contains(label) {
634        return None;
635    }
636    match props.get(property) {
637        Some(Value::String(text)) => Some(text.as_str()),
638        _ => None,
639    }
640}
641
642fn insert_commit(
643    indexes: &mut TextIndexMap,
644    label: DbString,
645    property: DbString,
646    value: impl TextValue,
647    row: u32,
648    node_id: NodeId,
649) {
650    let Some(text) = value.text() else {
651        return;
652    };
653    if let Some(entry) = indexes.get_mut(&(label, property)) {
654        std::sync::Arc::make_mut(&mut entry.index).insert_document(row, node_id, text);
655    }
656}
657
658fn remove_commit(
659    indexes: &mut TextIndexMap,
660    label: DbString,
661    property: DbString,
662    value: impl TextValue,
663    row: u32,
664    node_id: NodeId,
665) {
666    if value.text().is_none() {
667        return;
668    }
669    if let Some(entry) = indexes.get_mut(&(label, property)) {
670        std::sync::Arc::make_mut(&mut entry.index).remove_document(row, node_id);
671    }
672}
673
674trait TextValue {
675    fn text(&self) -> Option<&str>;
676}
677
678impl TextValue for &Value {
679    fn text(&self) -> Option<&str> {
680        match self {
681            Value::String(text) => Some(text.as_str()),
682            _ => None,
683        }
684    }
685}
686
687impl TextValue for &str {
688    fn text(&self) -> Option<&str> {
689        Some(self)
690    }
691}
692
693#[derive(Clone, Copy, Debug, Eq, PartialEq)]
694struct TextPosting {
695    node_id: NodeId,
696    term_count: u32,
697}
698
699fn roaring_heap_bytes(rows: &RoaringBitmap) -> usize {
700    let statistics = rows.statistics();
701    usize::try_from(
702        statistics
703            .n_bytes_array_containers
704            .saturating_add(statistics.n_bytes_run_containers)
705            .saturating_add(statistics.n_bytes_bitset_containers),
706    )
707    .unwrap_or(usize::MAX)
708}
709
710#[cfg(test)]
711#[path = "text_index/tests.rs"]
712mod tests;