1use std::collections::BTreeSet;
11use std::mem::size_of;
12use std::sync::Arc;
13
14use roaring::RoaringBitmap;
15use rustc_hash::FxHashMap;
16
17use selene_core::{CancellationChecker, DbString, LabelSet, NodeId, PropertyMap, Value};
18
19use crate::error::{GraphError, GraphResult};
20use crate::graph::{SeleneGraph, TextIndexEntry};
21use crate::shared::SharedGraph;
22use crate::store::RowIndex;
23use crate::text_search::{
24 DocumentStats, TextSearchError, TextSearchHit, TextTopK, bm25_score, tokenize_borrowed,
25 unique_query_terms,
26};
27
28#[path = "text_index/builder.rs"]
29mod builder;
30#[path = "text_index/candidate.rs"]
31mod candidate;
32use builder::TextIndexBuilder;
33
34#[derive(Clone, Debug)]
36pub struct TextIndex {
37 label: DbString,
38 property: DbString,
39 rows: RoaringBitmap,
40 document_lengths: FxHashMap<NodeId, u32>,
41 document_terms: FxHashMap<NodeId, Arc<[String]>>,
42 postings: FxHashMap<String, Arc<Vec<TextPosting>>>,
43 total_document_len: u64,
44 posting_count: usize,
45}
46
47impl TextIndex {
48 pub fn build(graph: &SeleneGraph, label: DbString, property: DbString) -> GraphResult<Self> {
59 let mut index = TextIndexBuilder::empty(label.clone(), property.clone());
60 let Some(label_rows) = graph.nodes_with_label(&label) else {
61 return Ok(index.finish());
62 };
63
64 for raw_row in label_rows.iter() {
65 if !graph.node_store.is_alive(raw_row) {
66 continue;
67 }
68 let row = RowIndex::new(raw_row);
69 let node_id = graph
70 .node_id_for_row(row)
71 .ok_or_else(|| GraphError::Inconsistent {
72 reason: format!(
73 "label index row {raw_row} for {} has no node id",
74 label.as_str()
75 ),
76 })?;
77 let properties = graph
78 .node_store
79 .properties
80 .get(raw_row as usize)
81 .ok_or_else(|| GraphError::Inconsistent {
82 reason: format!(
83 "text index row {raw_row} for {} has no property row",
84 label.as_str()
85 ),
86 })?;
87 let Some(Value::String(text)) = properties.get(&property) else {
88 continue;
89 };
90 index.insert_document(raw_row, node_id, text.as_str());
91 }
92 Ok(index.finish())
93 }
94
95 #[must_use]
97 pub fn empty(label: DbString, property: DbString) -> Self {
98 Self {
99 label,
100 property,
101 rows: RoaringBitmap::new(),
102 document_lengths: FxHashMap::default(),
103 document_terms: FxHashMap::default(),
104 postings: FxHashMap::default(),
105 total_document_len: 0,
106 posting_count: 0,
107 }
108 }
109
110 #[must_use]
112 pub const fn label(&self) -> &DbString {
113 &self.label
114 }
115
116 #[must_use]
118 pub const fn property(&self) -> &DbString {
119 &self.property
120 }
121
122 #[must_use]
124 pub const fn rows(&self) -> &RoaringBitmap {
125 &self.rows
126 }
127
128 #[must_use]
130 pub fn document_count(&self) -> usize {
131 self.document_lengths.len()
132 }
133
134 #[must_use]
136 pub fn term_count(&self) -> usize {
137 self.postings.len()
138 }
139
140 #[must_use]
142 pub const fn posting_count(&self) -> usize {
143 self.posting_count
144 }
145
146 #[must_use]
148 pub fn stats(&self) -> TextIndexStats {
149 TextIndexStats {
150 indexed_rows: self.rows.len(),
151 documents: self.document_count(),
152 distinct_terms: self.term_count(),
153 postings: self.posting_count,
154 total_document_len: self.total_document_len,
155 }
156 }
157
158 #[must_use]
160 pub fn memory_usage(&self) -> TextIndexMemoryUsage {
161 let row_bitmap_bytes = roaring_heap_bytes(&self.rows);
162 let row_bitmap_serialized_bytes = self.rows.serialized_size();
163 let document_length_bytes = self
164 .document_lengths
165 .capacity()
166 .saturating_mul(size_of::<(NodeId, u32)>());
167 let mut document_term_bytes = self
168 .document_terms
169 .capacity()
170 .saturating_mul(size_of::<(NodeId, Arc<[String]>)>());
171 for terms in self.document_terms.values() {
172 document_term_bytes =
173 document_term_bytes.saturating_add(terms.len().saturating_mul(size_of::<String>()));
174 for term in terms.iter() {
175 document_term_bytes = document_term_bytes.saturating_add(term.capacity());
176 }
177 }
178 let mut posting_bytes = 0usize;
179 let mut term_bytes = 0usize;
180 for (term, postings) in &self.postings {
181 term_bytes = term_bytes.saturating_add(term.capacity());
182 posting_bytes = posting_bytes
183 .saturating_add(postings.capacity().saturating_mul(size_of::<TextPosting>()));
184 }
185 let terms_table_bytes = self
186 .postings
187 .capacity()
188 .saturating_mul(size_of::<(String, Arc<Vec<TextPosting>>)>());
189 let estimated_index_bytes = size_of::<Self>()
190 .saturating_add(row_bitmap_bytes)
191 .saturating_add(document_length_bytes)
192 .saturating_add(document_term_bytes)
193 .saturating_add(terms_table_bytes)
194 .saturating_add(term_bytes)
195 .saturating_add(posting_bytes);
196 TextIndexMemoryUsage {
197 indexed_rows: self.rows.len(),
198 documents: self.document_count(),
199 distinct_terms: self.term_count(),
200 postings: self.posting_count,
201 row_bitmap_bytes,
202 row_bitmap_serialized_bytes,
203 document_length_bytes,
204 document_term_bytes,
205 terms_table_bytes,
206 term_bytes,
207 posting_bytes,
208 estimated_index_bytes,
209 }
210 }
211
212 #[must_use]
214 pub fn search(&self, query: &str, k: usize) -> Vec<TextSearchHit> {
215 self.search_checked(query, k, CancellationChecker::disabled())
216 .expect("disabled text-index checker cannot fail")
217 }
218
219 pub fn search_checked(
229 &self,
230 query: &str,
231 k: usize,
232 checker: CancellationChecker<'_>,
233 ) -> Result<Vec<TextSearchHit>, TextSearchError> {
234 checker.check()?;
235 if k == 0 || self.document_lengths.is_empty() {
236 return Ok(Vec::new());
237 }
238 let query_terms = unique_query_terms(query);
239 if query_terms.is_empty() {
240 return Ok(Vec::new());
241 }
242
243 let mut document_frequencies = vec![0_u32; query_terms.len()];
244 let mut candidates: FxHashMap<NodeId, DocumentStats> = FxHashMap::default();
245 let mut postings_since_check = 0usize;
246
247 for (term_index, term) in query_terms.iter().enumerate() {
248 let Some(postings) = self.postings.get(term) else {
249 continue;
250 };
251 document_frequencies[term_index] = u32::try_from(postings.len()).unwrap_or(u32::MAX);
252 for posting in postings.iter() {
253 postings_since_check += 1;
254 if postings_since_check >= crate::text_search::TEXT_SEARCH_CANCEL_STRIDE {
255 checker.check()?;
256 postings_since_check = 0;
257 }
258 let len = *self
259 .document_lengths
260 .get(&posting.node_id)
261 .expect("posting node must have document length");
262 let doc = candidates.entry(posting.node_id).or_insert_with(|| {
263 DocumentStats::zero(posting.node_id, len, query_terms.len())
264 });
265 doc.term_counts[term_index] = posting.term_count;
266 }
267 }
268
269 if candidates.is_empty() {
270 return Ok(Vec::new());
271 }
272 let corpus_len = self.document_lengths.len() as f64;
273 let average_document_len = self.total_document_len as f64 / corpus_len;
274 let mut top_k = TextTopK::new(k);
275 for doc in candidates.into_values() {
276 let score = bm25_score(
277 &doc,
278 &document_frequencies,
279 corpus_len,
280 average_document_len,
281 );
282 if score > 0.0 {
283 top_k.push(doc.node_id, score);
284 }
285 }
286 Ok(top_k.into_hits())
287 }
288
289 pub(crate) fn insert_document(&mut self, row: u32, node_id: NodeId, text: &str) {
290 self.remove_document(row, node_id);
291 let mut counts: FxHashMap<String, u32> = FxHashMap::default();
292 let mut len = 0_u32;
293 for token in tokenize_borrowed(text) {
294 len = len.saturating_add(1);
295 let count = counts.entry(token.into_owned()).or_insert(0);
296 *count = count.saturating_add(1);
297 }
298 if len == 0 {
299 return;
300 }
301
302 self.rows.insert(row);
303 self.document_lengths.insert(node_id, len);
304 self.total_document_len = self.total_document_len.saturating_add(u64::from(len));
305 let mut terms = Vec::with_capacity(counts.len());
306 for (term, term_count) in counts {
307 let postings = self
308 .postings
309 .entry(term.clone())
310 .or_insert_with(|| Arc::new(Vec::new()));
311 let postings = Arc::make_mut(postings);
312 match postings.binary_search_by_key(&node_id, |posting| posting.node_id) {
313 Ok(index) => {
314 postings[index].term_count = term_count;
315 }
316 Err(index) => {
317 postings.insert(
318 index,
319 TextPosting {
320 node_id,
321 term_count,
322 },
323 );
324 self.posting_count = self.posting_count.saturating_add(1);
325 }
326 }
327 terms.push(term);
328 }
329 self.document_terms.insert(node_id, Arc::from(terms));
330 }
331
332 pub(crate) fn remove_document(&mut self, row: u32, node_id: NodeId) {
333 self.rows.remove(row);
334 let Some(length) = self.document_lengths.remove(&node_id) else {
335 return;
336 };
337 self.total_document_len = self.total_document_len.saturating_sub(u64::from(length));
338 let Some(terms) = self.document_terms.remove(&node_id) else {
339 return;
340 };
341 for term in terms.iter() {
342 let remove_term = if let Some(postings) = self.postings.get_mut(term.as_str()) {
343 let postings = Arc::make_mut(postings);
344 if let Ok(index) =
345 postings.binary_search_by_key(&node_id, |posting| posting.node_id)
346 {
347 postings.remove(index);
348 self.posting_count = self.posting_count.saturating_sub(1);
349 }
350 postings.is_empty()
351 } else {
352 false
353 };
354 if remove_term {
355 self.postings.remove(term.as_str());
356 }
357 }
358 }
359
360 pub(crate) fn rows_eq(&self, reference: &Self) -> bool {
361 self.rows == reference.rows
362 && self.document_lengths == reference.document_lengths
363 && self.total_document_len == reference.total_document_len
364 && self.posting_count == reference.posting_count
365 && self.postings == reference.postings
366 }
367}
368
369#[derive(Clone, Copy, Debug, Eq, PartialEq)]
371pub struct TextIndexStats {
372 pub indexed_rows: u64,
374 pub documents: usize,
376 pub distinct_terms: usize,
378 pub postings: usize,
380 pub total_document_len: u64,
382}
383
384#[derive(Clone, Copy, Debug, Eq, PartialEq)]
386pub struct TextIndexMemoryUsage {
387 pub indexed_rows: u64,
389 pub documents: usize,
391 pub distinct_terms: usize,
393 pub postings: usize,
395 pub row_bitmap_bytes: usize,
397 pub row_bitmap_serialized_bytes: usize,
399 pub document_length_bytes: usize,
401 pub document_term_bytes: usize,
403 pub terms_table_bytes: usize,
405 pub term_bytes: usize,
407 pub posting_bytes: usize,
409 pub estimated_index_bytes: usize,
411}
412
413impl SeleneGraph {
414 pub fn build_text_index(
425 &self,
426 label: &DbString,
427 property: &DbString,
428 ) -> GraphResult<TextIndex> {
429 TextIndex::build(self, label.clone(), property.clone())
430 }
431
432 pub fn indexed_text_search_nodes(
443 &self,
444 label: &DbString,
445 property: &DbString,
446 query: &str,
447 k: usize,
448 ) -> GraphResult<Vec<TextSearchHit>> {
449 Ok(self.build_text_index(label, property)?.search(query, k))
450 }
451}
452
453impl SharedGraph {
454 pub fn build_text_index(
461 &self,
462 label: &DbString,
463 property: &DbString,
464 ) -> GraphResult<TextIndex> {
465 self.read().build_text_index(label, property)
466 }
467
468 pub fn indexed_text_search_nodes(
475 &self,
476 label: &DbString,
477 property: &DbString,
478 query: &str,
479 k: usize,
480 ) -> GraphResult<Vec<TextSearchHit>> {
481 self.read()
482 .indexed_text_search_nodes(label, property, query, k)
483 }
484}
485
486type TextIndexMap = FxHashMap<(DbString, DbString), TextIndexEntry>;
487
488pub(crate) fn apply_node_create(
489 indexes: &mut TextIndexMap,
490 labels: &LabelSet,
491 props: &PropertyMap,
492 row: u32,
493 node_id: NodeId,
494) {
495 for label in labels.iter() {
496 for (property, value) in props.iter() {
497 insert_commit(
498 indexes,
499 label.clone(),
500 property.clone(),
501 value,
502 row,
503 node_id,
504 );
505 }
506 }
507}
508
509pub(crate) fn apply_node_delete(
510 indexes: &mut TextIndexMap,
511 labels: &LabelSet,
512 props: &PropertyMap,
513 row: u32,
514 node_id: NodeId,
515) {
516 for label in labels.iter() {
517 for (property, value) in props.iter() {
518 remove_commit(
519 indexes,
520 label.clone(),
521 property.clone(),
522 value,
523 row,
524 node_id,
525 );
526 }
527 }
528}
529
530pub(crate) fn apply_node_update(
531 indexes: &mut TextIndexMap,
532 old_labels: &LabelSet,
533 old_props: &PropertyMap,
534 new_labels: &LabelSet,
535 new_props: &PropertyMap,
536 row: u32,
537 node_id: NodeId,
538) {
539 let candidates = candidate_keys(indexes, old_labels, old_props, new_labels, new_props);
540 for (label, property) in candidates {
541 match (
542 indexable_text(old_labels, old_props, &label, &property),
543 indexable_text(new_labels, new_props, &label, &property),
544 ) {
545 (Some(old_text), Some(new_text)) if old_text == new_text => {}
546 (Some(_), Some(new_text)) => {
547 insert_commit(
548 indexes,
549 label.clone(),
550 property.clone(),
551 new_text,
552 row,
553 node_id,
554 );
555 }
556 (Some(old_text), None) => {
557 remove_commit(
558 indexes,
559 label.clone(),
560 property.clone(),
561 old_text,
562 row,
563 node_id,
564 );
565 }
566 (None, Some(new_text)) => {
567 insert_commit(
568 indexes,
569 label.clone(),
570 property.clone(),
571 new_text,
572 row,
573 node_id,
574 );
575 }
576 (None, None) => {}
577 }
578 }
579}
580
581pub(crate) fn rebuild_text_indexes(graph: &mut SeleneGraph) -> GraphResult<()> {
582 let registrations: Vec<((DbString, DbString), Option<DbString>)> = graph
583 .text_index
584 .iter()
585 .map(|(key, entry)| (key.clone(), entry.name.clone()))
586 .collect();
587 graph.text_index.clear();
588 for ((label, property), name) in registrations {
589 let index = TextIndex::build(graph, label.clone(), property.clone())?;
590 graph
591 .text_index
592 .insert((label, property), TextIndexEntry::new(index, name));
593 }
594 Ok(())
595}
596
597fn candidate_keys(
598 indexes: &TextIndexMap,
599 old_labels: &LabelSet,
600 old_props: &PropertyMap,
601 new_labels: &LabelSet,
602 new_props: &PropertyMap,
603) -> BTreeSet<(DbString, DbString)> {
604 if indexes.is_empty() {
605 return BTreeSet::new();
606 }
607 let mut labels: BTreeSet<DbString> = BTreeSet::new();
608 labels.extend(old_labels.iter().cloned());
609 labels.extend(new_labels.iter().cloned());
610
611 let mut properties: BTreeSet<DbString> = BTreeSet::new();
612 properties.extend(old_props.keys().cloned());
613 properties.extend(new_props.keys().cloned());
614
615 let mut candidates = BTreeSet::new();
616 for label in &labels {
617 for property in &properties {
618 let key = (label.clone(), property.clone());
619 if indexes.contains_key(&key) {
620 candidates.insert(key);
621 }
622 }
623 }
624 candidates
625}
626
627fn indexable_text<'a>(
628 labels: &LabelSet,
629 props: &'a PropertyMap,
630 label: &DbString,
631 property: &DbString,
632) -> Option<&'a str> {
633 if !labels.contains(label) {
634 return None;
635 }
636 match props.get(property) {
637 Some(Value::String(text)) => Some(text.as_str()),
638 _ => None,
639 }
640}
641
642fn insert_commit(
643 indexes: &mut TextIndexMap,
644 label: DbString,
645 property: DbString,
646 value: impl TextValue,
647 row: u32,
648 node_id: NodeId,
649) {
650 let Some(text) = value.text() else {
651 return;
652 };
653 if let Some(entry) = indexes.get_mut(&(label, property)) {
654 std::sync::Arc::make_mut(&mut entry.index).insert_document(row, node_id, text);
655 }
656}
657
658fn remove_commit(
659 indexes: &mut TextIndexMap,
660 label: DbString,
661 property: DbString,
662 value: impl TextValue,
663 row: u32,
664 node_id: NodeId,
665) {
666 if value.text().is_none() {
667 return;
668 }
669 if let Some(entry) = indexes.get_mut(&(label, property)) {
670 std::sync::Arc::make_mut(&mut entry.index).remove_document(row, node_id);
671 }
672}
673
674trait TextValue {
675 fn text(&self) -> Option<&str>;
676}
677
678impl TextValue for &Value {
679 fn text(&self) -> Option<&str> {
680 match self {
681 Value::String(text) => Some(text.as_str()),
682 _ => None,
683 }
684 }
685}
686
687impl TextValue for &str {
688 fn text(&self) -> Option<&str> {
689 Some(self)
690 }
691}
692
693#[derive(Clone, Copy, Debug, Eq, PartialEq)]
694struct TextPosting {
695 node_id: NodeId,
696 term_count: u32,
697}
698
699fn roaring_heap_bytes(rows: &RoaringBitmap) -> usize {
700 let statistics = rows.statistics();
701 usize::try_from(
702 statistics
703 .n_bytes_array_containers
704 .saturating_add(statistics.n_bytes_run_containers)
705 .saturating_add(statistics.n_bytes_bitset_containers),
706 )
707 .unwrap_or(usize::MAX)
708}
709
710#[cfg(test)]
711#[path = "text_index/tests.rs"]
712mod tests;