1use super::{InvertedIndexParams, index::*};
5use crate::scalar::inverted::document_tokenizer::DocType;
6use crate::scalar::inverted::json::JsonTextStream;
7use crate::scalar::inverted::tokenizer::document_tokenizer::LanceTokenizer;
8#[cfg(test)]
9use crate::scalar::lance_format::LanceIndexStore;
10use crate::scalar::{IndexStore, OldIndexDataFilter};
11use crate::vector::graph::OrderedFloat;
12use crate::{progress::IndexBuildProgress, progress::noop_progress};
13use arrow::array::AsArray;
14use arrow::datatypes;
15use arrow_array::{Array, BinaryArray, RecordBatch, UInt64Array};
16use arrow_schema::{DataType, Field, Schema, SchemaRef};
17use bitpacking::{BitPacker, BitPacker4x};
18use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream};
19use deepsize::DeepSizeOf;
20use fst::Streamer;
21use futures::{Stream, StreamExt, TryStreamExt};
22use lance_arrow::json::JSON_EXT_NAME;
23use lance_arrow::{ARROW_EXT_NAME_KEY, iter_str_array};
24use lance_core::cache::LanceCache;
25use lance_core::error::LanceOptionExt;
26use lance_core::utils::tokio::{IO_CORE_RESERVATION, get_num_compute_intensive_cpus, spawn_cpu};
27use lance_core::{Error, ROW_ID, ROW_ID_FIELD, Result};
28use lance_io::object_store::ObjectStore;
29use object_store::path::Path;
30use roaring::RoaringBitmap;
31use smallvec::SmallVec;
32use std::collections::HashMap;
33use std::pin::Pin;
34use std::str::FromStr;
35use std::sync::Arc;
36use std::sync::LazyLock;
37use std::task::{Context, Poll};
38use std::{fmt::Debug, sync::atomic::AtomicU64};
39use tracing::instrument;
40
41pub const BLOCK_SIZE: usize = BitPacker4x::BLOCK_LEN;
45
46pub static LANCE_FTS_NUM_SHARDS: LazyLock<usize> = LazyLock::new(|| {
50 std::env::var("LANCE_FTS_NUM_SHARDS")
51 .unwrap_or_else(|_| default_num_workers().to_string())
52 .parse()
53 .expect("failed to parse LANCE_FTS_NUM_SHARDS")
54});
55pub static LANCE_FTS_PARTITION_SIZE: LazyLock<u64> = LazyLock::new(|| {
57 std::env::var("LANCE_FTS_PARTITION_SIZE")
58 .unwrap_or_else(|_| "2048".to_string())
59 .parse()
60 .expect("failed to parse LANCE_FTS_PARTITION_SIZE")
61});
62static LANCE_FTS_WRITE_QUEUE_SIZE: LazyLock<usize> = LazyLock::new(|| {
63 std::env::var("LANCE_FTS_WRITE_QUEUE_SIZE")
64 .unwrap_or_else(|_| "1".to_string())
65 .parse()
66 .expect("failed to parse LANCE_FTS_WRITE_QUEUE_SIZE")
67});
68static LANCE_FTS_POSTING_BATCH_ROWS: LazyLock<usize> = LazyLock::new(|| {
69 std::env::var("LANCE_FTS_POSTING_BATCH_ROWS")
70 .unwrap_or_else(|_| "256".to_string())
71 .parse()
72 .expect("failed to parse LANCE_FTS_POSTING_BATCH_ROWS")
73});
74const MAX_RETAINED_TOKEN_IDS: usize = 8 * 1024;
75
76fn default_num_workers() -> usize {
77 let total_cpus = get_num_compute_intensive_cpus() + *IO_CORE_RESERVATION;
78 std::cmp::max(1, total_cpus / 2)
79}
80
81fn resolve_num_workers(params: &InvertedIndexParams) -> usize {
82 let max_workers = get_num_compute_intensive_cpus().max(1);
83 params
84 .num_workers
85 .unwrap_or(*LANCE_FTS_NUM_SHARDS)
86 .clamp(1, max_workers)
87}
88
89fn resolve_worker_memory_limit_bytes(params: &InvertedIndexParams, num_workers: usize) -> u64 {
90 let default_worker_memory_limit_bytes = *LANCE_FTS_PARTITION_SIZE << 20;
91 params
92 .memory_limit_mb
93 .map(|memory_limit_mb| (memory_limit_mb << 20) / num_workers as u64)
94 .unwrap_or(default_worker_memory_limit_bytes)
95}
96
97fn merge_all_tail_partitions(tails: Vec<TailPartition>) -> Result<Option<InnerBuilder>> {
98 if tails.is_empty() {
99 return Ok(None);
100 }
101 merge_tail_partition_group(tails).map(Some)
102}
103
104fn merge_tail_partition_group(group: Vec<TailPartition>) -> Result<InnerBuilder> {
105 let mut group = group.into_iter();
106 let mut merged = group
107 .next()
108 .ok_or_else(|| {
109 Error::invalid_input("cannot merge an empty tail partition group".to_owned())
110 })?
111 .builder;
112 for tail in group {
113 merged.merge_from(tail.builder)?;
114 }
115 Ok(merged)
116}
117
118#[derive(Debug)]
119pub struct InvertedIndexBuilder {
120 params: InvertedIndexParams,
121 pub(crate) partitions: Vec<u64>,
122 new_partitions: Vec<u64>,
123 fragment_mask: Option<u64>,
124 token_set_format: TokenSetFormat,
125 format_version: InvertedListFormatVersion,
126 posting_tail_codec: PostingTailCodec,
127 src_store: Option<Arc<dyn IndexStore>>,
128 progress: Arc<dyn IndexBuildProgress>,
129 deleted_fragments: RoaringBitmap,
130}
131
132impl InvertedIndexBuilder {
133 pub fn new(params: InvertedIndexParams) -> Self {
134 Self::new_with_fragment_mask(params, None)
135 }
136
137 pub fn new_with_fragment_mask(params: InvertedIndexParams, fragment_mask: Option<u64>) -> Self {
138 Self::from_existing_index(
139 params,
140 None,
141 Vec::new(),
142 TokenSetFormat::default(),
143 fragment_mask,
144 RoaringBitmap::new(),
145 )
146 }
147
148 pub fn from_existing_index(
155 params: InvertedIndexParams,
156 store: Option<Arc<dyn IndexStore>>,
157 partitions: Vec<u64>,
158 token_set_format: TokenSetFormat,
159 fragment_mask: Option<u64>,
160 deleted_fragments: RoaringBitmap,
161 ) -> Self {
162 Self {
163 params,
164 partitions,
165 new_partitions: Vec::new(),
166 src_store: store,
167 token_set_format,
168 fragment_mask,
169 format_version: current_fts_format_version(),
170 posting_tail_codec: current_fts_format_version().posting_tail_codec(),
171 progress: noop_progress(),
172 deleted_fragments,
173 }
174 }
175
176 pub fn with_posting_tail_codec(mut self, posting_tail_codec: PostingTailCodec) -> Self {
177 self.format_version =
178 InvertedListFormatVersion::from_posting_tail_codec(posting_tail_codec);
179 self.posting_tail_codec = posting_tail_codec;
180 self
181 }
182
183 pub fn with_format_version(mut self, format_version: InvertedListFormatVersion) -> Self {
184 self.format_version = format_version;
185 self.posting_tail_codec = format_version.posting_tail_codec();
186 self
187 }
188
189 pub fn with_progress(mut self, progress: Arc<dyn IndexBuildProgress>) -> Self {
190 self.progress = progress;
191 self
192 }
193
194 pub async fn update(
195 &mut self,
196 new_data: SendableRecordBatchStream,
197 dest_store: &dyn IndexStore,
198 old_data_filter: Option<crate::scalar::OldIndexDataFilter>,
199 ) -> Result<()> {
200 let schema = new_data.schema();
201 let doc_col = schema.field(0).name();
202
203 if self.params.lance_tokenizer.is_none() {
205 let schema = new_data.schema();
206 let field = schema.column_with_name(doc_col).expect_ok()?.1;
207 let doc_type = DocType::try_from(field)?;
208 self.params.lance_tokenizer = Some(doc_type.as_ref().to_string());
209 }
210
211 let new_data = document_input(new_data, doc_col)?;
212
213 self.progress
214 .stage_start("tokenize_docs", None, "rows")
215 .await?;
216 self.update_index(new_data, dest_store).await?;
217
218 if let Some(OldIndexDataFilter::Fragments { to_remove, .. }) = old_data_filter {
219 self.deleted_fragments.extend(to_remove);
220 }
221
222 self.progress.stage_complete("tokenize_docs").await?;
223 self.write(dest_store).await?;
224 Ok(())
225 }
226
227 #[instrument(level = "debug", skip_all)]
228 async fn update_index(
229 &mut self,
230 stream: SendableRecordBatchStream,
231 dest_store: &dyn IndexStore,
232 ) -> Result<()> {
233 let num_workers = resolve_num_workers(&self.params);
234 let tokenizer = self.params.build()?;
235 let with_position = self.params.with_position;
236 let worker_memory_limit_bytes =
237 resolve_worker_memory_limit_bytes(&self.params, num_workers);
238 let worker_config = IndexWorkerConfig {
239 with_position,
240 format_version: self.format_version,
241 fragment_mask: self.fragment_mask,
242 token_set_format: self.token_set_format,
243 worker_memory_limit_bytes,
244 };
245 let next_id = self.partitions.iter().map(|id| id + 1).max().unwrap_or(0);
246 let id_alloc = Arc::new(AtomicU64::new(next_id));
247 let tokenized_count = Arc::new(AtomicU64::new(0));
248 let (sender, receiver) = async_channel::bounded(num_workers);
249 let dest_store = dest_store.clone_arc();
250 let mut index_tasks = Vec::with_capacity(num_workers);
251 for _ in 0..num_workers {
252 let tokenizer = tokenizer.clone();
253 let receiver: async_channel::Receiver<RecordBatch> = receiver.clone();
254 let dest_store = dest_store.clone();
255 let id_alloc = id_alloc.clone();
256 let progress = self.progress.clone();
257 let tokenized_count = tokenized_count.clone();
258 index_tasks.push(tokio::task::spawn(async move {
259 let mut worker =
260 IndexWorker::new(tokenizer, dest_store, id_alloc, worker_config).await?;
261 while let Ok(batch) = receiver.recv().await {
262 let num_rows = batch.num_rows();
263 worker.process_batch(batch).await?;
264 let tokenized_count = tokenized_count
265 .fetch_add(num_rows as u64, std::sync::atomic::Ordering::Relaxed)
266 + num_rows as u64;
267 progress
268 .stage_progress("tokenize_docs", tokenized_count)
269 .await?;
270 }
271 worker.finish().await
272 }));
273 }
274
275 let index_build = async {
276 drop(receiver);
279
280 let mut stream = Box::pin(stream);
281 log::info!("indexing FTS with {} workers", num_workers);
282
283 let mut last_num_rows = 0;
284 let mut total_num_rows = 0;
285 let start = std::time::Instant::now();
286 while let Some(batch) = stream.try_next().await? {
287 let num_rows = batch.num_rows();
288
289 if sender.send(batch).await.is_err() {
290 break;
294 }
295
296 total_num_rows += num_rows;
297 if total_num_rows >= last_num_rows + 1_000_000 {
298 log::debug!(
299 "indexed {} documents, elapsed: {:?}, speed: {}rows/s",
300 total_num_rows,
301 start.elapsed(),
302 total_num_rows as f32 / start.elapsed().as_secs_f32()
303 );
304 last_num_rows = total_num_rows;
305 }
306 }
307 drop(stream);
309 drop(sender);
310 log::info!("dispatching elapsed: {:?}", start.elapsed());
311
312 let start = std::time::Instant::now();
314 let mut tail_partitions = Vec::new();
315 for index_task in index_tasks {
316 let output = index_task.await??;
317 self.new_partitions.extend(output.partitions);
318 if let Some(tail_partition) = output.tail_partition {
319 tail_partitions.push(tail_partition);
320 }
321 }
322 let merged_tail_partitions =
323 spawn_cpu(move || merge_all_tail_partitions(tail_partitions)).await?;
324 if let Some(builder) = merged_tail_partitions {
325 self.new_partitions.push(builder.id());
326 let mut builder = builder;
327 builder.write(dest_store.as_ref()).await?;
328 }
329 log::info!("wait workers indexing elapsed: {:?}", start.elapsed());
330 Result::Ok(())
331 };
332
333 index_build.await
334 }
335
336 pub async fn remap(
337 &mut self,
338 mapping: &HashMap<u64, Option<u64>>,
339 src_store: Arc<dyn IndexStore>,
340 dest_store: &dyn IndexStore,
341 ) -> Result<()> {
342 for part in self.partitions.iter() {
343 let part = InvertedPartition::load(
344 src_store.clone(),
345 *part,
346 None,
347 &LanceCache::no_cache(),
348 self.token_set_format,
349 )
350 .await?;
351 let mut builder = part.into_builder().await?;
352 builder.remap(mapping).await?;
353 builder.write(dest_store).await?;
354 }
355 if self.fragment_mask.is_none() {
356 self.write_metadata(dest_store, &self.partitions).await?;
357 } else {
358 for &partition_id in &self.partitions {
360 self.write_part_metadata(dest_store, partition_id).await?;
361 }
362 }
363 Ok(())
364 }
365
366 async fn write_metadata(&self, dest_store: &dyn IndexStore, partitions: &[u64]) -> Result<()> {
367 let mut serialized_deleted_fragments =
368 Vec::with_capacity(self.deleted_fragments.serialized_size());
369 self.deleted_fragments
370 .serialize_into(&mut serialized_deleted_fragments)?;
371
372 let mut metadata = HashMap::from_iter(vec![
373 ("partitions".to_owned(), serde_json::to_string(&partitions)?),
374 ("params".to_owned(), serde_json::to_string(&self.params)?),
375 (
376 TOKEN_SET_FORMAT_KEY.to_owned(),
377 self.token_set_format.to_string(),
378 ),
379 (
380 POSTING_TAIL_CODEC_KEY.to_owned(),
381 self.posting_tail_codec.as_str().to_owned(),
382 ),
383 ]);
384
385 if self.params.with_position && self.format_version.uses_shared_position_stream() {
386 metadata.insert(
387 POSITIONS_LAYOUT_KEY.to_owned(),
388 POSITIONS_LAYOUT_SHARED_STREAM_V2.to_owned(),
389 );
390 metadata.insert(
391 POSITIONS_CODEC_KEY.to_owned(),
392 self.format_version
393 .position_codec()
394 .expect("shared positions require a codec")
395 .as_str()
396 .to_owned(),
397 );
398 }
399
400 let metadata_file_schema = Arc::new(Schema::new(vec![Field::new(
401 DELETED_FRAGMENTS_COL,
402 DataType::Binary,
403 false,
404 )]));
405 let deleted_fragments_col = Arc::new(BinaryArray::from(vec![
406 serialized_deleted_fragments.as_slice(),
407 ])) as Arc<dyn Array>;
408 let record_batch =
409 RecordBatch::try_new(metadata_file_schema.clone(), vec![deleted_fragments_col])?;
410
411 let mut writer = dest_store
412 .new_index_file(METADATA_FILE, metadata_file_schema)
413 .await?;
414 writer.write_record_batch(record_batch).await?;
415 writer.finish_with_metadata(metadata).await?;
416 Ok(())
417 }
418
419 pub(crate) async fn write_part_metadata(
424 &self,
425 dest_store: &dyn IndexStore,
426 partition: u64, ) -> Result<()> {
428 let partitions = vec![partition];
429 let mut metadata = HashMap::from_iter(vec![
430 ("partitions".to_owned(), serde_json::to_string(&partitions)?),
431 ("params".to_owned(), serde_json::to_string(&self.params)?),
432 (
433 TOKEN_SET_FORMAT_KEY.to_owned(),
434 self.token_set_format.to_string(),
435 ),
436 (
437 POSTING_TAIL_CODEC_KEY.to_owned(),
438 self.posting_tail_codec.as_str().to_owned(),
439 ),
440 ]);
441 if self.params.with_position && self.format_version.uses_shared_position_stream() {
442 metadata.insert(
443 POSITIONS_LAYOUT_KEY.to_owned(),
444 POSITIONS_LAYOUT_SHARED_STREAM_V2.to_owned(),
445 );
446 metadata.insert(
447 POSITIONS_CODEC_KEY.to_owned(),
448 self.format_version
449 .position_codec()
450 .expect("shared positions require a codec")
451 .as_str()
452 .to_owned(),
453 );
454 }
455 let file_name = part_metadata_file_path(partition);
457 let mut writer = dest_store
458 .new_index_file(&file_name, Arc::new(Schema::empty()))
459 .await?;
460 writer.finish_with_metadata(metadata).await?;
461 Ok(())
462 }
463
464 async fn write_metadata_with_progress(
465 &self,
466 dest_store: &dyn IndexStore,
467 partitions: &[u64],
468 ) -> Result<()> {
469 let total = if self.fragment_mask.is_none() {
470 Some(1)
471 } else {
472 Some(partitions.len() as u64)
473 };
474 self.progress
475 .stage_start("write_metadata", total, "files")
476 .await?;
477 if self.fragment_mask.is_none() {
478 self.write_metadata(dest_store, partitions).await?;
479 self.progress.stage_progress("write_metadata", 1).await?;
480 } else {
481 let mut completed = 0;
482 for &partition_id in partitions {
483 self.write_part_metadata(dest_store, partition_id).await?;
484 completed += 1;
485 self.progress
486 .stage_progress("write_metadata", completed)
487 .await?;
488 }
489 }
490 self.progress.stage_complete("write_metadata").await?;
491 Ok(())
492 }
493
494 async fn write(&self, dest_store: &dyn IndexStore) -> Result<()> {
495 let mut partitions = Vec::with_capacity(self.partitions.len() + self.new_partitions.len());
496 partitions.extend_from_slice(&self.partitions);
497 partitions.extend_from_slice(&self.new_partitions);
498 partitions.sort_unstable();
499
500 self.progress
501 .stage_start(
502 "copy_partitions",
503 Some(partitions.len() as u64),
504 "partitions",
505 )
506 .await?;
507 let mut copied = 0;
508 for part in self.partitions.iter() {
509 self.src_store
510 .as_ref()
511 .expect("existing partitions require a source store")
512 .copy_index_file(&token_file_path(*part), dest_store)
513 .await?;
514 self.src_store
515 .as_ref()
516 .expect("existing partitions require a source store")
517 .copy_index_file(&posting_file_path(*part), dest_store)
518 .await?;
519 self.src_store
520 .as_ref()
521 .expect("existing partitions require a source store")
522 .copy_index_file(&doc_file_path(*part), dest_store)
523 .await?;
524 copied += 1;
525 self.progress
526 .stage_progress("copy_partitions", copied)
527 .await?;
528 }
529 for _part in self.new_partitions.iter() {
530 copied += 1;
531 self.progress
532 .stage_progress("copy_partitions", copied)
533 .await?;
534 }
535 self.progress.stage_complete("copy_partitions").await?;
536
537 self.write_metadata_with_progress(dest_store, &partitions)
538 .await?;
539 Ok(())
540 }
541}
542
543impl Default for InvertedIndexBuilder {
544 fn default() -> Self {
545 let params = InvertedIndexParams::default();
546 Self::new(params)
547 }
548}
549
550#[derive(Debug)]
552pub struct InnerBuilder {
553 id: u64,
554 with_position: bool,
555 token_set_format: TokenSetFormat,
556 format_version: InvertedListFormatVersion,
557 posting_tail_codec: PostingTailCodec,
558 pub(crate) tokens: TokenSet,
559 pub(crate) posting_lists: Vec<PostingListBuilder>,
560 pub(crate) docs: DocSet,
561}
562
563impl InnerBuilder {
564 pub fn new(id: u64, with_position: bool, token_set_format: TokenSetFormat) -> Self {
565 Self::new_with_format_version(
566 id,
567 with_position,
568 token_set_format,
569 current_fts_format_version(),
570 )
571 }
572
573 pub fn new_with_format_version(
574 id: u64,
575 with_position: bool,
576 token_set_format: TokenSetFormat,
577 format_version: InvertedListFormatVersion,
578 ) -> Self {
579 Self {
580 id,
581 with_position,
582 token_set_format,
583 format_version,
584 posting_tail_codec: format_version.posting_tail_codec(),
585 tokens: TokenSet::default(),
586 posting_lists: Vec::new(),
587 docs: DocSet::default(),
588 }
589 }
590
591 pub fn new_with_posting_tail_codec(
592 id: u64,
593 with_position: bool,
594 token_set_format: TokenSetFormat,
595 posting_tail_codec: PostingTailCodec,
596 ) -> Self {
597 let format_version = if posting_tail_codec == PostingTailCodec::Fixed32 {
598 InvertedListFormatVersion::V1
599 } else {
600 InvertedListFormatVersion::V2
601 };
602 let mut builder =
603 Self::new_with_format_version(id, with_position, token_set_format, format_version);
604 builder.posting_tail_codec = posting_tail_codec;
605 builder
606 }
607
608 pub fn id(&self) -> u64 {
609 self.id
610 }
611
612 pub fn set_tokens(&mut self, tokens: TokenSet) {
614 self.tokens = tokens;
615 }
616
617 pub fn set_docs(&mut self, docs: DocSet) {
619 self.docs = docs;
620 }
621
622 pub fn set_posting_lists(&mut self, posting_lists: Vec<PostingListBuilder>) {
624 self.posting_lists = posting_lists;
625 }
626
627 pub async fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
628 let removed = self.docs.remap(mapping);
631
632 let mut token_id = 0;
636 let mut removed_token_ids = Vec::new();
637 self.posting_lists.retain_mut(|posting_list| {
638 posting_list.remap(&removed);
639 let keep = !posting_list.is_empty();
640 if !keep {
641 removed_token_ids.push(token_id as u32);
642 }
643 token_id += 1;
644 keep
645 });
646
647 self.tokens.remap(&removed_token_ids);
649
650 Ok(())
651 }
652
653 pub fn merge_from(&mut self, other: Self) -> Result<()> {
654 let Self {
655 id: _,
656 with_position,
657 token_set_format,
658 format_version,
659 posting_tail_codec,
660 tokens,
661 posting_lists,
662 docs,
663 } = other;
664
665 if self.with_position != with_position {
666 return Err(Error::index(format!(
667 "cannot merge partitions with mismatched positions settings: {} vs {}",
668 self.with_position, with_position
669 )));
670 }
671 if self.token_set_format != token_set_format {
672 return Err(Error::index(format!(
673 "cannot merge partitions with mismatched token set formats: {:?} vs {:?}",
674 self.token_set_format, token_set_format
675 )));
676 }
677 if self.format_version != format_version {
678 return Err(Error::index(format!(
679 "cannot merge partitions with mismatched FTS format versions: {:?} vs {:?}",
680 self.format_version, format_version
681 )));
682 }
683 if self.posting_tail_codec != posting_tail_codec {
684 return Err(Error::index(format!(
685 "cannot merge partitions with mismatched posting tail codecs: {:?} vs {:?}",
686 self.posting_tail_codec, posting_tail_codec
687 )));
688 }
689
690 let mut token_id_map = vec![u32::MAX; posting_lists.len()];
691 match tokens.tokens {
692 TokenMap::HashMap(map) => {
693 for (token, token_id) in map {
694 let new_token_id = self.tokens.get_or_add(token.as_str());
695 token_id_map[token_id as usize] = new_token_id;
696 }
697 }
698 TokenMap::Fst(map) => {
699 let mut stream = map.stream();
700 while let Some((token, token_id)) = stream.next() {
701 let new_token_id = self
702 .tokens
703 .get_or_add(String::from_utf8_lossy(token).as_ref());
704 token_id_map[token_id as usize] = new_token_id;
705 }
706 }
707 }
708
709 let doc_id_offset = self.docs.len() as u32;
710 for (row_id, num_tokens) in docs.iter() {
711 self.docs.append(*row_id, *num_tokens);
712 }
713 self.posting_lists.resize_with(self.tokens.len(), || {
714 PostingListBuilder::new_with_posting_tail_codec(with_position, self.posting_tail_codec)
715 });
716
717 for (token_id, posting_list) in posting_lists.into_iter().enumerate() {
718 if posting_list.is_empty() {
719 continue;
720 }
721 let new_token_id = token_id_map[token_id];
722 debug_assert_ne!(new_token_id, u32::MAX);
723 let merged_posting = &mut self.posting_lists[new_token_id as usize];
724 posting_list.for_each_entry(|doc_id, freq, positions| {
725 let positions = match positions {
726 Some(positions) => PositionRecorder::Position(positions.into()),
727 None => PositionRecorder::Count(freq),
728 };
729 merged_posting.add(doc_id_offset + doc_id, positions);
730 Ok::<(), Error>(())
731 })?;
732 }
733
734 Ok(())
735 }
736
737 pub async fn write(&mut self, store: &dyn IndexStore) -> Result<()> {
738 let docs = Arc::new(std::mem::take(&mut self.docs));
739 self.write_posting_lists(store, docs.clone()).await?;
740 self.write_tokens(store).await?;
741 self.write_docs(store, docs).await?;
742 Ok(())
743 }
744
745 #[instrument(level = "debug", skip_all)]
746 async fn write_posting_lists(
747 &mut self,
748 store: &dyn IndexStore,
749 docs: Arc<DocSet>,
750 ) -> Result<()> {
751 let id = self.id;
752 let mut writer = store
753 .new_index_file(
754 &posting_file_path(self.id),
755 inverted_list_schema_for_version(self.with_position, self.format_version),
756 )
757 .await?;
758 let posting_lists = std::mem::take(&mut self.posting_lists);
759
760 log::info!(
761 "writing {} posting lists of partition {}, with position {}",
762 posting_lists.len(),
763 id,
764 self.with_position
765 );
766 let with_position = self.with_position;
767 let format_version = self.format_version;
768 let schema = inverted_list_schema_for_version(self.with_position, self.format_version);
769 let docs_for_batches = docs.clone();
770 let schema_for_batches = schema.clone();
771 let batch_rows = *LANCE_FTS_POSTING_BATCH_ROWS;
772 let (tx, rx) = async_channel::bounded(*LANCE_FTS_WRITE_QUEUE_SIZE);
773 let producer = spawn_cpu(move || {
774 let mut batch_builder = PostingListBatchBuilder::new(
775 schema_for_batches.clone(),
776 with_position,
777 format_version,
778 batch_rows,
779 );
780 for posting_list in posting_lists {
781 posting_list.append_to_batch_with_docs(
782 &docs_for_batches,
783 &mut batch_builder,
784 format_version,
785 )?;
786 if batch_builder.len() < batch_rows {
787 continue;
788 }
789
790 let batch = batch_builder.finish()?;
791 if let Err(err) = tx.send_blocking(batch) {
792 return Err(Error::execution(format!(
793 "failed to send posting list batch to writer: {err}"
794 )));
795 }
796 }
797
798 if !batch_builder.is_empty() {
799 let batch = batch_builder.finish()?;
800 if let Err(err) = tx.send_blocking(batch) {
801 return Err(Error::execution(format!(
802 "failed to send posting list batch to writer: {err}"
803 )));
804 }
805 }
806
807 Result::Ok(())
808 });
809
810 while let Ok(batch) = rx.recv().await {
811 if let Err(err) = writer.write_record_batch(batch).await {
812 drop(rx);
813 let _ = producer.await;
815 return Err(err);
816 }
817 }
818 drop(rx);
819 producer.await?;
820
821 writer.finish().await?;
822 Ok(())
823 }
824
825 #[instrument(level = "debug", skip_all)]
826 async fn write_tokens(&mut self, store: &dyn IndexStore) -> Result<()> {
827 log::info!("writing tokens of partition {}", self.id);
828 let tokens = std::mem::take(&mut self.tokens);
829 let batch = tokens.to_batch(self.token_set_format)?;
830 let mut writer = store
831 .new_index_file(&token_file_path(self.id), batch.schema())
832 .await?;
833 writer.write_record_batch(batch).await?;
834 writer.finish().await?;
835 Ok(())
836 }
837
838 #[instrument(level = "debug", skip_all)]
839 async fn write_docs(&mut self, store: &dyn IndexStore, docs: Arc<DocSet>) -> Result<()> {
840 log::info!("writing docs of partition {}", self.id);
841 let batch = docs.to_batch()?;
842 let mut writer = store
843 .new_index_file(&doc_file_path(self.id), batch.schema())
844 .await?;
845 writer.write_record_batch(batch).await?;
846 writer.finish().await?;
847 Ok(())
848 }
849}
850
851struct IndexWorker {
852 tokenizer: Box<dyn LanceTokenizer>,
853 dest_store: Arc<dyn IndexStore>,
854 id_alloc: Arc<AtomicU64>,
855 builder: InnerBuilder,
856 partitions: Vec<u64>,
857 schema: SchemaRef,
858 memory_size: u64,
859 worker_memory_limit_bytes: u64,
860 total_doc_length: usize,
861 fragment_mask: Option<u64>,
862 token_set_format: TokenSetFormat,
863 token_ids: Vec<u32>,
864 last_token_count: usize,
865}
866
867struct TailPartition {
868 builder: InnerBuilder,
869}
870
871struct WorkerOutput {
872 partitions: Vec<u64>,
873 tail_partition: Option<TailPartition>,
874}
875
876#[derive(Debug, Clone, Copy)]
877struct IndexWorkerConfig {
878 with_position: bool,
879 format_version: InvertedListFormatVersion,
880 fragment_mask: Option<u64>,
881 token_set_format: TokenSetFormat,
882 worker_memory_limit_bytes: u64,
883}
884
885impl IndexWorker {
886 fn posting_lists_overhead_size(&self) -> u64 {
887 (self.builder.posting_lists.capacity() * std::mem::size_of::<PostingListBuilder>()) as u64
888 }
889
890 fn adjust_tracked_value(tracked: &mut u64, old: u64, new: u64) {
891 if new >= old {
892 *tracked += new - old;
893 } else {
894 *tracked -= old - new;
895 }
896 }
897
898 fn adjust_tracked_memory_size(&mut self, old_memory_size: u64, new_memory_size: u64) {
899 Self::adjust_tracked_value(&mut self.memory_size, old_memory_size, new_memory_size);
900 }
901
902 fn apply_delta(total: &mut u64, delta: i64) {
903 if delta >= 0 {
904 *total += delta as u64;
905 } else {
906 *total -= (-delta) as u64;
907 }
908 }
909
910 fn temporary_memory_size(&self) -> u64 {
911 (self.token_ids.capacity() * std::mem::size_of::<u32>()) as u64
912 }
913
914 fn trim_temporary_buffers(&mut self) {
915 if self.token_ids.capacity() > MAX_RETAINED_TOKEN_IDS {
916 self.token_ids = Vec::with_capacity(self.last_token_count.min(MAX_RETAINED_TOKEN_IDS));
917 }
918 }
919
920 async fn new(
921 tokenizer: Box<dyn LanceTokenizer>,
922 dest_store: Arc<dyn IndexStore>,
923 id_alloc: Arc<AtomicU64>,
924 config: IndexWorkerConfig,
925 ) -> Result<Self> {
926 let schema = inverted_list_schema_for_version(config.with_position, config.format_version);
927
928 Ok(Self {
929 tokenizer,
930 dest_store,
931 builder: InnerBuilder::new_with_format_version(
932 id_alloc.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
933 | config.fragment_mask.unwrap_or(0),
934 config.with_position,
935 config.token_set_format,
936 config.format_version,
937 ),
938 partitions: Vec::new(),
939 id_alloc,
940 schema,
941 memory_size: 0,
942 worker_memory_limit_bytes: config.worker_memory_limit_bytes,
943 total_doc_length: 0,
944 fragment_mask: config.fragment_mask,
945 token_set_format: config.token_set_format,
946 token_ids: Vec::new(),
947 last_token_count: 0,
948 })
949 }
950
951 fn has_position(&self) -> bool {
952 self.schema
953 .column_with_name(COMPRESSED_POSITION_COL)
954 .is_some()
955 || self.schema.column_with_name(POSITION_COL).is_some()
956 }
957
958 async fn process_batch(&mut self, batch: RecordBatch) -> Result<()> {
959 let doc_col = batch.column(0);
960 let doc_iter = iter_str_array(doc_col);
961 let row_id_col = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
962 let docs = doc_iter
963 .zip(row_id_col.values().iter())
964 .filter_map(|(doc, row_id)| doc.map(|doc| (doc, *row_id)));
965
966 let with_position = self.has_position();
967 for (doc, row_id) in docs {
968 let builder_was_empty = self.builder.docs.is_empty();
969 let old_temporary_memory_size = self.temporary_memory_size();
970 let old_token_memory_size = self.builder.tokens.memory_size() as u64;
971 let doc_id = self.builder.docs.len() as u32;
972 let mut token_num: u32 = 0;
973 let mut posting_memory_delta = 0i64;
974 if with_position {
975 if self.token_ids.capacity() < self.last_token_count {
976 self.token_ids
977 .reserve(self.last_token_count - self.token_ids.capacity());
978 }
979 self.token_ids.clear();
980 let builder = &mut self.builder;
981 let token_ids = &mut self.token_ids;
982 let memory_size = &mut self.memory_size;
983 let posting_tail_codec = builder.posting_tail_codec;
984
985 let mut token_stream = self.tokenizer.token_stream_for_doc(doc);
986 while token_stream.advance() {
987 let token = token_stream.token_mut();
988 let token_text = std::mem::take(&mut token.text);
989 let token_id = builder.tokens.add(token_text);
990 if token_id as usize == builder.posting_lists.len() {
991 let old_posting_lists_overhead_size = (builder.posting_lists.capacity()
992 * std::mem::size_of::<PostingListBuilder>())
993 as u64;
994 builder.posting_lists.push(
995 PostingListBuilder::new_with_posting_tail_codec(
996 true,
997 posting_tail_codec,
998 ),
999 );
1000 let new_posting_lists_overhead_size = (builder.posting_lists.capacity()
1001 * std::mem::size_of::<PostingListBuilder>())
1002 as u64;
1003 Self::adjust_tracked_value(
1004 memory_size,
1005 old_posting_lists_overhead_size,
1006 new_posting_lists_overhead_size,
1007 );
1008 }
1009 let posting_list = &mut builder.posting_lists[token_id as usize];
1010 let old_posting_memory_size = posting_list.size();
1011 if posting_list.add_occurrence(doc_id, token.position as u32)? {
1012 token_ids.push(token_id);
1013 }
1014 let new_posting_memory_size = posting_list.size();
1015 posting_memory_delta +=
1016 new_posting_memory_size as i64 - old_posting_memory_size as i64;
1017 token_num += 1;
1018 }
1019 } else {
1020 if self.token_ids.capacity() < self.last_token_count {
1021 self.token_ids
1022 .reserve(self.last_token_count - self.token_ids.capacity());
1023 }
1024 self.token_ids.clear();
1025
1026 let mut token_stream = self.tokenizer.token_stream_for_doc(doc);
1027 while token_stream.advance() {
1028 let token = token_stream.token_mut();
1029 let token_text = std::mem::take(&mut token.text);
1030 let token_id = self.builder.tokens.add(token_text);
1031 self.token_ids.push(token_id);
1032 token_num += 1;
1033 }
1034 }
1035 self.adjust_tracked_memory_size(
1036 old_token_memory_size,
1037 self.builder.tokens.memory_size() as u64,
1038 );
1039
1040 if !with_position {
1041 let old_posting_lists_overhead_size = self.posting_lists_overhead_size();
1042 self.builder
1043 .posting_lists
1044 .resize_with(self.builder.tokens.len(), || {
1045 PostingListBuilder::new_with_posting_tail_codec(
1046 false,
1047 self.builder.posting_tail_codec,
1048 )
1049 });
1050 let new_posting_lists_overhead_size = self.posting_lists_overhead_size();
1051 Self::adjust_tracked_value(
1052 &mut self.memory_size,
1053 old_posting_lists_overhead_size,
1054 new_posting_lists_overhead_size,
1055 );
1056 }
1057
1058 let old_doc_memory_size = self.builder.docs.memory_size() as u64;
1059 let appended_doc_id = self.builder.docs.append(row_id, token_num);
1060 debug_assert_eq!(appended_doc_id, doc_id);
1061 self.adjust_tracked_memory_size(
1062 old_doc_memory_size,
1063 self.builder.docs.memory_size() as u64,
1064 );
1065 self.total_doc_length += doc.len();
1066
1067 if with_position {
1068 for &token_id in &self.token_ids {
1069 let (old_posting_memory_size, new_posting_memory_size) = {
1070 let posting_list = &mut self.builder.posting_lists[token_id as usize];
1071 let old_posting_memory_size = posting_list.size();
1072 posting_list.finish_open_doc(doc_id)?;
1073 let new_posting_memory_size = posting_list.size();
1074 (old_posting_memory_size, new_posting_memory_size)
1075 };
1076 posting_memory_delta +=
1077 new_posting_memory_size as i64 - old_posting_memory_size as i64;
1078 }
1079 Self::apply_delta(&mut self.memory_size, posting_memory_delta);
1080 } else if token_num > 0 {
1081 self.token_ids.sort_unstable();
1082 let mut iter = self.token_ids.iter();
1083 let mut current = *iter.next().unwrap();
1084 let mut count = 1u32;
1085 for &token_id in iter {
1086 if token_id == current {
1087 count += 1;
1088 continue;
1089 }
1090
1091 let (old_posting_memory_size, new_posting_memory_size) = {
1092 let posting_list = &mut self.builder.posting_lists[current as usize];
1093 let old_posting_memory_size = posting_list.size();
1094 posting_list.add(doc_id, PositionRecorder::Count(count));
1095 let new_posting_memory_size = posting_list.size();
1096 (old_posting_memory_size, new_posting_memory_size)
1097 };
1098 posting_memory_delta +=
1099 new_posting_memory_size as i64 - old_posting_memory_size as i64;
1100
1101 current = token_id;
1102 count = 1;
1103 }
1104 let (old_posting_memory_size, new_posting_memory_size) = {
1105 let posting_list = &mut self.builder.posting_lists[current as usize];
1106 let old_posting_memory_size = posting_list.size();
1107 posting_list.add(doc_id, PositionRecorder::Count(count));
1108 let new_posting_memory_size = posting_list.size();
1109 (old_posting_memory_size, new_posting_memory_size)
1110 };
1111 posting_memory_delta +=
1112 new_posting_memory_size as i64 - old_posting_memory_size as i64;
1113 Self::apply_delta(&mut self.memory_size, posting_memory_delta);
1114 }
1115 self.last_token_count = self.token_ids.len();
1116 self.trim_temporary_buffers();
1117 self.adjust_tracked_memory_size(
1118 old_temporary_memory_size,
1119 self.temporary_memory_size(),
1120 );
1121
1122 if self.builder.docs.len() == 1 && self.memory_size > self.worker_memory_limit_bytes {
1123 return Err(Error::invalid_input(format!(
1124 "single document row_id={} exceeds worker memory limit: {} > {} bytes",
1125 row_id, self.memory_size, self.worker_memory_limit_bytes
1126 )));
1127 }
1128
1129 if self.builder.docs.len() as u32 == u32::MAX
1130 || (!builder_was_empty && self.memory_size >= self.worker_memory_limit_bytes)
1131 {
1132 self.flush().await?;
1133 }
1134 }
1135
1136 Ok(())
1137 }
1138
1139 #[instrument(level = "debug", skip_all)]
1140 async fn flush(&mut self) -> Result<()> {
1141 if self.builder.tokens.is_empty() {
1142 return Ok(());
1143 }
1144
1145 log::info!(
1146 "flushing posting lists, memory size: {} MiB",
1147 self.memory_size / (1024 * 1024)
1148 );
1149 self.memory_size = self.temporary_memory_size();
1150 let with_position = self.has_position();
1151 let format_version = self.builder.format_version;
1152 let builder = std::mem::replace(
1153 &mut self.builder,
1154 InnerBuilder::new_with_format_version(
1155 self.id_alloc
1156 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
1157 | self.fragment_mask.unwrap_or(0),
1158 with_position,
1159 self.token_set_format,
1160 format_version,
1161 ),
1162 );
1163 let written_partition_id = builder.id();
1164 let mut builder = builder;
1165 builder
1166 .write(self.dest_store.as_ref())
1167 .await
1168 .map_err(|err| {
1169 Error::execution(format!(
1170 "failed to write finalized partition {}: {err}",
1171 written_partition_id
1172 ))
1173 })?;
1174 self.partitions.push(written_partition_id);
1175 Ok(())
1176 }
1177
1178 async fn finish(self) -> Result<WorkerOutput> {
1179 let tail_partition = if self.builder.tokens.is_empty() {
1180 None
1181 } else {
1182 Some(TailPartition {
1183 builder: self.builder,
1184 })
1185 };
1186 Ok(WorkerOutput {
1187 partitions: self.partitions,
1188 tail_partition,
1189 })
1190 }
1191}
1192
1193#[derive(Debug, Clone)]
1194pub enum PositionRecorder {
1195 Position(SmallVec<[u32; 2]>),
1196 Count(u32),
1197}
1198
1199impl PositionRecorder {
1200 pub fn len(&self) -> u32 {
1201 match self {
1202 Self::Position(positions) => positions.len() as u32,
1203 Self::Count(count) => *count,
1204 }
1205 }
1206
1207 pub fn is_empty(&self) -> bool {
1208 self.len() == 0
1209 }
1210
1211 pub fn into_vec(self) -> Vec<u32> {
1212 match self {
1213 Self::Position(positions) => positions.into_vec(),
1214 Self::Count(_) => vec![0],
1215 }
1216 }
1217}
1218
1219#[derive(Debug, Eq, PartialEq, Clone, DeepSizeOf)]
1220pub struct ScoredDoc {
1221 pub row_id: u64,
1222 pub score: OrderedFloat,
1223}
1224
1225impl ScoredDoc {
1226 pub fn new(row_id: u64, score: f32) -> Self {
1227 Self {
1228 row_id,
1229 score: OrderedFloat(score),
1230 }
1231 }
1232}
1233
1234impl PartialOrd for ScoredDoc {
1235 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1236 Some(self.cmp(other))
1237 }
1238}
1239
1240impl Ord for ScoredDoc {
1241 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1242 self.score.cmp(&other.score)
1243 }
1244}
1245
1246pub fn legacy_inverted_list_schema(with_position: bool) -> SchemaRef {
1247 let mut fields = vec![
1248 arrow_schema::Field::new(ROW_ID, arrow_schema::DataType::UInt64, false),
1249 arrow_schema::Field::new(FREQUENCY_COL, arrow_schema::DataType::Float32, false),
1250 ];
1251 if with_position {
1252 fields.push(arrow_schema::Field::new(
1253 POSITION_COL,
1254 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1255 "item",
1256 arrow_schema::DataType::Int32,
1257 true,
1258 ))),
1259 false,
1260 ));
1261 }
1262 Arc::new(arrow_schema::Schema::new(fields))
1263}
1264
1265pub fn inverted_list_schema(with_position: bool) -> SchemaRef {
1266 inverted_list_schema_for_version(with_position, current_fts_format_version())
1267}
1268
1269pub fn inverted_list_schema_for_version(
1270 with_position: bool,
1271 format_version: InvertedListFormatVersion,
1272) -> SchemaRef {
1273 match format_version {
1274 InvertedListFormatVersion::V1 => inverted_list_schema_v1(with_position),
1275 InvertedListFormatVersion::V2 => inverted_list_schema_with_tail_codec_and_position_codec(
1276 with_position,
1277 PostingTailCodec::VarintDelta,
1278 Some(PositionStreamCodec::PackedDelta),
1279 ),
1280 }
1281}
1282
1283fn inverted_list_schema_v1(with_position: bool) -> SchemaRef {
1284 let mut fields = vec![
1285 arrow_schema::Field::new(
1286 POSTING_COL,
1287 datatypes::DataType::List(Arc::new(Field::new(
1288 "item",
1289 datatypes::DataType::LargeBinary,
1290 true,
1291 ))),
1292 false,
1293 ),
1294 arrow_schema::Field::new(MAX_SCORE_COL, datatypes::DataType::Float32, false),
1295 arrow_schema::Field::new(LENGTH_COL, datatypes::DataType::UInt32, false),
1296 ];
1297 if with_position {
1298 fields.push(arrow_schema::Field::new(
1299 POSITION_COL,
1300 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1301 "item",
1302 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1303 "item",
1304 arrow_schema::DataType::LargeBinary,
1305 true,
1306 ))),
1307 true,
1308 ))),
1309 false,
1310 ));
1311 }
1312 Arc::new(arrow_schema::Schema::new(fields))
1313}
1314
1315pub fn inverted_list_schema_with_tail_codec(
1316 with_position: bool,
1317 posting_tail_codec: PostingTailCodec,
1318) -> SchemaRef {
1319 inverted_list_schema_with_tail_codec_and_position_codec(
1320 with_position,
1321 posting_tail_codec,
1322 Some(PositionStreamCodec::PackedDelta),
1323 )
1324}
1325
1326fn inverted_list_schema_with_tail_codec_and_position_codec(
1327 with_position: bool,
1328 posting_tail_codec: PostingTailCodec,
1329 position_codec: Option<PositionStreamCodec>,
1330) -> SchemaRef {
1331 let mut fields = vec![
1332 arrow_schema::Field::new(
1335 POSTING_COL,
1336 datatypes::DataType::List(Arc::new(Field::new(
1337 "item",
1338 datatypes::DataType::LargeBinary,
1339 true,
1340 ))),
1341 false,
1342 ),
1343 arrow_schema::Field::new(MAX_SCORE_COL, datatypes::DataType::Float32, false),
1344 arrow_schema::Field::new(LENGTH_COL, datatypes::DataType::UInt32, false),
1345 ];
1346 if with_position {
1347 fields.push(arrow_schema::Field::new(
1348 COMPRESSED_POSITION_COL,
1349 arrow_schema::DataType::LargeBinary,
1350 false,
1351 ));
1352 fields.push(arrow_schema::Field::new(
1353 POSITION_BLOCK_OFFSET_COL,
1354 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1355 "item",
1356 arrow_schema::DataType::UInt32,
1357 true,
1358 ))),
1359 false,
1360 ));
1361 }
1362 let mut metadata = HashMap::from([(
1363 POSTING_TAIL_CODEC_KEY.to_owned(),
1364 posting_tail_codec.as_str().to_owned(),
1365 )]);
1366 if let Some(position_codec) = position_codec.filter(|_| with_position) {
1367 metadata.insert(
1368 POSITIONS_LAYOUT_KEY.to_owned(),
1369 POSITIONS_LAYOUT_SHARED_STREAM_V2.to_owned(),
1370 );
1371 metadata.insert(
1372 POSITIONS_CODEC_KEY.to_owned(),
1373 position_codec.as_str().to_owned(),
1374 );
1375 }
1376 Arc::new(arrow_schema::Schema::new_with_metadata(fields, metadata))
1377}
1378
1379pub struct FlattenStream {
1381 inner: SendableRecordBatchStream,
1385 field_type: DataType,
1386 data_type: DataType,
1387}
1388
1389impl FlattenStream {
1390 pub fn new(input: SendableRecordBatchStream) -> Self {
1391 let schema = input.schema();
1392 let field = schema.field(0);
1393 let data_type = match field.data_type() {
1394 DataType::List(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
1395 DataType::List(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
1396 DataType::LargeUtf8
1397 }
1398 DataType::LargeList(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
1399 DataType::LargeList(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
1400 DataType::LargeUtf8
1401 }
1402 _ => panic!(
1403 "expect data type List(Utf8) or List(LargeUtf8) but got {:?}",
1404 field.data_type()
1405 ),
1406 };
1407 Self {
1408 inner: input,
1409 field_type: field.data_type().clone(),
1410 data_type,
1411 }
1412 }
1413}
1414
1415impl Stream for FlattenStream {
1416 type Item = datafusion_common::Result<RecordBatch>;
1417
1418 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1419 match Pin::new(&mut self.inner).poll_next(cx) {
1420 Poll::Ready(Some(Ok(batch))) => {
1421 let doc_col = batch.column(0);
1422 let batch = match self.field_type {
1423 DataType::List(_) => flatten_string_list::<i32>(&batch, doc_col).map_err(|e| {
1424 datafusion_common::error::DataFusionError::Execution(format!(
1425 "flatten string list error: {}",
1426 e
1427 ))
1428 }),
1429 DataType::LargeList(_) => {
1430 flatten_string_list::<i64>(&batch, doc_col).map_err(|e| {
1431 datafusion_common::error::DataFusionError::Execution(format!(
1432 "flatten string list error: {}",
1433 e
1434 ))
1435 })
1436 }
1437 _ => unreachable!(
1438 "expect data type List or LargeList but got {:?}",
1439 self.field_type
1440 ),
1441 };
1442 Poll::Ready(Some(batch))
1443 }
1444 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
1445 Poll::Ready(None) => Poll::Ready(None),
1446 Poll::Pending => Poll::Pending,
1447 }
1448 }
1449}
1450
1451impl RecordBatchStream for FlattenStream {
1452 fn schema(&self) -> SchemaRef {
1453 let schema = Schema::new(vec![
1454 Field::new(
1455 self.inner.schema().field(0).name(),
1456 self.data_type.clone(),
1457 true,
1458 ),
1459 ROW_ID_FIELD.clone(),
1460 ]);
1461
1462 Arc::new(schema)
1463 }
1464}
1465
1466fn flatten_string_list<Offset: arrow::array::OffsetSizeTrait>(
1467 batch: &RecordBatch,
1468 doc_col: &Arc<dyn Array>,
1469) -> Result<RecordBatch> {
1470 let docs = doc_col.as_list::<Offset>();
1471 let row_ids = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
1472
1473 let row_ids = row_ids
1474 .values()
1475 .iter()
1476 .zip(docs.iter())
1477 .flat_map(|(row_id, doc)| std::iter::repeat_n(*row_id, doc.map(|d| d.len()).unwrap_or(0)));
1478
1479 let row_ids = Arc::new(UInt64Array::from_iter_values(row_ids));
1480 let docs = match docs.value_type() {
1481 datatypes::DataType::Utf8 | datatypes::DataType::LargeUtf8 => docs.values().clone(),
1482 _ => {
1483 return Err(Error::index(format!(
1484 "expect data type String or LargeString but got {}",
1485 docs.value_type()
1486 )));
1487 }
1488 };
1489
1490 let schema = Schema::new(vec![
1491 Field::new(
1492 batch.schema().field(0).name(),
1493 docs.data_type().clone(),
1494 true,
1495 ),
1496 ROW_ID_FIELD.clone(),
1497 ]);
1498 let batch = RecordBatch::try_new(Arc::new(schema), vec![docs, row_ids])?;
1499 Ok(batch)
1500}
1501
1502pub(crate) fn token_file_path(partition_id: u64) -> String {
1503 format!("part_{}_{}", partition_id, TOKENS_FILE)
1504}
1505
1506pub(crate) fn posting_file_path(partition_id: u64) -> String {
1507 format!("part_{}_{}", partition_id, INVERT_LIST_FILE)
1508}
1509
1510pub(crate) fn doc_file_path(partition_id: u64) -> String {
1511 format!("part_{}_{}", partition_id, DOCS_FILE)
1512}
1513
1514pub(crate) fn part_metadata_file_path(partition_id: u64) -> String {
1515 format!("part_{}_{}", partition_id, METADATA_FILE)
1516}
1517
1518const PARTITION_FILE_SUFFIXES: [&str; 3] = [TOKENS_FILE, INVERT_LIST_FILE, DOCS_FILE];
1519const PARTITION_FILE_RENAME_PHASES: u64 = 2;
1523
1524pub async fn merge_index_files(
1525 object_store: &ObjectStore,
1526 index_dir: &Path,
1527 store: Arc<dyn IndexStore>,
1528 progress: Arc<dyn IndexBuildProgress>,
1529) -> Result<()> {
1530 let part_metadata_files = list_metadata_files(object_store, index_dir).await?;
1532
1533 merge_metadata_files(store, &part_metadata_files, progress).await
1535}
1536
1537async fn list_metadata_files(object_store: &ObjectStore, index_dir: &Path) -> Result<Vec<String>> {
1540 let mut part_metadata_files = Vec::new();
1542 let mut list_stream = object_store.list(Some(index_dir.clone()));
1543
1544 while let Some(item) = list_stream.next().await {
1545 match item {
1546 Ok(meta) => {
1547 let file_name = meta.location.filename().unwrap_or_default();
1548 if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
1550 part_metadata_files.push(file_name.to_string());
1551 }
1552 }
1553 Err(_) => continue,
1554 }
1555 }
1556
1557 if part_metadata_files.is_empty() {
1558 return Err(Error::invalid_input_source(
1559 format!(
1560 "No partition metadata files found in index directory: {}",
1561 index_dir
1562 )
1563 .into(),
1564 ));
1565 }
1566
1567 Ok(part_metadata_files)
1568}
1569
1570async fn merge_metadata_files(
1572 store: Arc<dyn IndexStore>,
1573 part_metadata_files: &[String],
1574 progress: Arc<dyn IndexBuildProgress>,
1575) -> Result<()> {
1576 let mut all_partitions = Vec::new();
1578 let mut params = None;
1579 let mut token_set_format = None;
1580 let mut format_version = None;
1581 let mut posting_tail_codec = None;
1582 let mut deleted_fragments = RoaringBitmap::new();
1583 progress
1584 .stage_start(
1585 "read_partition_metadata",
1586 Some(part_metadata_files.len() as u64),
1587 "files",
1588 )
1589 .await?;
1590
1591 for (idx, file_name) in part_metadata_files.iter().enumerate() {
1592 let reader = store.open_index_file(file_name).await?;
1593 let metadata = &reader.schema().metadata;
1594
1595 let partitions_str = metadata.get("partitions").ok_or(Error::index(format!(
1596 "partitions not found in {}",
1597 file_name
1598 )))?;
1599
1600 let partition_ids: Vec<u64> = serde_json::from_str(partitions_str)
1601 .map_err(|e| Error::index(format!("Failed to parse partitions: {}", e)))?;
1602
1603 all_partitions.extend(partition_ids);
1604
1605 if params.is_none() {
1606 let params_str = metadata
1607 .get("params")
1608 .ok_or(Error::index(format!("params not found in {}", file_name)))?;
1609 params = Some(
1610 serde_json::from_str::<InvertedIndexParams>(params_str)
1611 .map_err(|e| Error::index(format!("Failed to parse params: {}", e)))?,
1612 );
1613 }
1614
1615 if token_set_format.is_none()
1616 && let Some(name) = metadata.get(TOKEN_SET_FORMAT_KEY)
1617 {
1618 token_set_format = Some(TokenSetFormat::from_str(name)?);
1619 }
1620 if format_version.is_none() {
1621 format_version = Some(parse_format_version_from_metadata(metadata)?);
1622 }
1623 if posting_tail_codec.is_none() {
1624 posting_tail_codec = Some(parse_posting_tail_codec(metadata)?);
1625 }
1626
1627 if reader.num_rows() > 0 {
1628 let metadata_batch = reader.read_range(0..1, None).await?;
1629 let deleted_fragments_col = metadata_batch
1630 .column_by_name(DELETED_FRAGMENTS_COL)
1631 .expect_ok()?;
1632 let deleted_fragments_arr = deleted_fragments_col
1633 .as_any()
1634 .downcast_ref::<BinaryArray>()
1635 .expect_ok()?;
1636 let part_deleted_fragments =
1637 RoaringBitmap::deserialize_from(deleted_fragments_arr.value(0))?;
1638 deleted_fragments.extend(part_deleted_fragments);
1639 }
1640 progress
1641 .stage_progress("read_partition_metadata", idx as u64 + 1)
1642 .await?;
1643 }
1644 progress.stage_complete("read_partition_metadata").await?;
1645
1646 let mut sorted_ids = all_partitions.clone();
1648 sorted_ids.sort();
1649 sorted_ids.dedup();
1650
1651 let id_mapping: HashMap<u64, u64> = sorted_ids
1652 .iter()
1653 .enumerate()
1654 .map(|(new_id, &old_id)| (old_id, new_id as u64))
1655 .collect();
1656
1657 let timestamp = std::time::SystemTime::now()
1659 .duration_since(std::time::UNIX_EPOCH)
1660 .unwrap()
1661 .as_secs();
1662
1663 let changed_partition_count = id_mapping
1664 .iter()
1665 .filter(|(old_id, new_id)| old_id != new_id)
1666 .count() as u64;
1667 let total_renames = changed_partition_count
1668 * PARTITION_FILE_SUFFIXES.len() as u64
1669 * PARTITION_FILE_RENAME_PHASES;
1670 progress
1671 .stage_start("remap_partition_files", Some(total_renames), "files")
1672 .await?;
1673
1674 let mut temp_files: Vec<(String, String, String)> = Vec::new(); let mut renamed_files = 0u64;
1677
1678 for (&old_id, &new_id) in &id_mapping {
1679 if old_id != new_id {
1680 for suffix in PARTITION_FILE_SUFFIXES {
1681 let old_path = format!("part_{}_{}", old_id, suffix);
1682 let new_path = format!("part_{}_{}", new_id, suffix);
1683 let temp_path = format!("temp_{}_{}", timestamp, old_path);
1684
1685 if let Err(e) = store.rename_index_file(&old_path, &temp_path).await {
1687 for (temp_name, old_name, _) in temp_files.iter().rev() {
1689 let _ = store.rename_index_file(temp_name, old_name).await;
1690 }
1691 return Err(Error::index(format!(
1692 "Failed to move {} to temp {}: {}",
1693 old_path, temp_path, e
1694 )));
1695 }
1696 temp_files.push((temp_path, old_path, new_path));
1697 renamed_files += 1;
1698 progress
1699 .stage_progress("remap_partition_files", renamed_files)
1700 .await?;
1701 }
1702 }
1703 }
1704
1705 let mut completed_renames: Vec<(String, String)> = Vec::new(); for (temp_path, _old_path, final_path) in &temp_files {
1709 if let Err(e) = store.rename_index_file(temp_path, final_path).await {
1710 for (final_name, temp_name) in completed_renames.iter().rev() {
1712 let _ = store.rename_index_file(final_name, temp_name).await;
1713 }
1714 for (temp_name, orig_name, _) in temp_files.iter() {
1716 if !completed_renames.iter().any(|(_, t)| t == temp_name) {
1717 let _ = store.rename_index_file(temp_name, orig_name).await;
1718 }
1719 }
1720 return Err(Error::index(format!(
1721 "Failed to rename {} to {}: {}",
1722 temp_path, final_path, e
1723 )));
1724 }
1725 completed_renames.push((final_path.clone(), temp_path.clone()));
1726 renamed_files += 1;
1727 progress
1728 .stage_progress("remap_partition_files", renamed_files)
1729 .await?;
1730 }
1731 progress.stage_complete("remap_partition_files").await?;
1732
1733 let remapped_partitions: Vec<u64> = (0..id_mapping.len() as u64).collect();
1735 let params = params.unwrap_or_default();
1736 let token_set_format = token_set_format.unwrap_or(TokenSetFormat::Arrow);
1737 let builder = InvertedIndexBuilder::from_existing_index(
1738 params,
1739 None,
1740 remapped_partitions.clone(),
1741 token_set_format,
1742 None,
1743 deleted_fragments,
1744 )
1745 .with_format_version(format_version.unwrap_or(InvertedListFormatVersion::V1))
1746 .with_posting_tail_codec(posting_tail_codec.unwrap_or(PostingTailCodec::Fixed32));
1747 progress
1748 .stage_start("write_merged_metadata", Some(1), "files")
1749 .await?;
1750 builder
1751 .write_metadata(&*store, &remapped_partitions)
1752 .await?;
1753 progress.stage_progress("write_merged_metadata", 1).await?;
1754 progress.stage_complete("write_merged_metadata").await?;
1755
1756 for file_name in part_metadata_files {
1758 if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
1759 let _ = store.delete_index_file(file_name).await;
1760 }
1761 }
1762
1763 Ok(())
1764}
1765
1766pub fn document_input(
1773 input: SendableRecordBatchStream,
1774 column: &str,
1775) -> Result<SendableRecordBatchStream> {
1776 let schema = input.schema();
1777 let field = schema.column_with_name(column).expect_ok()?.1;
1778 match field.data_type() {
1779 DataType::Utf8 | DataType::LargeUtf8 => Ok(input),
1780 DataType::List(field) | DataType::LargeList(field)
1781 if matches!(field.data_type(), DataType::Utf8 | DataType::LargeUtf8) =>
1782 {
1783 Ok(Box::pin(FlattenStream::new(input)))
1784 }
1785 DataType::LargeBinary => match field.metadata().get(ARROW_EXT_NAME_KEY) {
1786 Some(name) if name.as_str() == JSON_EXT_NAME => {
1787 Ok(Box::pin(JsonTextStream::new(input, column.to_string())))
1788 }
1789 _ => Err(Error::invalid_input_source(
1790 format!("column {} is not json", column).into(),
1791 )),
1792 },
1793 _ => Err(Error::invalid_input_source(
1794 format!(
1795 "column {} has type {}, is not utf8, large utf8 type/list, or large binary",
1796 column,
1797 field.data_type()
1798 )
1799 .into(),
1800 )),
1801 }
1802}
1803
1804#[cfg(test)]
1805mod tests {
1806 use super::*;
1807 use crate::metrics::NoOpMetricsCollector;
1808 use crate::progress::IndexBuildProgress;
1809 use crate::scalar::{IndexFile, IndexReader, IndexWriter, ScalarIndex};
1810 use arrow_array::{RecordBatch, StringArray, UInt64Array};
1811 use arrow_schema::{DataType, Field, Schema};
1812 use async_trait::async_trait;
1813 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1814 use futures::stream;
1815 use lance_core::ROW_ID;
1816 use lance_core::cache::LanceCache;
1817 use lance_core::utils::tempfile::TempDir;
1818 use std::any::Any;
1819 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1820 use std::time::Duration;
1821
1822 fn make_doc_batch(doc: &str, row_id: u64) -> RecordBatch {
1823 let schema = Arc::new(Schema::new(vec![
1824 Field::new("doc", DataType::Utf8, true),
1825 Field::new(ROW_ID, DataType::UInt64, false),
1826 ]));
1827 let docs = Arc::new(StringArray::from(vec![Some(doc)]));
1828 let row_ids = Arc::new(UInt64Array::from(vec![row_id]));
1829 RecordBatch::try_new(schema, vec![docs, row_ids]).unwrap()
1830 }
1831
1832 #[derive(Debug, Default, Clone)]
1833 struct CountingStore {
1834 write_count: Arc<AtomicUsize>,
1835 }
1836
1837 impl CountingStore {
1838 fn new() -> Self {
1839 Self {
1840 write_count: Arc::new(AtomicUsize::new(0)),
1841 }
1842 }
1843
1844 fn write_count(&self) -> usize {
1845 self.write_count.load(Ordering::SeqCst)
1846 }
1847 }
1848
1849 impl DeepSizeOf for CountingStore {
1850 fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
1851 0
1852 }
1853 }
1854
1855 #[derive(Debug)]
1856 struct CountingWriter {
1857 write_count: Arc<AtomicUsize>,
1858 }
1859
1860 #[async_trait]
1861 impl IndexWriter for CountingWriter {
1862 async fn write_record_batch(&mut self, _batch: RecordBatch) -> Result<u64> {
1863 Ok(self.write_count.fetch_add(1, Ordering::SeqCst) as u64)
1864 }
1865
1866 async fn finish(&mut self) -> Result<()> {
1867 Ok(())
1868 }
1869
1870 async fn finish_with_metadata(&mut self, _metadata: HashMap<String, String>) -> Result<()> {
1871 Ok(())
1872 }
1873 }
1874
1875 #[async_trait]
1876 impl IndexStore for CountingStore {
1877 fn as_any(&self) -> &dyn Any {
1878 self
1879 }
1880
1881 fn clone_arc(&self) -> Arc<dyn IndexStore> {
1882 Arc::new(self.clone())
1883 }
1884
1885 fn io_parallelism(&self) -> usize {
1886 1
1887 }
1888
1889 async fn new_index_file(
1890 &self,
1891 _name: &str,
1892 _schema: Arc<Schema>,
1893 ) -> Result<Box<dyn IndexWriter>> {
1894 Ok(Box::new(CountingWriter {
1895 write_count: self.write_count.clone(),
1896 }))
1897 }
1898
1899 async fn open_index_file(&self, _name: &str) -> Result<Arc<dyn IndexReader>> {
1900 Err(Error::not_supported(
1901 "CountingStore does not support reading",
1902 ))
1903 }
1904
1905 async fn copy_index_file(&self, _name: &str, _dest_store: &dyn IndexStore) -> Result<()> {
1906 Err(Error::not_supported(
1907 "CountingStore does not support copying",
1908 ))
1909 }
1910
1911 async fn rename_index_file(&self, _name: &str, _new_name: &str) -> Result<()> {
1912 Err(Error::not_supported(
1913 "CountingStore does not support renaming",
1914 ))
1915 }
1916
1917 async fn delete_index_file(&self, _name: &str) -> Result<()> {
1918 Err(Error::not_supported(
1919 "CountingStore does not support deleting",
1920 ))
1921 }
1922
1923 async fn list_files_with_sizes(&self) -> Result<Vec<IndexFile>> {
1924 Ok(vec![])
1925 }
1926 }
1927
1928 #[tokio::test]
1929 async fn test_write_posting_lists_batches_multiple_rows() -> Result<()> {
1930 let mut builder = InnerBuilder::new(0, false, TokenSetFormat::default());
1931 for doc_id in 0..3u64 {
1932 builder.docs.append(doc_id, 1);
1933 }
1934
1935 for doc_id in 0..3u32 {
1936 let mut posting_list = PostingListBuilder::new(false);
1937 posting_list.add(doc_id, PositionRecorder::Count(1));
1938 builder.posting_lists.push(posting_list);
1939 }
1940
1941 let store = CountingStore::new();
1942 let docs = Arc::new(std::mem::take(&mut builder.docs));
1943 builder.write_posting_lists(&store, docs).await?;
1944
1945 assert_eq!(store.write_count(), 1);
1946 Ok(())
1947 }
1948
1949 #[tokio::test]
1950 async fn test_build_only_path_writes_partitions_as_is() -> Result<()> {
1951 let src_dir = TempDir::default();
1952 let dest_dir = TempDir::default();
1953 let src_store = Arc::new(LanceIndexStore::new(
1954 ObjectStore::local().into(),
1955 src_dir.obj_path(),
1956 Arc::new(LanceCache::no_cache()),
1957 ));
1958 let dest_store = Arc::new(LanceIndexStore::new(
1959 ObjectStore::local().into(),
1960 dest_dir.obj_path(),
1961 Arc::new(LanceCache::no_cache()),
1962 ));
1963
1964 let params = InvertedIndexParams::default();
1965 let tokenizer = params.build()?;
1966 let token_set_format = TokenSetFormat::default();
1967 let id_alloc = Arc::new(AtomicU64::new(0));
1968
1969 let mut worker1 = IndexWorker::new(
1970 tokenizer.clone(),
1971 src_store.clone(),
1972 id_alloc.clone(),
1973 IndexWorkerConfig {
1974 with_position: params.with_position,
1975 format_version: InvertedListFormatVersion::V1,
1976 fragment_mask: None,
1977 token_set_format,
1978 worker_memory_limit_bytes: u64::MAX,
1979 },
1980 )
1981 .await?;
1982 worker1
1983 .process_batch(make_doc_batch("hello world", 0))
1984 .await?;
1985 let output1 = worker1.finish().await?;
1986 let mut partitions = output1.partitions;
1987 if let Some(mut tail_partition) = output1.tail_partition {
1988 partitions.push(tail_partition.builder.id());
1989 tail_partition.builder.write(src_store.as_ref()).await?;
1990 }
1991
1992 let mut worker2 = IndexWorker::new(
1993 tokenizer.clone(),
1994 src_store.clone(),
1995 id_alloc.clone(),
1996 IndexWorkerConfig {
1997 with_position: params.with_position,
1998 format_version: InvertedListFormatVersion::V1,
1999 fragment_mask: None,
2000 token_set_format,
2001 worker_memory_limit_bytes: u64::MAX,
2002 },
2003 )
2004 .await?;
2005 worker2
2006 .process_batch(make_doc_batch("goodbye world", 1))
2007 .await?;
2008 let output2 = worker2.finish().await?;
2009 partitions.extend(output2.partitions);
2010 if let Some(mut tail_partition) = output2.tail_partition {
2011 partitions.push(tail_partition.builder.id());
2012 tail_partition.builder.write(src_store.as_ref()).await?;
2013 }
2014 partitions.sort_unstable();
2015 assert_eq!(partitions.len(), 2);
2016 assert_ne!(partitions[0], partitions[1]);
2017
2018 let builder = InvertedIndexBuilder::from_existing_index(
2019 InvertedIndexParams::default(),
2020 Some(src_store.clone()),
2021 partitions.clone(),
2022 token_set_format,
2023 None,
2024 RoaringBitmap::new(),
2025 );
2026 builder.write(dest_store.as_ref()).await?;
2027
2028 let metadata_reader = dest_store.open_index_file(METADATA_FILE).await?;
2029 let metadata = &metadata_reader.schema().metadata;
2030 let partitions_str = metadata
2031 .get("partitions")
2032 .expect("partitions missing from metadata");
2033 let written_partitions: Vec<u64> = serde_json::from_str(partitions_str).unwrap();
2034 assert_eq!(written_partitions, partitions);
2035
2036 for id in &partitions {
2037 dest_store.open_index_file(&token_file_path(*id)).await?;
2038 dest_store.open_index_file(&posting_file_path(*id)).await?;
2039 dest_store.open_index_file(&doc_file_path(*id)).await?;
2040 }
2041
2042 Ok(())
2043 }
2044
2045 #[tokio::test]
2046 async fn test_update_preserves_existing_posting_tail_codec() -> Result<()> {
2047 let src_dir = TempDir::default();
2048 let dest_dir = TempDir::default();
2049 let src_store = Arc::new(LanceIndexStore::new(
2050 ObjectStore::local().into(),
2051 src_dir.obj_path(),
2052 Arc::new(LanceCache::no_cache()),
2053 ));
2054 let dest_store = Arc::new(LanceIndexStore::new(
2055 ObjectStore::local().into(),
2056 dest_dir.obj_path(),
2057 Arc::new(LanceCache::no_cache()),
2058 ));
2059
2060 let posting_tail_codec = PostingTailCodec::Fixed32;
2061 let mut partition = InnerBuilder::new_with_posting_tail_codec(
2062 0,
2063 false,
2064 TokenSetFormat::default(),
2065 posting_tail_codec,
2066 );
2067 partition.tokens.add("hello".to_owned());
2068 let mut posting_list =
2069 PostingListBuilder::new_with_posting_tail_codec(false, posting_tail_codec);
2070 posting_list.add(0, PositionRecorder::Count(1));
2071 partition.posting_lists.push(posting_list);
2072 partition.docs.append(100, 1);
2073 partition.write(src_store.as_ref()).await?;
2074
2075 let metadata_writer = InvertedIndexBuilder::from_existing_index(
2076 InvertedIndexParams::default(),
2077 Some(src_store.clone()),
2078 vec![0],
2079 TokenSetFormat::default(),
2080 None,
2081 RoaringBitmap::new(),
2082 )
2083 .with_posting_tail_codec(posting_tail_codec);
2084 metadata_writer
2085 .write_metadata(src_store.as_ref(), &[0])
2086 .await?;
2087
2088 let index = InvertedIndex::load(src_store, None, &LanceCache::no_cache()).await?;
2089 let schema = Arc::new(Schema::new(vec![
2090 Field::new("doc", DataType::Utf8, true),
2091 Field::new(ROW_ID, DataType::UInt64, false),
2092 ]));
2093 let docs = Arc::new(StringArray::from(vec![Some("hello again")]));
2094 let row_ids = Arc::new(UInt64Array::from(vec![101u64]));
2095 let batch = RecordBatch::try_new(schema.clone(), vec![docs, row_ids])?;
2096 let stream = RecordBatchStreamAdapter::new(schema, stream::iter(vec![Ok(batch)]));
2097 index
2098 .update(Box::pin(stream), dest_store.as_ref(), None)
2099 .await?;
2100
2101 let updated =
2102 InvertedIndex::load(dest_store.clone(), None, &LanceCache::no_cache()).await?;
2103 assert_eq!(updated.partitions.len(), 2);
2104 for partition in &updated.partitions {
2105 assert_eq!(
2106 partition.inverted_list.posting_tail_codec(),
2107 posting_tail_codec
2108 );
2109 }
2110
2111 let metadata = dest_store.open_index_file(METADATA_FILE).await?;
2112 assert_eq!(
2113 metadata.schema().metadata.get(POSTING_TAIL_CODEC_KEY),
2114 Some(&posting_tail_codec.as_str().to_owned())
2115 );
2116
2117 Ok(())
2118 }
2119
2120 #[test]
2121 fn test_with_posting_tail_codec_syncs_format_version() {
2122 let builder = InvertedIndexBuilder::from_existing_index(
2123 InvertedIndexParams::default(),
2124 None,
2125 Vec::new(),
2126 TokenSetFormat::default(),
2127 None,
2128 RoaringBitmap::new(),
2129 )
2130 .with_format_version(InvertedListFormatVersion::V2)
2131 .with_posting_tail_codec(PostingTailCodec::Fixed32);
2132 assert_eq!(builder.format_version, InvertedListFormatVersion::V1);
2133 assert_eq!(builder.posting_tail_codec, PostingTailCodec::Fixed32);
2134
2135 let builder = builder.with_posting_tail_codec(PostingTailCodec::VarintDelta);
2136 assert_eq!(builder.format_version, InvertedListFormatVersion::V2);
2137 assert_eq!(builder.posting_tail_codec, PostingTailCodec::VarintDelta);
2138 }
2139
2140 #[tokio::test]
2141 async fn test_inverted_index_without_positions_tracks_frequency() -> Result<()> {
2142 let index_dir = TempDir::default();
2143 let store = Arc::new(LanceIndexStore::new(
2144 ObjectStore::local().into(),
2145 index_dir.obj_path(),
2146 Arc::new(LanceCache::no_cache()),
2147 ));
2148
2149 let schema = Arc::new(Schema::new(vec![
2150 Field::new("doc", DataType::Utf8, true),
2151 Field::new(ROW_ID, DataType::UInt64, false),
2152 ]));
2153 let docs = Arc::new(StringArray::from(vec![Some("hello hello world")]));
2154 let row_ids = Arc::new(UInt64Array::from(vec![0u64]));
2155 let batch = RecordBatch::try_new(schema.clone(), vec![docs, row_ids])?;
2156 let stream = RecordBatchStreamAdapter::new(schema, stream::iter(vec![Ok(batch)]));
2157 let stream = Box::pin(stream);
2158
2159 let params =
2160 InvertedIndexParams::new("whitespace".to_string(), lance_tokenizer::Language::English)
2161 .with_position(false)
2162 .remove_stop_words(false)
2163 .stem(false)
2164 .max_token_length(None);
2165
2166 let mut builder = InvertedIndexBuilder::new(params);
2167 builder.update(stream, store.as_ref(), None).await?;
2168
2169 let index = InvertedIndex::load(store, None, &LanceCache::no_cache()).await?;
2170 assert_eq!(index.partitions.len(), 1);
2171 let partition = &index.partitions[0];
2172 let token_id = partition.tokens.get("hello").unwrap();
2173 let posting = partition
2174 .inverted_list
2175 .posting_list(token_id, false, &NoOpMetricsCollector)
2176 .await?;
2177
2178 let mut iter = posting.iter();
2179 let (doc_id, freq, positions) = iter.next().unwrap();
2180 assert_eq!(doc_id, 0);
2181 assert_eq!(freq, 2);
2182 assert!(positions.is_none());
2183 assert!(iter.next().is_none());
2184
2185 Ok(())
2186 }
2187
2188 lance_testing::define_stage_event_progress!(RecordingProgress, IndexBuildProgress, Result<()>);
2189
2190 #[derive(Debug, Default)]
2191 struct FailingProgress;
2192
2193 #[async_trait]
2194 impl IndexBuildProgress for FailingProgress {
2195 async fn stage_start(&self, _stage: &str, _total: Option<u64>, _unit: &str) -> Result<()> {
2196 Ok(())
2197 }
2198
2199 async fn stage_progress(&self, _stage: &str, _completed: u64) -> Result<()> {
2200 Err(Error::io("injected progress failure"))
2201 }
2202
2203 async fn stage_complete(&self, _stage: &str) -> Result<()> {
2204 Ok(())
2205 }
2206 }
2207
2208 #[tokio::test]
2209 async fn test_builder_reports_progress_stages() -> Result<()> {
2210 let index_dir = TempDir::default();
2211 let store = Arc::new(LanceIndexStore::new(
2212 ObjectStore::local().into(),
2213 index_dir.obj_path(),
2214 Arc::new(LanceCache::no_cache()),
2215 ));
2216
2217 let batch1 = make_doc_batch("hello world", 0);
2218 let batch2 = make_doc_batch("goodbye world", 1);
2219 let total_rows = 2u64;
2220 let stream = RecordBatchStreamAdapter::new(
2221 batch1.schema(),
2222 stream::iter(vec![Ok(batch1), Ok(batch2)]),
2223 );
2224 let stream = Box::pin(stream);
2225
2226 let progress = Arc::new(RecordingProgress::default());
2227 let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default())
2228 .with_progress(progress.clone());
2229 builder.update(stream, store.as_ref(), None).await?;
2230
2231 let events = progress.recorded_events();
2232 let tags = events
2233 .iter()
2234 .map(|(kind, stage, _)| format!("{kind}:{stage}"))
2235 .collect::<Vec<_>>();
2236 let tokenize_progress = events
2237 .iter()
2238 .filter_map(|(kind, stage, completed)| {
2239 if kind == "progress" && stage == "tokenize_docs" {
2240 Some(*completed)
2241 } else {
2242 None
2243 }
2244 })
2245 .collect::<Vec<_>>();
2246
2247 let tokenize_start = tags
2248 .iter()
2249 .position(|e| e == "start:tokenize_docs")
2250 .expect("missing tokenize_docs start");
2251 let tokenize_complete = tags
2252 .iter()
2253 .position(|e| e == "complete:tokenize_docs")
2254 .expect("missing tokenize_docs complete");
2255 let copy_start = tags
2256 .iter()
2257 .position(|e| e == "start:copy_partitions")
2258 .expect("missing copy_partitions start");
2259 let copy_complete = tags
2260 .iter()
2261 .position(|e| e == "complete:copy_partitions")
2262 .expect("missing copy_partitions complete");
2263 let metadata_start = tags
2264 .iter()
2265 .position(|e| e == "start:write_metadata")
2266 .expect("missing write_metadata start");
2267 let metadata_complete = tags
2268 .iter()
2269 .position(|e| e == "complete:write_metadata")
2270 .expect("missing write_metadata complete");
2271
2272 assert!(tokenize_start < tokenize_complete);
2273 assert!(tokenize_complete < copy_start);
2274 assert!(copy_start < copy_complete);
2275 assert!(copy_complete < metadata_start);
2276 assert!(metadata_start < metadata_complete);
2277
2278 assert!(
2279 tags.iter().any(|e| e == "progress:tokenize_docs"),
2280 "expected progress callback for tokenize_docs"
2281 );
2282 assert!(
2283 tokenize_progress.len() >= 2,
2284 "expected at least two progress callbacks for tokenize_docs, got {tokenize_progress:?}"
2285 );
2286 assert_eq!(
2287 tokenize_progress.iter().copied().max().unwrap_or_default(),
2288 total_rows,
2289 "expected tokenize_docs progress to reach all rows"
2290 );
2291 assert!(
2292 tags.iter().any(|e| e == "progress:copy_partitions"),
2293 "expected progress callback for copy_partitions"
2294 );
2295 assert!(
2296 tags.iter().any(|e| e == "progress:write_metadata"),
2297 "expected progress callback for write_metadata"
2298 );
2299 assert!(
2300 !tags.iter().any(|e| e == "start:merge_partitions"),
2301 "merge_partitions should not run in the build-only path"
2302 );
2303
2304 Ok(())
2305 }
2306
2307 #[tokio::test]
2308 async fn test_builder_default_path_skips_merge_stage() -> Result<()> {
2309 let index_dir = TempDir::default();
2310 let store = Arc::new(LanceIndexStore::new(
2311 ObjectStore::local().into(),
2312 index_dir.obj_path(),
2313 Arc::new(LanceCache::no_cache()),
2314 ));
2315
2316 let batch = make_doc_batch("hello world", 0);
2317 let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2318 let stream = Box::pin(stream);
2319
2320 let progress = Arc::new(RecordingProgress::default());
2321 let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default())
2322 .with_progress(progress.clone());
2323 builder.update(stream, store.as_ref(), None).await?;
2324
2325 let tags = progress
2326 .recorded_events()
2327 .iter()
2328 .map(|(kind, stage, _)| format!("{kind}:{stage}"))
2329 .collect::<Vec<_>>();
2330
2331 assert!(
2332 tags.iter().any(|e| e == "start:copy_partitions"),
2333 "default path should copy finalized partitions"
2334 );
2335 assert!(
2336 !tags.iter().any(|e| e == "start:merge_partitions"),
2337 "default path should not run merge_partitions"
2338 );
2339 Ok(())
2340 }
2341
2342 #[tokio::test]
2343 async fn test_merge_index_files_reports_progress_stages() -> Result<()> {
2344 let index_dir = TempDir::default();
2345 let index_path = index_dir.obj_path();
2346 let object_store = ObjectStore::local();
2347 let store = Arc::new(LanceIndexStore::new(
2348 object_store.clone().into(),
2349 index_path.clone(),
2350 Arc::new(LanceCache::no_cache()),
2351 ));
2352
2353 for (fragment_id, row_id, doc) in [
2354 (1_u64 << 32, 0_u64, "hello world"),
2355 (2_u64 << 32, 1_u64, "goodbye world"),
2356 ] {
2357 let batch = make_doc_batch(doc, row_id);
2358 let stream =
2359 RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2360 let stream = Box::pin(stream);
2361 let mut builder = InvertedIndexBuilder::new_with_fragment_mask(
2362 InvertedIndexParams::default(),
2363 Some(fragment_id),
2364 )
2365 .with_progress(noop_progress());
2366 builder.update(stream, store.as_ref(), None).await?;
2367 }
2368
2369 let progress = Arc::new(RecordingProgress::default());
2370 merge_index_files(&object_store, &index_path, store.clone(), progress.clone()).await?;
2371
2372 let events = progress.recorded_events();
2373 let tags = events
2374 .iter()
2375 .map(|(kind, stage, _)| format!("{kind}:{stage}"))
2376 .collect::<Vec<_>>();
2377 let remap_progress = events
2378 .iter()
2379 .filter_map(|(kind, stage, completed)| {
2380 if kind == "progress" && stage == "remap_partition_files" {
2381 Some(*completed)
2382 } else {
2383 None
2384 }
2385 })
2386 .collect::<Vec<_>>();
2387
2388 let read_start = tags
2389 .iter()
2390 .position(|e| e == "start:read_partition_metadata")
2391 .expect("missing read_partition_metadata start");
2392 let read_complete = tags
2393 .iter()
2394 .position(|e| e == "complete:read_partition_metadata")
2395 .expect("missing read_partition_metadata complete");
2396 let remap_start = tags
2397 .iter()
2398 .position(|e| e == "start:remap_partition_files")
2399 .expect("missing remap_partition_files start");
2400 let remap_complete = tags
2401 .iter()
2402 .position(|e| e == "complete:remap_partition_files")
2403 .expect("missing remap_partition_files complete");
2404 let metadata_start = tags
2405 .iter()
2406 .position(|e| e == "start:write_merged_metadata")
2407 .expect("missing write_merged_metadata start");
2408 let metadata_complete = tags
2409 .iter()
2410 .position(|e| e == "complete:write_merged_metadata")
2411 .expect("missing write_merged_metadata complete");
2412
2413 assert!(read_start < read_complete);
2414 assert!(read_complete < remap_start);
2415 assert!(remap_start < remap_complete);
2416 assert!(remap_complete < metadata_start);
2417 assert!(metadata_start < metadata_complete);
2418
2419 assert!(
2420 tags.iter().any(|e| e == "progress:read_partition_metadata"),
2421 "expected progress callback for read_partition_metadata"
2422 );
2423 assert_eq!(
2424 remap_progress.last().copied().unwrap_or_default(),
2425 12,
2426 "expected remap_partition_files progress to cover both rename phases"
2427 );
2428 assert!(
2429 tags.iter().any(|e| e == "progress:write_merged_metadata"),
2430 "expected progress callback for write_merged_metadata"
2431 );
2432
2433 Ok(())
2434 }
2435
2436 #[tokio::test]
2437 async fn test_worker_memory_limit_rejects_single_large_doc() {
2438 let index_dir = TempDir::default();
2439 let store = Arc::new(LanceIndexStore::new(
2440 ObjectStore::local().into(),
2441 index_dir.obj_path(),
2442 Arc::new(LanceCache::no_cache()),
2443 ));
2444
2445 let batch = make_doc_batch("hello world", 42);
2446 let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2447 let stream = Box::pin(stream);
2448
2449 let mut builder =
2450 InvertedIndexBuilder::new(InvertedIndexParams::default().memory_limit_mb(0));
2451 let err = builder
2452 .update(stream, store.as_ref(), None)
2453 .await
2454 .expect_err("single doc should exceed zero worker memory limit");
2455 assert!(
2456 err.to_string().contains("row_id=42"),
2457 "unexpected error: {err}"
2458 );
2459 }
2460
2461 #[tokio::test]
2462 async fn test_worker_trims_position_temp_buffers() -> Result<()> {
2463 let tokenizer = InvertedIndexParams::default().with_position(true).build()?;
2464 let store = Arc::new(CountingStore::new());
2465 let id_alloc = Arc::new(AtomicU64::new(0));
2466 let mut worker = IndexWorker::new(
2467 tokenizer,
2468 store,
2469 id_alloc,
2470 IndexWorkerConfig {
2471 with_position: true,
2472 format_version: InvertedListFormatVersion::V1,
2473 fragment_mask: None,
2474 token_set_format: TokenSetFormat::default(),
2475 worker_memory_limit_bytes: u64::MAX,
2476 },
2477 )
2478 .await?;
2479
2480 let doc = (0..(MAX_RETAINED_TOKEN_IDS * 2))
2481 .map(|i| format!("tok{i}"))
2482 .collect::<Vec<_>>()
2483 .join(" ");
2484 worker.process_batch(make_doc_batch(&doc, 0)).await?;
2485
2486 assert!(worker.token_ids.is_empty());
2487 assert!(worker.token_ids.capacity() <= MAX_RETAINED_TOKEN_IDS);
2488 assert!(worker.memory_size >= worker.temporary_memory_size());
2489 Ok(())
2490 }
2491
2492 #[tokio::test]
2493 async fn test_worker_flush_keeps_position_temp_memory_bounded() -> Result<()> {
2494 let tokenizer = InvertedIndexParams::default().with_position(true).build()?;
2495 let store = Arc::new(CountingStore::new());
2496 let id_alloc = Arc::new(AtomicU64::new(0));
2497 let mut worker = IndexWorker::new(
2498 tokenizer,
2499 store,
2500 id_alloc,
2501 IndexWorkerConfig {
2502 with_position: true,
2503 format_version: InvertedListFormatVersion::V1,
2504 fragment_mask: None,
2505 token_set_format: TokenSetFormat::default(),
2506 worker_memory_limit_bytes: u64::MAX,
2507 },
2508 )
2509 .await?;
2510
2511 let doc = std::iter::repeat_n("common", 32_768)
2512 .collect::<Vec<_>>()
2513 .join(" ");
2514 let mut observed_post_flush_memory = Vec::new();
2515 for row_id in 0..8 {
2516 worker.process_batch(make_doc_batch(&doc, row_id)).await?;
2517 worker.flush().await?;
2518 observed_post_flush_memory.push(worker.memory_size);
2519 }
2520
2521 let max_memory = *observed_post_flush_memory.iter().max().unwrap();
2522 let min_memory = *observed_post_flush_memory.iter().min().unwrap();
2523 assert!(
2524 max_memory <= min_memory.saturating_add(256 * 1024),
2525 "post-flush worker memory drifted upward: {observed_post_flush_memory:?}"
2526 );
2527 Ok(())
2528 }
2529
2530 #[tokio::test]
2531 async fn test_worker_flush_writes_partition_directly() -> Result<()> {
2532 let tokenizer = InvertedIndexParams::default().with_position(true).build()?;
2533 let store = Arc::new(CountingStore::new());
2534 let id_alloc = Arc::new(AtomicU64::new(0));
2535 let mut worker = IndexWorker::new(
2536 tokenizer,
2537 store.clone(),
2538 id_alloc,
2539 IndexWorkerConfig {
2540 with_position: true,
2541 format_version: InvertedListFormatVersion::V1,
2542 fragment_mask: None,
2543 token_set_format: TokenSetFormat::default(),
2544 worker_memory_limit_bytes: u64::MAX,
2545 },
2546 )
2547 .await?;
2548 worker
2549 .process_batch(make_doc_batch("alpha beta gamma", 0))
2550 .await?;
2551 worker.flush().await?;
2552 assert!(store.write_count() > 0);
2553 Ok(())
2554 }
2555
2556 #[test]
2557 fn test_resolve_worker_memory_limit_uses_default_when_unset() {
2558 let params = InvertedIndexParams::default();
2559 assert_eq!(
2560 resolve_worker_memory_limit_bytes(¶ms, 8),
2561 *LANCE_FTS_PARTITION_SIZE << 20
2562 );
2563 }
2564
2565 #[test]
2566 fn test_resolve_num_workers_uses_default_when_unset() {
2567 let expected = default_num_workers().clamp(1, get_num_compute_intensive_cpus().max(1));
2568 assert_eq!(
2569 resolve_num_workers(&InvertedIndexParams::default()),
2570 expected
2571 );
2572 }
2573
2574 #[test]
2575 fn test_resolve_num_workers_clamps_requested_value() {
2576 let max_workers = get_num_compute_intensive_cpus().max(1);
2577 assert_eq!(
2578 resolve_num_workers(&InvertedIndexParams::default().num_workers(0)),
2579 1
2580 );
2581 assert_eq!(
2582 resolve_num_workers(&InvertedIndexParams::default().num_workers(max_workers + 10)),
2583 max_workers
2584 );
2585 }
2586
2587 #[test]
2588 fn test_resolve_worker_memory_limit_splits_total_memory_limit() {
2589 let params = InvertedIndexParams::default().memory_limit_mb(4096);
2590 assert_eq!(resolve_worker_memory_limit_bytes(¶ms, 16), 256 << 20);
2591 }
2592
2593 #[test]
2594 fn test_merge_all_tail_partitions_combines_everything() -> Result<()> {
2595 let merged = merge_all_tail_partitions(vec![
2596 TailPartition {
2597 builder: InnerBuilder::new(0, false, TokenSetFormat::default()),
2598 },
2599 TailPartition {
2600 builder: InnerBuilder::new(1, false, TokenSetFormat::default()),
2601 },
2602 TailPartition {
2603 builder: InnerBuilder::new(2, false, TokenSetFormat::default()),
2604 },
2605 ])?;
2606
2607 assert_eq!(merged.expect("merged builder should exist").id(), 0);
2608 Ok(())
2609 }
2610
2611 #[test]
2612 fn test_merge_all_tail_partitions_returns_none_for_empty_input() -> Result<()> {
2613 assert!(merge_all_tail_partitions(Vec::new())?.is_none());
2614 Ok(())
2615 }
2616
2617 #[test]
2618 fn test_merge_tail_partition_group_combines_tail_builders() -> Result<()> {
2619 let mut first = InnerBuilder::new(0, false, TokenSetFormat::default());
2620 let hello = first.tokens.add("hello".to_owned());
2621 first
2622 .posting_lists
2623 .resize_with(first.tokens.len(), || PostingListBuilder::new(false));
2624 let first_doc = first.docs.append(10, 1);
2625 first.posting_lists[hello as usize].add(first_doc, PositionRecorder::Count(1));
2626
2627 let mut second = InnerBuilder::new(1, false, TokenSetFormat::default());
2628 let world = second.tokens.add("world".to_owned());
2629 second
2630 .posting_lists
2631 .resize_with(second.tokens.len(), || PostingListBuilder::new(false));
2632 let second_doc = second.docs.append(20, 2);
2633 second.posting_lists[world as usize].add(second_doc, PositionRecorder::Count(2));
2634
2635 let merged = merge_tail_partition_group(vec![
2636 TailPartition { builder: first },
2637 TailPartition { builder: second },
2638 ])?;
2639
2640 assert_eq!(merged.id(), 0);
2641 assert_eq!(merged.docs.len(), 2);
2642 assert_eq!(merged.tokens.len(), 2);
2643 assert_eq!(merged.posting_lists.len(), 2);
2644 assert_eq!(
2645 merged.posting_lists[merged.tokens.get("hello").unwrap() as usize].len(),
2646 1
2647 );
2648 assert_eq!(
2649 merged.posting_lists[merged.tokens.get("world").unwrap() as usize].len(),
2650 1
2651 );
2652 Ok(())
2653 }
2654
2655 #[tokio::test]
2656 async fn test_update_index_returns_worker_error_when_workers_exit_during_dispatch() {
2657 let num_batches = (*LANCE_FTS_NUM_SHARDS * 2 + 1) as u64;
2658 let index_dir = TempDir::default();
2659 let store = Arc::new(LanceIndexStore::new(
2660 ObjectStore::local().into(),
2661 index_dir.obj_path(),
2662 Arc::new(LanceCache::no_cache()),
2663 ));
2664 let schema = make_doc_batch("hello world", 0).schema();
2665 let stream = RecordBatchStreamAdapter::new(
2666 schema,
2667 stream::iter((0..num_batches).map(|row_id| Ok(make_doc_batch("hello world", row_id)))),
2668 );
2669 let stream = Box::pin(stream);
2670
2671 let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default())
2672 .with_progress(Arc::new(FailingProgress));
2673
2674 let result = tokio::time::timeout(
2675 Duration::from_secs(5),
2676 builder.update_index(stream, store.as_ref()),
2677 )
2678 .await
2679 .expect("update_index should not hang")
2680 .expect_err("worker failure should be returned");
2681
2682 assert!(
2683 result.to_string().contains("injected progress failure"),
2684 "unexpected error: {result}"
2685 );
2686 }
2687
2688 #[tokio::test]
2689 async fn test_new_index_has_empty_deleted_fragments() {
2690 let index_dir = TempDir::default();
2691 let store = Arc::new(LanceIndexStore::new(
2692 ObjectStore::local().into(),
2693 index_dir.obj_path(),
2694 Arc::new(LanceCache::no_cache()),
2695 ));
2696
2697 let batch = make_doc_batch("hello world", 0);
2698 let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2699 let stream = Box::pin(stream);
2700
2701 let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
2702 builder.update(stream, store.as_ref(), None).await.unwrap();
2703
2704 let index = InvertedIndex::load(store, None, &LanceCache::no_cache())
2705 .await
2706 .unwrap();
2707 assert!(
2708 index.deleted_fragments().is_empty(),
2709 "new index should have empty deleted fragments, got {:?}",
2710 index.deleted_fragments()
2711 );
2712 }
2713
2714 #[tokio::test]
2715 async fn test_remap_preserves_deleted_fragments() {
2716 let src_dir = TempDir::default();
2717 let dest_dir = TempDir::default();
2718 let src_store = Arc::new(LanceIndexStore::new(
2719 ObjectStore::local().into(),
2720 src_dir.obj_path(),
2721 Arc::new(LanceCache::no_cache()),
2722 ));
2723 let dest_store = Arc::new(LanceIndexStore::new(
2724 ObjectStore::local().into(),
2725 dest_dir.obj_path(),
2726 Arc::new(LanceCache::no_cache()),
2727 ));
2728
2729 let batch = make_doc_batch("hello world", 0);
2731 let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2732 let stream = Box::pin(stream);
2733
2734 let initial_deleted = RoaringBitmap::from_iter([5, 10, 42]);
2735 let mut builder = InvertedIndexBuilder::from_existing_index(
2736 InvertedIndexParams::default(),
2737 None,
2738 Vec::new(),
2739 TokenSetFormat::default(),
2740 None,
2741 initial_deleted.clone(),
2742 );
2743 builder
2744 .update(stream, src_store.as_ref(), None)
2745 .await
2746 .unwrap();
2747
2748 let index = InvertedIndex::load(src_store.clone(), None, &LanceCache::no_cache())
2750 .await
2751 .unwrap();
2752 assert_eq!(index.deleted_fragments(), &initial_deleted);
2753
2754 use crate::scalar::ScalarIndex;
2756 let mapping = HashMap::from([(0u64, Some(50 << 32))]);
2757 index.remap(&mapping, dest_store.as_ref()).await.unwrap();
2758
2759 let remapped_index = InvertedIndex::load(dest_store.clone(), None, &LanceCache::no_cache())
2761 .await
2762 .unwrap();
2763 assert_eq!(
2764 remapped_index.deleted_fragments(),
2765 &initial_deleted,
2766 "remap should preserve deleted fragments"
2767 );
2768 }
2769
2770 #[tokio::test]
2771 async fn test_update_grows_deleted_fragments_from_old_data_filter() {
2772 let index_dir = TempDir::default();
2773 let store = Arc::new(LanceIndexStore::new(
2774 ObjectStore::local().into(),
2775 index_dir.obj_path(),
2776 Arc::new(LanceCache::no_cache()),
2777 ));
2778
2779 let batch = make_doc_batch("hello world", 0);
2781 let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2782 let stream = Box::pin(stream);
2783
2784 let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
2785 builder.update(stream, store.as_ref(), None).await.unwrap();
2786
2787 let index = InvertedIndex::load(store.clone(), None, &LanceCache::no_cache())
2789 .await
2790 .unwrap();
2791 assert!(index.deleted_fragments().is_empty());
2792
2793 let update_dir = TempDir::default();
2794 let update_store = Arc::new(LanceIndexStore::new(
2795 ObjectStore::local().into(),
2796 update_dir.obj_path(),
2797 Arc::new(LanceCache::no_cache()),
2798 ));
2799
2800 let batch2 = make_doc_batch("new document", 1 << 32 | 1);
2801 let stream2 =
2802 RecordBatchStreamAdapter::new(batch2.schema(), stream::iter(vec![Ok(batch2)]));
2803 let stream2 = Box::pin(stream2);
2804
2805 let old_data_filter = Some(crate::scalar::OldIndexDataFilter::Fragments {
2806 to_keep: RoaringBitmap::from_iter([0]),
2807 to_remove: RoaringBitmap::from_iter([3, 7]),
2808 });
2809
2810 use crate::scalar::ScalarIndex;
2812 index
2813 .update(stream2, update_store.as_ref(), old_data_filter)
2814 .await
2815 .unwrap();
2816
2817 let updated_index =
2818 InvertedIndex::load(update_store.clone(), None, &LanceCache::no_cache())
2819 .await
2820 .unwrap();
2821 assert_eq!(
2822 updated_index.deleted_fragments(),
2823 &RoaringBitmap::from_iter([3, 7]),
2824 "update should add deleted fragments from old_data_filter"
2825 );
2826 }
2827
2828 #[tokio::test]
2829 async fn test_update_accumulates_deleted_fragments() {
2830 let dir1 = TempDir::default();
2831 let store1 = Arc::new(LanceIndexStore::new(
2832 ObjectStore::local().into(),
2833 dir1.obj_path(),
2834 Arc::new(LanceCache::no_cache()),
2835 ));
2836
2837 let batch = make_doc_batch("hello world", 0);
2839 let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2840 let stream = Box::pin(stream);
2841
2842 let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
2843 builder.update(stream, store1.as_ref(), None).await.unwrap();
2844
2845 let index = InvertedIndex::load(store1.clone(), None, &LanceCache::no_cache())
2847 .await
2848 .unwrap();
2849
2850 let dir2 = TempDir::default();
2851 let store2 = Arc::new(LanceIndexStore::new(
2852 ObjectStore::local().into(),
2853 dir2.obj_path(),
2854 Arc::new(LanceCache::no_cache()),
2855 ));
2856
2857 let batch2 = make_doc_batch("second doc", 1 << 32 | 1);
2858 let stream2 =
2859 RecordBatchStreamAdapter::new(batch2.schema(), stream::iter(vec![Ok(batch2)]));
2860 let stream2 = Box::pin(stream2);
2861
2862 use crate::scalar::ScalarIndex;
2863 index
2864 .update(
2865 stream2,
2866 store2.as_ref(),
2867 Some(crate::scalar::OldIndexDataFilter::Fragments {
2868 to_keep: RoaringBitmap::from_iter([0]),
2869 to_remove: RoaringBitmap::from_iter([3, 7]),
2870 }),
2871 )
2872 .await
2873 .unwrap();
2874
2875 let index2 = InvertedIndex::load(store2.clone(), None, &LanceCache::no_cache())
2877 .await
2878 .unwrap();
2879 assert_eq!(
2880 index2.deleted_fragments(),
2881 &RoaringBitmap::from_iter([3, 7])
2882 );
2883
2884 let dir3 = TempDir::default();
2885 let store3 = Arc::new(LanceIndexStore::new(
2886 ObjectStore::local().into(),
2887 dir3.obj_path(),
2888 Arc::new(LanceCache::no_cache()),
2889 ));
2890
2891 let batch3 = make_doc_batch("third doc", 2 << 32 | 2);
2892 let stream3 =
2893 RecordBatchStreamAdapter::new(batch3.schema(), stream::iter(vec![Ok(batch3)]));
2894 let stream3 = Box::pin(stream3);
2895
2896 index2
2897 .update(
2898 stream3,
2899 store3.as_ref(),
2900 Some(crate::scalar::OldIndexDataFilter::Fragments {
2901 to_keep: RoaringBitmap::from_iter([0, 1]),
2902 to_remove: RoaringBitmap::from_iter([12, 15]),
2903 }),
2904 )
2905 .await
2906 .unwrap();
2907
2908 let index3 = InvertedIndex::load(store3.clone(), None, &LanceCache::no_cache())
2909 .await
2910 .unwrap();
2911 assert_eq!(
2912 index3.deleted_fragments(),
2913 &RoaringBitmap::from_iter([3, 7, 12, 15]),
2914 "deleted fragments should accumulate across updates"
2915 );
2916 }
2917
2918 #[tokio::test]
2919 async fn test_update_with_rowid_filter_does_not_grow_deleted_fragments() {
2920 let index_dir = TempDir::default();
2921 let store = Arc::new(LanceIndexStore::new(
2922 ObjectStore::local().into(),
2923 index_dir.obj_path(),
2924 Arc::new(LanceCache::no_cache()),
2925 ));
2926
2927 let batch = make_doc_batch("hello world", 0);
2928 let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2929 let stream = Box::pin(stream);
2930
2931 let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
2932 builder.update(stream, store.as_ref(), None).await.unwrap();
2933
2934 let index = InvertedIndex::load(store.clone(), None, &LanceCache::no_cache())
2935 .await
2936 .unwrap();
2937
2938 let update_dir = TempDir::default();
2939 let update_store = Arc::new(LanceIndexStore::new(
2940 ObjectStore::local().into(),
2941 update_dir.obj_path(),
2942 Arc::new(LanceCache::no_cache()),
2943 ));
2944
2945 let batch2 = make_doc_batch("new doc", 1);
2946 let stream2 =
2947 RecordBatchStreamAdapter::new(batch2.schema(), stream::iter(vec![Ok(batch2)]));
2948 let stream2 = Box::pin(stream2);
2949
2950 let mut valid_ids = lance_core::utils::mask::RowAddrTreeMap::new();
2952 valid_ids.insert(0);
2953 let old_data_filter = Some(crate::scalar::OldIndexDataFilter::RowIds(valid_ids));
2954
2955 use crate::scalar::ScalarIndex;
2956 index
2957 .update(stream2, update_store.as_ref(), old_data_filter)
2958 .await
2959 .unwrap();
2960
2961 let updated_index =
2962 InvertedIndex::load(update_store.clone(), None, &LanceCache::no_cache())
2963 .await
2964 .unwrap();
2965 assert!(
2966 updated_index.deleted_fragments().is_empty(),
2967 "RowIds filter should not add to deleted fragments"
2968 );
2969 }
2970}