1use super::{
5 index::*,
6 merger::{Merger, PartitionSource, SizeBasedMerger},
7 InvertedIndexParams,
8};
9use crate::scalar::inverted::json::JsonTextStream;
10use crate::scalar::inverted::lance_tokenizer::DocType;
11use crate::scalar::inverted::tokenizer::lance_tokenizer::LanceTokenizer;
12use crate::scalar::lance_format::LanceIndexStore;
13use crate::scalar::IndexStore;
14use crate::vector::graph::OrderedFloat;
15use arrow::datatypes;
16use arrow::{array::AsArray, compute::concat_batches};
17use arrow_array::{Array, RecordBatch, UInt64Array};
18use arrow_schema::{DataType, Field, Schema, SchemaRef};
19use bitpacking::{BitPacker, BitPacker4x};
20use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream};
21use deepsize::DeepSizeOf;
22use futures::{stream, Stream, StreamExt, TryStreamExt};
23use lance_arrow::json::JSON_EXT_NAME;
24use lance_arrow::{iter_str_array, ARROW_EXT_NAME_KEY};
25use lance_core::utils::tokio::get_num_compute_intensive_cpus;
26use lance_core::{cache::LanceCache, utils::tokio::spawn_cpu};
27use lance_core::{error::LanceOptionExt, utils::tempfile::TempDir};
28use lance_core::{Error, Result, ROW_ID, ROW_ID_FIELD};
29use lance_io::object_store::ObjectStore;
30use object_store::path::Path;
31use smallvec::SmallVec;
32use snafu::location;
33use std::collections::HashMap;
34use std::pin::Pin;
35use std::str::FromStr;
36use std::sync::Arc;
37use std::sync::LazyLock;
38use std::task::{Context, Poll};
39use std::{fmt::Debug, sync::atomic::AtomicU64};
40use tracing::instrument;
41
42pub const BLOCK_SIZE: usize = BitPacker4x::BLOCK_LEN;
46
47static LANCE_FTS_FLUSH_SIZE: LazyLock<usize> = LazyLock::new(|| {
52 std::env::var("LANCE_FTS_FLUSH_SIZE")
53 .unwrap_or_else(|_| "16".to_string())
54 .parse()
55 .expect("failed to parse LANCE_FTS_FLUSH_SIZE")
56});
57pub static LANCE_FTS_NUM_SHARDS: LazyLock<usize> = LazyLock::new(|| {
62 std::env::var("LANCE_FTS_NUM_SHARDS")
63 .unwrap_or_else(|_| get_num_compute_intensive_cpus().to_string())
64 .parse()
65 .expect("failed to parse LANCE_FTS_NUM_SHARDS")
66});
67pub static LANCE_FTS_PARTITION_SIZE: LazyLock<u64> = LazyLock::new(|| {
70 std::env::var("LANCE_FTS_PARTITION_SIZE")
71 .unwrap_or_else(|_| "256".to_string())
72 .parse()
73 .expect("failed to parse LANCE_FTS_PARTITION_SIZE")
74});
75pub static LANCE_FTS_TARGET_SIZE: LazyLock<u64> = LazyLock::new(|| {
77 std::env::var("LANCE_FTS_TARGET_SIZE")
78 .unwrap_or_else(|_| "4096".to_string())
79 .parse()
80 .expect("failed to parse LANCE_FTS_TARGET_SIZE")
81});
82
83#[derive(Debug)]
84pub struct InvertedIndexBuilder {
85 params: InvertedIndexParams,
86 pub(crate) partitions: Vec<u64>,
87 new_partitions: Vec<u64>,
88 fragment_mask: Option<u64>,
89 token_set_format: TokenSetFormat,
90 _tmpdir: TempDir,
91 local_store: Arc<dyn IndexStore>,
92 src_store: Arc<dyn IndexStore>,
93}
94
95impl InvertedIndexBuilder {
96 pub fn new(params: InvertedIndexParams) -> Self {
97 Self::new_with_fragment_mask(params, None)
98 }
99
100 pub fn new_with_fragment_mask(params: InvertedIndexParams, fragment_mask: Option<u64>) -> Self {
101 Self::from_existing_index(
102 params,
103 None,
104 Vec::new(),
105 TokenSetFormat::default(),
106 fragment_mask,
107 )
108 }
109
110 pub fn from_existing_index(
117 params: InvertedIndexParams,
118 store: Option<Arc<dyn IndexStore>>,
119 partitions: Vec<u64>,
120 token_set_format: TokenSetFormat,
121 fragment_mask: Option<u64>,
122 ) -> Self {
123 let tmpdir = TempDir::default();
124 let local_store = Arc::new(LanceIndexStore::new(
125 ObjectStore::local().into(),
126 tmpdir.obj_path(),
127 Arc::new(LanceCache::no_cache()),
128 ));
129 let src_store = store.unwrap_or_else(|| local_store.clone());
130 Self {
131 params,
132 partitions,
133 new_partitions: Vec::new(),
134 _tmpdir: tmpdir,
135 local_store,
136 src_store,
137 token_set_format,
138 fragment_mask,
139 }
140 }
141
142 pub async fn update(
143 &mut self,
144 new_data: SendableRecordBatchStream,
145 dest_store: &dyn IndexStore,
146 ) -> Result<()> {
147 let schema = new_data.schema();
148 let doc_col = schema.field(0).name();
149
150 if self.params.lance_tokenizer.is_none() {
152 let schema = new_data.schema();
153 let field = schema.column_with_name(doc_col).expect_ok()?.1;
154 let doc_type = DocType::try_from(field)?;
155 self.params.lance_tokenizer = Some(doc_type.as_ref().to_string());
156 }
157
158 let new_data = document_input(new_data, doc_col)?;
159
160 self.update_index(new_data).await?;
161 self.write(dest_store).await?;
162 Ok(())
163 }
164
165 #[instrument(level = "debug", skip_all)]
166 async fn update_index(&mut self, stream: SendableRecordBatchStream) -> Result<()> {
167 let num_workers = *LANCE_FTS_NUM_SHARDS;
168 let tokenizer = self.params.build()?;
169 let with_position = self.params.with_position;
170 let next_id = self.partitions.iter().map(|id| id + 1).max().unwrap_or(0);
171 let id_alloc = Arc::new(AtomicU64::new(next_id));
172 let (sender, receiver) = async_channel::bounded(num_workers);
173 let mut index_tasks = Vec::with_capacity(num_workers);
174 for _ in 0..num_workers {
175 let store = self.local_store.clone();
176 let tokenizer = tokenizer.clone();
177 let receiver = receiver.clone();
178 let id_alloc = id_alloc.clone();
179 let fragment_mask = self.fragment_mask;
180 let token_set_format = self.token_set_format;
181 let task = tokio::task::spawn(async move {
182 let mut worker = IndexWorker::new(
183 store,
184 tokenizer,
185 with_position,
186 id_alloc,
187 fragment_mask,
188 token_set_format,
189 )
190 .await?;
191 while let Ok(batch) = receiver.recv().await {
192 worker.process_batch(batch).await?;
193 }
194 let partitions = worker.finish().await?;
195 Result::Ok(partitions)
196 });
197 index_tasks.push(task);
198 }
199
200 let sender = Arc::new(sender);
201
202 let mut stream = Box::pin(stream.then({
203 |batch_result| {
204 let sender = sender.clone();
205 async move {
206 let sender = sender.clone();
207 let batch = batch_result?;
208 let num_rows = batch.num_rows();
209 sender.send(batch).await.expect("failed to send batch");
210 Result::Ok(num_rows)
211 }
212 }
213 }));
214 log::info!("indexing FTS with {} workers", num_workers);
215
216 let mut last_num_rows = 0;
217 let mut total_num_rows = 0;
218 let start = std::time::Instant::now();
219 while let Some(num_rows) = stream.try_next().await? {
220 total_num_rows += num_rows;
221 if total_num_rows >= last_num_rows + 1_000_000 {
222 log::debug!(
223 "indexed {} documents, elapsed: {:?}, speed: {}rows/s",
224 total_num_rows,
225 start.elapsed(),
226 total_num_rows as f32 / start.elapsed().as_secs_f32()
227 );
228 last_num_rows = total_num_rows;
229 }
230 }
231 drop(stream);
233 debug_assert_eq!(sender.sender_count(), 1);
234 drop(sender);
235 log::info!("dispatching elapsed: {:?}", start.elapsed());
236
237 let start = std::time::Instant::now();
239 for index_task in index_tasks {
240 self.new_partitions.extend(index_task.await??);
241 }
242 log::info!("wait workers indexing elapsed: {:?}", start.elapsed());
243 Ok(())
244 }
245
246 pub async fn remap(
247 &mut self,
248 mapping: &HashMap<u64, Option<u64>>,
249 src_store: Arc<dyn IndexStore>,
250 dest_store: &dyn IndexStore,
251 ) -> Result<()> {
252 for part in self.partitions.iter() {
253 let part = InvertedPartition::load(
254 src_store.clone(),
255 *part,
256 None,
257 &LanceCache::no_cache(),
258 self.token_set_format,
259 )
260 .await?;
261 let mut builder = part.into_builder().await?;
262 builder.remap(mapping).await?;
263 builder.write(dest_store).await?;
264 }
265 if self.fragment_mask.is_none() {
266 self.write_metadata(dest_store, &self.partitions).await?;
267 } else {
268 for &partition_id in &self.partitions {
270 self.write_part_metadata(dest_store, partition_id).await?;
271 }
272 }
273 Ok(())
274 }
275
276 async fn write_metadata(&self, dest_store: &dyn IndexStore, partitions: &[u64]) -> Result<()> {
277 let metadata = HashMap::from_iter(vec![
278 ("partitions".to_owned(), serde_json::to_string(&partitions)?),
279 ("params".to_owned(), serde_json::to_string(&self.params)?),
280 (
281 TOKEN_SET_FORMAT_KEY.to_owned(),
282 self.token_set_format.to_string(),
283 ),
284 ]);
285 let mut writer = dest_store
286 .new_index_file(METADATA_FILE, Arc::new(Schema::empty()))
287 .await?;
288 writer.finish_with_metadata(metadata).await?;
289 Ok(())
290 }
291
292 pub(crate) async fn write_part_metadata(
297 &self,
298 dest_store: &dyn IndexStore,
299 partition: u64, ) -> Result<()> {
301 let partitions = vec![partition];
302 let metadata = HashMap::from_iter(vec![
303 ("partitions".to_owned(), serde_json::to_string(&partitions)?),
304 ("params".to_owned(), serde_json::to_string(&self.params)?),
305 (
306 TOKEN_SET_FORMAT_KEY.to_owned(),
307 self.token_set_format.to_string(),
308 ),
309 ]);
310 let file_name = part_metadata_file_path(partition);
312 let mut writer = dest_store
313 .new_index_file(&file_name, Arc::new(Schema::empty()))
314 .await?;
315 writer.finish_with_metadata(metadata).await?;
316 Ok(())
317 }
318
319 async fn write(&self, dest_store: &dyn IndexStore) -> Result<()> {
320 if self.params.skip_merge {
321 let mut partitions =
322 Vec::with_capacity(self.partitions.len() + self.new_partitions.len());
323 partitions.extend_from_slice(&self.partitions);
324 partitions.extend_from_slice(&self.new_partitions);
325 partitions.sort_unstable();
326
327 for part in self.partitions.iter() {
328 self.src_store
329 .copy_index_file(&token_file_path(*part), dest_store)
330 .await?;
331 self.src_store
332 .copy_index_file(&posting_file_path(*part), dest_store)
333 .await?;
334 self.src_store
335 .copy_index_file(&doc_file_path(*part), dest_store)
336 .await?;
337 }
338 for part in self.new_partitions.iter() {
339 self.local_store
340 .copy_index_file(&token_file_path(*part), dest_store)
341 .await?;
342 self.local_store
343 .copy_index_file(&posting_file_path(*part), dest_store)
344 .await?;
345 self.local_store
346 .copy_index_file(&doc_file_path(*part), dest_store)
347 .await?;
348 }
349
350 if self.fragment_mask.is_none() {
351 self.write_metadata(dest_store, &partitions).await?;
352 } else {
353 for &partition_id in &partitions {
354 self.write_part_metadata(dest_store, partition_id).await?;
355 }
356 }
357 return Ok(());
358 }
359
360 let partitions = self
361 .partitions
362 .iter()
363 .map(|part| PartitionSource::new(self.src_store.clone(), *part))
364 .chain(
365 self.new_partitions
366 .iter()
367 .map(|part| PartitionSource::new(self.local_store.clone(), *part)),
368 )
369 .collect::<Vec<_>>();
370 let mut merger = SizeBasedMerger::new(
371 dest_store,
372 partitions,
373 *LANCE_FTS_TARGET_SIZE << 20,
374 self.token_set_format,
375 );
376 let partitions = merger.merge().await?;
377
378 if self.fragment_mask.is_none() {
379 self.write_metadata(dest_store, &partitions).await?;
380 } else {
381 for &partition_id in &partitions {
382 self.write_part_metadata(dest_store, partition_id).await?;
383 }
384 }
385 Ok(())
386 }
387}
388
389impl Default for InvertedIndexBuilder {
390 fn default() -> Self {
391 let params = InvertedIndexParams::default();
392 Self::new(params)
393 }
394}
395
396#[derive(Debug)]
398pub struct InnerBuilder {
399 id: u64,
400 with_position: bool,
401 token_set_format: TokenSetFormat,
402 pub(crate) tokens: TokenSet,
403 pub(crate) posting_lists: Vec<PostingListBuilder>,
404 pub(crate) docs: DocSet,
405}
406
407impl InnerBuilder {
408 pub fn new(id: u64, with_position: bool, token_set_format: TokenSetFormat) -> Self {
409 Self {
410 id,
411 with_position,
412 token_set_format,
413 tokens: TokenSet::default(),
414 posting_lists: Vec::new(),
415 docs: DocSet::default(),
416 }
417 }
418
419 pub fn id(&self) -> u64 {
420 self.id
421 }
422
423 pub async fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
424 let removed = self.docs.remap(mapping);
427
428 let mut token_id = 0;
432 let mut removed_token_ids = Vec::new();
433 self.posting_lists.retain_mut(|posting_list| {
434 posting_list.remap(&removed);
435 let keep = !posting_list.is_empty();
436 if !keep {
437 removed_token_ids.push(token_id as u32);
438 }
439 token_id += 1;
440 keep
441 });
442
443 self.tokens.remap(&removed_token_ids);
445
446 Ok(())
447 }
448
449 pub async fn write(&mut self, store: &dyn IndexStore) -> Result<()> {
450 let docs = Arc::new(std::mem::take(&mut self.docs));
451 self.write_posting_lists(store, docs.clone()).await?;
452 self.write_tokens(store).await?;
453 self.write_docs(store, docs).await?;
454 Ok(())
455 }
456
457 #[instrument(level = "debug", skip_all)]
458 async fn write_posting_lists(
459 &mut self,
460 store: &dyn IndexStore,
461 docs: Arc<DocSet>,
462 ) -> Result<()> {
463 let id = self.id;
464 let mut writer = store
465 .new_index_file(
466 &posting_file_path(self.id),
467 inverted_list_schema(self.with_position),
468 )
469 .await?;
470 let posting_lists = std::mem::take(&mut self.posting_lists);
471
472 log::info!(
473 "writing {} posting lists of partition {}, with position {}",
474 posting_lists.len(),
475 id,
476 self.with_position
477 );
478 let schema = inverted_list_schema(self.with_position);
479
480 let mut batches = stream::iter(posting_lists)
481 .map(|posting_list| {
482 let block_max_scores = docs.calculate_block_max_scores(
483 posting_list.doc_ids.iter(),
484 posting_list.frequencies.iter(),
485 );
486 spawn_cpu(move || posting_list.to_batch(block_max_scores))
487 })
488 .buffered(get_num_compute_intensive_cpus());
489
490 let mut write_duration = std::time::Duration::ZERO;
491 let mut num_posting_lists = 0;
492 let mut buffer = Vec::new();
493 let mut size_sum = 0;
494 while let Some(batch) = batches.try_next().await? {
495 num_posting_lists += 1;
496 size_sum += batch.get_array_memory_size();
497 buffer.push(batch);
498 if size_sum >= *LANCE_FTS_FLUSH_SIZE << 20 {
499 let batch = concat_batches(&schema, buffer.iter())?;
500 buffer.clear();
501 size_sum = 0;
502 let start = std::time::Instant::now();
503 writer.write_record_batch(batch).await?;
504 write_duration += start.elapsed();
505 }
506
507 if num_posting_lists % 500_000 == 0 {
508 log::info!(
509 "wrote {} posting lists of partition {}, writing elapsed: {:?}",
510 num_posting_lists,
511 id,
512 write_duration,
513 );
514 }
515 }
516 if !buffer.is_empty() {
517 let batch = concat_batches(&schema, buffer.iter())?;
518 writer.write_record_batch(batch).await?;
519 }
520
521 writer.finish().await?;
522 Ok(())
523 }
524
525 #[instrument(level = "debug", skip_all)]
526 async fn write_tokens(&mut self, store: &dyn IndexStore) -> Result<()> {
527 log::info!("writing tokens of partition {}", self.id);
528 let tokens = std::mem::take(&mut self.tokens);
529 let batch = tokens.to_batch(self.token_set_format)?;
530 let mut writer = store
531 .new_index_file(&token_file_path(self.id), batch.schema())
532 .await?;
533 writer.write_record_batch(batch).await?;
534 writer.finish().await?;
535 Ok(())
536 }
537
538 #[instrument(level = "debug", skip_all)]
539 async fn write_docs(&mut self, store: &dyn IndexStore, docs: Arc<DocSet>) -> Result<()> {
540 log::info!("writing docs of partition {}", self.id);
541 let batch = docs.to_batch()?;
542 let mut writer = store
543 .new_index_file(&doc_file_path(self.id), batch.schema())
544 .await?;
545 writer.write_record_batch(batch).await?;
546 writer.finish().await?;
547 Ok(())
548 }
549}
550
551struct IndexWorker {
552 store: Arc<dyn IndexStore>,
553 tokenizer: Box<dyn LanceTokenizer>,
554 id_alloc: Arc<AtomicU64>,
555 builder: InnerBuilder,
556 partitions: Vec<u64>,
557 schema: SchemaRef,
558 estimated_size: u64,
559 total_doc_length: usize,
560 fragment_mask: Option<u64>,
561 token_set_format: TokenSetFormat,
562 token_occurrences: HashMap<u32, PositionRecorder>,
563 token_ids: Vec<u32>,
564 last_token_count: usize,
565 last_unique_token_count: usize,
566}
567
568impl IndexWorker {
569 async fn new(
570 store: Arc<dyn IndexStore>,
571 tokenizer: Box<dyn LanceTokenizer>,
572 with_position: bool,
573 id_alloc: Arc<AtomicU64>,
574 fragment_mask: Option<u64>,
575 token_set_format: TokenSetFormat,
576 ) -> Result<Self> {
577 let schema = inverted_list_schema(with_position);
578
579 Ok(Self {
580 store,
581 tokenizer,
582 builder: InnerBuilder::new(
583 id_alloc.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
584 | fragment_mask.unwrap_or(0),
585 with_position,
586 token_set_format,
587 ),
588 partitions: Vec::new(),
589 id_alloc,
590 schema,
591 estimated_size: 0,
592 total_doc_length: 0,
593 fragment_mask,
594 token_set_format,
595 token_occurrences: HashMap::new(),
596 token_ids: Vec::new(),
597 last_token_count: 0,
598 last_unique_token_count: 0,
599 })
600 }
601
602 fn has_position(&self) -> bool {
603 self.schema.column_with_name(POSITION_COL).is_some()
604 }
605
606 async fn process_batch(&mut self, batch: RecordBatch) -> Result<()> {
607 let doc_col = batch.column(0);
608 let doc_iter = iter_str_array(doc_col);
609 let row_id_col = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
610 let docs = doc_iter
611 .zip(row_id_col.values().iter())
612 .filter_map(|(doc, row_id)| doc.map(|doc| (doc, *row_id)));
613
614 let with_position = self.has_position();
615 for (doc, row_id) in docs {
616 let mut token_num: u32 = 0;
617 if with_position {
618 if self.token_occurrences.capacity() < self.last_unique_token_count {
619 self.token_occurrences
620 .reserve(self.last_unique_token_count - self.token_occurrences.capacity());
621 }
622 self.token_occurrences.clear();
623
624 let mut token_stream = self.tokenizer.token_stream_for_doc(doc);
625 while token_stream.advance() {
626 let token = token_stream.token_mut();
627 let token_text = std::mem::take(&mut token.text);
628 let token_id = self.builder.tokens.add(token_text);
629 self.token_occurrences
630 .entry(token_id)
631 .or_insert_with(|| PositionRecorder::new(true))
632 .push(token.position as u32);
633 token_num += 1;
634 }
635 } else {
636 if self.token_ids.capacity() < self.last_token_count {
637 self.token_ids
638 .reserve(self.last_token_count - self.token_ids.capacity());
639 }
640 self.token_ids.clear();
641
642 let mut token_stream = self.tokenizer.token_stream_for_doc(doc);
643 while token_stream.advance() {
644 let token = token_stream.token_mut();
645 let token_text = std::mem::take(&mut token.text);
646 let token_id = self.builder.tokens.add(token_text);
647 self.token_ids.push(token_id);
648 token_num += 1;
649 }
650 }
651 self.builder
652 .posting_lists
653 .resize_with(self.builder.tokens.len(), || {
654 PostingListBuilder::new(with_position)
655 });
656 let doc_id = self.builder.docs.append(row_id, token_num);
657 self.total_doc_length += doc.len();
658
659 if with_position {
660 let unique_tokens = self.token_occurrences.len();
661 for (token_id, term_positions) in self.token_occurrences.drain() {
662 let posting_list = &mut self.builder.posting_lists[token_id as usize];
663
664 let old_size = posting_list.size();
665 posting_list.add(doc_id, term_positions);
666 let new_size = posting_list.size();
667 self.estimated_size += new_size - old_size;
668 }
669 self.last_unique_token_count = unique_tokens;
670 } else if token_num > 0 {
671 self.token_ids.sort_unstable();
672 let mut iter = self.token_ids.iter();
673 let mut current = *iter.next().unwrap();
674 let mut count = 1u32;
675 for &token_id in iter {
676 if token_id == current {
677 count += 1;
678 continue;
679 }
680
681 let posting_list = &mut self.builder.posting_lists[current as usize];
682 let old_size = posting_list.size();
683 posting_list.add(doc_id, PositionRecorder::Count(count));
684 let new_size = posting_list.size();
685 self.estimated_size += new_size - old_size;
686
687 current = token_id;
688 count = 1;
689 }
690 let posting_list = &mut self.builder.posting_lists[current as usize];
691 let old_size = posting_list.size();
692 posting_list.add(doc_id, PositionRecorder::Count(count));
693 let new_size = posting_list.size();
694 self.estimated_size += new_size - old_size;
695 }
696 self.last_token_count = token_num as usize;
697
698 if self.builder.docs.len() as u32 == u32::MAX
699 || self.estimated_size >= *LANCE_FTS_PARTITION_SIZE << 20
700 {
701 self.flush().await?;
702 }
703 }
704
705 Ok(())
706 }
707
708 #[instrument(level = "debug", skip_all)]
709 async fn flush(&mut self) -> Result<()> {
710 if self.builder.tokens.is_empty() {
711 return Ok(());
712 }
713
714 log::info!(
715 "flushing posting lists, estimated size: {} MiB",
716 self.estimated_size / (1024 * 1024)
717 );
718 self.estimated_size = 0;
719 let with_position = self.has_position();
720 let mut builder = std::mem::replace(
721 &mut self.builder,
722 InnerBuilder::new(
723 self.id_alloc
724 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
725 | self.fragment_mask.unwrap_or(0),
726 with_position,
727 self.token_set_format,
728 ),
729 );
730 builder.write(self.store.as_ref()).await?;
731 self.partitions.push(builder.id());
732 Ok(())
733 }
734
735 async fn finish(mut self) -> Result<Vec<u64>> {
736 if !self.builder.tokens.is_empty() {
737 self.flush().await?;
738 }
739 Ok(self.partitions)
740 }
741}
742
743#[derive(Debug, Clone)]
744pub enum PositionRecorder {
745 Position(SmallVec<[u32; 4]>),
746 Count(u32),
747}
748
749impl PositionRecorder {
750 fn new(with_position: bool) -> Self {
751 if with_position {
752 Self::Position(SmallVec::new())
753 } else {
754 Self::Count(0)
755 }
756 }
757
758 fn push(&mut self, position: u32) {
759 match self {
760 Self::Position(positions) => positions.push(position),
761 Self::Count(count) => *count += 1,
762 }
763 }
764
765 pub fn len(&self) -> u32 {
766 match self {
767 Self::Position(positions) => positions.len() as u32,
768 Self::Count(count) => *count,
769 }
770 }
771
772 pub fn is_empty(&self) -> bool {
773 self.len() == 0
774 }
775
776 pub fn into_vec(self) -> Vec<u32> {
777 match self {
778 Self::Position(positions) => positions.into_vec(),
779 Self::Count(_) => vec![0],
780 }
781 }
782}
783
784#[derive(Debug, Eq, PartialEq, Clone, DeepSizeOf)]
785pub struct ScoredDoc {
786 pub row_id: u64,
787 pub score: OrderedFloat,
788}
789
790impl ScoredDoc {
791 pub fn new(row_id: u64, score: f32) -> Self {
792 Self {
793 row_id,
794 score: OrderedFloat(score),
795 }
796 }
797}
798
799impl PartialOrd for ScoredDoc {
800 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
801 Some(self.cmp(other))
802 }
803}
804
805impl Ord for ScoredDoc {
806 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
807 self.score.cmp(&other.score)
808 }
809}
810
811pub fn legacy_inverted_list_schema(with_position: bool) -> SchemaRef {
812 let mut fields = vec![
813 arrow_schema::Field::new(ROW_ID, arrow_schema::DataType::UInt64, false),
814 arrow_schema::Field::new(FREQUENCY_COL, arrow_schema::DataType::Float32, false),
815 ];
816 if with_position {
817 fields.push(arrow_schema::Field::new(
818 POSITION_COL,
819 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
820 "item",
821 arrow_schema::DataType::Int32,
822 true,
823 ))),
824 false,
825 ));
826 }
827 Arc::new(arrow_schema::Schema::new(fields))
828}
829
830pub fn inverted_list_schema(with_position: bool) -> SchemaRef {
831 let mut fields = vec![
832 arrow_schema::Field::new(
835 POSTING_COL,
836 datatypes::DataType::List(Arc::new(Field::new(
837 "item",
838 datatypes::DataType::LargeBinary,
839 true,
840 ))),
841 false,
842 ),
843 arrow_schema::Field::new(MAX_SCORE_COL, datatypes::DataType::Float32, false),
844 arrow_schema::Field::new(LENGTH_COL, datatypes::DataType::UInt32, false),
845 ];
846 if with_position {
847 fields.push(arrow_schema::Field::new(
848 POSITION_COL,
849 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
850 "item",
851 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
852 "item",
853 arrow_schema::DataType::LargeBinary,
854 true,
855 ))),
856 true,
857 ))),
858 false,
859 ));
860 }
861 Arc::new(arrow_schema::Schema::new(fields))
862}
863
864pub struct FlattenStream {
866 inner: SendableRecordBatchStream,
870 field_type: DataType,
871 data_type: DataType,
872}
873
874impl FlattenStream {
875 pub fn new(input: SendableRecordBatchStream) -> Self {
876 let schema = input.schema();
877 let field = schema.field(0);
878 let data_type = match field.data_type() {
879 DataType::List(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
880 DataType::List(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
881 DataType::LargeUtf8
882 }
883 DataType::LargeList(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
884 DataType::LargeList(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
885 DataType::LargeUtf8
886 }
887 _ => panic!(
888 "expect data type List(Utf8) or List(LargeUtf8) but got {:?}",
889 field.data_type()
890 ),
891 };
892 Self {
893 inner: input,
894 field_type: field.data_type().clone(),
895 data_type,
896 }
897 }
898}
899
900impl Stream for FlattenStream {
901 type Item = datafusion_common::Result<RecordBatch>;
902
903 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
904 match Pin::new(&mut self.inner).poll_next(cx) {
905 Poll::Ready(Some(Ok(batch))) => {
906 let doc_col = batch.column(0);
907 let batch = match self.field_type {
908 DataType::List(_) => flatten_string_list::<i32>(&batch, doc_col).map_err(|e| {
909 datafusion_common::error::DataFusionError::Execution(format!(
910 "flatten string list error: {}",
911 e
912 ))
913 }),
914 DataType::LargeList(_) => {
915 flatten_string_list::<i64>(&batch, doc_col).map_err(|e| {
916 datafusion_common::error::DataFusionError::Execution(format!(
917 "flatten string list error: {}",
918 e
919 ))
920 })
921 }
922 _ => unreachable!(
923 "expect data type List or LargeList but got {:?}",
924 self.field_type
925 ),
926 };
927 Poll::Ready(Some(batch))
928 }
929 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
930 Poll::Ready(None) => Poll::Ready(None),
931 Poll::Pending => Poll::Pending,
932 }
933 }
934}
935
936impl RecordBatchStream for FlattenStream {
937 fn schema(&self) -> SchemaRef {
938 let schema = Schema::new(vec![
939 Field::new(
940 self.inner.schema().field(0).name(),
941 self.data_type.clone(),
942 true,
943 ),
944 ROW_ID_FIELD.clone(),
945 ]);
946
947 Arc::new(schema)
948 }
949}
950
951fn flatten_string_list<Offset: arrow::array::OffsetSizeTrait>(
952 batch: &RecordBatch,
953 doc_col: &Arc<dyn Array>,
954) -> Result<RecordBatch> {
955 let docs = doc_col.as_list::<Offset>();
956 let row_ids = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
957
958 let row_ids = row_ids
959 .values()
960 .iter()
961 .zip(docs.iter())
962 .flat_map(|(row_id, doc)| std::iter::repeat_n(*row_id, doc.map(|d| d.len()).unwrap_or(0)));
963
964 let row_ids = Arc::new(UInt64Array::from_iter_values(row_ids));
965 let docs = match docs.value_type() {
966 datatypes::DataType::Utf8 | datatypes::DataType::LargeUtf8 => docs.values().clone(),
967 _ => {
968 return Err(Error::Index {
969 message: format!(
970 "expect data type String or LargeString but got {}",
971 docs.value_type()
972 ),
973 location: location!(),
974 });
975 }
976 };
977
978 let schema = Schema::new(vec![
979 Field::new(
980 batch.schema().field(0).name(),
981 docs.data_type().clone(),
982 true,
983 ),
984 ROW_ID_FIELD.clone(),
985 ]);
986 let batch = RecordBatch::try_new(Arc::new(schema), vec![docs, row_ids])?;
987 Ok(batch)
988}
989
990pub(crate) fn token_file_path(partition_id: u64) -> String {
991 format!("part_{}_{}", partition_id, TOKENS_FILE)
992}
993
994pub(crate) fn posting_file_path(partition_id: u64) -> String {
995 format!("part_{}_{}", partition_id, INVERT_LIST_FILE)
996}
997
998pub(crate) fn doc_file_path(partition_id: u64) -> String {
999 format!("part_{}_{}", partition_id, DOCS_FILE)
1000}
1001
1002pub(crate) fn part_metadata_file_path(partition_id: u64) -> String {
1003 format!("part_{}_{}", partition_id, METADATA_FILE)
1004}
1005
1006pub async fn merge_index_files(
1007 object_store: &ObjectStore,
1008 index_dir: &Path,
1009 store: Arc<dyn IndexStore>,
1010) -> Result<()> {
1011 let part_metadata_files = list_metadata_files(object_store, index_dir).await?;
1013
1014 merge_metadata_files(store, &part_metadata_files).await
1016}
1017
1018async fn list_metadata_files(object_store: &ObjectStore, index_dir: &Path) -> Result<Vec<String>> {
1021 let mut part_metadata_files = Vec::new();
1023 let mut list_stream = object_store.list(Some(index_dir.clone()));
1024
1025 while let Some(item) = list_stream.next().await {
1026 match item {
1027 Ok(meta) => {
1028 let file_name = meta.location.filename().unwrap_or_default();
1029 if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
1031 part_metadata_files.push(file_name.to_string());
1032 }
1033 }
1034 Err(_) => continue,
1035 }
1036 }
1037
1038 if part_metadata_files.is_empty() {
1039 return Err(Error::InvalidInput {
1040 source: format!(
1041 "No partition metadata files found in index directory: {}",
1042 index_dir
1043 )
1044 .into(),
1045 location: location!(),
1046 });
1047 }
1048
1049 Ok(part_metadata_files)
1050}
1051
1052async fn merge_metadata_files(
1054 store: Arc<dyn IndexStore>,
1055 part_metadata_files: &[String],
1056) -> Result<()> {
1057 let mut all_partitions = Vec::new();
1059 let mut params = None;
1060 let mut token_set_format = None;
1061
1062 for file_name in part_metadata_files {
1063 let reader = store.open_index_file(file_name).await?;
1064 let metadata = &reader.schema().metadata;
1065
1066 let partitions_str = metadata.get("partitions").ok_or(Error::Index {
1067 message: format!("partitions not found in {}", file_name),
1068 location: location!(),
1069 })?;
1070
1071 let partition_ids: Vec<u64> =
1072 serde_json::from_str(partitions_str).map_err(|e| Error::Index {
1073 message: format!("Failed to parse partitions: {}", e),
1074 location: location!(),
1075 })?;
1076
1077 all_partitions.extend(partition_ids);
1078
1079 if params.is_none() {
1080 let params_str = metadata.get("params").ok_or(Error::Index {
1081 message: format!("params not found in {}", file_name),
1082 location: location!(),
1083 })?;
1084 params = Some(
1085 serde_json::from_str::<InvertedIndexParams>(params_str).map_err(|e| {
1086 Error::Index {
1087 message: format!("Failed to parse params: {}", e),
1088 location: location!(),
1089 }
1090 })?,
1091 );
1092 }
1093
1094 if token_set_format.is_none() {
1095 if let Some(name) = metadata.get(TOKEN_SET_FORMAT_KEY) {
1096 token_set_format = Some(TokenSetFormat::from_str(name)?);
1097 }
1098 }
1099 }
1100
1101 let mut sorted_ids = all_partitions.clone();
1103 sorted_ids.sort();
1104 sorted_ids.dedup();
1105
1106 let id_mapping: HashMap<u64, u64> = sorted_ids
1107 .iter()
1108 .enumerate()
1109 .map(|(new_id, &old_id)| (old_id, new_id as u64))
1110 .collect();
1111
1112 let timestamp = std::time::SystemTime::now()
1114 .duration_since(std::time::UNIX_EPOCH)
1115 .unwrap()
1116 .as_secs();
1117
1118 let mut temp_files: Vec<(String, String, String)> = Vec::new(); for (&old_id, &new_id) in &id_mapping {
1122 if old_id != new_id {
1123 for suffix in [TOKENS_FILE, INVERT_LIST_FILE, DOCS_FILE] {
1124 let old_path = format!("part_{}_{}", old_id, suffix);
1125 let new_path = format!("part_{}_{}", new_id, suffix);
1126 let temp_path = format!("temp_{}_{}", timestamp, old_path);
1127
1128 if let Err(e) = store.rename_index_file(&old_path, &temp_path).await {
1130 for (temp_name, old_name, _) in temp_files.iter().rev() {
1132 let _ = store.rename_index_file(temp_name, old_name).await;
1133 }
1134 return Err(Error::Index {
1135 message: format!(
1136 "Failed to move {} to temp {}: {}",
1137 old_path, temp_path, e
1138 ),
1139 location: location!(),
1140 });
1141 }
1142 temp_files.push((temp_path, old_path, new_path));
1143 }
1144 }
1145 }
1146
1147 let mut completed_renames: Vec<(String, String)> = Vec::new(); for (temp_path, _old_path, final_path) in &temp_files {
1151 if let Err(e) = store.rename_index_file(temp_path, final_path).await {
1152 for (final_name, temp_name) in completed_renames.iter().rev() {
1154 let _ = store.rename_index_file(final_name, temp_name).await;
1155 }
1156 for (temp_name, orig_name, _) in temp_files.iter() {
1158 if !completed_renames.iter().any(|(_, t)| t == temp_name) {
1159 let _ = store.rename_index_file(temp_name, orig_name).await;
1160 }
1161 }
1162 return Err(Error::Index {
1163 message: format!("Failed to rename {} to {}: {}", temp_path, final_path, e),
1164 location: location!(),
1165 });
1166 }
1167 completed_renames.push((final_path.clone(), temp_path.clone()));
1168 }
1169
1170 let remapped_partitions: Vec<u64> = (0..id_mapping.len() as u64).collect();
1172 let params = params.unwrap_or_default();
1173 let token_set_format = token_set_format.unwrap_or(TokenSetFormat::Arrow);
1174 let builder = InvertedIndexBuilder::from_existing_index(
1175 params,
1176 None,
1177 remapped_partitions.clone(),
1178 token_set_format,
1179 None,
1180 );
1181 builder
1182 .write_metadata(&*store, &remapped_partitions)
1183 .await?;
1184
1185 for file_name in part_metadata_files {
1187 if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
1188 let _ = store.delete_index_file(file_name).await;
1189 }
1190 }
1191
1192 Ok(())
1193}
1194
1195pub fn document_input(
1202 input: SendableRecordBatchStream,
1203 column: &str,
1204) -> Result<SendableRecordBatchStream> {
1205 let schema = input.schema();
1206 let field = schema.column_with_name(column).expect_ok()?.1;
1207 match field.data_type() {
1208 DataType::Utf8 | DataType::LargeUtf8 => Ok(input),
1209 DataType::List(field) | DataType::LargeList(field)
1210 if matches!(field.data_type(), DataType::Utf8 | DataType::LargeUtf8) =>
1211 {
1212 Ok(Box::pin(FlattenStream::new(input)))
1213 }
1214 DataType::LargeBinary => match field.metadata().get(ARROW_EXT_NAME_KEY) {
1215 Some(name) if name.as_str() == JSON_EXT_NAME => {
1216 Ok(Box::pin(JsonTextStream::new(input, column.to_string())))
1217 }
1218 _ => Err(Error::InvalidInput {
1219 source: format!("column {} is not json", column).into(),
1220 location: location!(),
1221 }),
1222 },
1223 _ => Err(Error::InvalidInput {
1224 source: format!(
1225 "column {} has type {}, is not utf8, large utf8 type/list, or large binary",
1226 column,
1227 field.data_type()
1228 )
1229 .into(),
1230 location: location!(),
1231 }),
1232 }
1233}
1234
1235#[cfg(test)]
1236mod tests {
1237 use super::*;
1238 use crate::metrics::NoOpMetricsCollector;
1239 use arrow_array::{RecordBatch, StringArray, UInt64Array};
1240 use arrow_schema::{DataType, Field, Schema};
1241 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1242 use futures::stream;
1243 use lance_core::cache::LanceCache;
1244 use lance_core::utils::tempfile::TempDir;
1245 use lance_core::ROW_ID;
1246 use std::sync::atomic::AtomicU64;
1247
1248 fn make_doc_batch(doc: &str, row_id: u64) -> RecordBatch {
1249 let schema = Arc::new(Schema::new(vec![
1250 Field::new("doc", DataType::Utf8, true),
1251 Field::new(ROW_ID, DataType::UInt64, false),
1252 ]));
1253 let docs = Arc::new(StringArray::from(vec![Some(doc)]));
1254 let row_ids = Arc::new(UInt64Array::from(vec![row_id]));
1255 RecordBatch::try_new(schema, vec![docs, row_ids]).unwrap()
1256 }
1257
1258 #[tokio::test]
1259 async fn test_skip_merge_writes_partitions_as_is() -> Result<()> {
1260 let src_dir = TempDir::default();
1261 let dest_dir = TempDir::default();
1262 let src_store = Arc::new(LanceIndexStore::new(
1263 ObjectStore::local().into(),
1264 src_dir.obj_path(),
1265 Arc::new(LanceCache::no_cache()),
1266 ));
1267 let dest_store = Arc::new(LanceIndexStore::new(
1268 ObjectStore::local().into(),
1269 dest_dir.obj_path(),
1270 Arc::new(LanceCache::no_cache()),
1271 ));
1272
1273 let params = InvertedIndexParams::default();
1274 let tokenizer = params.build()?;
1275 let token_set_format = TokenSetFormat::default();
1276 let id_alloc = Arc::new(AtomicU64::new(0));
1277
1278 let mut worker1 = IndexWorker::new(
1279 src_store.clone(),
1280 tokenizer.clone(),
1281 params.with_position,
1282 id_alloc.clone(),
1283 None,
1284 token_set_format,
1285 )
1286 .await?;
1287 worker1
1288 .process_batch(make_doc_batch("hello world", 0))
1289 .await?;
1290 let mut partitions = worker1.finish().await?;
1291
1292 let mut worker2 = IndexWorker::new(
1293 src_store.clone(),
1294 tokenizer.clone(),
1295 params.with_position,
1296 id_alloc.clone(),
1297 None,
1298 token_set_format,
1299 )
1300 .await?;
1301 worker2
1302 .process_batch(make_doc_batch("goodbye world", 1))
1303 .await?;
1304 partitions.extend(worker2.finish().await?);
1305 partitions.sort_unstable();
1306 assert_eq!(partitions.len(), 2);
1307 assert_ne!(partitions[0], partitions[1]);
1308
1309 let builder = InvertedIndexBuilder::from_existing_index(
1310 InvertedIndexParams::default().skip_merge(true),
1311 Some(src_store.clone()),
1312 partitions.clone(),
1313 token_set_format,
1314 None,
1315 );
1316 builder.write(dest_store.as_ref()).await?;
1317
1318 let metadata_reader = dest_store.open_index_file(METADATA_FILE).await?;
1319 let metadata = &metadata_reader.schema().metadata;
1320 let partitions_str = metadata
1321 .get("partitions")
1322 .expect("partitions missing from metadata");
1323 let written_partitions: Vec<u64> = serde_json::from_str(partitions_str).unwrap();
1324 assert_eq!(written_partitions, partitions);
1325
1326 for id in &partitions {
1327 dest_store.open_index_file(&token_file_path(*id)).await?;
1328 dest_store.open_index_file(&posting_file_path(*id)).await?;
1329 dest_store.open_index_file(&doc_file_path(*id)).await?;
1330 }
1331
1332 Ok(())
1333 }
1334
1335 #[tokio::test]
1336 async fn test_inverted_index_without_positions_tracks_frequency() -> Result<()> {
1337 let index_dir = TempDir::default();
1338 let store = Arc::new(LanceIndexStore::new(
1339 ObjectStore::local().into(),
1340 index_dir.obj_path(),
1341 Arc::new(LanceCache::no_cache()),
1342 ));
1343
1344 let schema = Arc::new(Schema::new(vec![
1345 Field::new("doc", DataType::Utf8, true),
1346 Field::new(ROW_ID, DataType::UInt64, false),
1347 ]));
1348 let docs = Arc::new(StringArray::from(vec![Some("hello hello world")]));
1349 let row_ids = Arc::new(UInt64Array::from(vec![0u64]));
1350 let batch = RecordBatch::try_new(schema.clone(), vec![docs, row_ids])?;
1351 let stream = RecordBatchStreamAdapter::new(schema, stream::iter(vec![Ok(batch)]));
1352 let stream = Box::pin(stream);
1353
1354 let params = InvertedIndexParams::new(
1355 "whitespace".to_string(),
1356 tantivy::tokenizer::Language::English,
1357 )
1358 .with_position(false)
1359 .remove_stop_words(false)
1360 .stem(false)
1361 .max_token_length(None);
1362
1363 let mut builder = InvertedIndexBuilder::new(params);
1364 builder.update(stream, store.as_ref()).await?;
1365
1366 let index = InvertedIndex::load(store, None, &LanceCache::no_cache()).await?;
1367 assert_eq!(index.partitions.len(), 1);
1368 let partition = &index.partitions[0];
1369 let token_id = partition.tokens.get("hello").unwrap();
1370 let posting = partition
1371 .inverted_list
1372 .posting_list(token_id, false, &NoOpMetricsCollector)
1373 .await?;
1374
1375 let mut iter = posting.iter();
1376 let (doc_id, freq, positions) = iter.next().unwrap();
1377 assert_eq!(doc_id, 0);
1378 assert_eq!(freq, 2);
1379 assert!(positions.is_none());
1380 assert!(iter.next().is_none());
1381
1382 Ok(())
1383 }
1384}