Skip to main content

lance_index/scalar/inverted/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use super::{InvertedIndexParams, index::*};
5use crate::scalar::inverted::document_tokenizer::DocType;
6use crate::scalar::inverted::json::JsonTextStream;
7use crate::scalar::inverted::tokenizer::document_tokenizer::LanceTokenizer;
8#[cfg(test)]
9use crate::scalar::lance_format::LanceIndexStore;
10use crate::scalar::{IndexStore, OldIndexDataFilter};
11use crate::vector::graph::OrderedFloat;
12use crate::{progress::IndexBuildProgress, progress::noop_progress};
13use arrow::array::AsArray;
14use arrow::datatypes;
15use arrow_array::{Array, BinaryArray, RecordBatch, UInt64Array};
16use arrow_schema::{DataType, Field, Schema, SchemaRef};
17use bitpacking::{BitPacker, BitPacker4x};
18use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream};
19use deepsize::DeepSizeOf;
20use fst::Streamer;
21use futures::{Stream, StreamExt, TryStreamExt};
22use lance_arrow::json::JSON_EXT_NAME;
23use lance_arrow::{ARROW_EXT_NAME_KEY, iter_str_array};
24use lance_core::cache::LanceCache;
25use lance_core::error::LanceOptionExt;
26use lance_core::utils::tokio::{IO_CORE_RESERVATION, get_num_compute_intensive_cpus, spawn_cpu};
27use lance_core::{Error, ROW_ID, ROW_ID_FIELD, Result};
28use lance_io::object_store::ObjectStore;
29use object_store::path::Path;
30use roaring::RoaringBitmap;
31use smallvec::SmallVec;
32use std::collections::HashMap;
33use std::pin::Pin;
34use std::str::FromStr;
35use std::sync::Arc;
36use std::sync::LazyLock;
37use std::task::{Context, Poll};
38use std::{fmt::Debug, sync::atomic::AtomicU64};
39use tracing::instrument;
40
41// the number of elements in each block
42// each block contains 128 row ids and 128 frequencies
43// WARNING: changing this value will break the compatibility with existing indexes
44pub const BLOCK_SIZE: usize = BitPacker4x::BLOCK_LEN;
45
46// The default number of workers to use for FTS builds.
47// By default this is roughly `num_cpus / 2`, but it can be overridden
48// with `LANCE_FTS_NUM_SHARDS`.
49pub static LANCE_FTS_NUM_SHARDS: LazyLock<usize> = LazyLock::new(|| {
50    std::env::var("LANCE_FTS_NUM_SHARDS")
51        .unwrap_or_else(|_| default_num_workers().to_string())
52        .parse()
53        .expect("failed to parse LANCE_FTS_NUM_SHARDS")
54});
55// The default per-worker memory limit in MiB for FTS builds.
56pub static LANCE_FTS_PARTITION_SIZE: LazyLock<u64> = LazyLock::new(|| {
57    std::env::var("LANCE_FTS_PARTITION_SIZE")
58        .unwrap_or_else(|_| "2048".to_string())
59        .parse()
60        .expect("failed to parse LANCE_FTS_PARTITION_SIZE")
61});
62static LANCE_FTS_WRITE_QUEUE_SIZE: LazyLock<usize> = LazyLock::new(|| {
63    std::env::var("LANCE_FTS_WRITE_QUEUE_SIZE")
64        .unwrap_or_else(|_| "1".to_string())
65        .parse()
66        .expect("failed to parse LANCE_FTS_WRITE_QUEUE_SIZE")
67});
68static LANCE_FTS_POSTING_BATCH_ROWS: LazyLock<usize> = LazyLock::new(|| {
69    std::env::var("LANCE_FTS_POSTING_BATCH_ROWS")
70        .unwrap_or_else(|_| "256".to_string())
71        .parse()
72        .expect("failed to parse LANCE_FTS_POSTING_BATCH_ROWS")
73});
74const MAX_RETAINED_TOKEN_IDS: usize = 8 * 1024;
75
76fn default_num_workers() -> usize {
77    let total_cpus = get_num_compute_intensive_cpus() + *IO_CORE_RESERVATION;
78    std::cmp::max(1, total_cpus / 2)
79}
80
81fn resolve_num_workers(params: &InvertedIndexParams) -> usize {
82    let max_workers = get_num_compute_intensive_cpus().max(1);
83    params
84        .num_workers
85        .unwrap_or(*LANCE_FTS_NUM_SHARDS)
86        .clamp(1, max_workers)
87}
88
89fn resolve_worker_memory_limit_bytes(params: &InvertedIndexParams, num_workers: usize) -> u64 {
90    let default_worker_memory_limit_bytes = *LANCE_FTS_PARTITION_SIZE << 20;
91    params
92        .memory_limit_mb
93        .map(|memory_limit_mb| (memory_limit_mb << 20) / num_workers as u64)
94        .unwrap_or(default_worker_memory_limit_bytes)
95}
96
97fn merge_all_tail_partitions(tails: Vec<TailPartition>) -> Result<Option<InnerBuilder>> {
98    if tails.is_empty() {
99        return Ok(None);
100    }
101    merge_tail_partition_group(tails).map(Some)
102}
103
104fn merge_tail_partition_group(group: Vec<TailPartition>) -> Result<InnerBuilder> {
105    let mut group = group.into_iter();
106    let mut merged = group
107        .next()
108        .ok_or_else(|| {
109            Error::invalid_input("cannot merge an empty tail partition group".to_owned())
110        })?
111        .builder;
112    for tail in group {
113        merged.merge_from(tail.builder)?;
114    }
115    Ok(merged)
116}
117
118#[derive(Debug)]
119pub struct InvertedIndexBuilder {
120    params: InvertedIndexParams,
121    pub(crate) partitions: Vec<u64>,
122    new_partitions: Vec<u64>,
123    fragment_mask: Option<u64>,
124    token_set_format: TokenSetFormat,
125    format_version: InvertedListFormatVersion,
126    posting_tail_codec: PostingTailCodec,
127    src_store: Option<Arc<dyn IndexStore>>,
128    progress: Arc<dyn IndexBuildProgress>,
129    deleted_fragments: RoaringBitmap,
130}
131
132impl InvertedIndexBuilder {
133    pub fn new(params: InvertedIndexParams) -> Self {
134        Self::new_with_fragment_mask(params, None)
135    }
136
137    pub fn new_with_fragment_mask(params: InvertedIndexParams, fragment_mask: Option<u64>) -> Self {
138        Self::from_existing_index(
139            params,
140            None,
141            Vec::new(),
142            TokenSetFormat::default(),
143            fragment_mask,
144            RoaringBitmap::new(),
145        )
146    }
147
148    /// Creates an InvertedIndexBuilder from existing index with fragment filtering.
149    /// This method is used to create a builder from an existing index while applying
150    /// fragment-based filtering for distributed indexing scenarios.
151    /// fragment_mask Optional mask with fragment_id in high 32 bits for filtering.
152    /// Constructed as `(fragment_id as u64) << 32`.
153    /// When provided, ensures that generated IDs belong to the specified fragment.
154    pub fn from_existing_index(
155        params: InvertedIndexParams,
156        store: Option<Arc<dyn IndexStore>>,
157        partitions: Vec<u64>,
158        token_set_format: TokenSetFormat,
159        fragment_mask: Option<u64>,
160        deleted_fragments: RoaringBitmap,
161    ) -> Self {
162        Self {
163            params,
164            partitions,
165            new_partitions: Vec::new(),
166            src_store: store,
167            token_set_format,
168            fragment_mask,
169            format_version: current_fts_format_version(),
170            posting_tail_codec: current_fts_format_version().posting_tail_codec(),
171            progress: noop_progress(),
172            deleted_fragments,
173        }
174    }
175
176    pub fn with_posting_tail_codec(mut self, posting_tail_codec: PostingTailCodec) -> Self {
177        self.format_version =
178            InvertedListFormatVersion::from_posting_tail_codec(posting_tail_codec);
179        self.posting_tail_codec = posting_tail_codec;
180        self
181    }
182
183    pub fn with_format_version(mut self, format_version: InvertedListFormatVersion) -> Self {
184        self.format_version = format_version;
185        self.posting_tail_codec = format_version.posting_tail_codec();
186        self
187    }
188
189    pub fn with_progress(mut self, progress: Arc<dyn IndexBuildProgress>) -> Self {
190        self.progress = progress;
191        self
192    }
193
194    pub async fn update(
195        &mut self,
196        new_data: SendableRecordBatchStream,
197        dest_store: &dyn IndexStore,
198        old_data_filter: Option<crate::scalar::OldIndexDataFilter>,
199    ) -> Result<()> {
200        let schema = new_data.schema();
201        let doc_col = schema.field(0).name();
202
203        // infer lance_tokenizer based on document type
204        if self.params.lance_tokenizer.is_none() {
205            let schema = new_data.schema();
206            let field = schema.column_with_name(doc_col).expect_ok()?.1;
207            let doc_type = DocType::try_from(field)?;
208            self.params.lance_tokenizer = Some(doc_type.as_ref().to_string());
209        }
210
211        let new_data = document_input(new_data, doc_col)?;
212
213        self.progress
214            .stage_start("tokenize_docs", None, "rows")
215            .await?;
216        self.update_index(new_data, dest_store).await?;
217
218        if let Some(OldIndexDataFilter::Fragments { to_remove, .. }) = old_data_filter {
219            self.deleted_fragments.extend(to_remove);
220        }
221
222        self.progress.stage_complete("tokenize_docs").await?;
223        self.write(dest_store).await?;
224        Ok(())
225    }
226
227    #[instrument(level = "debug", skip_all)]
228    async fn update_index(
229        &mut self,
230        stream: SendableRecordBatchStream,
231        dest_store: &dyn IndexStore,
232    ) -> Result<()> {
233        let num_workers = resolve_num_workers(&self.params);
234        let tokenizer = self.params.build()?;
235        let with_position = self.params.with_position;
236        let worker_memory_limit_bytes =
237            resolve_worker_memory_limit_bytes(&self.params, num_workers);
238        let worker_config = IndexWorkerConfig {
239            with_position,
240            format_version: self.format_version,
241            fragment_mask: self.fragment_mask,
242            token_set_format: self.token_set_format,
243            worker_memory_limit_bytes,
244        };
245        let next_id = self.partitions.iter().map(|id| id + 1).max().unwrap_or(0);
246        let id_alloc = Arc::new(AtomicU64::new(next_id));
247        let tokenized_count = Arc::new(AtomicU64::new(0));
248        let (sender, receiver) = async_channel::bounded(num_workers);
249        let dest_store = dest_store.clone_arc();
250        let mut index_tasks = Vec::with_capacity(num_workers);
251        for _ in 0..num_workers {
252            let tokenizer = tokenizer.clone();
253            let receiver: async_channel::Receiver<RecordBatch> = receiver.clone();
254            let dest_store = dest_store.clone();
255            let id_alloc = id_alloc.clone();
256            let progress = self.progress.clone();
257            let tokenized_count = tokenized_count.clone();
258            index_tasks.push(tokio::task::spawn(async move {
259                let mut worker =
260                    IndexWorker::new(tokenizer, dest_store, id_alloc, worker_config).await?;
261                while let Ok(batch) = receiver.recv().await {
262                    let num_rows = batch.num_rows();
263                    worker.process_batch(batch).await?;
264                    let tokenized_count = tokenized_count
265                        .fetch_add(num_rows as u64, std::sync::atomic::Ordering::Relaxed)
266                        + num_rows as u64;
267                    progress
268                        .stage_progress("tokenize_docs", tokenized_count)
269                        .await?;
270                }
271                worker.finish().await
272            }));
273        }
274
275        let index_build = async {
276            // Keep the channel lifetime tied to the worker tasks so senders observe
277            // worker exits instead of blocking on an orphaned receiver handle.
278            drop(receiver);
279
280            let mut stream = Box::pin(stream);
281            log::info!("indexing FTS with {} workers", num_workers);
282
283            let mut last_num_rows = 0;
284            let mut total_num_rows = 0;
285            let start = std::time::Instant::now();
286            while let Some(batch) = stream.try_next().await? {
287                let num_rows = batch.num_rows();
288
289                if sender.send(batch).await.is_err() {
290                    // this only happens if all workers have exited,
291                    // so we don't return the send error here,
292                    // avoiding hiding the real error from workers.
293                    break;
294                }
295
296                total_num_rows += num_rows;
297                if total_num_rows >= last_num_rows + 1_000_000 {
298                    log::debug!(
299                        "indexed {} documents, elapsed: {:?}, speed: {}rows/s",
300                        total_num_rows,
301                        start.elapsed(),
302                        total_num_rows as f32 / start.elapsed().as_secs_f32()
303                    );
304                    last_num_rows = total_num_rows;
305                }
306            }
307            // drop the sender to stop receivers
308            drop(stream);
309            drop(sender);
310            log::info!("dispatching elapsed: {:?}", start.elapsed());
311
312            // wait for the workers to finish
313            let start = std::time::Instant::now();
314            let mut tail_partitions = Vec::new();
315            for index_task in index_tasks {
316                let output = index_task.await??;
317                self.new_partitions.extend(output.partitions);
318                if let Some(tail_partition) = output.tail_partition {
319                    tail_partitions.push(tail_partition);
320                }
321            }
322            let merged_tail_partitions =
323                spawn_cpu(move || merge_all_tail_partitions(tail_partitions)).await?;
324            if let Some(builder) = merged_tail_partitions {
325                self.new_partitions.push(builder.id());
326                let mut builder = builder;
327                builder.write(dest_store.as_ref()).await?;
328            }
329            log::info!("wait workers indexing elapsed: {:?}", start.elapsed());
330            Result::Ok(())
331        };
332
333        index_build.await
334    }
335
336    pub async fn remap(
337        &mut self,
338        mapping: &HashMap<u64, Option<u64>>,
339        src_store: Arc<dyn IndexStore>,
340        dest_store: &dyn IndexStore,
341    ) -> Result<()> {
342        for part in self.partitions.iter() {
343            let part = InvertedPartition::load(
344                src_store.clone(),
345                *part,
346                None,
347                &LanceCache::no_cache(),
348                self.token_set_format,
349            )
350            .await?;
351            let mut builder = part.into_builder().await?;
352            builder.remap(mapping).await?;
353            builder.write(dest_store).await?;
354        }
355        if self.fragment_mask.is_none() {
356            self.write_metadata(dest_store, &self.partitions).await?;
357        } else {
358            // in distributed mode, the part_temp_metadata is written by the worker
359            for &partition_id in &self.partitions {
360                self.write_part_metadata(dest_store, partition_id).await?;
361            }
362        }
363        Ok(())
364    }
365
366    async fn write_metadata(&self, dest_store: &dyn IndexStore, partitions: &[u64]) -> Result<()> {
367        let mut serialized_deleted_fragments =
368            Vec::with_capacity(self.deleted_fragments.serialized_size());
369        self.deleted_fragments
370            .serialize_into(&mut serialized_deleted_fragments)?;
371
372        let mut metadata = HashMap::from_iter(vec![
373            ("partitions".to_owned(), serde_json::to_string(&partitions)?),
374            ("params".to_owned(), serde_json::to_string(&self.params)?),
375            (
376                TOKEN_SET_FORMAT_KEY.to_owned(),
377                self.token_set_format.to_string(),
378            ),
379            (
380                POSTING_TAIL_CODEC_KEY.to_owned(),
381                self.posting_tail_codec.as_str().to_owned(),
382            ),
383        ]);
384
385        if self.params.with_position && self.format_version.uses_shared_position_stream() {
386            metadata.insert(
387                POSITIONS_LAYOUT_KEY.to_owned(),
388                POSITIONS_LAYOUT_SHARED_STREAM_V2.to_owned(),
389            );
390            metadata.insert(
391                POSITIONS_CODEC_KEY.to_owned(),
392                self.format_version
393                    .position_codec()
394                    .expect("shared positions require a codec")
395                    .as_str()
396                    .to_owned(),
397            );
398        }
399
400        let metadata_file_schema = Arc::new(Schema::new(vec![Field::new(
401            DELETED_FRAGMENTS_COL,
402            DataType::Binary,
403            false,
404        )]));
405        let deleted_fragments_col = Arc::new(BinaryArray::from(vec![
406            serialized_deleted_fragments.as_slice(),
407        ])) as Arc<dyn Array>;
408        let record_batch =
409            RecordBatch::try_new(metadata_file_schema.clone(), vec![deleted_fragments_col])?;
410
411        let mut writer = dest_store
412            .new_index_file(METADATA_FILE, metadata_file_schema)
413            .await?;
414        writer.write_record_batch(record_batch).await?;
415        writer.finish_with_metadata(metadata).await?;
416        Ok(())
417    }
418
419    /// Write partition metadata file for a single partition
420    ///
421    /// In a distributed environment, each worker node can write partition metadata files for the partitions it processes,
422    /// which are then merged into a final metadata file using the `merge_metadata_files` function.
423    pub(crate) async fn write_part_metadata(
424        &self,
425        dest_store: &dyn IndexStore,
426        partition: u64, // Modify parameter type
427    ) -> Result<()> {
428        let partitions = vec![partition];
429        let mut metadata = HashMap::from_iter(vec![
430            ("partitions".to_owned(), serde_json::to_string(&partitions)?),
431            ("params".to_owned(), serde_json::to_string(&self.params)?),
432            (
433                TOKEN_SET_FORMAT_KEY.to_owned(),
434                self.token_set_format.to_string(),
435            ),
436            (
437                POSTING_TAIL_CODEC_KEY.to_owned(),
438                self.posting_tail_codec.as_str().to_owned(),
439            ),
440        ]);
441        if self.params.with_position && self.format_version.uses_shared_position_stream() {
442            metadata.insert(
443                POSITIONS_LAYOUT_KEY.to_owned(),
444                POSITIONS_LAYOUT_SHARED_STREAM_V2.to_owned(),
445            );
446            metadata.insert(
447                POSITIONS_CODEC_KEY.to_owned(),
448                self.format_version
449                    .position_codec()
450                    .expect("shared positions require a codec")
451                    .as_str()
452                    .to_owned(),
453            );
454        }
455        // Use partition ID to generate a unique temporary filename
456        let file_name = part_metadata_file_path(partition);
457        let mut writer = dest_store
458            .new_index_file(&file_name, Arc::new(Schema::empty()))
459            .await?;
460        writer.finish_with_metadata(metadata).await?;
461        Ok(())
462    }
463
464    async fn write_metadata_with_progress(
465        &self,
466        dest_store: &dyn IndexStore,
467        partitions: &[u64],
468    ) -> Result<()> {
469        let total = if self.fragment_mask.is_none() {
470            Some(1)
471        } else {
472            Some(partitions.len() as u64)
473        };
474        self.progress
475            .stage_start("write_metadata", total, "files")
476            .await?;
477        if self.fragment_mask.is_none() {
478            self.write_metadata(dest_store, partitions).await?;
479            self.progress.stage_progress("write_metadata", 1).await?;
480        } else {
481            let mut completed = 0;
482            for &partition_id in partitions {
483                self.write_part_metadata(dest_store, partition_id).await?;
484                completed += 1;
485                self.progress
486                    .stage_progress("write_metadata", completed)
487                    .await?;
488            }
489        }
490        self.progress.stage_complete("write_metadata").await?;
491        Ok(())
492    }
493
494    async fn write(&self, dest_store: &dyn IndexStore) -> Result<()> {
495        let mut partitions = Vec::with_capacity(self.partitions.len() + self.new_partitions.len());
496        partitions.extend_from_slice(&self.partitions);
497        partitions.extend_from_slice(&self.new_partitions);
498        partitions.sort_unstable();
499
500        self.progress
501            .stage_start(
502                "copy_partitions",
503                Some(partitions.len() as u64),
504                "partitions",
505            )
506            .await?;
507        let mut copied = 0;
508        for part in self.partitions.iter() {
509            self.src_store
510                .as_ref()
511                .expect("existing partitions require a source store")
512                .copy_index_file(&token_file_path(*part), dest_store)
513                .await?;
514            self.src_store
515                .as_ref()
516                .expect("existing partitions require a source store")
517                .copy_index_file(&posting_file_path(*part), dest_store)
518                .await?;
519            self.src_store
520                .as_ref()
521                .expect("existing partitions require a source store")
522                .copy_index_file(&doc_file_path(*part), dest_store)
523                .await?;
524            copied += 1;
525            self.progress
526                .stage_progress("copy_partitions", copied)
527                .await?;
528        }
529        for _part in self.new_partitions.iter() {
530            copied += 1;
531            self.progress
532                .stage_progress("copy_partitions", copied)
533                .await?;
534        }
535        self.progress.stage_complete("copy_partitions").await?;
536
537        self.write_metadata_with_progress(dest_store, &partitions)
538            .await?;
539        Ok(())
540    }
541}
542
543impl Default for InvertedIndexBuilder {
544    fn default() -> Self {
545        let params = InvertedIndexParams::default();
546        Self::new(params)
547    }
548}
549
550// builder for single partition
551#[derive(Debug)]
552pub struct InnerBuilder {
553    id: u64,
554    with_position: bool,
555    token_set_format: TokenSetFormat,
556    format_version: InvertedListFormatVersion,
557    posting_tail_codec: PostingTailCodec,
558    pub(crate) tokens: TokenSet,
559    pub(crate) posting_lists: Vec<PostingListBuilder>,
560    pub(crate) docs: DocSet,
561}
562
563impl InnerBuilder {
564    pub fn new(id: u64, with_position: bool, token_set_format: TokenSetFormat) -> Self {
565        Self::new_with_format_version(
566            id,
567            with_position,
568            token_set_format,
569            current_fts_format_version(),
570        )
571    }
572
573    pub fn new_with_format_version(
574        id: u64,
575        with_position: bool,
576        token_set_format: TokenSetFormat,
577        format_version: InvertedListFormatVersion,
578    ) -> Self {
579        Self {
580            id,
581            with_position,
582            token_set_format,
583            format_version,
584            posting_tail_codec: format_version.posting_tail_codec(),
585            tokens: TokenSet::default(),
586            posting_lists: Vec::new(),
587            docs: DocSet::default(),
588        }
589    }
590
591    pub fn new_with_posting_tail_codec(
592        id: u64,
593        with_position: bool,
594        token_set_format: TokenSetFormat,
595        posting_tail_codec: PostingTailCodec,
596    ) -> Self {
597        let format_version = if posting_tail_codec == PostingTailCodec::Fixed32 {
598            InvertedListFormatVersion::V1
599        } else {
600            InvertedListFormatVersion::V2
601        };
602        let mut builder =
603            Self::new_with_format_version(id, with_position, token_set_format, format_version);
604        builder.posting_tail_codec = posting_tail_codec;
605        builder
606    }
607
608    pub fn id(&self) -> u64 {
609        self.id
610    }
611
612    /// Set the token set for this builder.
613    pub fn set_tokens(&mut self, tokens: TokenSet) {
614        self.tokens = tokens;
615    }
616
617    /// Set the document set for this builder.
618    pub fn set_docs(&mut self, docs: DocSet) {
619        self.docs = docs;
620    }
621
622    /// Set the posting lists for this builder.
623    pub fn set_posting_lists(&mut self, posting_lists: Vec<PostingListBuilder>) {
624        self.posting_lists = posting_lists;
625    }
626
627    pub async fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
628        // for the docs, we need to remove the rows that are removed from the doc set,
629        // and update the row ids of the rows that are updated
630        let removed = self.docs.remap(mapping);
631
632        // for the posting lists, we need to remap the doc ids:
633        // - if the a row is removed, we need to shift the doc ids of the following rows
634        // - if a row is updated (assigned a new row id), we don't need to do anything with the posting lists
635        let mut token_id = 0;
636        let mut removed_token_ids = Vec::new();
637        self.posting_lists.retain_mut(|posting_list| {
638            posting_list.remap(&removed);
639            let keep = !posting_list.is_empty();
640            if !keep {
641                removed_token_ids.push(token_id as u32);
642            }
643            token_id += 1;
644            keep
645        });
646
647        // for the tokens, remap the token ids if any posting list is empty
648        self.tokens.remap(&removed_token_ids);
649
650        Ok(())
651    }
652
653    pub fn merge_from(&mut self, other: Self) -> Result<()> {
654        let Self {
655            id: _,
656            with_position,
657            token_set_format,
658            format_version,
659            posting_tail_codec,
660            tokens,
661            posting_lists,
662            docs,
663        } = other;
664
665        if self.with_position != with_position {
666            return Err(Error::index(format!(
667                "cannot merge partitions with mismatched positions settings: {} vs {}",
668                self.with_position, with_position
669            )));
670        }
671        if self.token_set_format != token_set_format {
672            return Err(Error::index(format!(
673                "cannot merge partitions with mismatched token set formats: {:?} vs {:?}",
674                self.token_set_format, token_set_format
675            )));
676        }
677        if self.format_version != format_version {
678            return Err(Error::index(format!(
679                "cannot merge partitions with mismatched FTS format versions: {:?} vs {:?}",
680                self.format_version, format_version
681            )));
682        }
683        if self.posting_tail_codec != posting_tail_codec {
684            return Err(Error::index(format!(
685                "cannot merge partitions with mismatched posting tail codecs: {:?} vs {:?}",
686                self.posting_tail_codec, posting_tail_codec
687            )));
688        }
689
690        let mut token_id_map = vec![u32::MAX; posting_lists.len()];
691        match tokens.tokens {
692            TokenMap::HashMap(map) => {
693                for (token, token_id) in map {
694                    let new_token_id = self.tokens.get_or_add(token.as_str());
695                    token_id_map[token_id as usize] = new_token_id;
696                }
697            }
698            TokenMap::Fst(map) => {
699                let mut stream = map.stream();
700                while let Some((token, token_id)) = stream.next() {
701                    let new_token_id = self
702                        .tokens
703                        .get_or_add(String::from_utf8_lossy(token).as_ref());
704                    token_id_map[token_id as usize] = new_token_id;
705                }
706            }
707        }
708
709        let doc_id_offset = self.docs.len() as u32;
710        for (row_id, num_tokens) in docs.iter() {
711            self.docs.append(*row_id, *num_tokens);
712        }
713        self.posting_lists.resize_with(self.tokens.len(), || {
714            PostingListBuilder::new_with_posting_tail_codec(with_position, self.posting_tail_codec)
715        });
716
717        for (token_id, posting_list) in posting_lists.into_iter().enumerate() {
718            if posting_list.is_empty() {
719                continue;
720            }
721            let new_token_id = token_id_map[token_id];
722            debug_assert_ne!(new_token_id, u32::MAX);
723            let merged_posting = &mut self.posting_lists[new_token_id as usize];
724            posting_list.for_each_entry(|doc_id, freq, positions| {
725                let positions = match positions {
726                    Some(positions) => PositionRecorder::Position(positions.into()),
727                    None => PositionRecorder::Count(freq),
728                };
729                merged_posting.add(doc_id_offset + doc_id, positions);
730                Ok::<(), Error>(())
731            })?;
732        }
733
734        Ok(())
735    }
736
737    pub async fn write(&mut self, store: &dyn IndexStore) -> Result<()> {
738        let docs = Arc::new(std::mem::take(&mut self.docs));
739        self.write_posting_lists(store, docs.clone()).await?;
740        self.write_tokens(store).await?;
741        self.write_docs(store, docs).await?;
742        Ok(())
743    }
744
745    #[instrument(level = "debug", skip_all)]
746    async fn write_posting_lists(
747        &mut self,
748        store: &dyn IndexStore,
749        docs: Arc<DocSet>,
750    ) -> Result<()> {
751        let id = self.id;
752        let mut writer = store
753            .new_index_file(
754                &posting_file_path(self.id),
755                inverted_list_schema_for_version(self.with_position, self.format_version),
756            )
757            .await?;
758        let posting_lists = std::mem::take(&mut self.posting_lists);
759
760        log::info!(
761            "writing {} posting lists of partition {}, with position {}",
762            posting_lists.len(),
763            id,
764            self.with_position
765        );
766        let with_position = self.with_position;
767        let format_version = self.format_version;
768        let schema = inverted_list_schema_for_version(self.with_position, self.format_version);
769        let docs_for_batches = docs.clone();
770        let schema_for_batches = schema.clone();
771        let batch_rows = *LANCE_FTS_POSTING_BATCH_ROWS;
772        let (tx, rx) = async_channel::bounded(*LANCE_FTS_WRITE_QUEUE_SIZE);
773        let producer = spawn_cpu(move || {
774            let mut batch_builder = PostingListBatchBuilder::new(
775                schema_for_batches.clone(),
776                with_position,
777                format_version,
778                batch_rows,
779            );
780            for posting_list in posting_lists {
781                posting_list.append_to_batch_with_docs(
782                    &docs_for_batches,
783                    &mut batch_builder,
784                    format_version,
785                )?;
786                if batch_builder.len() < batch_rows {
787                    continue;
788                }
789
790                let batch = batch_builder.finish()?;
791                if let Err(err) = tx.send_blocking(batch) {
792                    return Err(Error::execution(format!(
793                        "failed to send posting list batch to writer: {err}"
794                    )));
795                }
796            }
797
798            if !batch_builder.is_empty() {
799                let batch = batch_builder.finish()?;
800                if let Err(err) = tx.send_blocking(batch) {
801                    return Err(Error::execution(format!(
802                        "failed to send posting list batch to writer: {err}"
803                    )));
804                }
805            }
806
807            Result::Ok(())
808        });
809
810        while let Ok(batch) = rx.recv().await {
811            if let Err(err) = writer.write_record_batch(batch).await {
812                drop(rx);
813                // Wait for producer to stop; preserve the write error as the primary failure.
814                let _ = producer.await;
815                return Err(err);
816            }
817        }
818        drop(rx);
819        producer.await?;
820
821        writer.finish().await?;
822        Ok(())
823    }
824
825    #[instrument(level = "debug", skip_all)]
826    async fn write_tokens(&mut self, store: &dyn IndexStore) -> Result<()> {
827        log::info!("writing tokens of partition {}", self.id);
828        let tokens = std::mem::take(&mut self.tokens);
829        let batch = tokens.to_batch(self.token_set_format)?;
830        let mut writer = store
831            .new_index_file(&token_file_path(self.id), batch.schema())
832            .await?;
833        writer.write_record_batch(batch).await?;
834        writer.finish().await?;
835        Ok(())
836    }
837
838    #[instrument(level = "debug", skip_all)]
839    async fn write_docs(&mut self, store: &dyn IndexStore, docs: Arc<DocSet>) -> Result<()> {
840        log::info!("writing docs of partition {}", self.id);
841        let batch = docs.to_batch()?;
842        let mut writer = store
843            .new_index_file(&doc_file_path(self.id), batch.schema())
844            .await?;
845        writer.write_record_batch(batch).await?;
846        writer.finish().await?;
847        Ok(())
848    }
849}
850
851struct IndexWorker {
852    tokenizer: Box<dyn LanceTokenizer>,
853    dest_store: Arc<dyn IndexStore>,
854    id_alloc: Arc<AtomicU64>,
855    builder: InnerBuilder,
856    partitions: Vec<u64>,
857    schema: SchemaRef,
858    memory_size: u64,
859    worker_memory_limit_bytes: u64,
860    total_doc_length: usize,
861    fragment_mask: Option<u64>,
862    token_set_format: TokenSetFormat,
863    token_ids: Vec<u32>,
864    last_token_count: usize,
865}
866
867struct TailPartition {
868    builder: InnerBuilder,
869}
870
871struct WorkerOutput {
872    partitions: Vec<u64>,
873    tail_partition: Option<TailPartition>,
874}
875
876#[derive(Debug, Clone, Copy)]
877struct IndexWorkerConfig {
878    with_position: bool,
879    format_version: InvertedListFormatVersion,
880    fragment_mask: Option<u64>,
881    token_set_format: TokenSetFormat,
882    worker_memory_limit_bytes: u64,
883}
884
885impl IndexWorker {
886    fn posting_lists_overhead_size(&self) -> u64 {
887        (self.builder.posting_lists.capacity() * std::mem::size_of::<PostingListBuilder>()) as u64
888    }
889
890    fn adjust_tracked_value(tracked: &mut u64, old: u64, new: u64) {
891        if new >= old {
892            *tracked += new - old;
893        } else {
894            *tracked -= old - new;
895        }
896    }
897
898    fn adjust_tracked_memory_size(&mut self, old_memory_size: u64, new_memory_size: u64) {
899        Self::adjust_tracked_value(&mut self.memory_size, old_memory_size, new_memory_size);
900    }
901
902    fn apply_delta(total: &mut u64, delta: i64) {
903        if delta >= 0 {
904            *total += delta as u64;
905        } else {
906            *total -= (-delta) as u64;
907        }
908    }
909
910    fn temporary_memory_size(&self) -> u64 {
911        (self.token_ids.capacity() * std::mem::size_of::<u32>()) as u64
912    }
913
914    fn trim_temporary_buffers(&mut self) {
915        if self.token_ids.capacity() > MAX_RETAINED_TOKEN_IDS {
916            self.token_ids = Vec::with_capacity(self.last_token_count.min(MAX_RETAINED_TOKEN_IDS));
917        }
918    }
919
920    async fn new(
921        tokenizer: Box<dyn LanceTokenizer>,
922        dest_store: Arc<dyn IndexStore>,
923        id_alloc: Arc<AtomicU64>,
924        config: IndexWorkerConfig,
925    ) -> Result<Self> {
926        let schema = inverted_list_schema_for_version(config.with_position, config.format_version);
927
928        Ok(Self {
929            tokenizer,
930            dest_store,
931            builder: InnerBuilder::new_with_format_version(
932                id_alloc.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
933                    | config.fragment_mask.unwrap_or(0),
934                config.with_position,
935                config.token_set_format,
936                config.format_version,
937            ),
938            partitions: Vec::new(),
939            id_alloc,
940            schema,
941            memory_size: 0,
942            worker_memory_limit_bytes: config.worker_memory_limit_bytes,
943            total_doc_length: 0,
944            fragment_mask: config.fragment_mask,
945            token_set_format: config.token_set_format,
946            token_ids: Vec::new(),
947            last_token_count: 0,
948        })
949    }
950
951    fn has_position(&self) -> bool {
952        self.schema
953            .column_with_name(COMPRESSED_POSITION_COL)
954            .is_some()
955            || self.schema.column_with_name(POSITION_COL).is_some()
956    }
957
958    async fn process_batch(&mut self, batch: RecordBatch) -> Result<()> {
959        let doc_col = batch.column(0);
960        let doc_iter = iter_str_array(doc_col);
961        let row_id_col = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
962        let docs = doc_iter
963            .zip(row_id_col.values().iter())
964            .filter_map(|(doc, row_id)| doc.map(|doc| (doc, *row_id)));
965
966        let with_position = self.has_position();
967        for (doc, row_id) in docs {
968            let builder_was_empty = self.builder.docs.is_empty();
969            let old_temporary_memory_size = self.temporary_memory_size();
970            let old_token_memory_size = self.builder.tokens.memory_size() as u64;
971            let doc_id = self.builder.docs.len() as u32;
972            let mut token_num: u32 = 0;
973            let mut posting_memory_delta = 0i64;
974            if with_position {
975                if self.token_ids.capacity() < self.last_token_count {
976                    self.token_ids
977                        .reserve(self.last_token_count - self.token_ids.capacity());
978                }
979                self.token_ids.clear();
980                let builder = &mut self.builder;
981                let token_ids = &mut self.token_ids;
982                let memory_size = &mut self.memory_size;
983                let posting_tail_codec = builder.posting_tail_codec;
984
985                let mut token_stream = self.tokenizer.token_stream_for_doc(doc);
986                while token_stream.advance() {
987                    let token = token_stream.token_mut();
988                    let token_text = std::mem::take(&mut token.text);
989                    let token_id = builder.tokens.add(token_text);
990                    if token_id as usize == builder.posting_lists.len() {
991                        let old_posting_lists_overhead_size = (builder.posting_lists.capacity()
992                            * std::mem::size_of::<PostingListBuilder>())
993                            as u64;
994                        builder.posting_lists.push(
995                            PostingListBuilder::new_with_posting_tail_codec(
996                                true,
997                                posting_tail_codec,
998                            ),
999                        );
1000                        let new_posting_lists_overhead_size = (builder.posting_lists.capacity()
1001                            * std::mem::size_of::<PostingListBuilder>())
1002                            as u64;
1003                        Self::adjust_tracked_value(
1004                            memory_size,
1005                            old_posting_lists_overhead_size,
1006                            new_posting_lists_overhead_size,
1007                        );
1008                    }
1009                    let posting_list = &mut builder.posting_lists[token_id as usize];
1010                    let old_posting_memory_size = posting_list.size();
1011                    if posting_list.add_occurrence(doc_id, token.position as u32)? {
1012                        token_ids.push(token_id);
1013                    }
1014                    let new_posting_memory_size = posting_list.size();
1015                    posting_memory_delta +=
1016                        new_posting_memory_size as i64 - old_posting_memory_size as i64;
1017                    token_num += 1;
1018                }
1019            } else {
1020                if self.token_ids.capacity() < self.last_token_count {
1021                    self.token_ids
1022                        .reserve(self.last_token_count - self.token_ids.capacity());
1023                }
1024                self.token_ids.clear();
1025
1026                let mut token_stream = self.tokenizer.token_stream_for_doc(doc);
1027                while token_stream.advance() {
1028                    let token = token_stream.token_mut();
1029                    let token_text = std::mem::take(&mut token.text);
1030                    let token_id = self.builder.tokens.add(token_text);
1031                    self.token_ids.push(token_id);
1032                    token_num += 1;
1033                }
1034            }
1035            self.adjust_tracked_memory_size(
1036                old_token_memory_size,
1037                self.builder.tokens.memory_size() as u64,
1038            );
1039
1040            if !with_position {
1041                let old_posting_lists_overhead_size = self.posting_lists_overhead_size();
1042                self.builder
1043                    .posting_lists
1044                    .resize_with(self.builder.tokens.len(), || {
1045                        PostingListBuilder::new_with_posting_tail_codec(
1046                            false,
1047                            self.builder.posting_tail_codec,
1048                        )
1049                    });
1050                let new_posting_lists_overhead_size = self.posting_lists_overhead_size();
1051                Self::adjust_tracked_value(
1052                    &mut self.memory_size,
1053                    old_posting_lists_overhead_size,
1054                    new_posting_lists_overhead_size,
1055                );
1056            }
1057
1058            let old_doc_memory_size = self.builder.docs.memory_size() as u64;
1059            let appended_doc_id = self.builder.docs.append(row_id, token_num);
1060            debug_assert_eq!(appended_doc_id, doc_id);
1061            self.adjust_tracked_memory_size(
1062                old_doc_memory_size,
1063                self.builder.docs.memory_size() as u64,
1064            );
1065            self.total_doc_length += doc.len();
1066
1067            if with_position {
1068                for &token_id in &self.token_ids {
1069                    let (old_posting_memory_size, new_posting_memory_size) = {
1070                        let posting_list = &mut self.builder.posting_lists[token_id as usize];
1071                        let old_posting_memory_size = posting_list.size();
1072                        posting_list.finish_open_doc(doc_id)?;
1073                        let new_posting_memory_size = posting_list.size();
1074                        (old_posting_memory_size, new_posting_memory_size)
1075                    };
1076                    posting_memory_delta +=
1077                        new_posting_memory_size as i64 - old_posting_memory_size as i64;
1078                }
1079                Self::apply_delta(&mut self.memory_size, posting_memory_delta);
1080            } else if token_num > 0 {
1081                self.token_ids.sort_unstable();
1082                let mut iter = self.token_ids.iter();
1083                let mut current = *iter.next().unwrap();
1084                let mut count = 1u32;
1085                for &token_id in iter {
1086                    if token_id == current {
1087                        count += 1;
1088                        continue;
1089                    }
1090
1091                    let (old_posting_memory_size, new_posting_memory_size) = {
1092                        let posting_list = &mut self.builder.posting_lists[current as usize];
1093                        let old_posting_memory_size = posting_list.size();
1094                        posting_list.add(doc_id, PositionRecorder::Count(count));
1095                        let new_posting_memory_size = posting_list.size();
1096                        (old_posting_memory_size, new_posting_memory_size)
1097                    };
1098                    posting_memory_delta +=
1099                        new_posting_memory_size as i64 - old_posting_memory_size as i64;
1100
1101                    current = token_id;
1102                    count = 1;
1103                }
1104                let (old_posting_memory_size, new_posting_memory_size) = {
1105                    let posting_list = &mut self.builder.posting_lists[current as usize];
1106                    let old_posting_memory_size = posting_list.size();
1107                    posting_list.add(doc_id, PositionRecorder::Count(count));
1108                    let new_posting_memory_size = posting_list.size();
1109                    (old_posting_memory_size, new_posting_memory_size)
1110                };
1111                posting_memory_delta +=
1112                    new_posting_memory_size as i64 - old_posting_memory_size as i64;
1113                Self::apply_delta(&mut self.memory_size, posting_memory_delta);
1114            }
1115            self.last_token_count = self.token_ids.len();
1116            self.trim_temporary_buffers();
1117            self.adjust_tracked_memory_size(
1118                old_temporary_memory_size,
1119                self.temporary_memory_size(),
1120            );
1121
1122            if self.builder.docs.len() == 1 && self.memory_size > self.worker_memory_limit_bytes {
1123                return Err(Error::invalid_input(format!(
1124                    "single document row_id={} exceeds worker memory limit: {} > {} bytes",
1125                    row_id, self.memory_size, self.worker_memory_limit_bytes
1126                )));
1127            }
1128
1129            if self.builder.docs.len() as u32 == u32::MAX
1130                || (!builder_was_empty && self.memory_size >= self.worker_memory_limit_bytes)
1131            {
1132                self.flush().await?;
1133            }
1134        }
1135
1136        Ok(())
1137    }
1138
1139    #[instrument(level = "debug", skip_all)]
1140    async fn flush(&mut self) -> Result<()> {
1141        if self.builder.tokens.is_empty() {
1142            return Ok(());
1143        }
1144
1145        log::info!(
1146            "flushing posting lists, memory size: {} MiB",
1147            self.memory_size / (1024 * 1024)
1148        );
1149        self.memory_size = self.temporary_memory_size();
1150        let with_position = self.has_position();
1151        let format_version = self.builder.format_version;
1152        let builder = std::mem::replace(
1153            &mut self.builder,
1154            InnerBuilder::new_with_format_version(
1155                self.id_alloc
1156                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
1157                    | self.fragment_mask.unwrap_or(0),
1158                with_position,
1159                self.token_set_format,
1160                format_version,
1161            ),
1162        );
1163        let written_partition_id = builder.id();
1164        let mut builder = builder;
1165        builder
1166            .write(self.dest_store.as_ref())
1167            .await
1168            .map_err(|err| {
1169                Error::execution(format!(
1170                    "failed to write finalized partition {}: {err}",
1171                    written_partition_id
1172                ))
1173            })?;
1174        self.partitions.push(written_partition_id);
1175        Ok(())
1176    }
1177
1178    async fn finish(self) -> Result<WorkerOutput> {
1179        let tail_partition = if self.builder.tokens.is_empty() {
1180            None
1181        } else {
1182            Some(TailPartition {
1183                builder: self.builder,
1184            })
1185        };
1186        Ok(WorkerOutput {
1187            partitions: self.partitions,
1188            tail_partition,
1189        })
1190    }
1191}
1192
1193#[derive(Debug, Clone)]
1194pub enum PositionRecorder {
1195    Position(SmallVec<[u32; 2]>),
1196    Count(u32),
1197}
1198
1199impl PositionRecorder {
1200    pub fn len(&self) -> u32 {
1201        match self {
1202            Self::Position(positions) => positions.len() as u32,
1203            Self::Count(count) => *count,
1204        }
1205    }
1206
1207    pub fn is_empty(&self) -> bool {
1208        self.len() == 0
1209    }
1210
1211    pub fn into_vec(self) -> Vec<u32> {
1212        match self {
1213            Self::Position(positions) => positions.into_vec(),
1214            Self::Count(_) => vec![0],
1215        }
1216    }
1217}
1218
1219#[derive(Debug, Eq, PartialEq, Clone, DeepSizeOf)]
1220pub struct ScoredDoc {
1221    pub row_id: u64,
1222    pub score: OrderedFloat,
1223}
1224
1225impl ScoredDoc {
1226    pub fn new(row_id: u64, score: f32) -> Self {
1227        Self {
1228            row_id,
1229            score: OrderedFloat(score),
1230        }
1231    }
1232}
1233
1234impl PartialOrd for ScoredDoc {
1235    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1236        Some(self.cmp(other))
1237    }
1238}
1239
1240impl Ord for ScoredDoc {
1241    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1242        self.score.cmp(&other.score)
1243    }
1244}
1245
1246pub fn legacy_inverted_list_schema(with_position: bool) -> SchemaRef {
1247    let mut fields = vec![
1248        arrow_schema::Field::new(ROW_ID, arrow_schema::DataType::UInt64, false),
1249        arrow_schema::Field::new(FREQUENCY_COL, arrow_schema::DataType::Float32, false),
1250    ];
1251    if with_position {
1252        fields.push(arrow_schema::Field::new(
1253            POSITION_COL,
1254            arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1255                "item",
1256                arrow_schema::DataType::Int32,
1257                true,
1258            ))),
1259            false,
1260        ));
1261    }
1262    Arc::new(arrow_schema::Schema::new(fields))
1263}
1264
1265pub fn inverted_list_schema(with_position: bool) -> SchemaRef {
1266    inverted_list_schema_for_version(with_position, current_fts_format_version())
1267}
1268
1269pub fn inverted_list_schema_for_version(
1270    with_position: bool,
1271    format_version: InvertedListFormatVersion,
1272) -> SchemaRef {
1273    match format_version {
1274        InvertedListFormatVersion::V1 => inverted_list_schema_v1(with_position),
1275        InvertedListFormatVersion::V2 => inverted_list_schema_with_tail_codec_and_position_codec(
1276            with_position,
1277            PostingTailCodec::VarintDelta,
1278            Some(PositionStreamCodec::PackedDelta),
1279        ),
1280    }
1281}
1282
1283fn inverted_list_schema_v1(with_position: bool) -> SchemaRef {
1284    let mut fields = vec![
1285        arrow_schema::Field::new(
1286            POSTING_COL,
1287            datatypes::DataType::List(Arc::new(Field::new(
1288                "item",
1289                datatypes::DataType::LargeBinary,
1290                true,
1291            ))),
1292            false,
1293        ),
1294        arrow_schema::Field::new(MAX_SCORE_COL, datatypes::DataType::Float32, false),
1295        arrow_schema::Field::new(LENGTH_COL, datatypes::DataType::UInt32, false),
1296    ];
1297    if with_position {
1298        fields.push(arrow_schema::Field::new(
1299            POSITION_COL,
1300            arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1301                "item",
1302                arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1303                    "item",
1304                    arrow_schema::DataType::LargeBinary,
1305                    true,
1306                ))),
1307                true,
1308            ))),
1309            false,
1310        ));
1311    }
1312    Arc::new(arrow_schema::Schema::new(fields))
1313}
1314
1315pub fn inverted_list_schema_with_tail_codec(
1316    with_position: bool,
1317    posting_tail_codec: PostingTailCodec,
1318) -> SchemaRef {
1319    inverted_list_schema_with_tail_codec_and_position_codec(
1320        with_position,
1321        posting_tail_codec,
1322        Some(PositionStreamCodec::PackedDelta),
1323    )
1324}
1325
1326fn inverted_list_schema_with_tail_codec_and_position_codec(
1327    with_position: bool,
1328    posting_tail_codec: PostingTailCodec,
1329    position_codec: Option<PositionStreamCodec>,
1330) -> SchemaRef {
1331    let mut fields = vec![
1332        // we compress the posting lists (including row ids and frequencies),
1333        // and store the compressed posting lists, so it's a large binary array
1334        arrow_schema::Field::new(
1335            POSTING_COL,
1336            datatypes::DataType::List(Arc::new(Field::new(
1337                "item",
1338                datatypes::DataType::LargeBinary,
1339                true,
1340            ))),
1341            false,
1342        ),
1343        arrow_schema::Field::new(MAX_SCORE_COL, datatypes::DataType::Float32, false),
1344        arrow_schema::Field::new(LENGTH_COL, datatypes::DataType::UInt32, false),
1345    ];
1346    if with_position {
1347        fields.push(arrow_schema::Field::new(
1348            COMPRESSED_POSITION_COL,
1349            arrow_schema::DataType::LargeBinary,
1350            false,
1351        ));
1352        fields.push(arrow_schema::Field::new(
1353            POSITION_BLOCK_OFFSET_COL,
1354            arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1355                "item",
1356                arrow_schema::DataType::UInt32,
1357                true,
1358            ))),
1359            false,
1360        ));
1361    }
1362    let mut metadata = HashMap::from([(
1363        POSTING_TAIL_CODEC_KEY.to_owned(),
1364        posting_tail_codec.as_str().to_owned(),
1365    )]);
1366    if let Some(position_codec) = position_codec.filter(|_| with_position) {
1367        metadata.insert(
1368            POSITIONS_LAYOUT_KEY.to_owned(),
1369            POSITIONS_LAYOUT_SHARED_STREAM_V2.to_owned(),
1370        );
1371        metadata.insert(
1372            POSITIONS_CODEC_KEY.to_owned(),
1373            position_codec.as_str().to_owned(),
1374        );
1375    }
1376    Arc::new(arrow_schema::Schema::new_with_metadata(fields, metadata))
1377}
1378
1379/// Flatten the string list stream into a string stream
1380pub struct FlattenStream {
1381    /// Inner record batch stream with 2 columns:
1382    /// 1. doc_col: List(Utf8) or List(LargeUtf8)
1383    /// 2. row_id_col: UInt64
1384    inner: SendableRecordBatchStream,
1385    field_type: DataType,
1386    data_type: DataType,
1387}
1388
1389impl FlattenStream {
1390    pub fn new(input: SendableRecordBatchStream) -> Self {
1391        let schema = input.schema();
1392        let field = schema.field(0);
1393        let data_type = match field.data_type() {
1394            DataType::List(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
1395            DataType::List(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
1396                DataType::LargeUtf8
1397            }
1398            DataType::LargeList(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
1399            DataType::LargeList(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
1400                DataType::LargeUtf8
1401            }
1402            _ => panic!(
1403                "expect data type List(Utf8) or List(LargeUtf8) but got {:?}",
1404                field.data_type()
1405            ),
1406        };
1407        Self {
1408            inner: input,
1409            field_type: field.data_type().clone(),
1410            data_type,
1411        }
1412    }
1413}
1414
1415impl Stream for FlattenStream {
1416    type Item = datafusion_common::Result<RecordBatch>;
1417
1418    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1419        match Pin::new(&mut self.inner).poll_next(cx) {
1420            Poll::Ready(Some(Ok(batch))) => {
1421                let doc_col = batch.column(0);
1422                let batch = match self.field_type {
1423                    DataType::List(_) => flatten_string_list::<i32>(&batch, doc_col).map_err(|e| {
1424                        datafusion_common::error::DataFusionError::Execution(format!(
1425                            "flatten string list error: {}",
1426                            e
1427                        ))
1428                    }),
1429                    DataType::LargeList(_) => {
1430                        flatten_string_list::<i64>(&batch, doc_col).map_err(|e| {
1431                            datafusion_common::error::DataFusionError::Execution(format!(
1432                                "flatten string list error: {}",
1433                                e
1434                            ))
1435                        })
1436                    }
1437                    _ => unreachable!(
1438                        "expect data type List or LargeList but got {:?}",
1439                        self.field_type
1440                    ),
1441                };
1442                Poll::Ready(Some(batch))
1443            }
1444            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
1445            Poll::Ready(None) => Poll::Ready(None),
1446            Poll::Pending => Poll::Pending,
1447        }
1448    }
1449}
1450
1451impl RecordBatchStream for FlattenStream {
1452    fn schema(&self) -> SchemaRef {
1453        let schema = Schema::new(vec![
1454            Field::new(
1455                self.inner.schema().field(0).name(),
1456                self.data_type.clone(),
1457                true,
1458            ),
1459            ROW_ID_FIELD.clone(),
1460        ]);
1461
1462        Arc::new(schema)
1463    }
1464}
1465
1466fn flatten_string_list<Offset: arrow::array::OffsetSizeTrait>(
1467    batch: &RecordBatch,
1468    doc_col: &Arc<dyn Array>,
1469) -> Result<RecordBatch> {
1470    let docs = doc_col.as_list::<Offset>();
1471    let row_ids = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
1472
1473    let row_ids = row_ids
1474        .values()
1475        .iter()
1476        .zip(docs.iter())
1477        .flat_map(|(row_id, doc)| std::iter::repeat_n(*row_id, doc.map(|d| d.len()).unwrap_or(0)));
1478
1479    let row_ids = Arc::new(UInt64Array::from_iter_values(row_ids));
1480    let docs = match docs.value_type() {
1481        datatypes::DataType::Utf8 | datatypes::DataType::LargeUtf8 => docs.values().clone(),
1482        _ => {
1483            return Err(Error::index(format!(
1484                "expect data type String or LargeString but got {}",
1485                docs.value_type()
1486            )));
1487        }
1488    };
1489
1490    let schema = Schema::new(vec![
1491        Field::new(
1492            batch.schema().field(0).name(),
1493            docs.data_type().clone(),
1494            true,
1495        ),
1496        ROW_ID_FIELD.clone(),
1497    ]);
1498    let batch = RecordBatch::try_new(Arc::new(schema), vec![docs, row_ids])?;
1499    Ok(batch)
1500}
1501
1502pub(crate) fn token_file_path(partition_id: u64) -> String {
1503    format!("part_{}_{}", partition_id, TOKENS_FILE)
1504}
1505
1506pub(crate) fn posting_file_path(partition_id: u64) -> String {
1507    format!("part_{}_{}", partition_id, INVERT_LIST_FILE)
1508}
1509
1510pub(crate) fn doc_file_path(partition_id: u64) -> String {
1511    format!("part_{}_{}", partition_id, DOCS_FILE)
1512}
1513
1514pub(crate) fn part_metadata_file_path(partition_id: u64) -> String {
1515    format!("part_{}_{}", partition_id, METADATA_FILE)
1516}
1517
1518const PARTITION_FILE_SUFFIXES: [&str; 3] = [TOKENS_FILE, INVERT_LIST_FILE, DOCS_FILE];
1519// Each remapped file is renamed twice: first to a temp path (phase 1), then to
1520// its final path (phase 2). Keep in sync with the two rename loops below in
1521// `merge_metadata_files`.
1522const PARTITION_FILE_RENAME_PHASES: u64 = 2;
1523
1524pub async fn merge_index_files(
1525    object_store: &ObjectStore,
1526    index_dir: &Path,
1527    store: Arc<dyn IndexStore>,
1528    progress: Arc<dyn IndexBuildProgress>,
1529) -> Result<()> {
1530    // List all partition metadata files in the index directory
1531    let part_metadata_files = list_metadata_files(object_store, index_dir).await?;
1532
1533    // Call merge_metadata_files function for inverted index
1534    merge_metadata_files(store, &part_metadata_files, progress).await
1535}
1536
1537/// List and filter metadata files from the index directory
1538/// Returns partition metadata files
1539async fn list_metadata_files(object_store: &ObjectStore, index_dir: &Path) -> Result<Vec<String>> {
1540    // List all partition metadata files in the index directory
1541    let mut part_metadata_files = Vec::new();
1542    let mut list_stream = object_store.list(Some(index_dir.clone()));
1543
1544    while let Some(item) = list_stream.next().await {
1545        match item {
1546            Ok(meta) => {
1547                let file_name = meta.location.filename().unwrap_or_default();
1548                // Filter files matching the pattern part_*_metadata.lance
1549                if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
1550                    part_metadata_files.push(file_name.to_string());
1551                }
1552            }
1553            Err(_) => continue,
1554        }
1555    }
1556
1557    if part_metadata_files.is_empty() {
1558        return Err(Error::invalid_input_source(
1559            format!(
1560                "No partition metadata files found in index directory: {}",
1561                index_dir
1562            )
1563            .into(),
1564        ));
1565    }
1566
1567    Ok(part_metadata_files)
1568}
1569
1570/// Merge partition metadata files with partition ID remapping to sequential IDs starting from 0
1571async fn merge_metadata_files(
1572    store: Arc<dyn IndexStore>,
1573    part_metadata_files: &[String],
1574    progress: Arc<dyn IndexBuildProgress>,
1575) -> Result<()> {
1576    // Collect all partition IDs and params
1577    let mut all_partitions = Vec::new();
1578    let mut params = None;
1579    let mut token_set_format = None;
1580    let mut format_version = None;
1581    let mut posting_tail_codec = None;
1582    let mut deleted_fragments = RoaringBitmap::new();
1583    progress
1584        .stage_start(
1585            "read_partition_metadata",
1586            Some(part_metadata_files.len() as u64),
1587            "files",
1588        )
1589        .await?;
1590
1591    for (idx, file_name) in part_metadata_files.iter().enumerate() {
1592        let reader = store.open_index_file(file_name).await?;
1593        let metadata = &reader.schema().metadata;
1594
1595        let partitions_str = metadata.get("partitions").ok_or(Error::index(format!(
1596            "partitions not found in {}",
1597            file_name
1598        )))?;
1599
1600        let partition_ids: Vec<u64> = serde_json::from_str(partitions_str)
1601            .map_err(|e| Error::index(format!("Failed to parse partitions: {}", e)))?;
1602
1603        all_partitions.extend(partition_ids);
1604
1605        if params.is_none() {
1606            let params_str = metadata
1607                .get("params")
1608                .ok_or(Error::index(format!("params not found in {}", file_name)))?;
1609            params = Some(
1610                serde_json::from_str::<InvertedIndexParams>(params_str)
1611                    .map_err(|e| Error::index(format!("Failed to parse params: {}", e)))?,
1612            );
1613        }
1614
1615        if token_set_format.is_none()
1616            && let Some(name) = metadata.get(TOKEN_SET_FORMAT_KEY)
1617        {
1618            token_set_format = Some(TokenSetFormat::from_str(name)?);
1619        }
1620        if format_version.is_none() {
1621            format_version = Some(parse_format_version_from_metadata(metadata)?);
1622        }
1623        if posting_tail_codec.is_none() {
1624            posting_tail_codec = Some(parse_posting_tail_codec(metadata)?);
1625        }
1626
1627        if reader.num_rows() > 0 {
1628            let metadata_batch = reader.read_range(0..1, None).await?;
1629            let deleted_fragments_col = metadata_batch
1630                .column_by_name(DELETED_FRAGMENTS_COL)
1631                .expect_ok()?;
1632            let deleted_fragments_arr = deleted_fragments_col
1633                .as_any()
1634                .downcast_ref::<BinaryArray>()
1635                .expect_ok()?;
1636            let part_deleted_fragments =
1637                RoaringBitmap::deserialize_from(deleted_fragments_arr.value(0))?;
1638            deleted_fragments.extend(part_deleted_fragments);
1639        }
1640        progress
1641            .stage_progress("read_partition_metadata", idx as u64 + 1)
1642            .await?;
1643    }
1644    progress.stage_complete("read_partition_metadata").await?;
1645
1646    // Create ID mapping: sorted original IDs -> 0,1,2...
1647    let mut sorted_ids = all_partitions.clone();
1648    sorted_ids.sort();
1649    sorted_ids.dedup();
1650
1651    let id_mapping: HashMap<u64, u64> = sorted_ids
1652        .iter()
1653        .enumerate()
1654        .map(|(new_id, &old_id)| (old_id, new_id as u64))
1655        .collect();
1656
1657    // Safe rename partition files using temporary files to avoid overwrite
1658    let timestamp = std::time::SystemTime::now()
1659        .duration_since(std::time::UNIX_EPOCH)
1660        .unwrap()
1661        .as_secs();
1662
1663    let changed_partition_count = id_mapping
1664        .iter()
1665        .filter(|(old_id, new_id)| old_id != new_id)
1666        .count() as u64;
1667    let total_renames = changed_partition_count
1668        * PARTITION_FILE_SUFFIXES.len() as u64
1669        * PARTITION_FILE_RENAME_PHASES;
1670    progress
1671        .stage_start("remap_partition_files", Some(total_renames), "files")
1672        .await?;
1673
1674    // Phase 1: Move files to temporary locations
1675    let mut temp_files: Vec<(String, String, String)> = Vec::new(); // (temp_path, old_path, final_path)
1676    let mut renamed_files = 0u64;
1677
1678    for (&old_id, &new_id) in &id_mapping {
1679        if old_id != new_id {
1680            for suffix in PARTITION_FILE_SUFFIXES {
1681                let old_path = format!("part_{}_{}", old_id, suffix);
1682                let new_path = format!("part_{}_{}", new_id, suffix);
1683                let temp_path = format!("temp_{}_{}", timestamp, old_path);
1684
1685                // Move to temporary location first to avoid overwrite
1686                if let Err(e) = store.rename_index_file(&old_path, &temp_path).await {
1687                    // Rollback phase 1: restore files from temp locations
1688                    for (temp_name, old_name, _) in temp_files.iter().rev() {
1689                        let _ = store.rename_index_file(temp_name, old_name).await;
1690                    }
1691                    return Err(Error::index(format!(
1692                        "Failed to move {} to temp {}: {}",
1693                        old_path, temp_path, e
1694                    )));
1695                }
1696                temp_files.push((temp_path, old_path, new_path));
1697                renamed_files += 1;
1698                progress
1699                    .stage_progress("remap_partition_files", renamed_files)
1700                    .await?;
1701            }
1702        }
1703    }
1704
1705    // Phase 2: Move from temporary to final locations
1706    let mut completed_renames: Vec<(String, String)> = Vec::new(); // (final_path, temp_path)
1707
1708    for (temp_path, _old_path, final_path) in &temp_files {
1709        if let Err(e) = store.rename_index_file(temp_path, final_path).await {
1710            // Rollback phase 2: restore completed renames and remaining temps
1711            for (final_name, temp_name) in completed_renames.iter().rev() {
1712                let _ = store.rename_index_file(final_name, temp_name).await;
1713            }
1714            // Restore remaining temp files to original locations
1715            for (temp_name, orig_name, _) in temp_files.iter() {
1716                if !completed_renames.iter().any(|(_, t)| t == temp_name) {
1717                    let _ = store.rename_index_file(temp_name, orig_name).await;
1718                }
1719            }
1720            return Err(Error::index(format!(
1721                "Failed to rename {} to {}: {}",
1722                temp_path, final_path, e
1723            )));
1724        }
1725        completed_renames.push((final_path.clone(), temp_path.clone()));
1726        renamed_files += 1;
1727        progress
1728            .stage_progress("remap_partition_files", renamed_files)
1729            .await?;
1730    }
1731    progress.stage_complete("remap_partition_files").await?;
1732
1733    // Write merged metadata with remapped IDs
1734    let remapped_partitions: Vec<u64> = (0..id_mapping.len() as u64).collect();
1735    let params = params.unwrap_or_default();
1736    let token_set_format = token_set_format.unwrap_or(TokenSetFormat::Arrow);
1737    let builder = InvertedIndexBuilder::from_existing_index(
1738        params,
1739        None,
1740        remapped_partitions.clone(),
1741        token_set_format,
1742        None,
1743        deleted_fragments,
1744    )
1745    .with_format_version(format_version.unwrap_or(InvertedListFormatVersion::V1))
1746    .with_posting_tail_codec(posting_tail_codec.unwrap_or(PostingTailCodec::Fixed32));
1747    progress
1748        .stage_start("write_merged_metadata", Some(1), "files")
1749        .await?;
1750    builder
1751        .write_metadata(&*store, &remapped_partitions)
1752        .await?;
1753    progress.stage_progress("write_merged_metadata", 1).await?;
1754    progress.stage_complete("write_merged_metadata").await?;
1755
1756    // Cleanup partition metadata files
1757    for file_name in part_metadata_files {
1758        if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
1759            let _ = store.delete_index_file(file_name).await;
1760        }
1761    }
1762
1763    Ok(())
1764}
1765
1766/// Convert input stream into a stream of documents.
1767///
1768/// The input stream must be one of:
1769/// 1. Document in Utf8 or LargeUtf8 format.
1770/// 2. Document in List(Utf8) or List(LargeUtf8) format.
1771/// 3. Json document in LargeBinary format.
1772pub fn document_input(
1773    input: SendableRecordBatchStream,
1774    column: &str,
1775) -> Result<SendableRecordBatchStream> {
1776    let schema = input.schema();
1777    let field = schema.column_with_name(column).expect_ok()?.1;
1778    match field.data_type() {
1779        DataType::Utf8 | DataType::LargeUtf8 => Ok(input),
1780        DataType::List(field) | DataType::LargeList(field)
1781            if matches!(field.data_type(), DataType::Utf8 | DataType::LargeUtf8) =>
1782        {
1783            Ok(Box::pin(FlattenStream::new(input)))
1784        }
1785        DataType::LargeBinary => match field.metadata().get(ARROW_EXT_NAME_KEY) {
1786            Some(name) if name.as_str() == JSON_EXT_NAME => {
1787                Ok(Box::pin(JsonTextStream::new(input, column.to_string())))
1788            }
1789            _ => Err(Error::invalid_input_source(
1790                format!("column {} is not json", column).into(),
1791            )),
1792        },
1793        _ => Err(Error::invalid_input_source(
1794            format!(
1795                "column {} has type {}, is not utf8, large utf8 type/list, or large binary",
1796                column,
1797                field.data_type()
1798            )
1799            .into(),
1800        )),
1801    }
1802}
1803
1804#[cfg(test)]
1805mod tests {
1806    use super::*;
1807    use crate::metrics::NoOpMetricsCollector;
1808    use crate::progress::IndexBuildProgress;
1809    use crate::scalar::{IndexFile, IndexReader, IndexWriter, ScalarIndex};
1810    use arrow_array::{RecordBatch, StringArray, UInt64Array};
1811    use arrow_schema::{DataType, Field, Schema};
1812    use async_trait::async_trait;
1813    use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1814    use futures::stream;
1815    use lance_core::ROW_ID;
1816    use lance_core::cache::LanceCache;
1817    use lance_core::utils::tempfile::TempDir;
1818    use std::any::Any;
1819    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1820    use std::time::Duration;
1821
1822    fn make_doc_batch(doc: &str, row_id: u64) -> RecordBatch {
1823        let schema = Arc::new(Schema::new(vec![
1824            Field::new("doc", DataType::Utf8, true),
1825            Field::new(ROW_ID, DataType::UInt64, false),
1826        ]));
1827        let docs = Arc::new(StringArray::from(vec![Some(doc)]));
1828        let row_ids = Arc::new(UInt64Array::from(vec![row_id]));
1829        RecordBatch::try_new(schema, vec![docs, row_ids]).unwrap()
1830    }
1831
1832    #[derive(Debug, Default, Clone)]
1833    struct CountingStore {
1834        write_count: Arc<AtomicUsize>,
1835    }
1836
1837    impl CountingStore {
1838        fn new() -> Self {
1839            Self {
1840                write_count: Arc::new(AtomicUsize::new(0)),
1841            }
1842        }
1843
1844        fn write_count(&self) -> usize {
1845            self.write_count.load(Ordering::SeqCst)
1846        }
1847    }
1848
1849    impl DeepSizeOf for CountingStore {
1850        fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
1851            0
1852        }
1853    }
1854
1855    #[derive(Debug)]
1856    struct CountingWriter {
1857        write_count: Arc<AtomicUsize>,
1858    }
1859
1860    #[async_trait]
1861    impl IndexWriter for CountingWriter {
1862        async fn write_record_batch(&mut self, _batch: RecordBatch) -> Result<u64> {
1863            Ok(self.write_count.fetch_add(1, Ordering::SeqCst) as u64)
1864        }
1865
1866        async fn finish(&mut self) -> Result<()> {
1867            Ok(())
1868        }
1869
1870        async fn finish_with_metadata(&mut self, _metadata: HashMap<String, String>) -> Result<()> {
1871            Ok(())
1872        }
1873    }
1874
1875    #[async_trait]
1876    impl IndexStore for CountingStore {
1877        fn as_any(&self) -> &dyn Any {
1878            self
1879        }
1880
1881        fn clone_arc(&self) -> Arc<dyn IndexStore> {
1882            Arc::new(self.clone())
1883        }
1884
1885        fn io_parallelism(&self) -> usize {
1886            1
1887        }
1888
1889        async fn new_index_file(
1890            &self,
1891            _name: &str,
1892            _schema: Arc<Schema>,
1893        ) -> Result<Box<dyn IndexWriter>> {
1894            Ok(Box::new(CountingWriter {
1895                write_count: self.write_count.clone(),
1896            }))
1897        }
1898
1899        async fn open_index_file(&self, _name: &str) -> Result<Arc<dyn IndexReader>> {
1900            Err(Error::not_supported(
1901                "CountingStore does not support reading",
1902            ))
1903        }
1904
1905        async fn copy_index_file(&self, _name: &str, _dest_store: &dyn IndexStore) -> Result<()> {
1906            Err(Error::not_supported(
1907                "CountingStore does not support copying",
1908            ))
1909        }
1910
1911        async fn rename_index_file(&self, _name: &str, _new_name: &str) -> Result<()> {
1912            Err(Error::not_supported(
1913                "CountingStore does not support renaming",
1914            ))
1915        }
1916
1917        async fn delete_index_file(&self, _name: &str) -> Result<()> {
1918            Err(Error::not_supported(
1919                "CountingStore does not support deleting",
1920            ))
1921        }
1922
1923        async fn list_files_with_sizes(&self) -> Result<Vec<IndexFile>> {
1924            Ok(vec![])
1925        }
1926    }
1927
1928    #[tokio::test]
1929    async fn test_write_posting_lists_batches_multiple_rows() -> Result<()> {
1930        let mut builder = InnerBuilder::new(0, false, TokenSetFormat::default());
1931        for doc_id in 0..3u64 {
1932            builder.docs.append(doc_id, 1);
1933        }
1934
1935        for doc_id in 0..3u32 {
1936            let mut posting_list = PostingListBuilder::new(false);
1937            posting_list.add(doc_id, PositionRecorder::Count(1));
1938            builder.posting_lists.push(posting_list);
1939        }
1940
1941        let store = CountingStore::new();
1942        let docs = Arc::new(std::mem::take(&mut builder.docs));
1943        builder.write_posting_lists(&store, docs).await?;
1944
1945        assert_eq!(store.write_count(), 1);
1946        Ok(())
1947    }
1948
1949    #[tokio::test]
1950    async fn test_build_only_path_writes_partitions_as_is() -> Result<()> {
1951        let src_dir = TempDir::default();
1952        let dest_dir = TempDir::default();
1953        let src_store = Arc::new(LanceIndexStore::new(
1954            ObjectStore::local().into(),
1955            src_dir.obj_path(),
1956            Arc::new(LanceCache::no_cache()),
1957        ));
1958        let dest_store = Arc::new(LanceIndexStore::new(
1959            ObjectStore::local().into(),
1960            dest_dir.obj_path(),
1961            Arc::new(LanceCache::no_cache()),
1962        ));
1963
1964        let params = InvertedIndexParams::default();
1965        let tokenizer = params.build()?;
1966        let token_set_format = TokenSetFormat::default();
1967        let id_alloc = Arc::new(AtomicU64::new(0));
1968
1969        let mut worker1 = IndexWorker::new(
1970            tokenizer.clone(),
1971            src_store.clone(),
1972            id_alloc.clone(),
1973            IndexWorkerConfig {
1974                with_position: params.with_position,
1975                format_version: InvertedListFormatVersion::V1,
1976                fragment_mask: None,
1977                token_set_format,
1978                worker_memory_limit_bytes: u64::MAX,
1979            },
1980        )
1981        .await?;
1982        worker1
1983            .process_batch(make_doc_batch("hello world", 0))
1984            .await?;
1985        let output1 = worker1.finish().await?;
1986        let mut partitions = output1.partitions;
1987        if let Some(mut tail_partition) = output1.tail_partition {
1988            partitions.push(tail_partition.builder.id());
1989            tail_partition.builder.write(src_store.as_ref()).await?;
1990        }
1991
1992        let mut worker2 = IndexWorker::new(
1993            tokenizer.clone(),
1994            src_store.clone(),
1995            id_alloc.clone(),
1996            IndexWorkerConfig {
1997                with_position: params.with_position,
1998                format_version: InvertedListFormatVersion::V1,
1999                fragment_mask: None,
2000                token_set_format,
2001                worker_memory_limit_bytes: u64::MAX,
2002            },
2003        )
2004        .await?;
2005        worker2
2006            .process_batch(make_doc_batch("goodbye world", 1))
2007            .await?;
2008        let output2 = worker2.finish().await?;
2009        partitions.extend(output2.partitions);
2010        if let Some(mut tail_partition) = output2.tail_partition {
2011            partitions.push(tail_partition.builder.id());
2012            tail_partition.builder.write(src_store.as_ref()).await?;
2013        }
2014        partitions.sort_unstable();
2015        assert_eq!(partitions.len(), 2);
2016        assert_ne!(partitions[0], partitions[1]);
2017
2018        let builder = InvertedIndexBuilder::from_existing_index(
2019            InvertedIndexParams::default(),
2020            Some(src_store.clone()),
2021            partitions.clone(),
2022            token_set_format,
2023            None,
2024            RoaringBitmap::new(),
2025        );
2026        builder.write(dest_store.as_ref()).await?;
2027
2028        let metadata_reader = dest_store.open_index_file(METADATA_FILE).await?;
2029        let metadata = &metadata_reader.schema().metadata;
2030        let partitions_str = metadata
2031            .get("partitions")
2032            .expect("partitions missing from metadata");
2033        let written_partitions: Vec<u64> = serde_json::from_str(partitions_str).unwrap();
2034        assert_eq!(written_partitions, partitions);
2035
2036        for id in &partitions {
2037            dest_store.open_index_file(&token_file_path(*id)).await?;
2038            dest_store.open_index_file(&posting_file_path(*id)).await?;
2039            dest_store.open_index_file(&doc_file_path(*id)).await?;
2040        }
2041
2042        Ok(())
2043    }
2044
2045    #[tokio::test]
2046    async fn test_update_preserves_existing_posting_tail_codec() -> Result<()> {
2047        let src_dir = TempDir::default();
2048        let dest_dir = TempDir::default();
2049        let src_store = Arc::new(LanceIndexStore::new(
2050            ObjectStore::local().into(),
2051            src_dir.obj_path(),
2052            Arc::new(LanceCache::no_cache()),
2053        ));
2054        let dest_store = Arc::new(LanceIndexStore::new(
2055            ObjectStore::local().into(),
2056            dest_dir.obj_path(),
2057            Arc::new(LanceCache::no_cache()),
2058        ));
2059
2060        let posting_tail_codec = PostingTailCodec::Fixed32;
2061        let mut partition = InnerBuilder::new_with_posting_tail_codec(
2062            0,
2063            false,
2064            TokenSetFormat::default(),
2065            posting_tail_codec,
2066        );
2067        partition.tokens.add("hello".to_owned());
2068        let mut posting_list =
2069            PostingListBuilder::new_with_posting_tail_codec(false, posting_tail_codec);
2070        posting_list.add(0, PositionRecorder::Count(1));
2071        partition.posting_lists.push(posting_list);
2072        partition.docs.append(100, 1);
2073        partition.write(src_store.as_ref()).await?;
2074
2075        let metadata_writer = InvertedIndexBuilder::from_existing_index(
2076            InvertedIndexParams::default(),
2077            Some(src_store.clone()),
2078            vec![0],
2079            TokenSetFormat::default(),
2080            None,
2081            RoaringBitmap::new(),
2082        )
2083        .with_posting_tail_codec(posting_tail_codec);
2084        metadata_writer
2085            .write_metadata(src_store.as_ref(), &[0])
2086            .await?;
2087
2088        let index = InvertedIndex::load(src_store, None, &LanceCache::no_cache()).await?;
2089        let schema = Arc::new(Schema::new(vec![
2090            Field::new("doc", DataType::Utf8, true),
2091            Field::new(ROW_ID, DataType::UInt64, false),
2092        ]));
2093        let docs = Arc::new(StringArray::from(vec![Some("hello again")]));
2094        let row_ids = Arc::new(UInt64Array::from(vec![101u64]));
2095        let batch = RecordBatch::try_new(schema.clone(), vec![docs, row_ids])?;
2096        let stream = RecordBatchStreamAdapter::new(schema, stream::iter(vec![Ok(batch)]));
2097        index
2098            .update(Box::pin(stream), dest_store.as_ref(), None)
2099            .await?;
2100
2101        let updated =
2102            InvertedIndex::load(dest_store.clone(), None, &LanceCache::no_cache()).await?;
2103        assert_eq!(updated.partitions.len(), 2);
2104        for partition in &updated.partitions {
2105            assert_eq!(
2106                partition.inverted_list.posting_tail_codec(),
2107                posting_tail_codec
2108            );
2109        }
2110
2111        let metadata = dest_store.open_index_file(METADATA_FILE).await?;
2112        assert_eq!(
2113            metadata.schema().metadata.get(POSTING_TAIL_CODEC_KEY),
2114            Some(&posting_tail_codec.as_str().to_owned())
2115        );
2116
2117        Ok(())
2118    }
2119
2120    #[test]
2121    fn test_with_posting_tail_codec_syncs_format_version() {
2122        let builder = InvertedIndexBuilder::from_existing_index(
2123            InvertedIndexParams::default(),
2124            None,
2125            Vec::new(),
2126            TokenSetFormat::default(),
2127            None,
2128            RoaringBitmap::new(),
2129        )
2130        .with_format_version(InvertedListFormatVersion::V2)
2131        .with_posting_tail_codec(PostingTailCodec::Fixed32);
2132        assert_eq!(builder.format_version, InvertedListFormatVersion::V1);
2133        assert_eq!(builder.posting_tail_codec, PostingTailCodec::Fixed32);
2134
2135        let builder = builder.with_posting_tail_codec(PostingTailCodec::VarintDelta);
2136        assert_eq!(builder.format_version, InvertedListFormatVersion::V2);
2137        assert_eq!(builder.posting_tail_codec, PostingTailCodec::VarintDelta);
2138    }
2139
2140    #[tokio::test]
2141    async fn test_inverted_index_without_positions_tracks_frequency() -> Result<()> {
2142        let index_dir = TempDir::default();
2143        let store = Arc::new(LanceIndexStore::new(
2144            ObjectStore::local().into(),
2145            index_dir.obj_path(),
2146            Arc::new(LanceCache::no_cache()),
2147        ));
2148
2149        let schema = Arc::new(Schema::new(vec![
2150            Field::new("doc", DataType::Utf8, true),
2151            Field::new(ROW_ID, DataType::UInt64, false),
2152        ]));
2153        let docs = Arc::new(StringArray::from(vec![Some("hello hello world")]));
2154        let row_ids = Arc::new(UInt64Array::from(vec![0u64]));
2155        let batch = RecordBatch::try_new(schema.clone(), vec![docs, row_ids])?;
2156        let stream = RecordBatchStreamAdapter::new(schema, stream::iter(vec![Ok(batch)]));
2157        let stream = Box::pin(stream);
2158
2159        let params =
2160            InvertedIndexParams::new("whitespace".to_string(), lance_tokenizer::Language::English)
2161                .with_position(false)
2162                .remove_stop_words(false)
2163                .stem(false)
2164                .max_token_length(None);
2165
2166        let mut builder = InvertedIndexBuilder::new(params);
2167        builder.update(stream, store.as_ref(), None).await?;
2168
2169        let index = InvertedIndex::load(store, None, &LanceCache::no_cache()).await?;
2170        assert_eq!(index.partitions.len(), 1);
2171        let partition = &index.partitions[0];
2172        let token_id = partition.tokens.get("hello").unwrap();
2173        let posting = partition
2174            .inverted_list
2175            .posting_list(token_id, false, &NoOpMetricsCollector)
2176            .await?;
2177
2178        let mut iter = posting.iter();
2179        let (doc_id, freq, positions) = iter.next().unwrap();
2180        assert_eq!(doc_id, 0);
2181        assert_eq!(freq, 2);
2182        assert!(positions.is_none());
2183        assert!(iter.next().is_none());
2184
2185        Ok(())
2186    }
2187
2188    lance_testing::define_stage_event_progress!(RecordingProgress, IndexBuildProgress, Result<()>);
2189
2190    #[derive(Debug, Default)]
2191    struct FailingProgress;
2192
2193    #[async_trait]
2194    impl IndexBuildProgress for FailingProgress {
2195        async fn stage_start(&self, _stage: &str, _total: Option<u64>, _unit: &str) -> Result<()> {
2196            Ok(())
2197        }
2198
2199        async fn stage_progress(&self, _stage: &str, _completed: u64) -> Result<()> {
2200            Err(Error::io("injected progress failure"))
2201        }
2202
2203        async fn stage_complete(&self, _stage: &str) -> Result<()> {
2204            Ok(())
2205        }
2206    }
2207
2208    #[tokio::test]
2209    async fn test_builder_reports_progress_stages() -> Result<()> {
2210        let index_dir = TempDir::default();
2211        let store = Arc::new(LanceIndexStore::new(
2212            ObjectStore::local().into(),
2213            index_dir.obj_path(),
2214            Arc::new(LanceCache::no_cache()),
2215        ));
2216
2217        let batch1 = make_doc_batch("hello world", 0);
2218        let batch2 = make_doc_batch("goodbye world", 1);
2219        let total_rows = 2u64;
2220        let stream = RecordBatchStreamAdapter::new(
2221            batch1.schema(),
2222            stream::iter(vec![Ok(batch1), Ok(batch2)]),
2223        );
2224        let stream = Box::pin(stream);
2225
2226        let progress = Arc::new(RecordingProgress::default());
2227        let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default())
2228            .with_progress(progress.clone());
2229        builder.update(stream, store.as_ref(), None).await?;
2230
2231        let events = progress.recorded_events();
2232        let tags = events
2233            .iter()
2234            .map(|(kind, stage, _)| format!("{kind}:{stage}"))
2235            .collect::<Vec<_>>();
2236        let tokenize_progress = events
2237            .iter()
2238            .filter_map(|(kind, stage, completed)| {
2239                if kind == "progress" && stage == "tokenize_docs" {
2240                    Some(*completed)
2241                } else {
2242                    None
2243                }
2244            })
2245            .collect::<Vec<_>>();
2246
2247        let tokenize_start = tags
2248            .iter()
2249            .position(|e| e == "start:tokenize_docs")
2250            .expect("missing tokenize_docs start");
2251        let tokenize_complete = tags
2252            .iter()
2253            .position(|e| e == "complete:tokenize_docs")
2254            .expect("missing tokenize_docs complete");
2255        let copy_start = tags
2256            .iter()
2257            .position(|e| e == "start:copy_partitions")
2258            .expect("missing copy_partitions start");
2259        let copy_complete = tags
2260            .iter()
2261            .position(|e| e == "complete:copy_partitions")
2262            .expect("missing copy_partitions complete");
2263        let metadata_start = tags
2264            .iter()
2265            .position(|e| e == "start:write_metadata")
2266            .expect("missing write_metadata start");
2267        let metadata_complete = tags
2268            .iter()
2269            .position(|e| e == "complete:write_metadata")
2270            .expect("missing write_metadata complete");
2271
2272        assert!(tokenize_start < tokenize_complete);
2273        assert!(tokenize_complete < copy_start);
2274        assert!(copy_start < copy_complete);
2275        assert!(copy_complete < metadata_start);
2276        assert!(metadata_start < metadata_complete);
2277
2278        assert!(
2279            tags.iter().any(|e| e == "progress:tokenize_docs"),
2280            "expected progress callback for tokenize_docs"
2281        );
2282        assert!(
2283            tokenize_progress.len() >= 2,
2284            "expected at least two progress callbacks for tokenize_docs, got {tokenize_progress:?}"
2285        );
2286        assert_eq!(
2287            tokenize_progress.iter().copied().max().unwrap_or_default(),
2288            total_rows,
2289            "expected tokenize_docs progress to reach all rows"
2290        );
2291        assert!(
2292            tags.iter().any(|e| e == "progress:copy_partitions"),
2293            "expected progress callback for copy_partitions"
2294        );
2295        assert!(
2296            tags.iter().any(|e| e == "progress:write_metadata"),
2297            "expected progress callback for write_metadata"
2298        );
2299        assert!(
2300            !tags.iter().any(|e| e == "start:merge_partitions"),
2301            "merge_partitions should not run in the build-only path"
2302        );
2303
2304        Ok(())
2305    }
2306
2307    #[tokio::test]
2308    async fn test_builder_default_path_skips_merge_stage() -> Result<()> {
2309        let index_dir = TempDir::default();
2310        let store = Arc::new(LanceIndexStore::new(
2311            ObjectStore::local().into(),
2312            index_dir.obj_path(),
2313            Arc::new(LanceCache::no_cache()),
2314        ));
2315
2316        let batch = make_doc_batch("hello world", 0);
2317        let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2318        let stream = Box::pin(stream);
2319
2320        let progress = Arc::new(RecordingProgress::default());
2321        let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default())
2322            .with_progress(progress.clone());
2323        builder.update(stream, store.as_ref(), None).await?;
2324
2325        let tags = progress
2326            .recorded_events()
2327            .iter()
2328            .map(|(kind, stage, _)| format!("{kind}:{stage}"))
2329            .collect::<Vec<_>>();
2330
2331        assert!(
2332            tags.iter().any(|e| e == "start:copy_partitions"),
2333            "default path should copy finalized partitions"
2334        );
2335        assert!(
2336            !tags.iter().any(|e| e == "start:merge_partitions"),
2337            "default path should not run merge_partitions"
2338        );
2339        Ok(())
2340    }
2341
2342    #[tokio::test]
2343    async fn test_merge_index_files_reports_progress_stages() -> Result<()> {
2344        let index_dir = TempDir::default();
2345        let index_path = index_dir.obj_path();
2346        let object_store = ObjectStore::local();
2347        let store = Arc::new(LanceIndexStore::new(
2348            object_store.clone().into(),
2349            index_path.clone(),
2350            Arc::new(LanceCache::no_cache()),
2351        ));
2352
2353        for (fragment_id, row_id, doc) in [
2354            (1_u64 << 32, 0_u64, "hello world"),
2355            (2_u64 << 32, 1_u64, "goodbye world"),
2356        ] {
2357            let batch = make_doc_batch(doc, row_id);
2358            let stream =
2359                RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2360            let stream = Box::pin(stream);
2361            let mut builder = InvertedIndexBuilder::new_with_fragment_mask(
2362                InvertedIndexParams::default(),
2363                Some(fragment_id),
2364            )
2365            .with_progress(noop_progress());
2366            builder.update(stream, store.as_ref(), None).await?;
2367        }
2368
2369        let progress = Arc::new(RecordingProgress::default());
2370        merge_index_files(&object_store, &index_path, store.clone(), progress.clone()).await?;
2371
2372        let events = progress.recorded_events();
2373        let tags = events
2374            .iter()
2375            .map(|(kind, stage, _)| format!("{kind}:{stage}"))
2376            .collect::<Vec<_>>();
2377        let remap_progress = events
2378            .iter()
2379            .filter_map(|(kind, stage, completed)| {
2380                if kind == "progress" && stage == "remap_partition_files" {
2381                    Some(*completed)
2382                } else {
2383                    None
2384                }
2385            })
2386            .collect::<Vec<_>>();
2387
2388        let read_start = tags
2389            .iter()
2390            .position(|e| e == "start:read_partition_metadata")
2391            .expect("missing read_partition_metadata start");
2392        let read_complete = tags
2393            .iter()
2394            .position(|e| e == "complete:read_partition_metadata")
2395            .expect("missing read_partition_metadata complete");
2396        let remap_start = tags
2397            .iter()
2398            .position(|e| e == "start:remap_partition_files")
2399            .expect("missing remap_partition_files start");
2400        let remap_complete = tags
2401            .iter()
2402            .position(|e| e == "complete:remap_partition_files")
2403            .expect("missing remap_partition_files complete");
2404        let metadata_start = tags
2405            .iter()
2406            .position(|e| e == "start:write_merged_metadata")
2407            .expect("missing write_merged_metadata start");
2408        let metadata_complete = tags
2409            .iter()
2410            .position(|e| e == "complete:write_merged_metadata")
2411            .expect("missing write_merged_metadata complete");
2412
2413        assert!(read_start < read_complete);
2414        assert!(read_complete < remap_start);
2415        assert!(remap_start < remap_complete);
2416        assert!(remap_complete < metadata_start);
2417        assert!(metadata_start < metadata_complete);
2418
2419        assert!(
2420            tags.iter().any(|e| e == "progress:read_partition_metadata"),
2421            "expected progress callback for read_partition_metadata"
2422        );
2423        assert_eq!(
2424            remap_progress.last().copied().unwrap_or_default(),
2425            12,
2426            "expected remap_partition_files progress to cover both rename phases"
2427        );
2428        assert!(
2429            tags.iter().any(|e| e == "progress:write_merged_metadata"),
2430            "expected progress callback for write_merged_metadata"
2431        );
2432
2433        Ok(())
2434    }
2435
2436    #[tokio::test]
2437    async fn test_worker_memory_limit_rejects_single_large_doc() {
2438        let index_dir = TempDir::default();
2439        let store = Arc::new(LanceIndexStore::new(
2440            ObjectStore::local().into(),
2441            index_dir.obj_path(),
2442            Arc::new(LanceCache::no_cache()),
2443        ));
2444
2445        let batch = make_doc_batch("hello world", 42);
2446        let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2447        let stream = Box::pin(stream);
2448
2449        let mut builder =
2450            InvertedIndexBuilder::new(InvertedIndexParams::default().memory_limit_mb(0));
2451        let err = builder
2452            .update(stream, store.as_ref(), None)
2453            .await
2454            .expect_err("single doc should exceed zero worker memory limit");
2455        assert!(
2456            err.to_string().contains("row_id=42"),
2457            "unexpected error: {err}"
2458        );
2459    }
2460
2461    #[tokio::test]
2462    async fn test_worker_trims_position_temp_buffers() -> Result<()> {
2463        let tokenizer = InvertedIndexParams::default().with_position(true).build()?;
2464        let store = Arc::new(CountingStore::new());
2465        let id_alloc = Arc::new(AtomicU64::new(0));
2466        let mut worker = IndexWorker::new(
2467            tokenizer,
2468            store,
2469            id_alloc,
2470            IndexWorkerConfig {
2471                with_position: true,
2472                format_version: InvertedListFormatVersion::V1,
2473                fragment_mask: None,
2474                token_set_format: TokenSetFormat::default(),
2475                worker_memory_limit_bytes: u64::MAX,
2476            },
2477        )
2478        .await?;
2479
2480        let doc = (0..(MAX_RETAINED_TOKEN_IDS * 2))
2481            .map(|i| format!("tok{i}"))
2482            .collect::<Vec<_>>()
2483            .join(" ");
2484        worker.process_batch(make_doc_batch(&doc, 0)).await?;
2485
2486        assert!(worker.token_ids.is_empty());
2487        assert!(worker.token_ids.capacity() <= MAX_RETAINED_TOKEN_IDS);
2488        assert!(worker.memory_size >= worker.temporary_memory_size());
2489        Ok(())
2490    }
2491
2492    #[tokio::test]
2493    async fn test_worker_flush_keeps_position_temp_memory_bounded() -> Result<()> {
2494        let tokenizer = InvertedIndexParams::default().with_position(true).build()?;
2495        let store = Arc::new(CountingStore::new());
2496        let id_alloc = Arc::new(AtomicU64::new(0));
2497        let mut worker = IndexWorker::new(
2498            tokenizer,
2499            store,
2500            id_alloc,
2501            IndexWorkerConfig {
2502                with_position: true,
2503                format_version: InvertedListFormatVersion::V1,
2504                fragment_mask: None,
2505                token_set_format: TokenSetFormat::default(),
2506                worker_memory_limit_bytes: u64::MAX,
2507            },
2508        )
2509        .await?;
2510
2511        let doc = std::iter::repeat_n("common", 32_768)
2512            .collect::<Vec<_>>()
2513            .join(" ");
2514        let mut observed_post_flush_memory = Vec::new();
2515        for row_id in 0..8 {
2516            worker.process_batch(make_doc_batch(&doc, row_id)).await?;
2517            worker.flush().await?;
2518            observed_post_flush_memory.push(worker.memory_size);
2519        }
2520
2521        let max_memory = *observed_post_flush_memory.iter().max().unwrap();
2522        let min_memory = *observed_post_flush_memory.iter().min().unwrap();
2523        assert!(
2524            max_memory <= min_memory.saturating_add(256 * 1024),
2525            "post-flush worker memory drifted upward: {observed_post_flush_memory:?}"
2526        );
2527        Ok(())
2528    }
2529
2530    #[tokio::test]
2531    async fn test_worker_flush_writes_partition_directly() -> Result<()> {
2532        let tokenizer = InvertedIndexParams::default().with_position(true).build()?;
2533        let store = Arc::new(CountingStore::new());
2534        let id_alloc = Arc::new(AtomicU64::new(0));
2535        let mut worker = IndexWorker::new(
2536            tokenizer,
2537            store.clone(),
2538            id_alloc,
2539            IndexWorkerConfig {
2540                with_position: true,
2541                format_version: InvertedListFormatVersion::V1,
2542                fragment_mask: None,
2543                token_set_format: TokenSetFormat::default(),
2544                worker_memory_limit_bytes: u64::MAX,
2545            },
2546        )
2547        .await?;
2548        worker
2549            .process_batch(make_doc_batch("alpha beta gamma", 0))
2550            .await?;
2551        worker.flush().await?;
2552        assert!(store.write_count() > 0);
2553        Ok(())
2554    }
2555
2556    #[test]
2557    fn test_resolve_worker_memory_limit_uses_default_when_unset() {
2558        let params = InvertedIndexParams::default();
2559        assert_eq!(
2560            resolve_worker_memory_limit_bytes(&params, 8),
2561            *LANCE_FTS_PARTITION_SIZE << 20
2562        );
2563    }
2564
2565    #[test]
2566    fn test_resolve_num_workers_uses_default_when_unset() {
2567        let expected = default_num_workers().clamp(1, get_num_compute_intensive_cpus().max(1));
2568        assert_eq!(
2569            resolve_num_workers(&InvertedIndexParams::default()),
2570            expected
2571        );
2572    }
2573
2574    #[test]
2575    fn test_resolve_num_workers_clamps_requested_value() {
2576        let max_workers = get_num_compute_intensive_cpus().max(1);
2577        assert_eq!(
2578            resolve_num_workers(&InvertedIndexParams::default().num_workers(0)),
2579            1
2580        );
2581        assert_eq!(
2582            resolve_num_workers(&InvertedIndexParams::default().num_workers(max_workers + 10)),
2583            max_workers
2584        );
2585    }
2586
2587    #[test]
2588    fn test_resolve_worker_memory_limit_splits_total_memory_limit() {
2589        let params = InvertedIndexParams::default().memory_limit_mb(4096);
2590        assert_eq!(resolve_worker_memory_limit_bytes(&params, 16), 256 << 20);
2591    }
2592
2593    #[test]
2594    fn test_merge_all_tail_partitions_combines_everything() -> Result<()> {
2595        let merged = merge_all_tail_partitions(vec![
2596            TailPartition {
2597                builder: InnerBuilder::new(0, false, TokenSetFormat::default()),
2598            },
2599            TailPartition {
2600                builder: InnerBuilder::new(1, false, TokenSetFormat::default()),
2601            },
2602            TailPartition {
2603                builder: InnerBuilder::new(2, false, TokenSetFormat::default()),
2604            },
2605        ])?;
2606
2607        assert_eq!(merged.expect("merged builder should exist").id(), 0);
2608        Ok(())
2609    }
2610
2611    #[test]
2612    fn test_merge_all_tail_partitions_returns_none_for_empty_input() -> Result<()> {
2613        assert!(merge_all_tail_partitions(Vec::new())?.is_none());
2614        Ok(())
2615    }
2616
2617    #[test]
2618    fn test_merge_tail_partition_group_combines_tail_builders() -> Result<()> {
2619        let mut first = InnerBuilder::new(0, false, TokenSetFormat::default());
2620        let hello = first.tokens.add("hello".to_owned());
2621        first
2622            .posting_lists
2623            .resize_with(first.tokens.len(), || PostingListBuilder::new(false));
2624        let first_doc = first.docs.append(10, 1);
2625        first.posting_lists[hello as usize].add(first_doc, PositionRecorder::Count(1));
2626
2627        let mut second = InnerBuilder::new(1, false, TokenSetFormat::default());
2628        let world = second.tokens.add("world".to_owned());
2629        second
2630            .posting_lists
2631            .resize_with(second.tokens.len(), || PostingListBuilder::new(false));
2632        let second_doc = second.docs.append(20, 2);
2633        second.posting_lists[world as usize].add(second_doc, PositionRecorder::Count(2));
2634
2635        let merged = merge_tail_partition_group(vec![
2636            TailPartition { builder: first },
2637            TailPartition { builder: second },
2638        ])?;
2639
2640        assert_eq!(merged.id(), 0);
2641        assert_eq!(merged.docs.len(), 2);
2642        assert_eq!(merged.tokens.len(), 2);
2643        assert_eq!(merged.posting_lists.len(), 2);
2644        assert_eq!(
2645            merged.posting_lists[merged.tokens.get("hello").unwrap() as usize].len(),
2646            1
2647        );
2648        assert_eq!(
2649            merged.posting_lists[merged.tokens.get("world").unwrap() as usize].len(),
2650            1
2651        );
2652        Ok(())
2653    }
2654
2655    #[tokio::test]
2656    async fn test_update_index_returns_worker_error_when_workers_exit_during_dispatch() {
2657        let num_batches = (*LANCE_FTS_NUM_SHARDS * 2 + 1) as u64;
2658        let index_dir = TempDir::default();
2659        let store = Arc::new(LanceIndexStore::new(
2660            ObjectStore::local().into(),
2661            index_dir.obj_path(),
2662            Arc::new(LanceCache::no_cache()),
2663        ));
2664        let schema = make_doc_batch("hello world", 0).schema();
2665        let stream = RecordBatchStreamAdapter::new(
2666            schema,
2667            stream::iter((0..num_batches).map(|row_id| Ok(make_doc_batch("hello world", row_id)))),
2668        );
2669        let stream = Box::pin(stream);
2670
2671        let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default())
2672            .with_progress(Arc::new(FailingProgress));
2673
2674        let result = tokio::time::timeout(
2675            Duration::from_secs(5),
2676            builder.update_index(stream, store.as_ref()),
2677        )
2678        .await
2679        .expect("update_index should not hang")
2680        .expect_err("worker failure should be returned");
2681
2682        assert!(
2683            result.to_string().contains("injected progress failure"),
2684            "unexpected error: {result}"
2685        );
2686    }
2687
2688    #[tokio::test]
2689    async fn test_new_index_has_empty_deleted_fragments() {
2690        let index_dir = TempDir::default();
2691        let store = Arc::new(LanceIndexStore::new(
2692            ObjectStore::local().into(),
2693            index_dir.obj_path(),
2694            Arc::new(LanceCache::no_cache()),
2695        ));
2696
2697        let batch = make_doc_batch("hello world", 0);
2698        let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2699        let stream = Box::pin(stream);
2700
2701        let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
2702        builder.update(stream, store.as_ref(), None).await.unwrap();
2703
2704        let index = InvertedIndex::load(store, None, &LanceCache::no_cache())
2705            .await
2706            .unwrap();
2707        assert!(
2708            index.deleted_fragments().is_empty(),
2709            "new index should have empty deleted fragments, got {:?}",
2710            index.deleted_fragments()
2711        );
2712    }
2713
2714    #[tokio::test]
2715    async fn test_remap_preserves_deleted_fragments() {
2716        let src_dir = TempDir::default();
2717        let dest_dir = TempDir::default();
2718        let src_store = Arc::new(LanceIndexStore::new(
2719            ObjectStore::local().into(),
2720            src_dir.obj_path(),
2721            Arc::new(LanceCache::no_cache()),
2722        ));
2723        let dest_store = Arc::new(LanceIndexStore::new(
2724            ObjectStore::local().into(),
2725            dest_dir.obj_path(),
2726            Arc::new(LanceCache::no_cache()),
2727        ));
2728
2729        // Build an initial index with some deleted fragments
2730        let batch = make_doc_batch("hello world", 0);
2731        let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2732        let stream = Box::pin(stream);
2733
2734        let initial_deleted = RoaringBitmap::from_iter([5, 10, 42]);
2735        let mut builder = InvertedIndexBuilder::from_existing_index(
2736            InvertedIndexParams::default(),
2737            None,
2738            Vec::new(),
2739            TokenSetFormat::default(),
2740            None,
2741            initial_deleted.clone(),
2742        );
2743        builder
2744            .update(stream, src_store.as_ref(), None)
2745            .await
2746            .unwrap();
2747
2748        // Load it back and confirm the invalidated fragments are set
2749        let index = InvertedIndex::load(src_store.clone(), None, &LanceCache::no_cache())
2750            .await
2751            .unwrap();
2752        assert_eq!(index.deleted_fragments(), &initial_deleted);
2753
2754        // Remap the index via the ScalarIndex trait method
2755        use crate::scalar::ScalarIndex;
2756        let mapping = HashMap::from([(0u64, Some(50 << 32))]);
2757        index.remap(&mapping, dest_store.as_ref()).await.unwrap();
2758
2759        // Reload from dest and verify deleted fragments are preserved
2760        let remapped_index = InvertedIndex::load(dest_store.clone(), None, &LanceCache::no_cache())
2761            .await
2762            .unwrap();
2763        assert_eq!(
2764            remapped_index.deleted_fragments(),
2765            &initial_deleted,
2766            "remap should preserve deleted fragments"
2767        );
2768    }
2769
2770    #[tokio::test]
2771    async fn test_update_grows_deleted_fragments_from_old_data_filter() {
2772        let index_dir = TempDir::default();
2773        let store = Arc::new(LanceIndexStore::new(
2774            ObjectStore::local().into(),
2775            index_dir.obj_path(),
2776            Arc::new(LanceCache::no_cache()),
2777        ));
2778
2779        // Build an initial index with no deleted fragments
2780        let batch = make_doc_batch("hello world", 0);
2781        let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2782        let stream = Box::pin(stream);
2783
2784        let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
2785        builder.update(stream, store.as_ref(), None).await.unwrap();
2786
2787        // Load the index and update it with an old_data_filter that invalidates fragments
2788        let index = InvertedIndex::load(store.clone(), None, &LanceCache::no_cache())
2789            .await
2790            .unwrap();
2791        assert!(index.deleted_fragments().is_empty());
2792
2793        let update_dir = TempDir::default();
2794        let update_store = Arc::new(LanceIndexStore::new(
2795            ObjectStore::local().into(),
2796            update_dir.obj_path(),
2797            Arc::new(LanceCache::no_cache()),
2798        ));
2799
2800        let batch2 = make_doc_batch("new document", 1 << 32 | 1);
2801        let stream2 =
2802            RecordBatchStreamAdapter::new(batch2.schema(), stream::iter(vec![Ok(batch2)]));
2803        let stream2 = Box::pin(stream2);
2804
2805        let old_data_filter = Some(crate::scalar::OldIndexDataFilter::Fragments {
2806            to_keep: RoaringBitmap::from_iter([0]),
2807            to_remove: RoaringBitmap::from_iter([3, 7]),
2808        });
2809
2810        // Use ScalarIndex::update trait method
2811        use crate::scalar::ScalarIndex;
2812        index
2813            .update(stream2, update_store.as_ref(), old_data_filter)
2814            .await
2815            .unwrap();
2816
2817        let updated_index =
2818            InvertedIndex::load(update_store.clone(), None, &LanceCache::no_cache())
2819                .await
2820                .unwrap();
2821        assert_eq!(
2822            updated_index.deleted_fragments(),
2823            &RoaringBitmap::from_iter([3, 7]),
2824            "update should add deleted fragments from old_data_filter"
2825        );
2826    }
2827
2828    #[tokio::test]
2829    async fn test_update_accumulates_deleted_fragments() {
2830        let dir1 = TempDir::default();
2831        let store1 = Arc::new(LanceIndexStore::new(
2832            ObjectStore::local().into(),
2833            dir1.obj_path(),
2834            Arc::new(LanceCache::no_cache()),
2835        ));
2836
2837        // Build initial index
2838        let batch = make_doc_batch("hello world", 0);
2839        let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2840        let stream = Box::pin(stream);
2841
2842        let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
2843        builder.update(stream, store1.as_ref(), None).await.unwrap();
2844
2845        // First update: delete fragments 3 and 7
2846        let index = InvertedIndex::load(store1.clone(), None, &LanceCache::no_cache())
2847            .await
2848            .unwrap();
2849
2850        let dir2 = TempDir::default();
2851        let store2 = Arc::new(LanceIndexStore::new(
2852            ObjectStore::local().into(),
2853            dir2.obj_path(),
2854            Arc::new(LanceCache::no_cache()),
2855        ));
2856
2857        let batch2 = make_doc_batch("second doc", 1 << 32 | 1);
2858        let stream2 =
2859            RecordBatchStreamAdapter::new(batch2.schema(), stream::iter(vec![Ok(batch2)]));
2860        let stream2 = Box::pin(stream2);
2861
2862        use crate::scalar::ScalarIndex;
2863        index
2864            .update(
2865                stream2,
2866                store2.as_ref(),
2867                Some(crate::scalar::OldIndexDataFilter::Fragments {
2868                    to_keep: RoaringBitmap::from_iter([0]),
2869                    to_remove: RoaringBitmap::from_iter([3, 7]),
2870                }),
2871            )
2872            .await
2873            .unwrap();
2874
2875        // Second update: invalidate additional fragments 12 and 15
2876        let index2 = InvertedIndex::load(store2.clone(), None, &LanceCache::no_cache())
2877            .await
2878            .unwrap();
2879        assert_eq!(
2880            index2.deleted_fragments(),
2881            &RoaringBitmap::from_iter([3, 7])
2882        );
2883
2884        let dir3 = TempDir::default();
2885        let store3 = Arc::new(LanceIndexStore::new(
2886            ObjectStore::local().into(),
2887            dir3.obj_path(),
2888            Arc::new(LanceCache::no_cache()),
2889        ));
2890
2891        let batch3 = make_doc_batch("third doc", 2 << 32 | 2);
2892        let stream3 =
2893            RecordBatchStreamAdapter::new(batch3.schema(), stream::iter(vec![Ok(batch3)]));
2894        let stream3 = Box::pin(stream3);
2895
2896        index2
2897            .update(
2898                stream3,
2899                store3.as_ref(),
2900                Some(crate::scalar::OldIndexDataFilter::Fragments {
2901                    to_keep: RoaringBitmap::from_iter([0, 1]),
2902                    to_remove: RoaringBitmap::from_iter([12, 15]),
2903                }),
2904            )
2905            .await
2906            .unwrap();
2907
2908        let index3 = InvertedIndex::load(store3.clone(), None, &LanceCache::no_cache())
2909            .await
2910            .unwrap();
2911        assert_eq!(
2912            index3.deleted_fragments(),
2913            &RoaringBitmap::from_iter([3, 7, 12, 15]),
2914            "deleted fragments should accumulate across updates"
2915        );
2916    }
2917
2918    #[tokio::test]
2919    async fn test_update_with_rowid_filter_does_not_grow_deleted_fragments() {
2920        let index_dir = TempDir::default();
2921        let store = Arc::new(LanceIndexStore::new(
2922            ObjectStore::local().into(),
2923            index_dir.obj_path(),
2924            Arc::new(LanceCache::no_cache()),
2925        ));
2926
2927        let batch = make_doc_batch("hello world", 0);
2928        let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2929        let stream = Box::pin(stream);
2930
2931        let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
2932        builder.update(stream, store.as_ref(), None).await.unwrap();
2933
2934        let index = InvertedIndex::load(store.clone(), None, &LanceCache::no_cache())
2935            .await
2936            .unwrap();
2937
2938        let update_dir = TempDir::default();
2939        let update_store = Arc::new(LanceIndexStore::new(
2940            ObjectStore::local().into(),
2941            update_dir.obj_path(),
2942            Arc::new(LanceCache::no_cache()),
2943        ));
2944
2945        let batch2 = make_doc_batch("new doc", 1);
2946        let stream2 =
2947            RecordBatchStreamAdapter::new(batch2.schema(), stream::iter(vec![Ok(batch2)]));
2948        let stream2 = Box::pin(stream2);
2949
2950        // Use RowIds filter instead of Fragments — should not affect deleted_fragments
2951        let mut valid_ids = lance_core::utils::mask::RowAddrTreeMap::new();
2952        valid_ids.insert(0);
2953        let old_data_filter = Some(crate::scalar::OldIndexDataFilter::RowIds(valid_ids));
2954
2955        use crate::scalar::ScalarIndex;
2956        index
2957            .update(stream2, update_store.as_ref(), old_data_filter)
2958            .await
2959            .unwrap();
2960
2961        let updated_index =
2962            InvertedIndex::load(update_store.clone(), None, &LanceCache::no_cache())
2963                .await
2964                .unwrap();
2965        assert!(
2966            updated_index.deleted_fragments().is_empty(),
2967            "RowIds filter should not add to deleted fragments"
2968        );
2969    }
2970}