1use std::mem::size_of;
11use std::sync::Arc;
12
13use roaring::RoaringBitmap;
14use rustc_hash::FxHashMap;
15use smallvec::SmallVec;
16
17use selene_core::{CancellationChecker, DbString, NodeId, Value};
18
19use crate::error::{GraphError, GraphResult};
20use crate::graph::SeleneGraph;
21use crate::shared::SharedGraph;
22use crate::store::RowIndex;
23use crate::text_search::{
24 DocumentStats, TextSearchError, TextSearchHit, TextTopK, bm25_score, tokenize_borrowed,
25 unique_query_terms,
26};
27
28#[path = "text_index/builder.rs"]
29mod builder;
30#[path = "text_index/candidate.rs"]
31mod candidate;
32#[path = "text_index/maintenance.rs"]
33mod maintenance;
34use builder::TextIndexBuilder;
35
36type QueryDocumentFrequencies = SmallVec<[u32; 4]>;
37type QueryPostings<'a> = SmallVec<[Option<&'a [TextPosting]>; 4]>;
38
39pub(crate) use maintenance::{
40 apply_node_create, apply_node_delete, apply_node_update, rebuild_text_indexes,
41};
42
43#[derive(Clone, Debug)]
45pub struct TextIndex {
46 label: DbString,
47 property: DbString,
48 rows: RoaringBitmap,
49 document_lengths: FxHashMap<NodeId, u32>,
50 document_terms: FxHashMap<NodeId, Arc<[String]>>,
51 postings: FxHashMap<String, Arc<Vec<TextPosting>>>,
52 total_document_len: u64,
53 posting_count: usize,
54}
55
56impl TextIndex {
57 pub fn build(graph: &SeleneGraph, label: DbString, property: DbString) -> GraphResult<Self> {
68 let Some(label_rows) = graph.nodes_with_label(&label) else {
69 return Ok(TextIndexBuilder::empty(label, property).finish());
70 };
71 let label_row_capacity = usize::try_from(label_rows.len()).unwrap_or(usize::MAX);
72 let mut index = TextIndexBuilder::with_document_capacity(
73 label.clone(),
74 property.clone(),
75 label_row_capacity,
76 );
77
78 for raw_row in label_rows.iter() {
79 if !graph.node_store.is_alive(raw_row) {
80 continue;
81 }
82 let row = RowIndex::new(raw_row);
83 let node_id = graph
84 .node_id_for_row(row)
85 .ok_or_else(|| GraphError::Inconsistent {
86 reason: format!(
87 "label index row {raw_row} for {} has no node id",
88 label.as_str()
89 ),
90 })?;
91 let properties = graph
92 .node_store
93 .properties
94 .get(raw_row as usize)
95 .ok_or_else(|| GraphError::Inconsistent {
96 reason: format!(
97 "text index row {raw_row} for {} has no property row",
98 label.as_str()
99 ),
100 })?;
101 let Some(Value::String(text)) = properties.get(&property) else {
102 continue;
103 };
104 index.insert_document(raw_row, node_id, text.as_str());
105 }
106 Ok(index.finish())
107 }
108
109 #[must_use]
111 pub fn empty(label: DbString, property: DbString) -> Self {
112 Self {
113 label,
114 property,
115 rows: RoaringBitmap::new(),
116 document_lengths: FxHashMap::default(),
117 document_terms: FxHashMap::default(),
118 postings: FxHashMap::default(),
119 total_document_len: 0,
120 posting_count: 0,
121 }
122 }
123
124 #[must_use]
126 pub const fn label(&self) -> &DbString {
127 &self.label
128 }
129
130 #[must_use]
132 pub const fn property(&self) -> &DbString {
133 &self.property
134 }
135
136 #[must_use]
138 pub const fn rows(&self) -> &RoaringBitmap {
139 &self.rows
140 }
141
142 #[must_use]
144 pub fn document_count(&self) -> usize {
145 self.document_lengths.len()
146 }
147
148 #[must_use]
150 pub fn term_count(&self) -> usize {
151 self.postings.len()
152 }
153
154 #[must_use]
156 pub const fn posting_count(&self) -> usize {
157 self.posting_count
158 }
159
160 #[must_use]
162 pub fn stats(&self) -> TextIndexStats {
163 TextIndexStats {
164 indexed_rows: self.rows.len(),
165 documents: self.document_count(),
166 distinct_terms: self.term_count(),
167 postings: self.posting_count,
168 total_document_len: self.total_document_len,
169 }
170 }
171
172 #[must_use]
174 pub fn memory_usage(&self) -> TextIndexMemoryUsage {
175 let row_bitmap_bytes = roaring_heap_bytes(&self.rows);
176 let row_bitmap_serialized_bytes = self.rows.serialized_size();
177 let document_length_bytes = self
178 .document_lengths
179 .capacity()
180 .saturating_mul(size_of::<(NodeId, u32)>());
181 let mut document_term_bytes = self
182 .document_terms
183 .capacity()
184 .saturating_mul(size_of::<(NodeId, Arc<[String]>)>());
185 for terms in self.document_terms.values() {
186 document_term_bytes =
187 document_term_bytes.saturating_add(terms.len().saturating_mul(size_of::<String>()));
188 for term in terms.iter() {
189 document_term_bytes = document_term_bytes.saturating_add(term.capacity());
190 }
191 }
192 let mut posting_bytes = 0usize;
193 let mut term_bytes = 0usize;
194 for (term, postings) in &self.postings {
195 term_bytes = term_bytes.saturating_add(term.capacity());
196 posting_bytes = posting_bytes
197 .saturating_add(postings.capacity().saturating_mul(size_of::<TextPosting>()));
198 }
199 let terms_table_bytes = self
200 .postings
201 .capacity()
202 .saturating_mul(size_of::<(String, Arc<Vec<TextPosting>>)>());
203 let estimated_index_bytes = size_of::<Self>()
204 .saturating_add(row_bitmap_bytes)
205 .saturating_add(document_length_bytes)
206 .saturating_add(document_term_bytes)
207 .saturating_add(terms_table_bytes)
208 .saturating_add(term_bytes)
209 .saturating_add(posting_bytes);
210 TextIndexMemoryUsage {
211 indexed_rows: self.rows.len(),
212 documents: self.document_count(),
213 distinct_terms: self.term_count(),
214 postings: self.posting_count,
215 row_bitmap_bytes,
216 row_bitmap_serialized_bytes,
217 document_length_bytes,
218 document_term_bytes,
219 terms_table_bytes,
220 term_bytes,
221 posting_bytes,
222 estimated_index_bytes,
223 }
224 }
225
226 #[must_use]
228 pub fn search(&self, query: &str, k: usize) -> Vec<TextSearchHit> {
229 self.search_checked(query, k, CancellationChecker::disabled())
230 .expect("disabled text-index checker cannot fail")
231 }
232
233 pub fn search_checked(
244 &self,
245 query: &str,
246 k: usize,
247 checker: CancellationChecker<'_>,
248 ) -> Result<Vec<TextSearchHit>, TextSearchError> {
249 checker.check()?;
250 if k == 0 || self.document_lengths.is_empty() {
251 return Ok(Vec::new());
252 }
253 let query_terms = unique_query_terms(query);
254 if query_terms.is_empty() {
255 return Ok(Vec::new());
256 }
257
258 let mut document_frequencies = QueryDocumentFrequencies::with_capacity(query_terms.len());
259 let mut postings_by_term = QueryPostings::with_capacity(query_terms.len());
260 let mut candidate_capacity = 0usize;
261 for term in &query_terms {
262 match self.postings.get(term) {
263 Some(postings) => {
264 candidate_capacity = candidate_capacity.saturating_add(postings.len());
265 document_frequencies.push(u32::try_from(postings.len()).unwrap_or(u32::MAX));
266 postings_by_term.push(Some(postings.as_slice()));
267 }
268 None => {
269 document_frequencies.push(0);
270 postings_by_term.push(None);
271 }
272 }
273 }
274 let candidate_capacity = candidate_capacity.min(self.document_lengths.len());
275 if candidate_capacity == 0 {
276 return Ok(Vec::new());
277 }
278
279 let mut candidates: FxHashMap<NodeId, DocumentStats> = FxHashMap::default();
280 candidates.reserve(candidate_capacity);
281 let mut postings_since_check = 0usize;
282
283 for (term_index, postings) in postings_by_term.into_iter().enumerate() {
284 let Some(postings) = postings else {
285 continue;
286 };
287 for posting in postings {
288 postings_since_check += 1;
289 if postings_since_check >= crate::text_search::TEXT_SEARCH_CANCEL_STRIDE {
290 checker.check()?;
291 postings_since_check = 0;
292 }
293 let len = *self
294 .document_lengths
295 .get(&posting.node_id)
296 .expect("posting node must have document length");
297 let doc = candidates.entry(posting.node_id).or_insert_with(|| {
298 DocumentStats::zero(posting.node_id, len, query_terms.len())
299 });
300 doc.term_counts[term_index] = posting.term_count;
301 }
302 }
303
304 if candidates.is_empty() {
305 return Ok(Vec::new());
306 }
307 let corpus_len = self.document_lengths.len() as f64;
308 let average_document_len = self.total_document_len as f64 / corpus_len;
309 let mut top_k = TextTopK::new(k);
310 let mut docs_since_check = 0usize;
311 for doc in candidates.into_values() {
312 docs_since_check += 1;
313 if docs_since_check >= crate::text_search::TEXT_SEARCH_CANCEL_STRIDE {
314 checker.note_nodes_scanned(docs_since_check)?;
315 docs_since_check = 0;
316 }
317 let score = bm25_score(
318 &doc,
319 &document_frequencies,
320 corpus_len,
321 average_document_len,
322 );
323 if score > 0.0 {
324 top_k.push(doc.node_id, score);
325 }
326 }
327 if docs_since_check > 0 {
328 checker.note_nodes_scanned(docs_since_check)?;
329 }
330 Ok(top_k.into_hits())
331 }
332
333 pub(crate) fn insert_document(&mut self, row: u32, node_id: NodeId, text: &str) {
334 self.remove_document(row, node_id);
335 let mut counts: FxHashMap<String, u32> = FxHashMap::default();
336 let mut len = 0_u32;
337 for token in tokenize_borrowed(text) {
338 len = len.saturating_add(1);
339 let count = counts.entry(token.into_owned()).or_insert(0);
340 *count = count.saturating_add(1);
341 }
342 if len == 0 {
343 return;
344 }
345
346 self.rows.insert(row);
347 self.document_lengths.insert(node_id, len);
348 self.total_document_len = self.total_document_len.saturating_add(u64::from(len));
349 let mut terms = Vec::with_capacity(counts.len());
350 for (term, term_count) in counts {
351 let postings = self
352 .postings
353 .entry(term.clone())
354 .or_insert_with(|| Arc::new(Vec::new()));
355 let postings = Arc::make_mut(postings);
356 match postings.binary_search_by_key(&node_id, |posting| posting.node_id) {
357 Ok(index) => {
358 postings[index].term_count = term_count;
359 }
360 Err(index) => {
361 postings.insert(
362 index,
363 TextPosting {
364 node_id,
365 term_count,
366 },
367 );
368 self.posting_count = self.posting_count.saturating_add(1);
369 }
370 }
371 terms.push(term);
372 }
373 self.document_terms.insert(node_id, Arc::from(terms));
374 }
375
376 pub(crate) fn remove_document(&mut self, row: u32, node_id: NodeId) {
377 self.rows.remove(row);
378 let Some(length) = self.document_lengths.remove(&node_id) else {
379 return;
380 };
381 self.total_document_len = self.total_document_len.saturating_sub(u64::from(length));
382 let Some(terms) = self.document_terms.remove(&node_id) else {
383 return;
384 };
385 for term in terms.iter() {
386 let remove_term = if let Some(postings) = self.postings.get_mut(term.as_str()) {
387 let postings = Arc::make_mut(postings);
388 if let Ok(index) =
389 postings.binary_search_by_key(&node_id, |posting| posting.node_id)
390 {
391 postings.remove(index);
392 self.posting_count = self.posting_count.saturating_sub(1);
393 }
394 postings.is_empty()
395 } else {
396 false
397 };
398 if remove_term {
399 self.postings.remove(term.as_str());
400 }
401 }
402 }
403
404 pub(crate) fn rows_eq(&self, reference: &Self) -> bool {
405 self.rows == reference.rows
406 && self.document_lengths == reference.document_lengths
407 && self.total_document_len == reference.total_document_len
408 && self.posting_count == reference.posting_count
409 && self.postings == reference.postings
410 }
411}
412
413#[derive(Clone, Copy, Debug, Eq, PartialEq)]
415pub struct TextIndexStats {
416 pub indexed_rows: u64,
418 pub documents: usize,
420 pub distinct_terms: usize,
422 pub postings: usize,
424 pub total_document_len: u64,
426}
427
428#[derive(Clone, Copy, Debug, Eq, PartialEq)]
430pub struct TextIndexMemoryUsage {
431 pub indexed_rows: u64,
433 pub documents: usize,
435 pub distinct_terms: usize,
437 pub postings: usize,
439 pub row_bitmap_bytes: usize,
441 pub row_bitmap_serialized_bytes: usize,
443 pub document_length_bytes: usize,
445 pub document_term_bytes: usize,
447 pub terms_table_bytes: usize,
449 pub term_bytes: usize,
451 pub posting_bytes: usize,
453 pub estimated_index_bytes: usize,
455}
456
457impl SeleneGraph {
458 pub fn build_text_index(
469 &self,
470 label: &DbString,
471 property: &DbString,
472 ) -> GraphResult<TextIndex> {
473 TextIndex::build(self, label.clone(), property.clone())
474 }
475
476 pub fn indexed_text_search_nodes(
487 &self,
488 label: &DbString,
489 property: &DbString,
490 query: &str,
491 k: usize,
492 ) -> GraphResult<Vec<TextSearchHit>> {
493 Ok(self.build_text_index(label, property)?.search(query, k))
494 }
495}
496
497impl SharedGraph {
498 pub fn build_text_index(
505 &self,
506 label: &DbString,
507 property: &DbString,
508 ) -> GraphResult<TextIndex> {
509 self.read().build_text_index(label, property)
510 }
511
512 pub fn indexed_text_search_nodes(
519 &self,
520 label: &DbString,
521 property: &DbString,
522 query: &str,
523 k: usize,
524 ) -> GraphResult<Vec<TextSearchHit>> {
525 self.read()
526 .indexed_text_search_nodes(label, property, query, k)
527 }
528}
529
530#[derive(Clone, Copy, Debug, Eq, PartialEq)]
531struct TextPosting {
532 node_id: NodeId,
533 term_count: u32,
534}
535
536fn roaring_heap_bytes(rows: &RoaringBitmap) -> usize {
537 let statistics = rows.statistics();
538 usize::try_from(
539 statistics
540 .n_bytes_array_containers
541 .saturating_add(statistics.n_bytes_run_containers)
542 .saturating_add(statistics.n_bytes_bitset_containers),
543 )
544 .unwrap_or(usize::MAX)
545}
546
547#[cfg(test)]
548#[path = "text_index/tests.rs"]
549mod tests;