1use super::{InvertedIndexParams, index::*};
5use crate::scalar::inverted::document_tokenizer::DocType;
6use crate::scalar::inverted::json::JsonTextStream;
7use crate::scalar::inverted::tokenizer::document_tokenizer::LanceTokenizer;
8#[cfg(test)]
9use crate::scalar::lance_format::LanceIndexStore;
10use crate::scalar::{IndexStore, OldIndexDataFilter};
11use crate::vector::graph::OrderedFloat;
12use crate::{progress::IndexBuildProgress, progress::noop_progress};
13use arrow::array::AsArray;
14use arrow::datatypes;
15use arrow_array::{Array, BinaryArray, RecordBatch, UInt64Array};
16use arrow_schema::{DataType, Field, Schema, SchemaRef};
17use bitpacking::{BitPacker, BitPacker4x};
18use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream};
19use deepsize::DeepSizeOf;
20use fst::Streamer;
21use futures::{Stream, StreamExt, TryStreamExt};
22use lance_arrow::json::JSON_EXT_NAME;
23use lance_arrow::{ARROW_EXT_NAME_KEY, iter_str_array};
24use lance_core::cache::LanceCache;
25use lance_core::error::LanceOptionExt;
26use lance_core::utils::mask::RowSetOps;
27use lance_core::utils::tokio::{IO_CORE_RESERVATION, get_num_compute_intensive_cpus, spawn_cpu};
28use lance_core::{Error, ROW_ID, ROW_ID_FIELD, Result};
29use lance_io::object_store::ObjectStore;
30use object_store::path::Path;
31use roaring::RoaringBitmap;
32use smallvec::SmallVec;
33use std::collections::HashMap;
34use std::pin::Pin;
35use std::str::FromStr;
36use std::sync::Arc;
37use std::sync::LazyLock;
38use std::task::{Context, Poll};
39use std::{fmt::Debug, sync::atomic::AtomicU64};
40use tracing::instrument;
41
42pub const BLOCK_SIZE: usize = BitPacker4x::BLOCK_LEN;
46
47pub static LANCE_FTS_NUM_SHARDS: LazyLock<usize> = LazyLock::new(|| {
51 std::env::var("LANCE_FTS_NUM_SHARDS")
52 .unwrap_or_else(|_| default_num_workers().to_string())
53 .parse()
54 .expect("failed to parse LANCE_FTS_NUM_SHARDS")
55});
56pub static LANCE_FTS_PARTITION_SIZE: LazyLock<u64> = LazyLock::new(|| {
58 std::env::var("LANCE_FTS_PARTITION_SIZE")
59 .unwrap_or_else(|_| "2048".to_string())
60 .parse()
61 .expect("failed to parse LANCE_FTS_PARTITION_SIZE")
62});
63static LANCE_FTS_WRITE_QUEUE_SIZE: LazyLock<usize> = LazyLock::new(|| {
64 std::env::var("LANCE_FTS_WRITE_QUEUE_SIZE")
65 .unwrap_or_else(|_| "1".to_string())
66 .parse()
67 .expect("failed to parse LANCE_FTS_WRITE_QUEUE_SIZE")
68});
69static LANCE_FTS_POSTING_BATCH_ROWS: LazyLock<usize> = LazyLock::new(|| {
70 std::env::var("LANCE_FTS_POSTING_BATCH_ROWS")
71 .unwrap_or_else(|_| "256".to_string())
72 .parse()
73 .expect("failed to parse LANCE_FTS_POSTING_BATCH_ROWS")
74});
75const MAX_RETAINED_TOKEN_IDS: usize = 8 * 1024;
76
77fn default_num_workers() -> usize {
78 let total_cpus = get_num_compute_intensive_cpus() + *IO_CORE_RESERVATION;
79 std::cmp::max(1, total_cpus / 2)
80}
81
82fn resolve_num_workers(params: &InvertedIndexParams) -> usize {
83 let max_workers = get_num_compute_intensive_cpus().max(1);
84 params
85 .num_workers
86 .unwrap_or(*LANCE_FTS_NUM_SHARDS)
87 .clamp(1, max_workers)
88}
89
90fn resolve_worker_memory_limit_bytes(params: &InvertedIndexParams, num_workers: usize) -> u64 {
91 let default_worker_memory_limit_bytes = *LANCE_FTS_PARTITION_SIZE << 20;
92 params
93 .memory_limit_mb
94 .map(|memory_limit_mb| (memory_limit_mb << 20) / num_workers as u64)
95 .unwrap_or(default_worker_memory_limit_bytes)
96}
97
98fn merge_all_tail_partitions(tails: Vec<TailPartition>) -> Result<Option<InnerBuilder>> {
99 if tails.is_empty() {
100 return Ok(None);
101 }
102 merge_tail_partition_group(tails).map(Some)
103}
104
105fn merge_tail_partition_group(group: Vec<TailPartition>) -> Result<InnerBuilder> {
106 let mut group = group.into_iter();
107 let mut merged = group
108 .next()
109 .ok_or_else(|| {
110 Error::invalid_input("cannot merge an empty tail partition group".to_owned())
111 })?
112 .builder;
113 for tail in group {
114 merged.merge_from(tail.builder)?;
115 }
116 Ok(merged)
117}
118
119#[derive(Debug)]
120pub struct InvertedIndexBuilder {
121 params: InvertedIndexParams,
122 pub(crate) partitions: Vec<u64>,
123 new_partitions: Vec<u64>,
124 fragment_mask: Option<u64>,
125 token_set_format: TokenSetFormat,
126 format_version: InvertedListFormatVersion,
127 posting_tail_codec: PostingTailCodec,
128 src_store: Option<Arc<dyn IndexStore>>,
129 progress: Arc<dyn IndexBuildProgress>,
130 deleted_fragments: RoaringBitmap,
131}
132
133impl InvertedIndexBuilder {
134 pub fn new(params: InvertedIndexParams) -> Self {
135 Self::new_with_fragment_mask(params, None)
136 }
137
138 pub fn new_with_fragment_mask(params: InvertedIndexParams, fragment_mask: Option<u64>) -> Self {
139 Self::from_existing_index(
140 params,
141 None,
142 Vec::new(),
143 TokenSetFormat::default(),
144 fragment_mask,
145 RoaringBitmap::new(),
146 )
147 }
148
149 pub fn from_existing_index(
156 params: InvertedIndexParams,
157 store: Option<Arc<dyn IndexStore>>,
158 partitions: Vec<u64>,
159 token_set_format: TokenSetFormat,
160 fragment_mask: Option<u64>,
161 deleted_fragments: RoaringBitmap,
162 ) -> Self {
163 Self {
164 params,
165 partitions,
166 new_partitions: Vec::new(),
167 src_store: store,
168 token_set_format,
169 fragment_mask,
170 format_version: current_fts_format_version(),
171 posting_tail_codec: current_fts_format_version().posting_tail_codec(),
172 progress: noop_progress(),
173 deleted_fragments,
174 }
175 }
176
177 pub fn with_posting_tail_codec(mut self, posting_tail_codec: PostingTailCodec) -> Self {
178 self.format_version =
179 InvertedListFormatVersion::from_posting_tail_codec(posting_tail_codec);
180 self.posting_tail_codec = posting_tail_codec;
181 self
182 }
183
184 pub fn with_format_version(mut self, format_version: InvertedListFormatVersion) -> Self {
185 self.format_version = format_version;
186 self.posting_tail_codec = format_version.posting_tail_codec();
187 self
188 }
189
190 pub fn with_token_set_format(mut self, token_set_format: TokenSetFormat) -> Self {
191 self.token_set_format = token_set_format;
192 self
193 }
194
195 pub fn with_progress(mut self, progress: Arc<dyn IndexBuildProgress>) -> Self {
196 self.progress = progress;
197 self
198 }
199
200 pub async fn update(
201 &mut self,
202 new_data: SendableRecordBatchStream,
203 dest_store: &dyn IndexStore,
204 old_data_filter: Option<crate::scalar::OldIndexDataFilter>,
205 ) -> Result<()> {
206 let schema = new_data.schema();
207 let doc_col = schema.field(0).name();
208
209 if self.params.lance_tokenizer.is_none() {
211 let schema = new_data.schema();
212 let field = schema.column_with_name(doc_col).expect_ok()?.1;
213 let doc_type = DocType::try_from(field)?;
214 self.params.lance_tokenizer = Some(doc_type.as_ref().to_string());
215 }
216
217 let new_data = document_input(new_data, doc_col)?;
218
219 self.progress
220 .stage_start("tokenize_docs", None, "rows")
221 .await?;
222 self.update_index(new_data, dest_store).await?;
223
224 if let Some(OldIndexDataFilter::Fragments { to_remove, .. }) = old_data_filter {
225 self.deleted_fragments.extend(to_remove);
226 }
227
228 self.progress.stage_complete("tokenize_docs").await?;
229 self.write(dest_store).await?;
230 Ok(())
231 }
232
233 pub async fn update_from_segments(
234 &mut self,
235 new_data: SendableRecordBatchStream,
236 dest_store: &dyn IndexStore,
237 old_segments: &[Arc<InvertedIndex>],
238 old_data_filter: Option<crate::scalar::OldIndexDataFilter>,
239 ) -> Result<()> {
240 let schema = new_data.schema();
241 let doc_col = schema.field(0).name();
242
243 if self.params.lance_tokenizer.is_none() {
244 let field = schema.column_with_name(doc_col).expect_ok()?.1;
245 let doc_type = DocType::try_from(field)?;
246 self.params.lance_tokenizer = Some(doc_type.as_ref().to_string());
247 }
248
249 self.merge_existing_segments(dest_store, old_segments, old_data_filter.as_ref())
250 .await?;
251
252 let new_data = document_input(new_data, doc_col)?;
253
254 self.progress
255 .stage_start("tokenize_docs", None, "rows")
256 .await?;
257 self.update_index(new_data, dest_store).await?;
258 self.progress.stage_complete("tokenize_docs").await?;
259
260 self.write(dest_store).await?;
261 Ok(())
262 }
263
264 async fn merge_existing_segments(
265 &mut self,
266 dest_store: &dyn IndexStore,
267 old_segments: &[Arc<InvertedIndex>],
268 old_data_filter: Option<&crate::scalar::OldIndexDataFilter>,
269 ) -> Result<()> {
270 let num_workers = resolve_num_workers(&self.params);
271 let memory_limit_bytes = resolve_worker_memory_limit_bytes(&self.params, num_workers);
272 let mut merged: Option<InnerBuilder> = None;
273 for index in old_segments {
274 if old_data_filter.is_none() {
275 self.deleted_fragments
276 .extend(index.deleted_fragments().iter());
277 }
278 for partition in &index.partitions {
279 let mut partition_builder = partition.as_ref().clone().into_builder().await?;
280 if let Some(filter) = old_data_filter {
281 partition_builder.filter_old_data(filter).await?;
282 }
283 if partition_builder.is_empty() {
284 continue;
285 }
286 match &mut merged {
287 Some(merged) => {
288 let would_exceed_memory = merged
289 .memory_size()
290 .saturating_add(partition_builder.memory_size())
291 >= memory_limit_bytes;
292 let would_exceed_doc_ids = merged
293 .docs
294 .len()
295 .saturating_add(partition_builder.docs.len())
296 > u32::MAX as usize;
297 if would_exceed_memory || would_exceed_doc_ids {
298 let builder = std::mem::replace(merged, partition_builder);
299 self.write_new_partition(dest_store, builder).await?;
300 } else {
301 merged.merge_from(partition_builder)?;
302 }
303 }
304 None => merged = Some(partition_builder),
305 }
306 }
307 }
308
309 if let Some(builder) = merged {
310 self.write_new_partition(dest_store, builder).await?;
311 }
312 Ok(())
313 }
314
315 async fn write_new_partition(
316 &mut self,
317 dest_store: &dyn IndexStore,
318 mut builder: InnerBuilder,
319 ) -> Result<()> {
320 let partition_id = self.next_partition_id() | self.fragment_mask.unwrap_or(0);
321 builder.set_id(partition_id);
322 builder.write(dest_store).await?;
323 self.new_partitions.push(partition_id);
324 Ok(())
325 }
326
327 fn next_partition_id(&self) -> u64 {
328 self.partitions
329 .iter()
330 .chain(self.new_partitions.iter())
331 .map(|id| id + 1)
332 .max()
333 .unwrap_or(0)
334 }
335
336 #[instrument(level = "debug", skip_all)]
337 async fn update_index(
338 &mut self,
339 stream: SendableRecordBatchStream,
340 dest_store: &dyn IndexStore,
341 ) -> Result<()> {
342 let num_workers = resolve_num_workers(&self.params);
343 let tokenizer = self.params.build()?;
344 let with_position = self.params.with_position;
345 let worker_memory_limit_bytes =
346 resolve_worker_memory_limit_bytes(&self.params, num_workers);
347 let worker_config = IndexWorkerConfig {
348 with_position,
349 format_version: self.format_version,
350 fragment_mask: self.fragment_mask,
351 token_set_format: self.token_set_format,
352 worker_memory_limit_bytes,
353 };
354 let next_id = self.next_partition_id();
355 let id_alloc = Arc::new(AtomicU64::new(next_id));
356 let tokenized_count = Arc::new(AtomicU64::new(0));
357 let (sender, receiver) = async_channel::bounded(num_workers);
358 let dest_store = dest_store.clone_arc();
359 let mut index_tasks = Vec::with_capacity(num_workers);
360 for _ in 0..num_workers {
361 let tokenizer = tokenizer.clone();
362 let receiver: async_channel::Receiver<RecordBatch> = receiver.clone();
363 let dest_store = dest_store.clone();
364 let id_alloc = id_alloc.clone();
365 let progress = self.progress.clone();
366 let tokenized_count = tokenized_count.clone();
367 index_tasks.push(tokio::task::spawn(async move {
368 let mut worker =
369 IndexWorker::new(tokenizer, dest_store, id_alloc, worker_config).await?;
370 while let Ok(batch) = receiver.recv().await {
371 let num_rows = batch.num_rows();
372 worker.process_batch(batch).await?;
373 let tokenized_count = tokenized_count
374 .fetch_add(num_rows as u64, std::sync::atomic::Ordering::Relaxed)
375 + num_rows as u64;
376 progress
377 .stage_progress("tokenize_docs", tokenized_count)
378 .await?;
379 }
380 worker.finish().await
381 }));
382 }
383
384 let index_build = async {
385 drop(receiver);
388
389 let mut stream = Box::pin(stream);
390 log::info!("indexing FTS with {} workers", num_workers);
391
392 let mut last_num_rows = 0;
393 let mut total_num_rows = 0;
394 let start = std::time::Instant::now();
395 while let Some(batch) = stream.try_next().await? {
396 let num_rows = batch.num_rows();
397
398 if sender.send(batch).await.is_err() {
399 break;
403 }
404
405 total_num_rows += num_rows;
406 if total_num_rows >= last_num_rows + 1_000_000 {
407 log::debug!(
408 "indexed {} documents, elapsed: {:?}, speed: {}rows/s",
409 total_num_rows,
410 start.elapsed(),
411 total_num_rows as f32 / start.elapsed().as_secs_f32()
412 );
413 last_num_rows = total_num_rows;
414 }
415 }
416 drop(stream);
418 drop(sender);
419 log::info!("dispatching elapsed: {:?}", start.elapsed());
420
421 let start = std::time::Instant::now();
423 let mut tail_partitions = Vec::new();
424 for index_task in index_tasks {
425 let output = index_task.await??;
426 self.new_partitions.extend(output.partitions);
427 if let Some(tail_partition) = output.tail_partition {
428 tail_partitions.push(tail_partition);
429 }
430 }
431 let merged_tail_partitions =
432 spawn_cpu(move || merge_all_tail_partitions(tail_partitions)).await?;
433 if let Some(builder) = merged_tail_partitions {
434 self.new_partitions.push(builder.id());
435 let mut builder = builder;
436 builder.write(dest_store.as_ref()).await?;
437 }
438 log::info!("wait workers indexing elapsed: {:?}", start.elapsed());
439 Result::Ok(())
440 };
441
442 index_build.await
443 }
444
445 pub async fn remap(
446 &mut self,
447 mapping: &HashMap<u64, Option<u64>>,
448 src_store: Arc<dyn IndexStore>,
449 dest_store: &dyn IndexStore,
450 ) -> Result<()> {
451 for part in self.partitions.iter() {
452 let part = InvertedPartition::load(
453 src_store.clone(),
454 *part,
455 None,
456 &LanceCache::no_cache(),
457 self.token_set_format,
458 )
459 .await?;
460 let mut builder = part.into_builder().await?;
461 builder.remap(mapping).await?;
462 builder.write(dest_store).await?;
463 }
464 if self.fragment_mask.is_none() {
465 self.write_metadata(dest_store, &self.partitions).await?;
466 } else {
467 for &partition_id in &self.partitions {
469 self.write_part_metadata(dest_store, partition_id).await?;
470 }
471 }
472 Ok(())
473 }
474
475 async fn write_metadata(&self, dest_store: &dyn IndexStore, partitions: &[u64]) -> Result<()> {
476 let mut serialized_deleted_fragments =
477 Vec::with_capacity(self.deleted_fragments.serialized_size());
478 self.deleted_fragments
479 .serialize_into(&mut serialized_deleted_fragments)?;
480
481 let mut metadata = HashMap::from_iter(vec![
482 ("partitions".to_owned(), serde_json::to_string(&partitions)?),
483 ("params".to_owned(), serde_json::to_string(&self.params)?),
484 (
485 TOKEN_SET_FORMAT_KEY.to_owned(),
486 self.token_set_format.to_string(),
487 ),
488 (
489 POSTING_TAIL_CODEC_KEY.to_owned(),
490 self.posting_tail_codec.as_str().to_owned(),
491 ),
492 ]);
493
494 if self.params.with_position && self.format_version.uses_shared_position_stream() {
495 metadata.insert(
496 POSITIONS_LAYOUT_KEY.to_owned(),
497 POSITIONS_LAYOUT_SHARED_STREAM_V2.to_owned(),
498 );
499 metadata.insert(
500 POSITIONS_CODEC_KEY.to_owned(),
501 self.format_version
502 .position_codec()
503 .expect("shared positions require a codec")
504 .as_str()
505 .to_owned(),
506 );
507 }
508
509 let metadata_file_schema = Arc::new(Schema::new(vec![Field::new(
510 DELETED_FRAGMENTS_COL,
511 DataType::Binary,
512 false,
513 )]));
514 let deleted_fragments_col = Arc::new(BinaryArray::from(vec![
515 serialized_deleted_fragments.as_slice(),
516 ])) as Arc<dyn Array>;
517 let record_batch =
518 RecordBatch::try_new(metadata_file_schema.clone(), vec![deleted_fragments_col])?;
519
520 let mut writer = dest_store
521 .new_index_file(METADATA_FILE, metadata_file_schema)
522 .await?;
523 writer.write_record_batch(record_batch).await?;
524 writer.finish_with_metadata(metadata).await?;
525 Ok(())
526 }
527
528 pub(crate) async fn write_part_metadata(
533 &self,
534 dest_store: &dyn IndexStore,
535 partition: u64, ) -> Result<()> {
537 let partitions = vec![partition];
538 let mut metadata = HashMap::from_iter(vec![
539 ("partitions".to_owned(), serde_json::to_string(&partitions)?),
540 ("params".to_owned(), serde_json::to_string(&self.params)?),
541 (
542 TOKEN_SET_FORMAT_KEY.to_owned(),
543 self.token_set_format.to_string(),
544 ),
545 (
546 POSTING_TAIL_CODEC_KEY.to_owned(),
547 self.posting_tail_codec.as_str().to_owned(),
548 ),
549 ]);
550 if self.params.with_position && self.format_version.uses_shared_position_stream() {
551 metadata.insert(
552 POSITIONS_LAYOUT_KEY.to_owned(),
553 POSITIONS_LAYOUT_SHARED_STREAM_V2.to_owned(),
554 );
555 metadata.insert(
556 POSITIONS_CODEC_KEY.to_owned(),
557 self.format_version
558 .position_codec()
559 .expect("shared positions require a codec")
560 .as_str()
561 .to_owned(),
562 );
563 }
564 let file_name = part_metadata_file_path(partition);
566 let mut writer = dest_store
567 .new_index_file(&file_name, Arc::new(Schema::empty()))
568 .await?;
569 writer.finish_with_metadata(metadata).await?;
570 Ok(())
571 }
572
573 async fn write_metadata_with_progress(
574 &self,
575 dest_store: &dyn IndexStore,
576 partitions: &[u64],
577 ) -> Result<()> {
578 let total = if self.fragment_mask.is_none() {
579 Some(1)
580 } else {
581 Some(partitions.len() as u64)
582 };
583 self.progress
584 .stage_start("write_metadata", total, "files")
585 .await?;
586 if self.fragment_mask.is_none() {
587 self.write_metadata(dest_store, partitions).await?;
588 self.progress.stage_progress("write_metadata", 1).await?;
589 } else {
590 let mut completed = 0;
591 for &partition_id in partitions {
592 self.write_part_metadata(dest_store, partition_id).await?;
593 completed += 1;
594 self.progress
595 .stage_progress("write_metadata", completed)
596 .await?;
597 }
598 }
599 self.progress.stage_complete("write_metadata").await?;
600 Ok(())
601 }
602
603 async fn write(&self, dest_store: &dyn IndexStore) -> Result<()> {
604 let mut partitions = Vec::with_capacity(self.partitions.len() + self.new_partitions.len());
605 partitions.extend_from_slice(&self.partitions);
606 partitions.extend_from_slice(&self.new_partitions);
607 partitions.sort_unstable();
608
609 self.progress
610 .stage_start(
611 "copy_partitions",
612 Some(partitions.len() as u64),
613 "partitions",
614 )
615 .await?;
616 let mut copied = 0;
617 for part in self.partitions.iter() {
618 self.src_store
619 .as_ref()
620 .expect("existing partitions require a source store")
621 .copy_index_file(&token_file_path(*part), dest_store)
622 .await?;
623 self.src_store
624 .as_ref()
625 .expect("existing partitions require a source store")
626 .copy_index_file(&posting_file_path(*part), dest_store)
627 .await?;
628 self.src_store
629 .as_ref()
630 .expect("existing partitions require a source store")
631 .copy_index_file(&doc_file_path(*part), dest_store)
632 .await?;
633 copied += 1;
634 self.progress
635 .stage_progress("copy_partitions", copied)
636 .await?;
637 }
638 for _part in self.new_partitions.iter() {
639 copied += 1;
640 self.progress
641 .stage_progress("copy_partitions", copied)
642 .await?;
643 }
644 self.progress.stage_complete("copy_partitions").await?;
645
646 self.write_metadata_with_progress(dest_store, &partitions)
647 .await?;
648 Ok(())
649 }
650}
651
652impl Default for InvertedIndexBuilder {
653 fn default() -> Self {
654 let params = InvertedIndexParams::default();
655 Self::new(params)
656 }
657}
658
659#[derive(Debug)]
661pub struct InnerBuilder {
662 id: u64,
663 with_position: bool,
664 token_set_format: TokenSetFormat,
665 format_version: InvertedListFormatVersion,
666 posting_tail_codec: PostingTailCodec,
667 pub(crate) tokens: TokenSet,
668 pub(crate) posting_lists: Vec<PostingListBuilder>,
669 pub(crate) docs: DocSet,
670}
671
672impl InnerBuilder {
673 pub fn new(id: u64, with_position: bool, token_set_format: TokenSetFormat) -> Self {
674 Self::new_with_format_version(
675 id,
676 with_position,
677 token_set_format,
678 current_fts_format_version(),
679 )
680 }
681
682 pub fn new_with_format_version(
683 id: u64,
684 with_position: bool,
685 token_set_format: TokenSetFormat,
686 format_version: InvertedListFormatVersion,
687 ) -> Self {
688 Self {
689 id,
690 with_position,
691 token_set_format,
692 format_version,
693 posting_tail_codec: format_version.posting_tail_codec(),
694 tokens: TokenSet::default(),
695 posting_lists: Vec::new(),
696 docs: DocSet::default(),
697 }
698 }
699
700 pub fn new_with_posting_tail_codec(
701 id: u64,
702 with_position: bool,
703 token_set_format: TokenSetFormat,
704 posting_tail_codec: PostingTailCodec,
705 ) -> Self {
706 let format_version = if posting_tail_codec == PostingTailCodec::Fixed32 {
707 InvertedListFormatVersion::V1
708 } else {
709 InvertedListFormatVersion::V2
710 };
711 let mut builder =
712 Self::new_with_format_version(id, with_position, token_set_format, format_version);
713 builder.posting_tail_codec = posting_tail_codec;
714 builder
715 }
716
717 pub fn id(&self) -> u64 {
718 self.id
719 }
720
721 fn set_id(&mut self, id: u64) {
722 self.id = id;
723 }
724
725 pub fn is_empty(&self) -> bool {
726 self.docs.is_empty()
727 }
728
729 pub fn set_tokens(&mut self, tokens: TokenSet) {
731 self.tokens = tokens;
732 }
733
734 pub fn set_docs(&mut self, docs: DocSet) {
736 self.docs = docs;
737 }
738
739 pub fn set_posting_lists(&mut self, posting_lists: Vec<PostingListBuilder>) {
741 self.posting_lists = posting_lists;
742 }
743
744 pub async fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
745 let removed = self.docs.remap(mapping);
748
749 let mut token_id = 0;
753 let mut removed_token_ids = Vec::new();
754 self.posting_lists.retain_mut(|posting_list| {
755 posting_list.remap(&removed);
756 let keep = !posting_list.is_empty();
757 if !keep {
758 removed_token_ids.push(token_id as u32);
759 }
760 token_id += 1;
761 keep
762 });
763
764 self.tokens.remap(&removed_token_ids);
766
767 Ok(())
768 }
769
770 async fn filter_old_data(&mut self, filter: &OldIndexDataFilter) -> Result<()> {
771 let mut mapping = HashMap::new();
772 for (row_id, _) in self.docs.iter() {
773 let keep = match filter {
774 OldIndexDataFilter::Fragments { to_keep, .. } => {
775 to_keep.contains((*row_id >> 32) as u32)
776 }
777 OldIndexDataFilter::RowIds(valid_row_ids) => valid_row_ids.contains(*row_id),
778 };
779 if !keep {
780 mapping.insert(*row_id, None);
781 }
782 }
783 self.remap(&mapping).await
784 }
785
786 pub fn merge_from(&mut self, other: Self) -> Result<()> {
787 let Self {
788 id: _,
789 with_position,
790 token_set_format,
791 format_version,
792 posting_tail_codec,
793 tokens,
794 posting_lists,
795 docs,
796 } = other;
797
798 if self.with_position != with_position {
799 return Err(Error::index(format!(
800 "cannot merge partitions with mismatched positions settings: {} vs {}",
801 self.with_position, with_position
802 )));
803 }
804 if self.token_set_format != token_set_format {
805 return Err(Error::index(format!(
806 "cannot merge partitions with mismatched token set formats: {:?} vs {:?}",
807 self.token_set_format, token_set_format
808 )));
809 }
810 if self.format_version != format_version {
811 return Err(Error::index(format!(
812 "cannot merge partitions with mismatched FTS format versions: {:?} vs {:?}",
813 self.format_version, format_version
814 )));
815 }
816 if self.posting_tail_codec != posting_tail_codec {
817 return Err(Error::index(format!(
818 "cannot merge partitions with mismatched posting tail codecs: {:?} vs {:?}",
819 self.posting_tail_codec, posting_tail_codec
820 )));
821 }
822
823 let mut token_id_map = vec![u32::MAX; posting_lists.len()];
824 match tokens.tokens {
825 TokenMap::HashMap(map) => {
826 for (token, token_id) in map {
827 let new_token_id = self.tokens.get_or_add(token.as_str());
828 token_id_map[token_id as usize] = new_token_id;
829 }
830 }
831 TokenMap::Fst(map) => {
832 let mut stream = map.stream();
833 while let Some((token, token_id)) = stream.next() {
834 let new_token_id = self
835 .tokens
836 .get_or_add(String::from_utf8_lossy(token).as_ref());
837 token_id_map[token_id as usize] = new_token_id;
838 }
839 }
840 }
841
842 let doc_id_offset = self.docs.len() as u32;
843 for (row_id, num_tokens) in docs.iter() {
844 self.docs.append(*row_id, *num_tokens);
845 }
846 self.posting_lists.resize_with(self.tokens.len(), || {
847 PostingListBuilder::new_with_posting_tail_codec(with_position, self.posting_tail_codec)
848 });
849
850 for (token_id, posting_list) in posting_lists.into_iter().enumerate() {
851 if posting_list.is_empty() {
852 continue;
853 }
854 let new_token_id = token_id_map[token_id];
855 debug_assert_ne!(new_token_id, u32::MAX);
856 let merged_posting = &mut self.posting_lists[new_token_id as usize];
857 posting_list.for_each_entry(|doc_id, freq, positions| {
858 let positions = match positions {
859 Some(positions) => PositionRecorder::Position(positions.into()),
860 None => PositionRecorder::Count(freq),
861 };
862 merged_posting.add(doc_id_offset + doc_id, positions);
863 Ok::<(), Error>(())
864 })?;
865 }
866
867 Ok(())
868 }
869
870 fn memory_size(&self) -> u64 {
871 let posting_lists_overhead =
872 self.posting_lists.capacity() * std::mem::size_of::<PostingListBuilder>();
873 let posting_lists_size: u64 = self
874 .posting_lists
875 .iter()
876 .map(|posting| posting.size())
877 .sum();
878 (self.tokens.memory_size() + self.docs.memory_size() + posting_lists_overhead) as u64
879 + posting_lists_size
880 }
881
882 pub async fn write(&mut self, store: &dyn IndexStore) -> Result<()> {
883 let docs = Arc::new(std::mem::take(&mut self.docs));
884 self.write_posting_lists(store, docs.clone()).await?;
885 self.write_tokens(store).await?;
886 self.write_docs(store, docs).await?;
887 Ok(())
888 }
889
890 #[instrument(level = "debug", skip_all)]
891 async fn write_posting_lists(
892 &mut self,
893 store: &dyn IndexStore,
894 docs: Arc<DocSet>,
895 ) -> Result<()> {
896 let id = self.id;
897 let mut writer = store
898 .new_index_file(
899 &posting_file_path(self.id),
900 inverted_list_schema_for_version(self.with_position, self.format_version),
901 )
902 .await?;
903 let posting_lists = std::mem::take(&mut self.posting_lists);
904
905 log::info!(
906 "writing {} posting lists of partition {}, with position {}",
907 posting_lists.len(),
908 id,
909 self.with_position
910 );
911 let with_position = self.with_position;
912 let format_version = self.format_version;
913 let schema = inverted_list_schema_for_version(self.with_position, self.format_version);
914 let docs_for_batches = docs.clone();
915 let schema_for_batches = schema.clone();
916 let batch_rows = *LANCE_FTS_POSTING_BATCH_ROWS;
917 let (tx, rx) = async_channel::bounded(*LANCE_FTS_WRITE_QUEUE_SIZE);
918 let producer = spawn_cpu(move || {
919 let mut batch_builder = PostingListBatchBuilder::new(
920 schema_for_batches.clone(),
921 with_position,
922 format_version,
923 batch_rows,
924 );
925 for posting_list in posting_lists {
926 posting_list.append_to_batch_with_docs(
927 &docs_for_batches,
928 &mut batch_builder,
929 format_version,
930 )?;
931 if batch_builder.len() < batch_rows {
932 continue;
933 }
934
935 let batch = batch_builder.finish()?;
936 if let Err(err) = tx.send_blocking(batch) {
937 return Err(Error::execution(format!(
938 "failed to send posting list batch to writer: {err}"
939 )));
940 }
941 }
942
943 if !batch_builder.is_empty() {
944 let batch = batch_builder.finish()?;
945 if let Err(err) = tx.send_blocking(batch) {
946 return Err(Error::execution(format!(
947 "failed to send posting list batch to writer: {err}"
948 )));
949 }
950 }
951
952 Result::Ok(())
953 });
954
955 while let Ok(batch) = rx.recv().await {
956 if let Err(err) = writer.write_record_batch(batch).await {
957 drop(rx);
958 let _ = producer.await;
960 return Err(err);
961 }
962 }
963 drop(rx);
964 producer.await?;
965
966 writer.finish().await?;
967 Ok(())
968 }
969
970 #[instrument(level = "debug", skip_all)]
971 async fn write_tokens(&mut self, store: &dyn IndexStore) -> Result<()> {
972 log::info!("writing tokens of partition {}", self.id);
973 let tokens = std::mem::take(&mut self.tokens);
974 let batch = tokens.to_batch(self.token_set_format)?;
975 let mut writer = store
976 .new_index_file(&token_file_path(self.id), batch.schema())
977 .await?;
978 writer.write_record_batch(batch).await?;
979 writer.finish().await?;
980 Ok(())
981 }
982
983 #[instrument(level = "debug", skip_all)]
984 async fn write_docs(&mut self, store: &dyn IndexStore, docs: Arc<DocSet>) -> Result<()> {
985 log::info!("writing docs of partition {}", self.id);
986 let batch = docs.to_batch()?;
987 let mut writer = store
988 .new_index_file(&doc_file_path(self.id), batch.schema())
989 .await?;
990 writer.write_record_batch(batch).await?;
991 writer.finish().await?;
992 Ok(())
993 }
994}
995
996struct IndexWorker {
997 tokenizer: Box<dyn LanceTokenizer>,
998 dest_store: Arc<dyn IndexStore>,
999 id_alloc: Arc<AtomicU64>,
1000 builder: InnerBuilder,
1001 partitions: Vec<u64>,
1002 schema: SchemaRef,
1003 memory_size: u64,
1004 worker_memory_limit_bytes: u64,
1005 total_doc_length: usize,
1006 fragment_mask: Option<u64>,
1007 token_set_format: TokenSetFormat,
1008 token_ids: Vec<u32>,
1009 last_token_count: usize,
1010}
1011
1012struct TailPartition {
1013 builder: InnerBuilder,
1014}
1015
1016struct WorkerOutput {
1017 partitions: Vec<u64>,
1018 tail_partition: Option<TailPartition>,
1019}
1020
1021#[derive(Debug, Clone, Copy)]
1022struct IndexWorkerConfig {
1023 with_position: bool,
1024 format_version: InvertedListFormatVersion,
1025 fragment_mask: Option<u64>,
1026 token_set_format: TokenSetFormat,
1027 worker_memory_limit_bytes: u64,
1028}
1029
1030impl IndexWorker {
1031 fn posting_lists_overhead_size(&self) -> u64 {
1032 (self.builder.posting_lists.capacity() * std::mem::size_of::<PostingListBuilder>()) as u64
1033 }
1034
1035 fn adjust_tracked_value(tracked: &mut u64, old: u64, new: u64) {
1036 if new >= old {
1037 *tracked += new - old;
1038 } else {
1039 *tracked -= old - new;
1040 }
1041 }
1042
1043 fn adjust_tracked_memory_size(&mut self, old_memory_size: u64, new_memory_size: u64) {
1044 Self::adjust_tracked_value(&mut self.memory_size, old_memory_size, new_memory_size);
1045 }
1046
1047 fn apply_delta(total: &mut u64, delta: i64) {
1048 if delta >= 0 {
1049 *total += delta as u64;
1050 } else {
1051 *total -= (-delta) as u64;
1052 }
1053 }
1054
1055 fn temporary_memory_size(&self) -> u64 {
1056 (self.token_ids.capacity() * std::mem::size_of::<u32>()) as u64
1057 }
1058
1059 fn trim_temporary_buffers(&mut self) {
1060 if self.token_ids.capacity() > MAX_RETAINED_TOKEN_IDS {
1061 self.token_ids = Vec::with_capacity(self.last_token_count.min(MAX_RETAINED_TOKEN_IDS));
1062 }
1063 }
1064
1065 async fn new(
1066 tokenizer: Box<dyn LanceTokenizer>,
1067 dest_store: Arc<dyn IndexStore>,
1068 id_alloc: Arc<AtomicU64>,
1069 config: IndexWorkerConfig,
1070 ) -> Result<Self> {
1071 let schema = inverted_list_schema_for_version(config.with_position, config.format_version);
1072
1073 Ok(Self {
1074 tokenizer,
1075 dest_store,
1076 builder: InnerBuilder::new_with_format_version(
1077 id_alloc.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
1078 | config.fragment_mask.unwrap_or(0),
1079 config.with_position,
1080 config.token_set_format,
1081 config.format_version,
1082 ),
1083 partitions: Vec::new(),
1084 id_alloc,
1085 schema,
1086 memory_size: 0,
1087 worker_memory_limit_bytes: config.worker_memory_limit_bytes,
1088 total_doc_length: 0,
1089 fragment_mask: config.fragment_mask,
1090 token_set_format: config.token_set_format,
1091 token_ids: Vec::new(),
1092 last_token_count: 0,
1093 })
1094 }
1095
1096 fn has_position(&self) -> bool {
1097 self.schema
1098 .column_with_name(COMPRESSED_POSITION_COL)
1099 .is_some()
1100 || self.schema.column_with_name(POSITION_COL).is_some()
1101 }
1102
1103 async fn process_batch(&mut self, batch: RecordBatch) -> Result<()> {
1104 let doc_col = batch.column(0);
1105 let doc_iter = iter_str_array(doc_col);
1106 let row_id_col = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
1107 let docs = doc_iter
1108 .zip(row_id_col.values().iter())
1109 .filter_map(|(doc, row_id)| doc.map(|doc| (doc, *row_id)));
1110
1111 let with_position = self.has_position();
1112 for (doc, row_id) in docs {
1113 let builder_was_empty = self.builder.docs.is_empty();
1114 let old_temporary_memory_size = self.temporary_memory_size();
1115 let old_token_memory_size = self.builder.tokens.memory_size() as u64;
1116 let doc_id = self.builder.docs.len() as u32;
1117 let mut token_num: u32 = 0;
1118 let mut posting_memory_delta = 0i64;
1119 if with_position {
1120 if self.token_ids.capacity() < self.last_token_count {
1121 self.token_ids
1122 .reserve(self.last_token_count - self.token_ids.capacity());
1123 }
1124 self.token_ids.clear();
1125 let builder = &mut self.builder;
1126 let token_ids = &mut self.token_ids;
1127 let memory_size = &mut self.memory_size;
1128 let posting_tail_codec = builder.posting_tail_codec;
1129
1130 let mut token_stream = self.tokenizer.token_stream_for_doc(doc);
1131 while token_stream.advance() {
1132 let token = token_stream.token_mut();
1133 let token_text = std::mem::take(&mut token.text);
1134 let token_id = builder.tokens.add(token_text);
1135 if token_id as usize == builder.posting_lists.len() {
1136 let old_posting_lists_overhead_size = (builder.posting_lists.capacity()
1137 * std::mem::size_of::<PostingListBuilder>())
1138 as u64;
1139 builder.posting_lists.push(
1140 PostingListBuilder::new_with_posting_tail_codec(
1141 true,
1142 posting_tail_codec,
1143 ),
1144 );
1145 let new_posting_lists_overhead_size = (builder.posting_lists.capacity()
1146 * std::mem::size_of::<PostingListBuilder>())
1147 as u64;
1148 Self::adjust_tracked_value(
1149 memory_size,
1150 old_posting_lists_overhead_size,
1151 new_posting_lists_overhead_size,
1152 );
1153 }
1154 let posting_list = &mut builder.posting_lists[token_id as usize];
1155 let old_posting_memory_size = posting_list.size();
1156 if posting_list.add_occurrence(doc_id, token.position as u32)? {
1157 token_ids.push(token_id);
1158 }
1159 let new_posting_memory_size = posting_list.size();
1160 posting_memory_delta +=
1161 new_posting_memory_size as i64 - old_posting_memory_size as i64;
1162 token_num += 1;
1163 }
1164 } else {
1165 if self.token_ids.capacity() < self.last_token_count {
1166 self.token_ids
1167 .reserve(self.last_token_count - self.token_ids.capacity());
1168 }
1169 self.token_ids.clear();
1170
1171 let mut token_stream = self.tokenizer.token_stream_for_doc(doc);
1172 while token_stream.advance() {
1173 let token = token_stream.token_mut();
1174 let token_text = std::mem::take(&mut token.text);
1175 let token_id = self.builder.tokens.add(token_text);
1176 self.token_ids.push(token_id);
1177 token_num += 1;
1178 }
1179 }
1180 self.adjust_tracked_memory_size(
1181 old_token_memory_size,
1182 self.builder.tokens.memory_size() as u64,
1183 );
1184
1185 if !with_position {
1186 let old_posting_lists_overhead_size = self.posting_lists_overhead_size();
1187 self.builder
1188 .posting_lists
1189 .resize_with(self.builder.tokens.len(), || {
1190 PostingListBuilder::new_with_posting_tail_codec(
1191 false,
1192 self.builder.posting_tail_codec,
1193 )
1194 });
1195 let new_posting_lists_overhead_size = self.posting_lists_overhead_size();
1196 Self::adjust_tracked_value(
1197 &mut self.memory_size,
1198 old_posting_lists_overhead_size,
1199 new_posting_lists_overhead_size,
1200 );
1201 }
1202
1203 let old_doc_memory_size = self.builder.docs.memory_size() as u64;
1204 let appended_doc_id = self.builder.docs.append(row_id, token_num);
1205 debug_assert_eq!(appended_doc_id, doc_id);
1206 self.adjust_tracked_memory_size(
1207 old_doc_memory_size,
1208 self.builder.docs.memory_size() as u64,
1209 );
1210 self.total_doc_length += doc.len();
1211
1212 if with_position {
1213 for &token_id in &self.token_ids {
1214 let (old_posting_memory_size, new_posting_memory_size) = {
1215 let posting_list = &mut self.builder.posting_lists[token_id as usize];
1216 let old_posting_memory_size = posting_list.size();
1217 posting_list.finish_open_doc(doc_id)?;
1218 let new_posting_memory_size = posting_list.size();
1219 (old_posting_memory_size, new_posting_memory_size)
1220 };
1221 posting_memory_delta +=
1222 new_posting_memory_size as i64 - old_posting_memory_size as i64;
1223 }
1224 Self::apply_delta(&mut self.memory_size, posting_memory_delta);
1225 } else if token_num > 0 {
1226 self.token_ids.sort_unstable();
1227 let mut iter = self.token_ids.iter();
1228 let mut current = *iter.next().unwrap();
1229 let mut count = 1u32;
1230 for &token_id in iter {
1231 if token_id == current {
1232 count += 1;
1233 continue;
1234 }
1235
1236 let (old_posting_memory_size, new_posting_memory_size) = {
1237 let posting_list = &mut self.builder.posting_lists[current as usize];
1238 let old_posting_memory_size = posting_list.size();
1239 posting_list.add(doc_id, PositionRecorder::Count(count));
1240 let new_posting_memory_size = posting_list.size();
1241 (old_posting_memory_size, new_posting_memory_size)
1242 };
1243 posting_memory_delta +=
1244 new_posting_memory_size as i64 - old_posting_memory_size as i64;
1245
1246 current = token_id;
1247 count = 1;
1248 }
1249 let (old_posting_memory_size, new_posting_memory_size) = {
1250 let posting_list = &mut self.builder.posting_lists[current as usize];
1251 let old_posting_memory_size = posting_list.size();
1252 posting_list.add(doc_id, PositionRecorder::Count(count));
1253 let new_posting_memory_size = posting_list.size();
1254 (old_posting_memory_size, new_posting_memory_size)
1255 };
1256 posting_memory_delta +=
1257 new_posting_memory_size as i64 - old_posting_memory_size as i64;
1258 Self::apply_delta(&mut self.memory_size, posting_memory_delta);
1259 }
1260 self.last_token_count = self.token_ids.len();
1261 self.trim_temporary_buffers();
1262 self.adjust_tracked_memory_size(
1263 old_temporary_memory_size,
1264 self.temporary_memory_size(),
1265 );
1266
1267 if self.builder.docs.len() == 1 && self.memory_size > self.worker_memory_limit_bytes {
1268 return Err(Error::invalid_input(format!(
1269 "single document row_id={} exceeds worker memory limit: {} > {} bytes",
1270 row_id, self.memory_size, self.worker_memory_limit_bytes
1271 )));
1272 }
1273
1274 if self.builder.docs.len() as u32 == u32::MAX
1275 || (!builder_was_empty && self.memory_size >= self.worker_memory_limit_bytes)
1276 {
1277 self.flush().await?;
1278 }
1279 }
1280
1281 Ok(())
1282 }
1283
1284 #[instrument(level = "debug", skip_all)]
1285 async fn flush(&mut self) -> Result<()> {
1286 if self.builder.tokens.is_empty() {
1287 return Ok(());
1288 }
1289
1290 log::info!(
1291 "flushing posting lists, memory size: {} MiB",
1292 self.memory_size / (1024 * 1024)
1293 );
1294 self.memory_size = self.temporary_memory_size();
1295 let with_position = self.has_position();
1296 let format_version = self.builder.format_version;
1297 let builder = std::mem::replace(
1298 &mut self.builder,
1299 InnerBuilder::new_with_format_version(
1300 self.id_alloc
1301 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
1302 | self.fragment_mask.unwrap_or(0),
1303 with_position,
1304 self.token_set_format,
1305 format_version,
1306 ),
1307 );
1308 let written_partition_id = builder.id();
1309 let mut builder = builder;
1310 builder
1311 .write(self.dest_store.as_ref())
1312 .await
1313 .map_err(|err| {
1314 Error::execution(format!(
1315 "failed to write finalized partition {}: {err}",
1316 written_partition_id
1317 ))
1318 })?;
1319 self.partitions.push(written_partition_id);
1320 Ok(())
1321 }
1322
1323 async fn finish(self) -> Result<WorkerOutput> {
1324 let tail_partition = if self.builder.tokens.is_empty() {
1325 None
1326 } else {
1327 Some(TailPartition {
1328 builder: self.builder,
1329 })
1330 };
1331 Ok(WorkerOutput {
1332 partitions: self.partitions,
1333 tail_partition,
1334 })
1335 }
1336}
1337
1338#[derive(Debug, Clone)]
1339pub enum PositionRecorder {
1340 Position(SmallVec<[u32; 2]>),
1341 Count(u32),
1342}
1343
1344impl PositionRecorder {
1345 pub fn len(&self) -> u32 {
1346 match self {
1347 Self::Position(positions) => positions.len() as u32,
1348 Self::Count(count) => *count,
1349 }
1350 }
1351
1352 pub fn is_empty(&self) -> bool {
1353 self.len() == 0
1354 }
1355
1356 pub fn into_vec(self) -> Vec<u32> {
1357 match self {
1358 Self::Position(positions) => positions.into_vec(),
1359 Self::Count(_) => vec![0],
1360 }
1361 }
1362}
1363
1364#[derive(Debug, Eq, PartialEq, Clone, DeepSizeOf)]
1365pub struct ScoredDoc {
1366 pub row_id: u64,
1367 pub score: OrderedFloat,
1368}
1369
1370impl ScoredDoc {
1371 pub fn new(row_id: u64, score: f32) -> Self {
1372 Self {
1373 row_id,
1374 score: OrderedFloat(score),
1375 }
1376 }
1377}
1378
1379impl PartialOrd for ScoredDoc {
1380 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1381 Some(self.cmp(other))
1382 }
1383}
1384
1385impl Ord for ScoredDoc {
1386 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1387 self.score.cmp(&other.score)
1388 }
1389}
1390
1391pub fn legacy_inverted_list_schema(with_position: bool) -> SchemaRef {
1392 let mut fields = vec![
1393 arrow_schema::Field::new(ROW_ID, arrow_schema::DataType::UInt64, false),
1394 arrow_schema::Field::new(FREQUENCY_COL, arrow_schema::DataType::Float32, false),
1395 ];
1396 if with_position {
1397 fields.push(arrow_schema::Field::new(
1398 POSITION_COL,
1399 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1400 "item",
1401 arrow_schema::DataType::Int32,
1402 true,
1403 ))),
1404 false,
1405 ));
1406 }
1407 Arc::new(arrow_schema::Schema::new(fields))
1408}
1409
1410pub fn inverted_list_schema(with_position: bool) -> SchemaRef {
1411 inverted_list_schema_for_version(with_position, current_fts_format_version())
1412}
1413
1414pub fn inverted_list_schema_for_version(
1415 with_position: bool,
1416 format_version: InvertedListFormatVersion,
1417) -> SchemaRef {
1418 match format_version {
1419 InvertedListFormatVersion::V1 => inverted_list_schema_v1(with_position),
1420 InvertedListFormatVersion::V2 => inverted_list_schema_with_tail_codec_and_position_codec(
1421 with_position,
1422 PostingTailCodec::VarintDelta,
1423 Some(PositionStreamCodec::PackedDelta),
1424 ),
1425 }
1426}
1427
1428fn inverted_list_schema_v1(with_position: bool) -> SchemaRef {
1429 let mut fields = vec![
1430 arrow_schema::Field::new(
1431 POSTING_COL,
1432 datatypes::DataType::List(Arc::new(Field::new(
1433 "item",
1434 datatypes::DataType::LargeBinary,
1435 true,
1436 ))),
1437 false,
1438 ),
1439 arrow_schema::Field::new(MAX_SCORE_COL, datatypes::DataType::Float32, false),
1440 arrow_schema::Field::new(LENGTH_COL, datatypes::DataType::UInt32, false),
1441 ];
1442 if with_position {
1443 fields.push(arrow_schema::Field::new(
1444 POSITION_COL,
1445 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1446 "item",
1447 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1448 "item",
1449 arrow_schema::DataType::LargeBinary,
1450 true,
1451 ))),
1452 true,
1453 ))),
1454 false,
1455 ));
1456 }
1457 Arc::new(arrow_schema::Schema::new(fields))
1458}
1459
1460pub fn inverted_list_schema_with_tail_codec(
1461 with_position: bool,
1462 posting_tail_codec: PostingTailCodec,
1463) -> SchemaRef {
1464 inverted_list_schema_with_tail_codec_and_position_codec(
1465 with_position,
1466 posting_tail_codec,
1467 Some(PositionStreamCodec::PackedDelta),
1468 )
1469}
1470
1471fn inverted_list_schema_with_tail_codec_and_position_codec(
1472 with_position: bool,
1473 posting_tail_codec: PostingTailCodec,
1474 position_codec: Option<PositionStreamCodec>,
1475) -> SchemaRef {
1476 let mut fields = vec![
1477 arrow_schema::Field::new(
1480 POSTING_COL,
1481 datatypes::DataType::List(Arc::new(Field::new(
1482 "item",
1483 datatypes::DataType::LargeBinary,
1484 true,
1485 ))),
1486 false,
1487 ),
1488 arrow_schema::Field::new(MAX_SCORE_COL, datatypes::DataType::Float32, false),
1489 arrow_schema::Field::new(LENGTH_COL, datatypes::DataType::UInt32, false),
1490 ];
1491 if with_position {
1492 fields.push(arrow_schema::Field::new(
1493 COMPRESSED_POSITION_COL,
1494 arrow_schema::DataType::LargeBinary,
1495 false,
1496 ));
1497 fields.push(arrow_schema::Field::new(
1498 POSITION_BLOCK_OFFSET_COL,
1499 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1500 "item",
1501 arrow_schema::DataType::UInt32,
1502 true,
1503 ))),
1504 false,
1505 ));
1506 }
1507 let mut metadata = HashMap::from([(
1508 POSTING_TAIL_CODEC_KEY.to_owned(),
1509 posting_tail_codec.as_str().to_owned(),
1510 )]);
1511 if let Some(position_codec) = position_codec.filter(|_| with_position) {
1512 metadata.insert(
1513 POSITIONS_LAYOUT_KEY.to_owned(),
1514 POSITIONS_LAYOUT_SHARED_STREAM_V2.to_owned(),
1515 );
1516 metadata.insert(
1517 POSITIONS_CODEC_KEY.to_owned(),
1518 position_codec.as_str().to_owned(),
1519 );
1520 }
1521 Arc::new(arrow_schema::Schema::new_with_metadata(fields, metadata))
1522}
1523
1524pub struct FlattenStream {
1526 inner: SendableRecordBatchStream,
1530 field_type: DataType,
1531 data_type: DataType,
1532}
1533
1534impl FlattenStream {
1535 pub fn new(input: SendableRecordBatchStream) -> Self {
1536 let schema = input.schema();
1537 let field = schema.field(0);
1538 let data_type = match field.data_type() {
1539 DataType::List(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
1540 DataType::List(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
1541 DataType::LargeUtf8
1542 }
1543 DataType::LargeList(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
1544 DataType::LargeList(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
1545 DataType::LargeUtf8
1546 }
1547 _ => panic!(
1548 "expect data type List(Utf8) or List(LargeUtf8) but got {:?}",
1549 field.data_type()
1550 ),
1551 };
1552 Self {
1553 inner: input,
1554 field_type: field.data_type().clone(),
1555 data_type,
1556 }
1557 }
1558}
1559
1560impl Stream for FlattenStream {
1561 type Item = datafusion_common::Result<RecordBatch>;
1562
1563 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1564 match Pin::new(&mut self.inner).poll_next(cx) {
1565 Poll::Ready(Some(Ok(batch))) => {
1566 let doc_col = batch.column(0);
1567 let batch = match self.field_type {
1568 DataType::List(_) => flatten_string_list::<i32>(&batch, doc_col).map_err(|e| {
1569 datafusion_common::error::DataFusionError::Execution(format!(
1570 "flatten string list error: {}",
1571 e
1572 ))
1573 }),
1574 DataType::LargeList(_) => {
1575 flatten_string_list::<i64>(&batch, doc_col).map_err(|e| {
1576 datafusion_common::error::DataFusionError::Execution(format!(
1577 "flatten string list error: {}",
1578 e
1579 ))
1580 })
1581 }
1582 _ => unreachable!(
1583 "expect data type List or LargeList but got {:?}",
1584 self.field_type
1585 ),
1586 };
1587 Poll::Ready(Some(batch))
1588 }
1589 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
1590 Poll::Ready(None) => Poll::Ready(None),
1591 Poll::Pending => Poll::Pending,
1592 }
1593 }
1594}
1595
1596impl RecordBatchStream for FlattenStream {
1597 fn schema(&self) -> SchemaRef {
1598 let schema = Schema::new(vec![
1599 Field::new(
1600 self.inner.schema().field(0).name(),
1601 self.data_type.clone(),
1602 true,
1603 ),
1604 ROW_ID_FIELD.clone(),
1605 ]);
1606
1607 Arc::new(schema)
1608 }
1609}
1610
1611fn flatten_string_list<Offset: arrow::array::OffsetSizeTrait>(
1612 batch: &RecordBatch,
1613 doc_col: &Arc<dyn Array>,
1614) -> Result<RecordBatch> {
1615 let docs = doc_col.as_list::<Offset>();
1616 let row_ids = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
1617
1618 let row_ids = row_ids
1619 .values()
1620 .iter()
1621 .zip(docs.iter())
1622 .flat_map(|(row_id, doc)| std::iter::repeat_n(*row_id, doc.map(|d| d.len()).unwrap_or(0)));
1623
1624 let row_ids = Arc::new(UInt64Array::from_iter_values(row_ids));
1625 let docs = match docs.value_type() {
1626 datatypes::DataType::Utf8 | datatypes::DataType::LargeUtf8 => docs.values().clone(),
1627 _ => {
1628 return Err(Error::index(format!(
1629 "expect data type String or LargeString but got {}",
1630 docs.value_type()
1631 )));
1632 }
1633 };
1634
1635 let schema = Schema::new(vec![
1636 Field::new(
1637 batch.schema().field(0).name(),
1638 docs.data_type().clone(),
1639 true,
1640 ),
1641 ROW_ID_FIELD.clone(),
1642 ]);
1643 let batch = RecordBatch::try_new(Arc::new(schema), vec![docs, row_ids])?;
1644 Ok(batch)
1645}
1646
1647pub(crate) fn token_file_path(partition_id: u64) -> String {
1648 format!("part_{}_{}", partition_id, TOKENS_FILE)
1649}
1650
1651pub(crate) fn posting_file_path(partition_id: u64) -> String {
1652 format!("part_{}_{}", partition_id, INVERT_LIST_FILE)
1653}
1654
1655pub(crate) fn doc_file_path(partition_id: u64) -> String {
1656 format!("part_{}_{}", partition_id, DOCS_FILE)
1657}
1658
1659pub(crate) fn part_metadata_file_path(partition_id: u64) -> String {
1660 format!("part_{}_{}", partition_id, METADATA_FILE)
1661}
1662
1663const PARTITION_FILE_SUFFIXES: [&str; 3] = [TOKENS_FILE, INVERT_LIST_FILE, DOCS_FILE];
1664const PARTITION_FILE_RENAME_PHASES: u64 = 2;
1668
1669pub async fn merge_index_files(
1670 object_store: &ObjectStore,
1671 index_dir: &Path,
1672 store: Arc<dyn IndexStore>,
1673 progress: Arc<dyn IndexBuildProgress>,
1674) -> Result<()> {
1675 let part_metadata_files = list_metadata_files(object_store, index_dir).await?;
1677
1678 merge_metadata_files(store, &part_metadata_files, progress).await
1680}
1681
1682async fn list_metadata_files(object_store: &ObjectStore, index_dir: &Path) -> Result<Vec<String>> {
1685 let mut part_metadata_files = Vec::new();
1687 let mut list_stream = object_store.list(Some(index_dir.clone()));
1688
1689 while let Some(item) = list_stream.next().await {
1690 match item {
1691 Ok(meta) => {
1692 let file_name = meta.location.filename().unwrap_or_default();
1693 if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
1695 part_metadata_files.push(file_name.to_string());
1696 }
1697 }
1698 Err(_) => continue,
1699 }
1700 }
1701
1702 if part_metadata_files.is_empty() {
1703 return Err(Error::invalid_input_source(
1704 format!(
1705 "No partition metadata files found in index directory: {}",
1706 index_dir
1707 )
1708 .into(),
1709 ));
1710 }
1711
1712 Ok(part_metadata_files)
1713}
1714
1715async fn merge_metadata_files(
1717 store: Arc<dyn IndexStore>,
1718 part_metadata_files: &[String],
1719 progress: Arc<dyn IndexBuildProgress>,
1720) -> Result<()> {
1721 let mut all_partitions = Vec::new();
1723 let mut params = None;
1724 let mut token_set_format = None;
1725 let mut format_version = None;
1726 let mut posting_tail_codec = None;
1727 let mut deleted_fragments = RoaringBitmap::new();
1728 progress
1729 .stage_start(
1730 "read_partition_metadata",
1731 Some(part_metadata_files.len() as u64),
1732 "files",
1733 )
1734 .await?;
1735
1736 for (idx, file_name) in part_metadata_files.iter().enumerate() {
1737 let reader = store.open_index_file(file_name).await?;
1738 let metadata = &reader.schema().metadata;
1739
1740 let partitions_str = metadata.get("partitions").ok_or(Error::index(format!(
1741 "partitions not found in {}",
1742 file_name
1743 )))?;
1744
1745 let partition_ids: Vec<u64> = serde_json::from_str(partitions_str)
1746 .map_err(|e| Error::index(format!("Failed to parse partitions: {}", e)))?;
1747
1748 all_partitions.extend(partition_ids);
1749
1750 if params.is_none() {
1751 let params_str = metadata
1752 .get("params")
1753 .ok_or(Error::index(format!("params not found in {}", file_name)))?;
1754 params = Some(
1755 serde_json::from_str::<InvertedIndexParams>(params_str)
1756 .map_err(|e| Error::index(format!("Failed to parse params: {}", e)))?,
1757 );
1758 }
1759
1760 if token_set_format.is_none()
1761 && let Some(name) = metadata.get(TOKEN_SET_FORMAT_KEY)
1762 {
1763 token_set_format = Some(TokenSetFormat::from_str(name)?);
1764 }
1765 if format_version.is_none() {
1766 format_version = Some(parse_format_version_from_metadata(metadata)?);
1767 }
1768 if posting_tail_codec.is_none() {
1769 posting_tail_codec = Some(parse_posting_tail_codec(metadata)?);
1770 }
1771
1772 if reader.num_rows() > 0 {
1773 let metadata_batch = reader.read_range(0..1, None).await?;
1774 let deleted_fragments_col = metadata_batch
1775 .column_by_name(DELETED_FRAGMENTS_COL)
1776 .expect_ok()?;
1777 let deleted_fragments_arr = deleted_fragments_col
1778 .as_any()
1779 .downcast_ref::<BinaryArray>()
1780 .expect_ok()?;
1781 let part_deleted_fragments =
1782 RoaringBitmap::deserialize_from(deleted_fragments_arr.value(0))?;
1783 deleted_fragments.extend(part_deleted_fragments);
1784 }
1785 progress
1786 .stage_progress("read_partition_metadata", idx as u64 + 1)
1787 .await?;
1788 }
1789 progress.stage_complete("read_partition_metadata").await?;
1790
1791 let mut sorted_ids = all_partitions.clone();
1793 sorted_ids.sort();
1794 sorted_ids.dedup();
1795
1796 let id_mapping: HashMap<u64, u64> = sorted_ids
1797 .iter()
1798 .enumerate()
1799 .map(|(new_id, &old_id)| (old_id, new_id as u64))
1800 .collect();
1801
1802 let timestamp = std::time::SystemTime::now()
1804 .duration_since(std::time::UNIX_EPOCH)
1805 .unwrap()
1806 .as_secs();
1807
1808 let changed_partition_count = id_mapping
1809 .iter()
1810 .filter(|(old_id, new_id)| old_id != new_id)
1811 .count() as u64;
1812 let total_renames = changed_partition_count
1813 * PARTITION_FILE_SUFFIXES.len() as u64
1814 * PARTITION_FILE_RENAME_PHASES;
1815 progress
1816 .stage_start("remap_partition_files", Some(total_renames), "files")
1817 .await?;
1818
1819 let mut temp_files: Vec<(String, String, String)> = Vec::new(); let mut renamed_files = 0u64;
1822
1823 for (&old_id, &new_id) in &id_mapping {
1824 if old_id != new_id {
1825 for suffix in PARTITION_FILE_SUFFIXES {
1826 let old_path = format!("part_{}_{}", old_id, suffix);
1827 let new_path = format!("part_{}_{}", new_id, suffix);
1828 let temp_path = format!("temp_{}_{}", timestamp, old_path);
1829
1830 if let Err(e) = store.rename_index_file(&old_path, &temp_path).await {
1832 for (temp_name, old_name, _) in temp_files.iter().rev() {
1834 let _ = store.rename_index_file(temp_name, old_name).await;
1835 }
1836 return Err(Error::index(format!(
1837 "Failed to move {} to temp {}: {}",
1838 old_path, temp_path, e
1839 )));
1840 }
1841 temp_files.push((temp_path, old_path, new_path));
1842 renamed_files += 1;
1843 progress
1844 .stage_progress("remap_partition_files", renamed_files)
1845 .await?;
1846 }
1847 }
1848 }
1849
1850 let mut completed_renames: Vec<(String, String)> = Vec::new(); for (temp_path, _old_path, final_path) in &temp_files {
1854 if let Err(e) = store.rename_index_file(temp_path, final_path).await {
1855 for (final_name, temp_name) in completed_renames.iter().rev() {
1857 let _ = store.rename_index_file(final_name, temp_name).await;
1858 }
1859 for (temp_name, orig_name, _) in temp_files.iter() {
1861 if !completed_renames.iter().any(|(_, t)| t == temp_name) {
1862 let _ = store.rename_index_file(temp_name, orig_name).await;
1863 }
1864 }
1865 return Err(Error::index(format!(
1866 "Failed to rename {} to {}: {}",
1867 temp_path, final_path, e
1868 )));
1869 }
1870 completed_renames.push((final_path.clone(), temp_path.clone()));
1871 renamed_files += 1;
1872 progress
1873 .stage_progress("remap_partition_files", renamed_files)
1874 .await?;
1875 }
1876 progress.stage_complete("remap_partition_files").await?;
1877
1878 let remapped_partitions: Vec<u64> = (0..id_mapping.len() as u64).collect();
1880 let params = params.unwrap_or_default();
1881 let token_set_format = token_set_format.unwrap_or(TokenSetFormat::Arrow);
1882 let builder = InvertedIndexBuilder::from_existing_index(
1883 params,
1884 None,
1885 remapped_partitions.clone(),
1886 token_set_format,
1887 None,
1888 deleted_fragments,
1889 )
1890 .with_format_version(format_version.unwrap_or(InvertedListFormatVersion::V1))
1891 .with_posting_tail_codec(posting_tail_codec.unwrap_or(PostingTailCodec::Fixed32));
1892 progress
1893 .stage_start("write_merged_metadata", Some(1), "files")
1894 .await?;
1895 builder
1896 .write_metadata(&*store, &remapped_partitions)
1897 .await?;
1898 progress.stage_progress("write_merged_metadata", 1).await?;
1899 progress.stage_complete("write_merged_metadata").await?;
1900
1901 for file_name in part_metadata_files {
1903 if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
1904 let _ = store.delete_index_file(file_name).await;
1905 }
1906 }
1907
1908 Ok(())
1909}
1910
1911pub fn document_input(
1918 input: SendableRecordBatchStream,
1919 column: &str,
1920) -> Result<SendableRecordBatchStream> {
1921 let schema = input.schema();
1922 let field = schema.column_with_name(column).expect_ok()?.1;
1923 match field.data_type() {
1924 DataType::Utf8 | DataType::LargeUtf8 => Ok(input),
1925 DataType::List(field) | DataType::LargeList(field)
1926 if matches!(field.data_type(), DataType::Utf8 | DataType::LargeUtf8) =>
1927 {
1928 Ok(Box::pin(FlattenStream::new(input)))
1929 }
1930 DataType::LargeBinary => match field.metadata().get(ARROW_EXT_NAME_KEY) {
1931 Some(name) if name.as_str() == JSON_EXT_NAME => {
1932 Ok(Box::pin(JsonTextStream::new(input, column.to_string())))
1933 }
1934 _ => Err(Error::invalid_input_source(
1935 format!("column {} is not json", column).into(),
1936 )),
1937 },
1938 _ => Err(Error::invalid_input_source(
1939 format!(
1940 "column {} has type {}, is not utf8, large utf8 type/list, or large binary",
1941 column,
1942 field.data_type()
1943 )
1944 .into(),
1945 )),
1946 }
1947}
1948
1949#[cfg(test)]
1950mod tests {
1951 use super::*;
1952 use crate::metrics::NoOpMetricsCollector;
1953 use crate::progress::IndexBuildProgress;
1954 use crate::scalar::{IndexFile, IndexReader, IndexWriter, ScalarIndex};
1955 use arrow_array::{RecordBatch, StringArray, UInt64Array};
1956 use arrow_schema::{DataType, Field, Schema};
1957 use async_trait::async_trait;
1958 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1959 use futures::stream;
1960 use lance_core::ROW_ID;
1961 use lance_core::cache::LanceCache;
1962 use lance_core::utils::tempfile::TempDir;
1963 use std::any::Any;
1964 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1965 use std::time::Duration;
1966
1967 fn make_doc_batch(doc: &str, row_id: u64) -> RecordBatch {
1968 let schema = Arc::new(Schema::new(vec![
1969 Field::new("doc", DataType::Utf8, true),
1970 Field::new(ROW_ID, DataType::UInt64, false),
1971 ]));
1972 let docs = Arc::new(StringArray::from(vec![Some(doc)]));
1973 let row_ids = Arc::new(UInt64Array::from(vec![row_id]));
1974 RecordBatch::try_new(schema, vec![docs, row_ids]).unwrap()
1975 }
1976
1977 #[derive(Debug, Default, Clone)]
1978 struct CountingStore {
1979 write_count: Arc<AtomicUsize>,
1980 }
1981
1982 impl CountingStore {
1983 fn new() -> Self {
1984 Self {
1985 write_count: Arc::new(AtomicUsize::new(0)),
1986 }
1987 }
1988
1989 fn write_count(&self) -> usize {
1990 self.write_count.load(Ordering::SeqCst)
1991 }
1992 }
1993
1994 impl DeepSizeOf for CountingStore {
1995 fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
1996 0
1997 }
1998 }
1999
2000 #[derive(Debug)]
2001 struct CountingWriter {
2002 write_count: Arc<AtomicUsize>,
2003 }
2004
2005 #[async_trait]
2006 impl IndexWriter for CountingWriter {
2007 async fn write_record_batch(&mut self, _batch: RecordBatch) -> Result<u64> {
2008 Ok(self.write_count.fetch_add(1, Ordering::SeqCst) as u64)
2009 }
2010
2011 async fn finish(&mut self) -> Result<()> {
2012 Ok(())
2013 }
2014
2015 async fn finish_with_metadata(&mut self, _metadata: HashMap<String, String>) -> Result<()> {
2016 Ok(())
2017 }
2018 }
2019
2020 #[async_trait]
2021 impl IndexStore for CountingStore {
2022 fn as_any(&self) -> &dyn Any {
2023 self
2024 }
2025
2026 fn clone_arc(&self) -> Arc<dyn IndexStore> {
2027 Arc::new(self.clone())
2028 }
2029
2030 fn io_parallelism(&self) -> usize {
2031 1
2032 }
2033
2034 async fn new_index_file(
2035 &self,
2036 _name: &str,
2037 _schema: Arc<Schema>,
2038 ) -> Result<Box<dyn IndexWriter>> {
2039 Ok(Box::new(CountingWriter {
2040 write_count: self.write_count.clone(),
2041 }))
2042 }
2043
2044 async fn open_index_file(&self, _name: &str) -> Result<Arc<dyn IndexReader>> {
2045 Err(Error::not_supported(
2046 "CountingStore does not support reading",
2047 ))
2048 }
2049
2050 async fn copy_index_file(&self, _name: &str, _dest_store: &dyn IndexStore) -> Result<()> {
2051 Err(Error::not_supported(
2052 "CountingStore does not support copying",
2053 ))
2054 }
2055
2056 async fn rename_index_file(&self, _name: &str, _new_name: &str) -> Result<()> {
2057 Err(Error::not_supported(
2058 "CountingStore does not support renaming",
2059 ))
2060 }
2061
2062 async fn delete_index_file(&self, _name: &str) -> Result<()> {
2063 Err(Error::not_supported(
2064 "CountingStore does not support deleting",
2065 ))
2066 }
2067
2068 async fn list_files_with_sizes(&self) -> Result<Vec<IndexFile>> {
2069 Ok(vec![])
2070 }
2071 }
2072
2073 #[tokio::test]
2074 async fn test_write_posting_lists_batches_multiple_rows() -> Result<()> {
2075 let mut builder = InnerBuilder::new(0, false, TokenSetFormat::default());
2076 for doc_id in 0..3u64 {
2077 builder.docs.append(doc_id, 1);
2078 }
2079
2080 for doc_id in 0..3u32 {
2081 let mut posting_list = PostingListBuilder::new(false);
2082 posting_list.add(doc_id, PositionRecorder::Count(1));
2083 builder.posting_lists.push(posting_list);
2084 }
2085
2086 let store = CountingStore::new();
2087 let docs = Arc::new(std::mem::take(&mut builder.docs));
2088 builder.write_posting_lists(&store, docs).await?;
2089
2090 assert_eq!(store.write_count(), 1);
2091 Ok(())
2092 }
2093
2094 #[tokio::test]
2095 async fn test_build_only_path_writes_partitions_as_is() -> Result<()> {
2096 let src_dir = TempDir::default();
2097 let dest_dir = TempDir::default();
2098 let src_store = Arc::new(LanceIndexStore::new(
2099 ObjectStore::local().into(),
2100 src_dir.obj_path(),
2101 Arc::new(LanceCache::no_cache()),
2102 ));
2103 let dest_store = Arc::new(LanceIndexStore::new(
2104 ObjectStore::local().into(),
2105 dest_dir.obj_path(),
2106 Arc::new(LanceCache::no_cache()),
2107 ));
2108
2109 let params = InvertedIndexParams::default();
2110 let tokenizer = params.build()?;
2111 let token_set_format = TokenSetFormat::default();
2112 let id_alloc = Arc::new(AtomicU64::new(0));
2113
2114 let mut worker1 = IndexWorker::new(
2115 tokenizer.clone(),
2116 src_store.clone(),
2117 id_alloc.clone(),
2118 IndexWorkerConfig {
2119 with_position: params.with_position,
2120 format_version: InvertedListFormatVersion::V1,
2121 fragment_mask: None,
2122 token_set_format,
2123 worker_memory_limit_bytes: u64::MAX,
2124 },
2125 )
2126 .await?;
2127 worker1
2128 .process_batch(make_doc_batch("hello world", 0))
2129 .await?;
2130 let output1 = worker1.finish().await?;
2131 let mut partitions = output1.partitions;
2132 if let Some(mut tail_partition) = output1.tail_partition {
2133 partitions.push(tail_partition.builder.id());
2134 tail_partition.builder.write(src_store.as_ref()).await?;
2135 }
2136
2137 let mut worker2 = IndexWorker::new(
2138 tokenizer.clone(),
2139 src_store.clone(),
2140 id_alloc.clone(),
2141 IndexWorkerConfig {
2142 with_position: params.with_position,
2143 format_version: InvertedListFormatVersion::V1,
2144 fragment_mask: None,
2145 token_set_format,
2146 worker_memory_limit_bytes: u64::MAX,
2147 },
2148 )
2149 .await?;
2150 worker2
2151 .process_batch(make_doc_batch("goodbye world", 1))
2152 .await?;
2153 let output2 = worker2.finish().await?;
2154 partitions.extend(output2.partitions);
2155 if let Some(mut tail_partition) = output2.tail_partition {
2156 partitions.push(tail_partition.builder.id());
2157 tail_partition.builder.write(src_store.as_ref()).await?;
2158 }
2159 partitions.sort_unstable();
2160 assert_eq!(partitions.len(), 2);
2161 assert_ne!(partitions[0], partitions[1]);
2162
2163 let builder = InvertedIndexBuilder::from_existing_index(
2164 InvertedIndexParams::default(),
2165 Some(src_store.clone()),
2166 partitions.clone(),
2167 token_set_format,
2168 None,
2169 RoaringBitmap::new(),
2170 );
2171 builder.write(dest_store.as_ref()).await?;
2172
2173 let metadata_reader = dest_store.open_index_file(METADATA_FILE).await?;
2174 let metadata = &metadata_reader.schema().metadata;
2175 let partitions_str = metadata
2176 .get("partitions")
2177 .expect("partitions missing from metadata");
2178 let written_partitions: Vec<u64> = serde_json::from_str(partitions_str).unwrap();
2179 assert_eq!(written_partitions, partitions);
2180
2181 for id in &partitions {
2182 dest_store.open_index_file(&token_file_path(*id)).await?;
2183 dest_store.open_index_file(&posting_file_path(*id)).await?;
2184 dest_store.open_index_file(&doc_file_path(*id)).await?;
2185 }
2186
2187 Ok(())
2188 }
2189
2190 #[tokio::test]
2191 async fn test_update_preserves_existing_posting_tail_codec() -> Result<()> {
2192 let src_dir = TempDir::default();
2193 let dest_dir = TempDir::default();
2194 let src_store = Arc::new(LanceIndexStore::new(
2195 ObjectStore::local().into(),
2196 src_dir.obj_path(),
2197 Arc::new(LanceCache::no_cache()),
2198 ));
2199 let dest_store = Arc::new(LanceIndexStore::new(
2200 ObjectStore::local().into(),
2201 dest_dir.obj_path(),
2202 Arc::new(LanceCache::no_cache()),
2203 ));
2204
2205 let posting_tail_codec = PostingTailCodec::Fixed32;
2206 let mut partition = InnerBuilder::new_with_posting_tail_codec(
2207 0,
2208 false,
2209 TokenSetFormat::default(),
2210 posting_tail_codec,
2211 );
2212 partition.tokens.add("hello".to_owned());
2213 let mut posting_list =
2214 PostingListBuilder::new_with_posting_tail_codec(false, posting_tail_codec);
2215 posting_list.add(0, PositionRecorder::Count(1));
2216 partition.posting_lists.push(posting_list);
2217 partition.docs.append(100, 1);
2218 partition.write(src_store.as_ref()).await?;
2219
2220 let metadata_writer = InvertedIndexBuilder::from_existing_index(
2221 InvertedIndexParams::default(),
2222 Some(src_store.clone()),
2223 vec![0],
2224 TokenSetFormat::default(),
2225 None,
2226 RoaringBitmap::new(),
2227 )
2228 .with_posting_tail_codec(posting_tail_codec);
2229 metadata_writer
2230 .write_metadata(src_store.as_ref(), &[0])
2231 .await?;
2232
2233 let index = InvertedIndex::load(src_store, None, &LanceCache::no_cache()).await?;
2234 let schema = Arc::new(Schema::new(vec![
2235 Field::new("doc", DataType::Utf8, true),
2236 Field::new(ROW_ID, DataType::UInt64, false),
2237 ]));
2238 let docs = Arc::new(StringArray::from(vec![Some("hello again")]));
2239 let row_ids = Arc::new(UInt64Array::from(vec![101u64]));
2240 let batch = RecordBatch::try_new(schema.clone(), vec![docs, row_ids])?;
2241 let stream = RecordBatchStreamAdapter::new(schema, stream::iter(vec![Ok(batch)]));
2242 index
2243 .update(Box::pin(stream), dest_store.as_ref(), None)
2244 .await?;
2245
2246 let updated =
2247 InvertedIndex::load(dest_store.clone(), None, &LanceCache::no_cache()).await?;
2248 assert_eq!(updated.partitions.len(), 2);
2249 for partition in &updated.partitions {
2250 assert_eq!(
2251 partition.inverted_list.posting_tail_codec(),
2252 posting_tail_codec
2253 );
2254 }
2255
2256 let metadata = dest_store.open_index_file(METADATA_FILE).await?;
2257 assert_eq!(
2258 metadata.schema().metadata.get(POSTING_TAIL_CODEC_KEY),
2259 Some(&posting_tail_codec.as_str().to_owned())
2260 );
2261
2262 Ok(())
2263 }
2264
2265 #[test]
2266 fn test_with_posting_tail_codec_syncs_format_version() {
2267 let builder = InvertedIndexBuilder::from_existing_index(
2268 InvertedIndexParams::default(),
2269 None,
2270 Vec::new(),
2271 TokenSetFormat::default(),
2272 None,
2273 RoaringBitmap::new(),
2274 )
2275 .with_format_version(InvertedListFormatVersion::V2)
2276 .with_posting_tail_codec(PostingTailCodec::Fixed32);
2277 assert_eq!(builder.format_version, InvertedListFormatVersion::V1);
2278 assert_eq!(builder.posting_tail_codec, PostingTailCodec::Fixed32);
2279
2280 let builder = builder.with_posting_tail_codec(PostingTailCodec::VarintDelta);
2281 assert_eq!(builder.format_version, InvertedListFormatVersion::V2);
2282 assert_eq!(builder.posting_tail_codec, PostingTailCodec::VarintDelta);
2283 }
2284
2285 #[tokio::test]
2286 async fn test_inverted_index_without_positions_tracks_frequency() -> Result<()> {
2287 let index_dir = TempDir::default();
2288 let store = Arc::new(LanceIndexStore::new(
2289 ObjectStore::local().into(),
2290 index_dir.obj_path(),
2291 Arc::new(LanceCache::no_cache()),
2292 ));
2293
2294 let schema = Arc::new(Schema::new(vec![
2295 Field::new("doc", DataType::Utf8, true),
2296 Field::new(ROW_ID, DataType::UInt64, false),
2297 ]));
2298 let docs = Arc::new(StringArray::from(vec![Some("hello hello world")]));
2299 let row_ids = Arc::new(UInt64Array::from(vec![0u64]));
2300 let batch = RecordBatch::try_new(schema.clone(), vec![docs, row_ids])?;
2301 let stream = RecordBatchStreamAdapter::new(schema, stream::iter(vec![Ok(batch)]));
2302 let stream = Box::pin(stream);
2303
2304 let params =
2305 InvertedIndexParams::new("whitespace".to_string(), lance_tokenizer::Language::English)
2306 .with_position(false)
2307 .remove_stop_words(false)
2308 .stem(false)
2309 .max_token_length(None);
2310
2311 let mut builder = InvertedIndexBuilder::new(params);
2312 builder.update(stream, store.as_ref(), None).await?;
2313
2314 let index = InvertedIndex::load(store, None, &LanceCache::no_cache()).await?;
2315 assert_eq!(index.partitions.len(), 1);
2316 let partition = &index.partitions[0];
2317 let token_id = partition.tokens.get("hello").unwrap();
2318 let posting = partition
2319 .inverted_list
2320 .posting_list(token_id, false, &NoOpMetricsCollector)
2321 .await?;
2322
2323 let mut iter = posting.iter();
2324 let (doc_id, freq, positions) = iter.next().unwrap();
2325 assert_eq!(doc_id, 0);
2326 assert_eq!(freq, 2);
2327 assert!(positions.is_none());
2328 assert!(iter.next().is_none());
2329
2330 Ok(())
2331 }
2332
2333 lance_testing::define_stage_event_progress!(RecordingProgress, IndexBuildProgress, Result<()>);
2334
2335 #[derive(Debug, Default)]
2336 struct FailingProgress;
2337
2338 #[async_trait]
2339 impl IndexBuildProgress for FailingProgress {
2340 async fn stage_start(&self, _stage: &str, _total: Option<u64>, _unit: &str) -> Result<()> {
2341 Ok(())
2342 }
2343
2344 async fn stage_progress(&self, _stage: &str, _completed: u64) -> Result<()> {
2345 Err(Error::io("injected progress failure"))
2346 }
2347
2348 async fn stage_complete(&self, _stage: &str) -> Result<()> {
2349 Ok(())
2350 }
2351 }
2352
2353 #[tokio::test]
2354 async fn test_builder_reports_progress_stages() -> Result<()> {
2355 let index_dir = TempDir::default();
2356 let store = Arc::new(LanceIndexStore::new(
2357 ObjectStore::local().into(),
2358 index_dir.obj_path(),
2359 Arc::new(LanceCache::no_cache()),
2360 ));
2361
2362 let batch1 = make_doc_batch("hello world", 0);
2363 let batch2 = make_doc_batch("goodbye world", 1);
2364 let total_rows = 2u64;
2365 let stream = RecordBatchStreamAdapter::new(
2366 batch1.schema(),
2367 stream::iter(vec![Ok(batch1), Ok(batch2)]),
2368 );
2369 let stream = Box::pin(stream);
2370
2371 let progress = Arc::new(RecordingProgress::default());
2372 let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default())
2373 .with_progress(progress.clone());
2374 builder.update(stream, store.as_ref(), None).await?;
2375
2376 let events = progress.recorded_events();
2377 let tags = events
2378 .iter()
2379 .map(|(kind, stage, _)| format!("{kind}:{stage}"))
2380 .collect::<Vec<_>>();
2381 let tokenize_progress = events
2382 .iter()
2383 .filter_map(|(kind, stage, completed)| {
2384 if kind == "progress" && stage == "tokenize_docs" {
2385 Some(*completed)
2386 } else {
2387 None
2388 }
2389 })
2390 .collect::<Vec<_>>();
2391
2392 let tokenize_start = tags
2393 .iter()
2394 .position(|e| e == "start:tokenize_docs")
2395 .expect("missing tokenize_docs start");
2396 let tokenize_complete = tags
2397 .iter()
2398 .position(|e| e == "complete:tokenize_docs")
2399 .expect("missing tokenize_docs complete");
2400 let copy_start = tags
2401 .iter()
2402 .position(|e| e == "start:copy_partitions")
2403 .expect("missing copy_partitions start");
2404 let copy_complete = tags
2405 .iter()
2406 .position(|e| e == "complete:copy_partitions")
2407 .expect("missing copy_partitions complete");
2408 let metadata_start = tags
2409 .iter()
2410 .position(|e| e == "start:write_metadata")
2411 .expect("missing write_metadata start");
2412 let metadata_complete = tags
2413 .iter()
2414 .position(|e| e == "complete:write_metadata")
2415 .expect("missing write_metadata complete");
2416
2417 assert!(tokenize_start < tokenize_complete);
2418 assert!(tokenize_complete < copy_start);
2419 assert!(copy_start < copy_complete);
2420 assert!(copy_complete < metadata_start);
2421 assert!(metadata_start < metadata_complete);
2422
2423 assert!(
2424 tags.iter().any(|e| e == "progress:tokenize_docs"),
2425 "expected progress callback for tokenize_docs"
2426 );
2427 assert!(
2428 tokenize_progress.len() >= 2,
2429 "expected at least two progress callbacks for tokenize_docs, got {tokenize_progress:?}"
2430 );
2431 assert_eq!(
2432 tokenize_progress.iter().copied().max().unwrap_or_default(),
2433 total_rows,
2434 "expected tokenize_docs progress to reach all rows"
2435 );
2436 assert!(
2437 tags.iter().any(|e| e == "progress:copy_partitions"),
2438 "expected progress callback for copy_partitions"
2439 );
2440 assert!(
2441 tags.iter().any(|e| e == "progress:write_metadata"),
2442 "expected progress callback for write_metadata"
2443 );
2444 assert!(
2445 !tags.iter().any(|e| e == "start:merge_partitions"),
2446 "merge_partitions should not run in the build-only path"
2447 );
2448
2449 Ok(())
2450 }
2451
2452 #[tokio::test]
2453 async fn test_builder_default_path_skips_merge_stage() -> Result<()> {
2454 let index_dir = TempDir::default();
2455 let store = Arc::new(LanceIndexStore::new(
2456 ObjectStore::local().into(),
2457 index_dir.obj_path(),
2458 Arc::new(LanceCache::no_cache()),
2459 ));
2460
2461 let batch = make_doc_batch("hello world", 0);
2462 let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2463 let stream = Box::pin(stream);
2464
2465 let progress = Arc::new(RecordingProgress::default());
2466 let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default())
2467 .with_progress(progress.clone());
2468 builder.update(stream, store.as_ref(), None).await?;
2469
2470 let tags = progress
2471 .recorded_events()
2472 .iter()
2473 .map(|(kind, stage, _)| format!("{kind}:{stage}"))
2474 .collect::<Vec<_>>();
2475
2476 assert!(
2477 tags.iter().any(|e| e == "start:copy_partitions"),
2478 "default path should copy finalized partitions"
2479 );
2480 assert!(
2481 !tags.iter().any(|e| e == "start:merge_partitions"),
2482 "default path should not run merge_partitions"
2483 );
2484 Ok(())
2485 }
2486
2487 #[tokio::test]
2488 async fn test_merge_index_files_reports_progress_stages() -> Result<()> {
2489 let index_dir = TempDir::default();
2490 let index_path = index_dir.obj_path();
2491 let object_store = ObjectStore::local();
2492 let store = Arc::new(LanceIndexStore::new(
2493 object_store.clone().into(),
2494 index_path.clone(),
2495 Arc::new(LanceCache::no_cache()),
2496 ));
2497
2498 for (fragment_id, row_id, doc) in [
2499 (1_u64 << 32, 0_u64, "hello world"),
2500 (2_u64 << 32, 1_u64, "goodbye world"),
2501 ] {
2502 let batch = make_doc_batch(doc, row_id);
2503 let stream =
2504 RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2505 let stream = Box::pin(stream);
2506 let mut builder = InvertedIndexBuilder::new_with_fragment_mask(
2507 InvertedIndexParams::default(),
2508 Some(fragment_id),
2509 )
2510 .with_progress(noop_progress());
2511 builder.update(stream, store.as_ref(), None).await?;
2512 }
2513
2514 let progress = Arc::new(RecordingProgress::default());
2515 merge_index_files(&object_store, &index_path, store.clone(), progress.clone()).await?;
2516
2517 let events = progress.recorded_events();
2518 let tags = events
2519 .iter()
2520 .map(|(kind, stage, _)| format!("{kind}:{stage}"))
2521 .collect::<Vec<_>>();
2522 let remap_progress = events
2523 .iter()
2524 .filter_map(|(kind, stage, completed)| {
2525 if kind == "progress" && stage == "remap_partition_files" {
2526 Some(*completed)
2527 } else {
2528 None
2529 }
2530 })
2531 .collect::<Vec<_>>();
2532
2533 let read_start = tags
2534 .iter()
2535 .position(|e| e == "start:read_partition_metadata")
2536 .expect("missing read_partition_metadata start");
2537 let read_complete = tags
2538 .iter()
2539 .position(|e| e == "complete:read_partition_metadata")
2540 .expect("missing read_partition_metadata complete");
2541 let remap_start = tags
2542 .iter()
2543 .position(|e| e == "start:remap_partition_files")
2544 .expect("missing remap_partition_files start");
2545 let remap_complete = tags
2546 .iter()
2547 .position(|e| e == "complete:remap_partition_files")
2548 .expect("missing remap_partition_files complete");
2549 let metadata_start = tags
2550 .iter()
2551 .position(|e| e == "start:write_merged_metadata")
2552 .expect("missing write_merged_metadata start");
2553 let metadata_complete = tags
2554 .iter()
2555 .position(|e| e == "complete:write_merged_metadata")
2556 .expect("missing write_merged_metadata complete");
2557
2558 assert!(read_start < read_complete);
2559 assert!(read_complete < remap_start);
2560 assert!(remap_start < remap_complete);
2561 assert!(remap_complete < metadata_start);
2562 assert!(metadata_start < metadata_complete);
2563
2564 assert!(
2565 tags.iter().any(|e| e == "progress:read_partition_metadata"),
2566 "expected progress callback for read_partition_metadata"
2567 );
2568 assert_eq!(
2569 remap_progress.last().copied().unwrap_or_default(),
2570 12,
2571 "expected remap_partition_files progress to cover both rename phases"
2572 );
2573 assert!(
2574 tags.iter().any(|e| e == "progress:write_merged_metadata"),
2575 "expected progress callback for write_merged_metadata"
2576 );
2577
2578 Ok(())
2579 }
2580
2581 #[tokio::test]
2582 async fn test_worker_memory_limit_rejects_single_large_doc() {
2583 let index_dir = TempDir::default();
2584 let store = Arc::new(LanceIndexStore::new(
2585 ObjectStore::local().into(),
2586 index_dir.obj_path(),
2587 Arc::new(LanceCache::no_cache()),
2588 ));
2589
2590 let batch = make_doc_batch("hello world", 42);
2591 let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2592 let stream = Box::pin(stream);
2593
2594 let mut builder =
2595 InvertedIndexBuilder::new(InvertedIndexParams::default().memory_limit_mb(0));
2596 let err = builder
2597 .update(stream, store.as_ref(), None)
2598 .await
2599 .expect_err("single doc should exceed zero worker memory limit");
2600 assert!(
2601 err.to_string().contains("row_id=42"),
2602 "unexpected error: {err}"
2603 );
2604 }
2605
2606 #[tokio::test]
2607 async fn test_worker_trims_position_temp_buffers() -> Result<()> {
2608 let tokenizer = InvertedIndexParams::default().with_position(true).build()?;
2609 let store = Arc::new(CountingStore::new());
2610 let id_alloc = Arc::new(AtomicU64::new(0));
2611 let mut worker = IndexWorker::new(
2612 tokenizer,
2613 store,
2614 id_alloc,
2615 IndexWorkerConfig {
2616 with_position: true,
2617 format_version: InvertedListFormatVersion::V1,
2618 fragment_mask: None,
2619 token_set_format: TokenSetFormat::default(),
2620 worker_memory_limit_bytes: u64::MAX,
2621 },
2622 )
2623 .await?;
2624
2625 let doc = (0..(MAX_RETAINED_TOKEN_IDS * 2))
2626 .map(|i| format!("tok{i}"))
2627 .collect::<Vec<_>>()
2628 .join(" ");
2629 worker.process_batch(make_doc_batch(&doc, 0)).await?;
2630
2631 assert!(worker.token_ids.is_empty());
2632 assert!(worker.token_ids.capacity() <= MAX_RETAINED_TOKEN_IDS);
2633 assert!(worker.memory_size >= worker.temporary_memory_size());
2634 Ok(())
2635 }
2636
2637 #[tokio::test]
2638 async fn test_worker_flush_keeps_position_temp_memory_bounded() -> Result<()> {
2639 let tokenizer = InvertedIndexParams::default().with_position(true).build()?;
2640 let store = Arc::new(CountingStore::new());
2641 let id_alloc = Arc::new(AtomicU64::new(0));
2642 let mut worker = IndexWorker::new(
2643 tokenizer,
2644 store,
2645 id_alloc,
2646 IndexWorkerConfig {
2647 with_position: true,
2648 format_version: InvertedListFormatVersion::V1,
2649 fragment_mask: None,
2650 token_set_format: TokenSetFormat::default(),
2651 worker_memory_limit_bytes: u64::MAX,
2652 },
2653 )
2654 .await?;
2655
2656 let doc = std::iter::repeat_n("common", 32_768)
2657 .collect::<Vec<_>>()
2658 .join(" ");
2659 let mut observed_post_flush_memory = Vec::new();
2660 for row_id in 0..8 {
2661 worker.process_batch(make_doc_batch(&doc, row_id)).await?;
2662 worker.flush().await?;
2663 observed_post_flush_memory.push(worker.memory_size);
2664 }
2665
2666 let max_memory = *observed_post_flush_memory.iter().max().unwrap();
2667 let min_memory = *observed_post_flush_memory.iter().min().unwrap();
2668 assert!(
2669 max_memory <= min_memory.saturating_add(256 * 1024),
2670 "post-flush worker memory drifted upward: {observed_post_flush_memory:?}"
2671 );
2672 Ok(())
2673 }
2674
2675 #[tokio::test]
2676 async fn test_worker_flush_writes_partition_directly() -> Result<()> {
2677 let tokenizer = InvertedIndexParams::default().with_position(true).build()?;
2678 let store = Arc::new(CountingStore::new());
2679 let id_alloc = Arc::new(AtomicU64::new(0));
2680 let mut worker = IndexWorker::new(
2681 tokenizer,
2682 store.clone(),
2683 id_alloc,
2684 IndexWorkerConfig {
2685 with_position: true,
2686 format_version: InvertedListFormatVersion::V1,
2687 fragment_mask: None,
2688 token_set_format: TokenSetFormat::default(),
2689 worker_memory_limit_bytes: u64::MAX,
2690 },
2691 )
2692 .await?;
2693 worker
2694 .process_batch(make_doc_batch("alpha beta gamma", 0))
2695 .await?;
2696 worker.flush().await?;
2697 assert!(store.write_count() > 0);
2698 Ok(())
2699 }
2700
2701 #[test]
2702 fn test_resolve_worker_memory_limit_uses_default_when_unset() {
2703 let params = InvertedIndexParams::default();
2704 assert_eq!(
2705 resolve_worker_memory_limit_bytes(¶ms, 8),
2706 *LANCE_FTS_PARTITION_SIZE << 20
2707 );
2708 }
2709
2710 #[test]
2711 fn test_resolve_num_workers_uses_default_when_unset() {
2712 let expected = default_num_workers().clamp(1, get_num_compute_intensive_cpus().max(1));
2713 assert_eq!(
2714 resolve_num_workers(&InvertedIndexParams::default()),
2715 expected
2716 );
2717 }
2718
2719 #[test]
2720 fn test_resolve_num_workers_clamps_requested_value() {
2721 let max_workers = get_num_compute_intensive_cpus().max(1);
2722 assert_eq!(
2723 resolve_num_workers(&InvertedIndexParams::default().num_workers(0)),
2724 1
2725 );
2726 assert_eq!(
2727 resolve_num_workers(&InvertedIndexParams::default().num_workers(max_workers + 10)),
2728 max_workers
2729 );
2730 }
2731
2732 #[test]
2733 fn test_resolve_worker_memory_limit_splits_total_memory_limit() {
2734 let params = InvertedIndexParams::default().memory_limit_mb(4096);
2735 assert_eq!(resolve_worker_memory_limit_bytes(¶ms, 16), 256 << 20);
2736 }
2737
2738 #[test]
2739 fn test_merge_all_tail_partitions_combines_everything() -> Result<()> {
2740 let merged = merge_all_tail_partitions(vec![
2741 TailPartition {
2742 builder: InnerBuilder::new(0, false, TokenSetFormat::default()),
2743 },
2744 TailPartition {
2745 builder: InnerBuilder::new(1, false, TokenSetFormat::default()),
2746 },
2747 TailPartition {
2748 builder: InnerBuilder::new(2, false, TokenSetFormat::default()),
2749 },
2750 ])?;
2751
2752 assert_eq!(merged.expect("merged builder should exist").id(), 0);
2753 Ok(())
2754 }
2755
2756 #[test]
2757 fn test_merge_all_tail_partitions_returns_none_for_empty_input() -> Result<()> {
2758 assert!(merge_all_tail_partitions(Vec::new())?.is_none());
2759 Ok(())
2760 }
2761
2762 #[test]
2763 fn test_merge_tail_partition_group_combines_tail_builders() -> Result<()> {
2764 let mut first = InnerBuilder::new(0, false, TokenSetFormat::default());
2765 let hello = first.tokens.add("hello".to_owned());
2766 first
2767 .posting_lists
2768 .resize_with(first.tokens.len(), || PostingListBuilder::new(false));
2769 let first_doc = first.docs.append(10, 1);
2770 first.posting_lists[hello as usize].add(first_doc, PositionRecorder::Count(1));
2771
2772 let mut second = InnerBuilder::new(1, false, TokenSetFormat::default());
2773 let world = second.tokens.add("world".to_owned());
2774 second
2775 .posting_lists
2776 .resize_with(second.tokens.len(), || PostingListBuilder::new(false));
2777 let second_doc = second.docs.append(20, 2);
2778 second.posting_lists[world as usize].add(second_doc, PositionRecorder::Count(2));
2779
2780 let merged = merge_tail_partition_group(vec![
2781 TailPartition { builder: first },
2782 TailPartition { builder: second },
2783 ])?;
2784
2785 assert_eq!(merged.id(), 0);
2786 assert_eq!(merged.docs.len(), 2);
2787 assert_eq!(merged.tokens.len(), 2);
2788 assert_eq!(merged.posting_lists.len(), 2);
2789 assert_eq!(
2790 merged.posting_lists[merged.tokens.get("hello").unwrap() as usize].len(),
2791 1
2792 );
2793 assert_eq!(
2794 merged.posting_lists[merged.tokens.get("world").unwrap() as usize].len(),
2795 1
2796 );
2797 Ok(())
2798 }
2799
2800 #[tokio::test]
2801 async fn test_update_index_returns_worker_error_when_workers_exit_during_dispatch() {
2802 let num_batches = (*LANCE_FTS_NUM_SHARDS * 2 + 1) as u64;
2803 let index_dir = TempDir::default();
2804 let store = Arc::new(LanceIndexStore::new(
2805 ObjectStore::local().into(),
2806 index_dir.obj_path(),
2807 Arc::new(LanceCache::no_cache()),
2808 ));
2809 let schema = make_doc_batch("hello world", 0).schema();
2810 let stream = RecordBatchStreamAdapter::new(
2811 schema,
2812 stream::iter((0..num_batches).map(|row_id| Ok(make_doc_batch("hello world", row_id)))),
2813 );
2814 let stream = Box::pin(stream);
2815
2816 let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default())
2817 .with_progress(Arc::new(FailingProgress));
2818
2819 let result = tokio::time::timeout(
2820 Duration::from_secs(5),
2821 builder.update_index(stream, store.as_ref()),
2822 )
2823 .await
2824 .expect("update_index should not hang")
2825 .expect_err("worker failure should be returned");
2826
2827 assert!(
2828 result.to_string().contains("injected progress failure"),
2829 "unexpected error: {result}"
2830 );
2831 }
2832
2833 #[tokio::test]
2834 async fn test_new_index_has_empty_deleted_fragments() {
2835 let index_dir = TempDir::default();
2836 let store = Arc::new(LanceIndexStore::new(
2837 ObjectStore::local().into(),
2838 index_dir.obj_path(),
2839 Arc::new(LanceCache::no_cache()),
2840 ));
2841
2842 let batch = make_doc_batch("hello world", 0);
2843 let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2844 let stream = Box::pin(stream);
2845
2846 let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
2847 builder.update(stream, store.as_ref(), None).await.unwrap();
2848
2849 let index = InvertedIndex::load(store, None, &LanceCache::no_cache())
2850 .await
2851 .unwrap();
2852 assert!(
2853 index.deleted_fragments().is_empty(),
2854 "new index should have empty deleted fragments, got {:?}",
2855 index.deleted_fragments()
2856 );
2857 }
2858
2859 #[tokio::test]
2860 async fn test_remap_preserves_deleted_fragments() {
2861 let src_dir = TempDir::default();
2862 let dest_dir = TempDir::default();
2863 let src_store = Arc::new(LanceIndexStore::new(
2864 ObjectStore::local().into(),
2865 src_dir.obj_path(),
2866 Arc::new(LanceCache::no_cache()),
2867 ));
2868 let dest_store = Arc::new(LanceIndexStore::new(
2869 ObjectStore::local().into(),
2870 dest_dir.obj_path(),
2871 Arc::new(LanceCache::no_cache()),
2872 ));
2873
2874 let batch = make_doc_batch("hello world", 0);
2876 let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2877 let stream = Box::pin(stream);
2878
2879 let initial_deleted = RoaringBitmap::from_iter([5, 10, 42]);
2880 let mut builder = InvertedIndexBuilder::from_existing_index(
2881 InvertedIndexParams::default(),
2882 None,
2883 Vec::new(),
2884 TokenSetFormat::default(),
2885 None,
2886 initial_deleted.clone(),
2887 );
2888 builder
2889 .update(stream, src_store.as_ref(), None)
2890 .await
2891 .unwrap();
2892
2893 let index = InvertedIndex::load(src_store.clone(), None, &LanceCache::no_cache())
2895 .await
2896 .unwrap();
2897 assert_eq!(index.deleted_fragments(), &initial_deleted);
2898
2899 use crate::scalar::ScalarIndex;
2901 let mapping = HashMap::from([(0u64, Some(50 << 32))]);
2902 index.remap(&mapping, dest_store.as_ref()).await.unwrap();
2903
2904 let remapped_index = InvertedIndex::load(dest_store.clone(), None, &LanceCache::no_cache())
2906 .await
2907 .unwrap();
2908 assert_eq!(
2909 remapped_index.deleted_fragments(),
2910 &initial_deleted,
2911 "remap should preserve deleted fragments"
2912 );
2913 }
2914
2915 #[tokio::test]
2916 async fn test_update_grows_deleted_fragments_from_old_data_filter() {
2917 let index_dir = TempDir::default();
2918 let store = Arc::new(LanceIndexStore::new(
2919 ObjectStore::local().into(),
2920 index_dir.obj_path(),
2921 Arc::new(LanceCache::no_cache()),
2922 ));
2923
2924 let batch = make_doc_batch("hello world", 0);
2926 let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2927 let stream = Box::pin(stream);
2928
2929 let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
2930 builder.update(stream, store.as_ref(), None).await.unwrap();
2931
2932 let index = InvertedIndex::load(store.clone(), None, &LanceCache::no_cache())
2934 .await
2935 .unwrap();
2936 assert!(index.deleted_fragments().is_empty());
2937
2938 let update_dir = TempDir::default();
2939 let update_store = Arc::new(LanceIndexStore::new(
2940 ObjectStore::local().into(),
2941 update_dir.obj_path(),
2942 Arc::new(LanceCache::no_cache()),
2943 ));
2944
2945 let batch2 = make_doc_batch("new document", 1 << 32 | 1);
2946 let stream2 =
2947 RecordBatchStreamAdapter::new(batch2.schema(), stream::iter(vec![Ok(batch2)]));
2948 let stream2 = Box::pin(stream2);
2949
2950 let old_data_filter = Some(crate::scalar::OldIndexDataFilter::Fragments {
2951 to_keep: RoaringBitmap::from_iter([0]),
2952 to_remove: RoaringBitmap::from_iter([3, 7]),
2953 });
2954
2955 use crate::scalar::ScalarIndex;
2957 index
2958 .update(stream2, update_store.as_ref(), old_data_filter)
2959 .await
2960 .unwrap();
2961
2962 let updated_index =
2963 InvertedIndex::load(update_store.clone(), None, &LanceCache::no_cache())
2964 .await
2965 .unwrap();
2966 assert_eq!(
2967 updated_index.deleted_fragments(),
2968 &RoaringBitmap::from_iter([3, 7]),
2969 "update should add deleted fragments from old_data_filter"
2970 );
2971 }
2972
2973 #[tokio::test]
2974 async fn test_update_accumulates_deleted_fragments() {
2975 let dir1 = TempDir::default();
2976 let store1 = Arc::new(LanceIndexStore::new(
2977 ObjectStore::local().into(),
2978 dir1.obj_path(),
2979 Arc::new(LanceCache::no_cache()),
2980 ));
2981
2982 let batch = make_doc_batch("hello world", 0);
2984 let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2985 let stream = Box::pin(stream);
2986
2987 let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
2988 builder.update(stream, store1.as_ref(), None).await.unwrap();
2989
2990 let index = InvertedIndex::load(store1.clone(), None, &LanceCache::no_cache())
2992 .await
2993 .unwrap();
2994
2995 let dir2 = TempDir::default();
2996 let store2 = Arc::new(LanceIndexStore::new(
2997 ObjectStore::local().into(),
2998 dir2.obj_path(),
2999 Arc::new(LanceCache::no_cache()),
3000 ));
3001
3002 let batch2 = make_doc_batch("second doc", 1 << 32 | 1);
3003 let stream2 =
3004 RecordBatchStreamAdapter::new(batch2.schema(), stream::iter(vec![Ok(batch2)]));
3005 let stream2 = Box::pin(stream2);
3006
3007 use crate::scalar::ScalarIndex;
3008 index
3009 .update(
3010 stream2,
3011 store2.as_ref(),
3012 Some(crate::scalar::OldIndexDataFilter::Fragments {
3013 to_keep: RoaringBitmap::from_iter([0]),
3014 to_remove: RoaringBitmap::from_iter([3, 7]),
3015 }),
3016 )
3017 .await
3018 .unwrap();
3019
3020 let index2 = InvertedIndex::load(store2.clone(), None, &LanceCache::no_cache())
3022 .await
3023 .unwrap();
3024 assert_eq!(
3025 index2.deleted_fragments(),
3026 &RoaringBitmap::from_iter([3, 7])
3027 );
3028
3029 let dir3 = TempDir::default();
3030 let store3 = Arc::new(LanceIndexStore::new(
3031 ObjectStore::local().into(),
3032 dir3.obj_path(),
3033 Arc::new(LanceCache::no_cache()),
3034 ));
3035
3036 let batch3 = make_doc_batch("third doc", 2 << 32 | 2);
3037 let stream3 =
3038 RecordBatchStreamAdapter::new(batch3.schema(), stream::iter(vec![Ok(batch3)]));
3039 let stream3 = Box::pin(stream3);
3040
3041 index2
3042 .update(
3043 stream3,
3044 store3.as_ref(),
3045 Some(crate::scalar::OldIndexDataFilter::Fragments {
3046 to_keep: RoaringBitmap::from_iter([0, 1]),
3047 to_remove: RoaringBitmap::from_iter([12, 15]),
3048 }),
3049 )
3050 .await
3051 .unwrap();
3052
3053 let index3 = InvertedIndex::load(store3.clone(), None, &LanceCache::no_cache())
3054 .await
3055 .unwrap();
3056 assert_eq!(
3057 index3.deleted_fragments(),
3058 &RoaringBitmap::from_iter([3, 7, 12, 15]),
3059 "deleted fragments should accumulate across updates"
3060 );
3061 }
3062
3063 #[tokio::test]
3064 async fn test_update_with_rowid_filter_does_not_grow_deleted_fragments() {
3065 let index_dir = TempDir::default();
3066 let store = Arc::new(LanceIndexStore::new(
3067 ObjectStore::local().into(),
3068 index_dir.obj_path(),
3069 Arc::new(LanceCache::no_cache()),
3070 ));
3071
3072 let batch = make_doc_batch("hello world", 0);
3073 let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
3074 let stream = Box::pin(stream);
3075
3076 let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
3077 builder.update(stream, store.as_ref(), None).await.unwrap();
3078
3079 let index = InvertedIndex::load(store.clone(), None, &LanceCache::no_cache())
3080 .await
3081 .unwrap();
3082
3083 let update_dir = TempDir::default();
3084 let update_store = Arc::new(LanceIndexStore::new(
3085 ObjectStore::local().into(),
3086 update_dir.obj_path(),
3087 Arc::new(LanceCache::no_cache()),
3088 ));
3089
3090 let batch2 = make_doc_batch("new doc", 1);
3091 let stream2 =
3092 RecordBatchStreamAdapter::new(batch2.schema(), stream::iter(vec![Ok(batch2)]));
3093 let stream2 = Box::pin(stream2);
3094
3095 let mut valid_ids = lance_core::utils::mask::RowAddrTreeMap::new();
3097 valid_ids.insert(0);
3098 let old_data_filter = Some(crate::scalar::OldIndexDataFilter::RowIds(valid_ids));
3099
3100 use crate::scalar::ScalarIndex;
3101 index
3102 .update(stream2, update_store.as_ref(), old_data_filter)
3103 .await
3104 .unwrap();
3105
3106 let updated_index =
3107 InvertedIndex::load(update_store.clone(), None, &LanceCache::no_cache())
3108 .await
3109 .unwrap();
3110 assert!(
3111 updated_index.deleted_fragments().is_empty(),
3112 "RowIds filter should not add to deleted fragments"
3113 );
3114 }
3115}