Skip to main content

selene_graph/
text_index.rs

1//! Reusable BM25 postings indexes over graph node string properties.
2//!
3//! `TextIndex` is an in-memory maintained artifact for one `(label, property)`
4//! pair. Durable state stores only the registration; postings are derived from
5//! primary node rows at registration, recovery, compaction, and commit-time
6//! maintenance. The exact scorer in [`crate::text_search`] remains the
7//! correctness oracle; this module reuses the same tokenizer, IDF formula, and
8//! top-k ordering.
9
10use std::mem::size_of;
11use std::sync::Arc;
12
13use roaring::RoaringBitmap;
14use rustc_hash::FxHashMap;
15use smallvec::SmallVec;
16
17use selene_core::{CancellationChecker, DbString, NodeId, Value};
18
19use crate::error::{GraphError, GraphResult};
20use crate::graph::SeleneGraph;
21use crate::shared::SharedGraph;
22use crate::store::RowIndex;
23use crate::text_search::{
24    DocumentStats, TextSearchError, TextSearchHit, TextTopK, bm25_score, tokenize_borrowed,
25    unique_query_terms,
26};
27
28#[path = "text_index/builder.rs"]
29mod builder;
30#[path = "text_index/candidate.rs"]
31mod candidate;
32#[path = "text_index/maintenance.rs"]
33mod maintenance;
34use builder::TextIndexBuilder;
35
36type QueryDocumentFrequencies = SmallVec<[u32; 4]>;
37type QueryPostings<'a> = SmallVec<[Option<&'a [TextPosting]>; 4]>;
38
39pub(crate) use maintenance::{
40    apply_node_create, apply_node_delete, apply_node_update, rebuild_text_indexes,
41};
42
43/// In-memory BM25 postings index for one node `(label, property)` pair.
44#[derive(Clone, Debug)]
45pub struct TextIndex {
46    label: DbString,
47    property: DbString,
48    rows: RoaringBitmap,
49    document_lengths: FxHashMap<NodeId, u32>,
50    document_terms: FxHashMap<NodeId, Arc<[String]>>,
51    postings: FxHashMap<String, Arc<Vec<TextPosting>>>,
52    total_document_len: u64,
53    posting_count: usize,
54}
55
56impl TextIndex {
57    /// Build a postings index from the current graph snapshot.
58    ///
59    /// Only alive nodes that carry `label` and whose `property` value is a
60    /// non-empty string document are indexed. Non-string values are ignored, which
61    /// matches the exact BM25 scan path.
62    ///
63    /// # Errors
64    ///
65    /// Returns [`GraphError::Inconsistent`] if the label index references a row
66    /// without a resolvable node id or property row.
67    pub fn build(graph: &SeleneGraph, label: DbString, property: DbString) -> GraphResult<Self> {
68        let Some(label_rows) = graph.nodes_with_label(&label) else {
69            return Ok(TextIndexBuilder::empty(label, property).finish());
70        };
71        let label_row_capacity = usize::try_from(label_rows.len()).unwrap_or(usize::MAX);
72        let mut index = TextIndexBuilder::with_document_capacity(
73            label.clone(),
74            property.clone(),
75            label_row_capacity,
76        );
77
78        for raw_row in label_rows.iter() {
79            if !graph.node_store.is_alive(raw_row) {
80                continue;
81            }
82            let row = RowIndex::new(raw_row);
83            let node_id = graph
84                .node_id_for_row(row)
85                .ok_or_else(|| GraphError::Inconsistent {
86                    reason: format!(
87                        "label index row {raw_row} for {} has no node id",
88                        label.as_str()
89                    ),
90                })?;
91            let properties = graph
92                .node_store
93                .properties
94                .get(raw_row as usize)
95                .ok_or_else(|| GraphError::Inconsistent {
96                    reason: format!(
97                        "text index row {raw_row} for {} has no property row",
98                        label.as_str()
99                    ),
100                })?;
101            let Some(Value::String(text)) = properties.get(&property) else {
102                continue;
103            };
104            index.insert_document(raw_row, node_id, text.as_str());
105        }
106        Ok(index.finish())
107    }
108
109    /// Construct an empty postings index for `label.property`.
110    #[must_use]
111    pub fn empty(label: DbString, property: DbString) -> Self {
112        Self {
113            label,
114            property,
115            rows: RoaringBitmap::new(),
116            document_lengths: FxHashMap::default(),
117            document_terms: FxHashMap::default(),
118            postings: FxHashMap::default(),
119            total_document_len: 0,
120            posting_count: 0,
121        }
122    }
123
124    /// Return the indexed node label.
125    #[must_use]
126    pub const fn label(&self) -> &DbString {
127        &self.label
128    }
129
130    /// Return the indexed node property.
131    #[must_use]
132    pub const fn property(&self) -> &DbString {
133        &self.property
134    }
135
136    /// Return the indexed row bitmap.
137    #[must_use]
138    pub const fn rows(&self) -> &RoaringBitmap {
139        &self.rows
140    }
141
142    /// Return the number of indexed string documents.
143    #[must_use]
144    pub fn document_count(&self) -> usize {
145        self.document_lengths.len()
146    }
147
148    /// Return the number of distinct indexed terms.
149    #[must_use]
150    pub fn term_count(&self) -> usize {
151        self.postings.len()
152    }
153
154    /// Return the total number of term-document postings.
155    #[must_use]
156    pub const fn posting_count(&self) -> usize {
157        self.posting_count
158    }
159
160    /// Return aggregate index counters.
161    #[must_use]
162    pub fn stats(&self) -> TextIndexStats {
163        TextIndexStats {
164            indexed_rows: self.rows.len(),
165            documents: self.document_count(),
166            distinct_terms: self.term_count(),
167            postings: self.posting_count,
168            total_document_len: self.total_document_len,
169        }
170    }
171
172    /// Return an estimated memory usage snapshot for this index.
173    #[must_use]
174    pub fn memory_usage(&self) -> TextIndexMemoryUsage {
175        let row_bitmap_bytes = roaring_heap_bytes(&self.rows);
176        let row_bitmap_serialized_bytes = self.rows.serialized_size();
177        let document_length_bytes = self
178            .document_lengths
179            .capacity()
180            .saturating_mul(size_of::<(NodeId, u32)>());
181        let mut document_term_bytes = self
182            .document_terms
183            .capacity()
184            .saturating_mul(size_of::<(NodeId, Arc<[String]>)>());
185        for terms in self.document_terms.values() {
186            document_term_bytes =
187                document_term_bytes.saturating_add(terms.len().saturating_mul(size_of::<String>()));
188            for term in terms.iter() {
189                document_term_bytes = document_term_bytes.saturating_add(term.capacity());
190            }
191        }
192        let mut posting_bytes = 0usize;
193        let mut term_bytes = 0usize;
194        for (term, postings) in &self.postings {
195            term_bytes = term_bytes.saturating_add(term.capacity());
196            posting_bytes = posting_bytes
197                .saturating_add(postings.capacity().saturating_mul(size_of::<TextPosting>()));
198        }
199        let terms_table_bytes = self
200            .postings
201            .capacity()
202            .saturating_mul(size_of::<(String, Arc<Vec<TextPosting>>)>());
203        let estimated_index_bytes = size_of::<Self>()
204            .saturating_add(row_bitmap_bytes)
205            .saturating_add(document_length_bytes)
206            .saturating_add(document_term_bytes)
207            .saturating_add(terms_table_bytes)
208            .saturating_add(term_bytes)
209            .saturating_add(posting_bytes);
210        TextIndexMemoryUsage {
211            indexed_rows: self.rows.len(),
212            documents: self.document_count(),
213            distinct_terms: self.term_count(),
214            postings: self.posting_count,
215            row_bitmap_bytes,
216            row_bitmap_serialized_bytes,
217            document_length_bytes,
218            document_term_bytes,
219            terms_table_bytes,
220            term_bytes,
221            posting_bytes,
222            estimated_index_bytes,
223        }
224    }
225
226    /// Rank indexed documents for `query` using BM25.
227    #[must_use]
228    pub fn search(&self, query: &str, k: usize) -> Vec<TextSearchHit> {
229        self.search_checked(query, k, CancellationChecker::disabled())
230            .expect("disabled text-index checker cannot fail")
231    }
232
233    /// Rank indexed documents for `query` with cooperative cancellation checks.
234    ///
235    /// The query path visits only postings for query terms, not every indexed
236    /// document. Scores and tie ordering match the exact scan oracle.
237    ///
238    /// # Errors
239    ///
240    /// Returns [`TextSearchError::Cancelled`], [`TextSearchError::Timeout`], or
241    /// [`TextSearchError::NodeScanBudgetExceeded`] when the supplied checker
242    /// trips while collecting postings or scoring candidate documents.
243    pub fn search_checked(
244        &self,
245        query: &str,
246        k: usize,
247        checker: CancellationChecker<'_>,
248    ) -> Result<Vec<TextSearchHit>, TextSearchError> {
249        checker.check()?;
250        if k == 0 || self.document_lengths.is_empty() {
251            return Ok(Vec::new());
252        }
253        let query_terms = unique_query_terms(query);
254        if query_terms.is_empty() {
255            return Ok(Vec::new());
256        }
257
258        let mut document_frequencies = QueryDocumentFrequencies::with_capacity(query_terms.len());
259        let mut postings_by_term = QueryPostings::with_capacity(query_terms.len());
260        let mut candidate_capacity = 0usize;
261        for term in &query_terms {
262            match self.postings.get(term) {
263                Some(postings) => {
264                    candidate_capacity = candidate_capacity.saturating_add(postings.len());
265                    document_frequencies.push(u32::try_from(postings.len()).unwrap_or(u32::MAX));
266                    postings_by_term.push(Some(postings.as_slice()));
267                }
268                None => {
269                    document_frequencies.push(0);
270                    postings_by_term.push(None);
271                }
272            }
273        }
274        let candidate_capacity = candidate_capacity.min(self.document_lengths.len());
275        if candidate_capacity == 0 {
276            return Ok(Vec::new());
277        }
278
279        let mut candidates: FxHashMap<NodeId, DocumentStats> = FxHashMap::default();
280        candidates.reserve(candidate_capacity);
281        let mut postings_since_check = 0usize;
282
283        for (term_index, postings) in postings_by_term.into_iter().enumerate() {
284            let Some(postings) = postings else {
285                continue;
286            };
287            for posting in postings {
288                postings_since_check += 1;
289                if postings_since_check >= crate::text_search::TEXT_SEARCH_CANCEL_STRIDE {
290                    checker.check()?;
291                    postings_since_check = 0;
292                }
293                let len = *self
294                    .document_lengths
295                    .get(&posting.node_id)
296                    .expect("posting node must have document length");
297                let doc = candidates.entry(posting.node_id).or_insert_with(|| {
298                    DocumentStats::zero(posting.node_id, len, query_terms.len())
299                });
300                doc.term_counts[term_index] = posting.term_count;
301            }
302        }
303
304        if candidates.is_empty() {
305            return Ok(Vec::new());
306        }
307        let corpus_len = self.document_lengths.len() as f64;
308        let average_document_len = self.total_document_len as f64 / corpus_len;
309        let mut top_k = TextTopK::new(k);
310        let mut docs_since_check = 0usize;
311        for doc in candidates.into_values() {
312            docs_since_check += 1;
313            if docs_since_check >= crate::text_search::TEXT_SEARCH_CANCEL_STRIDE {
314                checker.note_nodes_scanned(docs_since_check)?;
315                docs_since_check = 0;
316            }
317            let score = bm25_score(
318                &doc,
319                &document_frequencies,
320                corpus_len,
321                average_document_len,
322            );
323            if score > 0.0 {
324                top_k.push(doc.node_id, score);
325            }
326        }
327        if docs_since_check > 0 {
328            checker.note_nodes_scanned(docs_since_check)?;
329        }
330        Ok(top_k.into_hits())
331    }
332
333    pub(crate) fn insert_document(&mut self, row: u32, node_id: NodeId, text: &str) {
334        self.remove_document(row, node_id);
335        let mut counts: FxHashMap<String, u32> = FxHashMap::default();
336        let mut len = 0_u32;
337        for token in tokenize_borrowed(text) {
338            len = len.saturating_add(1);
339            let count = counts.entry(token.into_owned()).or_insert(0);
340            *count = count.saturating_add(1);
341        }
342        if len == 0 {
343            return;
344        }
345
346        self.rows.insert(row);
347        self.document_lengths.insert(node_id, len);
348        self.total_document_len = self.total_document_len.saturating_add(u64::from(len));
349        let mut terms = Vec::with_capacity(counts.len());
350        for (term, term_count) in counts {
351            let postings = self
352                .postings
353                .entry(term.clone())
354                .or_insert_with(|| Arc::new(Vec::new()));
355            let postings = Arc::make_mut(postings);
356            match postings.binary_search_by_key(&node_id, |posting| posting.node_id) {
357                Ok(index) => {
358                    postings[index].term_count = term_count;
359                }
360                Err(index) => {
361                    postings.insert(
362                        index,
363                        TextPosting {
364                            node_id,
365                            term_count,
366                        },
367                    );
368                    self.posting_count = self.posting_count.saturating_add(1);
369                }
370            }
371            terms.push(term);
372        }
373        self.document_terms.insert(node_id, Arc::from(terms));
374    }
375
376    pub(crate) fn remove_document(&mut self, row: u32, node_id: NodeId) {
377        self.rows.remove(row);
378        let Some(length) = self.document_lengths.remove(&node_id) else {
379            return;
380        };
381        self.total_document_len = self.total_document_len.saturating_sub(u64::from(length));
382        let Some(terms) = self.document_terms.remove(&node_id) else {
383            return;
384        };
385        for term in terms.iter() {
386            let remove_term = if let Some(postings) = self.postings.get_mut(term.as_str()) {
387                let postings = Arc::make_mut(postings);
388                if let Ok(index) =
389                    postings.binary_search_by_key(&node_id, |posting| posting.node_id)
390                {
391                    postings.remove(index);
392                    self.posting_count = self.posting_count.saturating_sub(1);
393                }
394                postings.is_empty()
395            } else {
396                false
397            };
398            if remove_term {
399                self.postings.remove(term.as_str());
400            }
401        }
402    }
403
404    pub(crate) fn rows_eq(&self, reference: &Self) -> bool {
405        self.rows == reference.rows
406            && self.document_lengths == reference.document_lengths
407            && self.total_document_len == reference.total_document_len
408            && self.posting_count == reference.posting_count
409            && self.postings == reference.postings
410    }
411}
412
413/// Aggregate counters for a [`TextIndex`].
414#[derive(Clone, Copy, Debug, Eq, PartialEq)]
415pub struct TextIndexStats {
416    /// Number of indexed graph rows.
417    pub indexed_rows: u64,
418    /// Number of string documents with at least one token.
419    pub documents: usize,
420    /// Number of distinct terms in the vocabulary.
421    pub distinct_terms: usize,
422    /// Number of term-document postings.
423    pub postings: usize,
424    /// Sum of token counts across indexed documents.
425    pub total_document_len: u64,
426}
427
428/// Estimated memory usage for one [`TextIndex`].
429#[derive(Clone, Copy, Debug, Eq, PartialEq)]
430pub struct TextIndexMemoryUsage {
431    /// Number of indexed graph rows.
432    pub indexed_rows: u64,
433    /// Number of string documents with at least one token.
434    pub documents: usize,
435    /// Number of distinct terms in the vocabulary.
436    pub distinct_terms: usize,
437    /// Number of term-document postings.
438    pub postings: usize,
439    /// Estimated heap bytes used by the row bitmap containers.
440    pub row_bitmap_bytes: usize,
441    /// Serialized byte size of the row bitmap.
442    pub row_bitmap_serialized_bytes: usize,
443    /// Estimated bytes for the document-length map.
444    pub document_length_bytes: usize,
445    /// Estimated bytes for per-document term lists used by commit maintenance.
446    pub document_term_bytes: usize,
447    /// Estimated bytes for the postings hash table.
448    pub terms_table_bytes: usize,
449    /// Estimated bytes for term string buffers.
450    pub term_bytes: usize,
451    /// Estimated bytes for posting vectors.
452    pub posting_bytes: usize,
453    /// Estimated bytes reachable from the index object.
454    pub estimated_index_bytes: usize,
455}
456
457impl SeleneGraph {
458    /// Build a reusable BM25 postings index for `label.property`.
459    ///
460    /// The returned index is tied to this graph snapshot. Mutations committed
461    /// after the snapshot is read require rebuilding or durable registration in a
462    /// later maintained-index layer.
463    ///
464    /// # Errors
465    ///
466    /// Returns [`GraphError::Inconsistent`] if graph label/property columns are
467    /// internally inconsistent while the snapshot is scanned.
468    pub fn build_text_index(
469        &self,
470        label: &DbString,
471        property: &DbString,
472    ) -> GraphResult<TextIndex> {
473        TextIndex::build(self, label.clone(), property.clone())
474    }
475
476    /// Rank string-valued node properties through a transient postings index.
477    ///
478    /// This is primarily useful for tests and benchmark comparisons. Repeated
479    /// production queries should build a [`TextIndex`] once and call
480    /// [`TextIndex::search`] directly.
481    ///
482    /// # Errors
483    ///
484    /// Returns [`GraphError::Inconsistent`] if index construction observes corrupt
485    /// graph columns.
486    pub fn indexed_text_search_nodes(
487        &self,
488        label: &DbString,
489        property: &DbString,
490        query: &str,
491        k: usize,
492    ) -> GraphResult<Vec<TextSearchHit>> {
493        Ok(self.build_text_index(label, property)?.search(query, k))
494    }
495}
496
497impl SharedGraph {
498    /// Build a reusable BM25 postings index from the current shared snapshot.
499    ///
500    /// # Errors
501    ///
502    /// Returns [`GraphError::Inconsistent`] if index construction observes corrupt
503    /// graph columns.
504    pub fn build_text_index(
505        &self,
506        label: &DbString,
507        property: &DbString,
508    ) -> GraphResult<TextIndex> {
509        self.read().build_text_index(label, property)
510    }
511
512    /// Rank string-valued node properties through a transient postings index.
513    ///
514    /// # Errors
515    ///
516    /// Returns [`GraphError::Inconsistent`] if index construction observes corrupt
517    /// graph columns.
518    pub fn indexed_text_search_nodes(
519        &self,
520        label: &DbString,
521        property: &DbString,
522        query: &str,
523        k: usize,
524    ) -> GraphResult<Vec<TextSearchHit>> {
525        self.read()
526            .indexed_text_search_nodes(label, property, query, k)
527    }
528}
529
530#[derive(Clone, Copy, Debug, Eq, PartialEq)]
531struct TextPosting {
532    node_id: NodeId,
533    term_count: u32,
534}
535
536fn roaring_heap_bytes(rows: &RoaringBitmap) -> usize {
537    let statistics = rows.statistics();
538    usize::try_from(
539        statistics
540            .n_bytes_array_containers
541            .saturating_add(statistics.n_bytes_run_containers)
542            .saturating_add(statistics.n_bytes_bitset_containers),
543    )
544    .unwrap_or(usize::MAX)
545}
546
547#[cfg(test)]
548#[path = "text_index/tests.rs"]
549mod tests;