Skip to main content

lance_index/scalar/inverted/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use super::{InvertedIndexParams, index::*};
5use crate::scalar::inverted::document_tokenizer::DocType;
6use crate::scalar::inverted::json::JsonTextStream;
7use crate::scalar::inverted::tokenizer::document_tokenizer::LanceTokenizer;
8#[cfg(test)]
9use crate::scalar::lance_format::LanceIndexStore;
10use crate::scalar::{IndexStore, OldIndexDataFilter};
11use crate::vector::graph::OrderedFloat;
12use crate::{progress::IndexBuildProgress, progress::noop_progress};
13use arrow::array::AsArray;
14use arrow::datatypes;
15use arrow_array::{Array, BinaryArray, RecordBatch, UInt64Array};
16use arrow_schema::{DataType, Field, Schema, SchemaRef};
17use bitpacking::{BitPacker, BitPacker4x};
18use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream};
19use deepsize::DeepSizeOf;
20use fst::Streamer;
21use futures::{Stream, StreamExt, TryStreamExt};
22use lance_arrow::json::JSON_EXT_NAME;
23use lance_arrow::{ARROW_EXT_NAME_KEY, iter_str_array};
24use lance_core::cache::LanceCache;
25use lance_core::error::LanceOptionExt;
26use lance_core::utils::mask::RowSetOps;
27use lance_core::utils::tokio::{IO_CORE_RESERVATION, get_num_compute_intensive_cpus, spawn_cpu};
28use lance_core::{Error, ROW_ID, ROW_ID_FIELD, Result};
29use lance_io::object_store::ObjectStore;
30use object_store::path::Path;
31use roaring::RoaringBitmap;
32use smallvec::SmallVec;
33use std::collections::HashMap;
34use std::pin::Pin;
35use std::str::FromStr;
36use std::sync::Arc;
37use std::sync::LazyLock;
38use std::task::{Context, Poll};
39use std::{fmt::Debug, sync::atomic::AtomicU64};
40use tracing::instrument;
41
42// the number of elements in each block
43// each block contains 128 row ids and 128 frequencies
44// WARNING: changing this value will break the compatibility with existing indexes
45pub const BLOCK_SIZE: usize = BitPacker4x::BLOCK_LEN;
46
47// The default number of workers to use for FTS builds.
48// By default this is roughly `num_cpus / 2`, but it can be overridden
49// with `LANCE_FTS_NUM_SHARDS`.
50pub static LANCE_FTS_NUM_SHARDS: LazyLock<usize> = LazyLock::new(|| {
51    std::env::var("LANCE_FTS_NUM_SHARDS")
52        .unwrap_or_else(|_| default_num_workers().to_string())
53        .parse()
54        .expect("failed to parse LANCE_FTS_NUM_SHARDS")
55});
56// The default per-worker memory limit in MiB for FTS builds.
57pub static LANCE_FTS_PARTITION_SIZE: LazyLock<u64> = LazyLock::new(|| {
58    std::env::var("LANCE_FTS_PARTITION_SIZE")
59        .unwrap_or_else(|_| "2048".to_string())
60        .parse()
61        .expect("failed to parse LANCE_FTS_PARTITION_SIZE")
62});
63static LANCE_FTS_WRITE_QUEUE_SIZE: LazyLock<usize> = LazyLock::new(|| {
64    std::env::var("LANCE_FTS_WRITE_QUEUE_SIZE")
65        .unwrap_or_else(|_| "1".to_string())
66        .parse()
67        .expect("failed to parse LANCE_FTS_WRITE_QUEUE_SIZE")
68});
69static LANCE_FTS_POSTING_BATCH_ROWS: LazyLock<usize> = LazyLock::new(|| {
70    std::env::var("LANCE_FTS_POSTING_BATCH_ROWS")
71        .unwrap_or_else(|_| "256".to_string())
72        .parse()
73        .expect("failed to parse LANCE_FTS_POSTING_BATCH_ROWS")
74});
75const MAX_RETAINED_TOKEN_IDS: usize = 8 * 1024;
76
77fn default_num_workers() -> usize {
78    let total_cpus = get_num_compute_intensive_cpus() + *IO_CORE_RESERVATION;
79    std::cmp::max(1, total_cpus / 2)
80}
81
82fn resolve_num_workers(params: &InvertedIndexParams) -> usize {
83    let max_workers = get_num_compute_intensive_cpus().max(1);
84    params
85        .num_workers
86        .unwrap_or(*LANCE_FTS_NUM_SHARDS)
87        .clamp(1, max_workers)
88}
89
90fn resolve_worker_memory_limit_bytes(params: &InvertedIndexParams, num_workers: usize) -> u64 {
91    let default_worker_memory_limit_bytes = *LANCE_FTS_PARTITION_SIZE << 20;
92    params
93        .memory_limit_mb
94        .map(|memory_limit_mb| (memory_limit_mb << 20) / num_workers as u64)
95        .unwrap_or(default_worker_memory_limit_bytes)
96}
97
98fn merge_all_tail_partitions(tails: Vec<TailPartition>) -> Result<Option<InnerBuilder>> {
99    if tails.is_empty() {
100        return Ok(None);
101    }
102    merge_tail_partition_group(tails).map(Some)
103}
104
105fn merge_tail_partition_group(group: Vec<TailPartition>) -> Result<InnerBuilder> {
106    let mut group = group.into_iter();
107    let mut merged = group
108        .next()
109        .ok_or_else(|| {
110            Error::invalid_input("cannot merge an empty tail partition group".to_owned())
111        })?
112        .builder;
113    for tail in group {
114        merged.merge_from(tail.builder)?;
115    }
116    Ok(merged)
117}
118
119#[derive(Debug)]
120pub struct InvertedIndexBuilder {
121    params: InvertedIndexParams,
122    pub(crate) partitions: Vec<u64>,
123    new_partitions: Vec<u64>,
124    fragment_mask: Option<u64>,
125    token_set_format: TokenSetFormat,
126    format_version: InvertedListFormatVersion,
127    posting_tail_codec: PostingTailCodec,
128    src_store: Option<Arc<dyn IndexStore>>,
129    progress: Arc<dyn IndexBuildProgress>,
130    deleted_fragments: RoaringBitmap,
131}
132
133impl InvertedIndexBuilder {
134    pub fn new(params: InvertedIndexParams) -> Self {
135        Self::new_with_fragment_mask(params, None)
136    }
137
138    pub fn new_with_fragment_mask(params: InvertedIndexParams, fragment_mask: Option<u64>) -> Self {
139        Self::from_existing_index(
140            params,
141            None,
142            Vec::new(),
143            TokenSetFormat::default(),
144            fragment_mask,
145            RoaringBitmap::new(),
146        )
147    }
148
149    /// Creates an InvertedIndexBuilder from existing index with fragment filtering.
150    /// This method is used to create a builder from an existing index while applying
151    /// fragment-based filtering for distributed indexing scenarios.
152    /// fragment_mask Optional mask with fragment_id in high 32 bits for filtering.
153    /// Constructed as `(fragment_id as u64) << 32`.
154    /// When provided, ensures that generated IDs belong to the specified fragment.
155    pub fn from_existing_index(
156        params: InvertedIndexParams,
157        store: Option<Arc<dyn IndexStore>>,
158        partitions: Vec<u64>,
159        token_set_format: TokenSetFormat,
160        fragment_mask: Option<u64>,
161        deleted_fragments: RoaringBitmap,
162    ) -> Self {
163        Self {
164            params,
165            partitions,
166            new_partitions: Vec::new(),
167            src_store: store,
168            token_set_format,
169            fragment_mask,
170            format_version: current_fts_format_version(),
171            posting_tail_codec: current_fts_format_version().posting_tail_codec(),
172            progress: noop_progress(),
173            deleted_fragments,
174        }
175    }
176
177    pub fn with_posting_tail_codec(mut self, posting_tail_codec: PostingTailCodec) -> Self {
178        self.format_version =
179            InvertedListFormatVersion::from_posting_tail_codec(posting_tail_codec);
180        self.posting_tail_codec = posting_tail_codec;
181        self
182    }
183
184    pub fn with_format_version(mut self, format_version: InvertedListFormatVersion) -> Self {
185        self.format_version = format_version;
186        self.posting_tail_codec = format_version.posting_tail_codec();
187        self
188    }
189
190    pub fn with_token_set_format(mut self, token_set_format: TokenSetFormat) -> Self {
191        self.token_set_format = token_set_format;
192        self
193    }
194
195    pub fn with_progress(mut self, progress: Arc<dyn IndexBuildProgress>) -> Self {
196        self.progress = progress;
197        self
198    }
199
200    pub async fn update(
201        &mut self,
202        new_data: SendableRecordBatchStream,
203        dest_store: &dyn IndexStore,
204        old_data_filter: Option<crate::scalar::OldIndexDataFilter>,
205    ) -> Result<()> {
206        let schema = new_data.schema();
207        let doc_col = schema.field(0).name();
208
209        // infer lance_tokenizer based on document type
210        if self.params.lance_tokenizer.is_none() {
211            let schema = new_data.schema();
212            let field = schema.column_with_name(doc_col).expect_ok()?.1;
213            let doc_type = DocType::try_from(field)?;
214            self.params.lance_tokenizer = Some(doc_type.as_ref().to_string());
215        }
216
217        let new_data = document_input(new_data, doc_col)?;
218
219        self.progress
220            .stage_start("tokenize_docs", None, "rows")
221            .await?;
222        self.update_index(new_data, dest_store).await?;
223
224        if let Some(OldIndexDataFilter::Fragments { to_remove, .. }) = old_data_filter {
225            self.deleted_fragments.extend(to_remove);
226        }
227
228        self.progress.stage_complete("tokenize_docs").await?;
229        self.write(dest_store).await?;
230        Ok(())
231    }
232
233    pub async fn update_from_segments(
234        &mut self,
235        new_data: SendableRecordBatchStream,
236        dest_store: &dyn IndexStore,
237        old_segments: &[Arc<InvertedIndex>],
238        old_data_filter: Option<crate::scalar::OldIndexDataFilter>,
239    ) -> Result<()> {
240        let schema = new_data.schema();
241        let doc_col = schema.field(0).name();
242
243        if self.params.lance_tokenizer.is_none() {
244            let field = schema.column_with_name(doc_col).expect_ok()?.1;
245            let doc_type = DocType::try_from(field)?;
246            self.params.lance_tokenizer = Some(doc_type.as_ref().to_string());
247        }
248
249        self.merge_existing_segments(dest_store, old_segments, old_data_filter.as_ref())
250            .await?;
251
252        let new_data = document_input(new_data, doc_col)?;
253
254        self.progress
255            .stage_start("tokenize_docs", None, "rows")
256            .await?;
257        self.update_index(new_data, dest_store).await?;
258        self.progress.stage_complete("tokenize_docs").await?;
259
260        self.write(dest_store).await?;
261        Ok(())
262    }
263
264    async fn merge_existing_segments(
265        &mut self,
266        dest_store: &dyn IndexStore,
267        old_segments: &[Arc<InvertedIndex>],
268        old_data_filter: Option<&crate::scalar::OldIndexDataFilter>,
269    ) -> Result<()> {
270        let num_workers = resolve_num_workers(&self.params);
271        let memory_limit_bytes = resolve_worker_memory_limit_bytes(&self.params, num_workers);
272        let mut merged: Option<InnerBuilder> = None;
273        for index in old_segments {
274            if old_data_filter.is_none() {
275                self.deleted_fragments
276                    .extend(index.deleted_fragments().iter());
277            }
278            for partition in &index.partitions {
279                let mut partition_builder = partition.as_ref().clone().into_builder().await?;
280                if let Some(filter) = old_data_filter {
281                    partition_builder.filter_old_data(filter).await?;
282                }
283                if partition_builder.is_empty() {
284                    continue;
285                }
286                match &mut merged {
287                    Some(merged) => {
288                        let would_exceed_memory = merged
289                            .memory_size()
290                            .saturating_add(partition_builder.memory_size())
291                            >= memory_limit_bytes;
292                        let would_exceed_doc_ids = merged
293                            .docs
294                            .len()
295                            .saturating_add(partition_builder.docs.len())
296                            > u32::MAX as usize;
297                        if would_exceed_memory || would_exceed_doc_ids {
298                            let builder = std::mem::replace(merged, partition_builder);
299                            self.write_new_partition(dest_store, builder).await?;
300                        } else {
301                            merged.merge_from(partition_builder)?;
302                        }
303                    }
304                    None => merged = Some(partition_builder),
305                }
306            }
307        }
308
309        if let Some(builder) = merged {
310            self.write_new_partition(dest_store, builder).await?;
311        }
312        Ok(())
313    }
314
315    async fn write_new_partition(
316        &mut self,
317        dest_store: &dyn IndexStore,
318        mut builder: InnerBuilder,
319    ) -> Result<()> {
320        let partition_id = self.next_partition_id() | self.fragment_mask.unwrap_or(0);
321        builder.set_id(partition_id);
322        builder.write(dest_store).await?;
323        self.new_partitions.push(partition_id);
324        Ok(())
325    }
326
327    fn next_partition_id(&self) -> u64 {
328        self.partitions
329            .iter()
330            .chain(self.new_partitions.iter())
331            .map(|id| id + 1)
332            .max()
333            .unwrap_or(0)
334    }
335
336    #[instrument(level = "debug", skip_all)]
337    async fn update_index(
338        &mut self,
339        stream: SendableRecordBatchStream,
340        dest_store: &dyn IndexStore,
341    ) -> Result<()> {
342        let num_workers = resolve_num_workers(&self.params);
343        let tokenizer = self.params.build()?;
344        let with_position = self.params.with_position;
345        let worker_memory_limit_bytes =
346            resolve_worker_memory_limit_bytes(&self.params, num_workers);
347        let worker_config = IndexWorkerConfig {
348            with_position,
349            format_version: self.format_version,
350            fragment_mask: self.fragment_mask,
351            token_set_format: self.token_set_format,
352            worker_memory_limit_bytes,
353        };
354        let next_id = self.next_partition_id();
355        let id_alloc = Arc::new(AtomicU64::new(next_id));
356        let tokenized_count = Arc::new(AtomicU64::new(0));
357        let (sender, receiver) = async_channel::bounded(num_workers);
358        let dest_store = dest_store.clone_arc();
359        let mut index_tasks = Vec::with_capacity(num_workers);
360        for _ in 0..num_workers {
361            let tokenizer = tokenizer.clone();
362            let receiver: async_channel::Receiver<RecordBatch> = receiver.clone();
363            let dest_store = dest_store.clone();
364            let id_alloc = id_alloc.clone();
365            let progress = self.progress.clone();
366            let tokenized_count = tokenized_count.clone();
367            index_tasks.push(tokio::task::spawn(async move {
368                let mut worker =
369                    IndexWorker::new(tokenizer, dest_store, id_alloc, worker_config).await?;
370                while let Ok(batch) = receiver.recv().await {
371                    let num_rows = batch.num_rows();
372                    worker.process_batch(batch).await?;
373                    let tokenized_count = tokenized_count
374                        .fetch_add(num_rows as u64, std::sync::atomic::Ordering::Relaxed)
375                        + num_rows as u64;
376                    progress
377                        .stage_progress("tokenize_docs", tokenized_count)
378                        .await?;
379                }
380                worker.finish().await
381            }));
382        }
383
384        let index_build = async {
385            // Keep the channel lifetime tied to the worker tasks so senders observe
386            // worker exits instead of blocking on an orphaned receiver handle.
387            drop(receiver);
388
389            let mut stream = Box::pin(stream);
390            log::info!("indexing FTS with {} workers", num_workers);
391
392            let mut last_num_rows = 0;
393            let mut total_num_rows = 0;
394            let start = std::time::Instant::now();
395            while let Some(batch) = stream.try_next().await? {
396                let num_rows = batch.num_rows();
397
398                if sender.send(batch).await.is_err() {
399                    // this only happens if all workers have exited,
400                    // so we don't return the send error here,
401                    // avoiding hiding the real error from workers.
402                    break;
403                }
404
405                total_num_rows += num_rows;
406                if total_num_rows >= last_num_rows + 1_000_000 {
407                    log::debug!(
408                        "indexed {} documents, elapsed: {:?}, speed: {}rows/s",
409                        total_num_rows,
410                        start.elapsed(),
411                        total_num_rows as f32 / start.elapsed().as_secs_f32()
412                    );
413                    last_num_rows = total_num_rows;
414                }
415            }
416            // drop the sender to stop receivers
417            drop(stream);
418            drop(sender);
419            log::info!("dispatching elapsed: {:?}", start.elapsed());
420
421            // wait for the workers to finish
422            let start = std::time::Instant::now();
423            let mut tail_partitions = Vec::new();
424            for index_task in index_tasks {
425                let output = index_task.await??;
426                self.new_partitions.extend(output.partitions);
427                if let Some(tail_partition) = output.tail_partition {
428                    tail_partitions.push(tail_partition);
429                }
430            }
431            let merged_tail_partitions =
432                spawn_cpu(move || merge_all_tail_partitions(tail_partitions)).await?;
433            if let Some(builder) = merged_tail_partitions {
434                self.new_partitions.push(builder.id());
435                let mut builder = builder;
436                builder.write(dest_store.as_ref()).await?;
437            }
438            log::info!("wait workers indexing elapsed: {:?}", start.elapsed());
439            Result::Ok(())
440        };
441
442        index_build.await
443    }
444
445    pub async fn remap(
446        &mut self,
447        mapping: &HashMap<u64, Option<u64>>,
448        src_store: Arc<dyn IndexStore>,
449        dest_store: &dyn IndexStore,
450    ) -> Result<()> {
451        for part in self.partitions.iter() {
452            let part = InvertedPartition::load(
453                src_store.clone(),
454                *part,
455                None,
456                &LanceCache::no_cache(),
457                self.token_set_format,
458            )
459            .await?;
460            let mut builder = part.into_builder().await?;
461            builder.remap(mapping).await?;
462            builder.write(dest_store).await?;
463        }
464        if self.fragment_mask.is_none() {
465            self.write_metadata(dest_store, &self.partitions).await?;
466        } else {
467            // in distributed mode, the part_temp_metadata is written by the worker
468            for &partition_id in &self.partitions {
469                self.write_part_metadata(dest_store, partition_id).await?;
470            }
471        }
472        Ok(())
473    }
474
475    async fn write_metadata(&self, dest_store: &dyn IndexStore, partitions: &[u64]) -> Result<()> {
476        let mut serialized_deleted_fragments =
477            Vec::with_capacity(self.deleted_fragments.serialized_size());
478        self.deleted_fragments
479            .serialize_into(&mut serialized_deleted_fragments)?;
480
481        let mut metadata = HashMap::from_iter(vec![
482            ("partitions".to_owned(), serde_json::to_string(&partitions)?),
483            ("params".to_owned(), serde_json::to_string(&self.params)?),
484            (
485                TOKEN_SET_FORMAT_KEY.to_owned(),
486                self.token_set_format.to_string(),
487            ),
488            (
489                POSTING_TAIL_CODEC_KEY.to_owned(),
490                self.posting_tail_codec.as_str().to_owned(),
491            ),
492        ]);
493
494        if self.params.with_position && self.format_version.uses_shared_position_stream() {
495            metadata.insert(
496                POSITIONS_LAYOUT_KEY.to_owned(),
497                POSITIONS_LAYOUT_SHARED_STREAM_V2.to_owned(),
498            );
499            metadata.insert(
500                POSITIONS_CODEC_KEY.to_owned(),
501                self.format_version
502                    .position_codec()
503                    .expect("shared positions require a codec")
504                    .as_str()
505                    .to_owned(),
506            );
507        }
508
509        let metadata_file_schema = Arc::new(Schema::new(vec![Field::new(
510            DELETED_FRAGMENTS_COL,
511            DataType::Binary,
512            false,
513        )]));
514        let deleted_fragments_col = Arc::new(BinaryArray::from(vec![
515            serialized_deleted_fragments.as_slice(),
516        ])) as Arc<dyn Array>;
517        let record_batch =
518            RecordBatch::try_new(metadata_file_schema.clone(), vec![deleted_fragments_col])?;
519
520        let mut writer = dest_store
521            .new_index_file(METADATA_FILE, metadata_file_schema)
522            .await?;
523        writer.write_record_batch(record_batch).await?;
524        writer.finish_with_metadata(metadata).await?;
525        Ok(())
526    }
527
528    /// Write partition metadata file for a single partition
529    ///
530    /// In a distributed environment, each worker node can write partition metadata files for the partitions it processes,
531    /// which are then merged into a final metadata file using the `merge_metadata_files` function.
532    pub(crate) async fn write_part_metadata(
533        &self,
534        dest_store: &dyn IndexStore,
535        partition: u64, // Modify parameter type
536    ) -> Result<()> {
537        let partitions = vec![partition];
538        let mut metadata = HashMap::from_iter(vec![
539            ("partitions".to_owned(), serde_json::to_string(&partitions)?),
540            ("params".to_owned(), serde_json::to_string(&self.params)?),
541            (
542                TOKEN_SET_FORMAT_KEY.to_owned(),
543                self.token_set_format.to_string(),
544            ),
545            (
546                POSTING_TAIL_CODEC_KEY.to_owned(),
547                self.posting_tail_codec.as_str().to_owned(),
548            ),
549        ]);
550        if self.params.with_position && self.format_version.uses_shared_position_stream() {
551            metadata.insert(
552                POSITIONS_LAYOUT_KEY.to_owned(),
553                POSITIONS_LAYOUT_SHARED_STREAM_V2.to_owned(),
554            );
555            metadata.insert(
556                POSITIONS_CODEC_KEY.to_owned(),
557                self.format_version
558                    .position_codec()
559                    .expect("shared positions require a codec")
560                    .as_str()
561                    .to_owned(),
562            );
563        }
564        // Use partition ID to generate a unique temporary filename
565        let file_name = part_metadata_file_path(partition);
566        let mut writer = dest_store
567            .new_index_file(&file_name, Arc::new(Schema::empty()))
568            .await?;
569        writer.finish_with_metadata(metadata).await?;
570        Ok(())
571    }
572
573    async fn write_metadata_with_progress(
574        &self,
575        dest_store: &dyn IndexStore,
576        partitions: &[u64],
577    ) -> Result<()> {
578        let total = if self.fragment_mask.is_none() {
579            Some(1)
580        } else {
581            Some(partitions.len() as u64)
582        };
583        self.progress
584            .stage_start("write_metadata", total, "files")
585            .await?;
586        if self.fragment_mask.is_none() {
587            self.write_metadata(dest_store, partitions).await?;
588            self.progress.stage_progress("write_metadata", 1).await?;
589        } else {
590            let mut completed = 0;
591            for &partition_id in partitions {
592                self.write_part_metadata(dest_store, partition_id).await?;
593                completed += 1;
594                self.progress
595                    .stage_progress("write_metadata", completed)
596                    .await?;
597            }
598        }
599        self.progress.stage_complete("write_metadata").await?;
600        Ok(())
601    }
602
603    async fn write(&self, dest_store: &dyn IndexStore) -> Result<()> {
604        let mut partitions = Vec::with_capacity(self.partitions.len() + self.new_partitions.len());
605        partitions.extend_from_slice(&self.partitions);
606        partitions.extend_from_slice(&self.new_partitions);
607        partitions.sort_unstable();
608
609        self.progress
610            .stage_start(
611                "copy_partitions",
612                Some(partitions.len() as u64),
613                "partitions",
614            )
615            .await?;
616        let mut copied = 0;
617        for part in self.partitions.iter() {
618            self.src_store
619                .as_ref()
620                .expect("existing partitions require a source store")
621                .copy_index_file(&token_file_path(*part), dest_store)
622                .await?;
623            self.src_store
624                .as_ref()
625                .expect("existing partitions require a source store")
626                .copy_index_file(&posting_file_path(*part), dest_store)
627                .await?;
628            self.src_store
629                .as_ref()
630                .expect("existing partitions require a source store")
631                .copy_index_file(&doc_file_path(*part), dest_store)
632                .await?;
633            copied += 1;
634            self.progress
635                .stage_progress("copy_partitions", copied)
636                .await?;
637        }
638        for _part in self.new_partitions.iter() {
639            copied += 1;
640            self.progress
641                .stage_progress("copy_partitions", copied)
642                .await?;
643        }
644        self.progress.stage_complete("copy_partitions").await?;
645
646        self.write_metadata_with_progress(dest_store, &partitions)
647            .await?;
648        Ok(())
649    }
650}
651
652impl Default for InvertedIndexBuilder {
653    fn default() -> Self {
654        let params = InvertedIndexParams::default();
655        Self::new(params)
656    }
657}
658
659// builder for single partition
660#[derive(Debug)]
661pub struct InnerBuilder {
662    id: u64,
663    with_position: bool,
664    token_set_format: TokenSetFormat,
665    format_version: InvertedListFormatVersion,
666    posting_tail_codec: PostingTailCodec,
667    pub(crate) tokens: TokenSet,
668    pub(crate) posting_lists: Vec<PostingListBuilder>,
669    pub(crate) docs: DocSet,
670}
671
672impl InnerBuilder {
673    pub fn new(id: u64, with_position: bool, token_set_format: TokenSetFormat) -> Self {
674        Self::new_with_format_version(
675            id,
676            with_position,
677            token_set_format,
678            current_fts_format_version(),
679        )
680    }
681
682    pub fn new_with_format_version(
683        id: u64,
684        with_position: bool,
685        token_set_format: TokenSetFormat,
686        format_version: InvertedListFormatVersion,
687    ) -> Self {
688        Self {
689            id,
690            with_position,
691            token_set_format,
692            format_version,
693            posting_tail_codec: format_version.posting_tail_codec(),
694            tokens: TokenSet::default(),
695            posting_lists: Vec::new(),
696            docs: DocSet::default(),
697        }
698    }
699
700    pub fn new_with_posting_tail_codec(
701        id: u64,
702        with_position: bool,
703        token_set_format: TokenSetFormat,
704        posting_tail_codec: PostingTailCodec,
705    ) -> Self {
706        let format_version = if posting_tail_codec == PostingTailCodec::Fixed32 {
707            InvertedListFormatVersion::V1
708        } else {
709            InvertedListFormatVersion::V2
710        };
711        let mut builder =
712            Self::new_with_format_version(id, with_position, token_set_format, format_version);
713        builder.posting_tail_codec = posting_tail_codec;
714        builder
715    }
716
717    pub fn id(&self) -> u64 {
718        self.id
719    }
720
721    fn set_id(&mut self, id: u64) {
722        self.id = id;
723    }
724
725    pub fn is_empty(&self) -> bool {
726        self.docs.is_empty()
727    }
728
729    /// Set the token set for this builder.
730    pub fn set_tokens(&mut self, tokens: TokenSet) {
731        self.tokens = tokens;
732    }
733
734    /// Set the document set for this builder.
735    pub fn set_docs(&mut self, docs: DocSet) {
736        self.docs = docs;
737    }
738
739    /// Set the posting lists for this builder.
740    pub fn set_posting_lists(&mut self, posting_lists: Vec<PostingListBuilder>) {
741        self.posting_lists = posting_lists;
742    }
743
744    pub async fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
745        // for the docs, we need to remove the rows that are removed from the doc set,
746        // and update the row ids of the rows that are updated
747        let removed = self.docs.remap(mapping);
748
749        // for the posting lists, we need to remap the doc ids:
750        // - if the a row is removed, we need to shift the doc ids of the following rows
751        // - if a row is updated (assigned a new row id), we don't need to do anything with the posting lists
752        let mut token_id = 0;
753        let mut removed_token_ids = Vec::new();
754        self.posting_lists.retain_mut(|posting_list| {
755            posting_list.remap(&removed);
756            let keep = !posting_list.is_empty();
757            if !keep {
758                removed_token_ids.push(token_id as u32);
759            }
760            token_id += 1;
761            keep
762        });
763
764        // for the tokens, remap the token ids if any posting list is empty
765        self.tokens.remap(&removed_token_ids);
766
767        Ok(())
768    }
769
770    async fn filter_old_data(&mut self, filter: &OldIndexDataFilter) -> Result<()> {
771        let mut mapping = HashMap::new();
772        for (row_id, _) in self.docs.iter() {
773            let keep = match filter {
774                OldIndexDataFilter::Fragments { to_keep, .. } => {
775                    to_keep.contains((*row_id >> 32) as u32)
776                }
777                OldIndexDataFilter::RowIds(valid_row_ids) => valid_row_ids.contains(*row_id),
778            };
779            if !keep {
780                mapping.insert(*row_id, None);
781            }
782        }
783        self.remap(&mapping).await
784    }
785
786    pub fn merge_from(&mut self, other: Self) -> Result<()> {
787        let Self {
788            id: _,
789            with_position,
790            token_set_format,
791            format_version,
792            posting_tail_codec,
793            tokens,
794            posting_lists,
795            docs,
796        } = other;
797
798        if self.with_position != with_position {
799            return Err(Error::index(format!(
800                "cannot merge partitions with mismatched positions settings: {} vs {}",
801                self.with_position, with_position
802            )));
803        }
804        if self.token_set_format != token_set_format {
805            return Err(Error::index(format!(
806                "cannot merge partitions with mismatched token set formats: {:?} vs {:?}",
807                self.token_set_format, token_set_format
808            )));
809        }
810        if self.format_version != format_version {
811            return Err(Error::index(format!(
812                "cannot merge partitions with mismatched FTS format versions: {:?} vs {:?}",
813                self.format_version, format_version
814            )));
815        }
816        if self.posting_tail_codec != posting_tail_codec {
817            return Err(Error::index(format!(
818                "cannot merge partitions with mismatched posting tail codecs: {:?} vs {:?}",
819                self.posting_tail_codec, posting_tail_codec
820            )));
821        }
822
823        let mut token_id_map = vec![u32::MAX; posting_lists.len()];
824        match tokens.tokens {
825            TokenMap::HashMap(map) => {
826                for (token, token_id) in map {
827                    let new_token_id = self.tokens.get_or_add(token.as_str());
828                    token_id_map[token_id as usize] = new_token_id;
829                }
830            }
831            TokenMap::Fst(map) => {
832                let mut stream = map.stream();
833                while let Some((token, token_id)) = stream.next() {
834                    let new_token_id = self
835                        .tokens
836                        .get_or_add(String::from_utf8_lossy(token).as_ref());
837                    token_id_map[token_id as usize] = new_token_id;
838                }
839            }
840        }
841
842        let doc_id_offset = self.docs.len() as u32;
843        for (row_id, num_tokens) in docs.iter() {
844            self.docs.append(*row_id, *num_tokens);
845        }
846        self.posting_lists.resize_with(self.tokens.len(), || {
847            PostingListBuilder::new_with_posting_tail_codec(with_position, self.posting_tail_codec)
848        });
849
850        for (token_id, posting_list) in posting_lists.into_iter().enumerate() {
851            if posting_list.is_empty() {
852                continue;
853            }
854            let new_token_id = token_id_map[token_id];
855            debug_assert_ne!(new_token_id, u32::MAX);
856            let merged_posting = &mut self.posting_lists[new_token_id as usize];
857            posting_list.for_each_entry(|doc_id, freq, positions| {
858                let positions = match positions {
859                    Some(positions) => PositionRecorder::Position(positions.into()),
860                    None => PositionRecorder::Count(freq),
861                };
862                merged_posting.add(doc_id_offset + doc_id, positions);
863                Ok::<(), Error>(())
864            })?;
865        }
866
867        Ok(())
868    }
869
870    fn memory_size(&self) -> u64 {
871        let posting_lists_overhead =
872            self.posting_lists.capacity() * std::mem::size_of::<PostingListBuilder>();
873        let posting_lists_size: u64 = self
874            .posting_lists
875            .iter()
876            .map(|posting| posting.size())
877            .sum();
878        (self.tokens.memory_size() + self.docs.memory_size() + posting_lists_overhead) as u64
879            + posting_lists_size
880    }
881
882    pub async fn write(&mut self, store: &dyn IndexStore) -> Result<()> {
883        let docs = Arc::new(std::mem::take(&mut self.docs));
884        self.write_posting_lists(store, docs.clone()).await?;
885        self.write_tokens(store).await?;
886        self.write_docs(store, docs).await?;
887        Ok(())
888    }
889
890    #[instrument(level = "debug", skip_all)]
891    async fn write_posting_lists(
892        &mut self,
893        store: &dyn IndexStore,
894        docs: Arc<DocSet>,
895    ) -> Result<()> {
896        let id = self.id;
897        let mut writer = store
898            .new_index_file(
899                &posting_file_path(self.id),
900                inverted_list_schema_for_version(self.with_position, self.format_version),
901            )
902            .await?;
903        let posting_lists = std::mem::take(&mut self.posting_lists);
904
905        log::info!(
906            "writing {} posting lists of partition {}, with position {}",
907            posting_lists.len(),
908            id,
909            self.with_position
910        );
911        let with_position = self.with_position;
912        let format_version = self.format_version;
913        let schema = inverted_list_schema_for_version(self.with_position, self.format_version);
914        let docs_for_batches = docs.clone();
915        let schema_for_batches = schema.clone();
916        let batch_rows = *LANCE_FTS_POSTING_BATCH_ROWS;
917        let (tx, rx) = async_channel::bounded(*LANCE_FTS_WRITE_QUEUE_SIZE);
918        let producer = spawn_cpu(move || {
919            let mut batch_builder = PostingListBatchBuilder::new(
920                schema_for_batches.clone(),
921                with_position,
922                format_version,
923                batch_rows,
924            );
925            for posting_list in posting_lists {
926                posting_list.append_to_batch_with_docs(
927                    &docs_for_batches,
928                    &mut batch_builder,
929                    format_version,
930                )?;
931                if batch_builder.len() < batch_rows {
932                    continue;
933                }
934
935                let batch = batch_builder.finish()?;
936                if let Err(err) = tx.send_blocking(batch) {
937                    return Err(Error::execution(format!(
938                        "failed to send posting list batch to writer: {err}"
939                    )));
940                }
941            }
942
943            if !batch_builder.is_empty() {
944                let batch = batch_builder.finish()?;
945                if let Err(err) = tx.send_blocking(batch) {
946                    return Err(Error::execution(format!(
947                        "failed to send posting list batch to writer: {err}"
948                    )));
949                }
950            }
951
952            Result::Ok(())
953        });
954
955        while let Ok(batch) = rx.recv().await {
956            if let Err(err) = writer.write_record_batch(batch).await {
957                drop(rx);
958                // Wait for producer to stop; preserve the write error as the primary failure.
959                let _ = producer.await;
960                return Err(err);
961            }
962        }
963        drop(rx);
964        producer.await?;
965
966        writer.finish().await?;
967        Ok(())
968    }
969
970    #[instrument(level = "debug", skip_all)]
971    async fn write_tokens(&mut self, store: &dyn IndexStore) -> Result<()> {
972        log::info!("writing tokens of partition {}", self.id);
973        let tokens = std::mem::take(&mut self.tokens);
974        let batch = tokens.to_batch(self.token_set_format)?;
975        let mut writer = store
976            .new_index_file(&token_file_path(self.id), batch.schema())
977            .await?;
978        writer.write_record_batch(batch).await?;
979        writer.finish().await?;
980        Ok(())
981    }
982
983    #[instrument(level = "debug", skip_all)]
984    async fn write_docs(&mut self, store: &dyn IndexStore, docs: Arc<DocSet>) -> Result<()> {
985        log::info!("writing docs of partition {}", self.id);
986        let batch = docs.to_batch()?;
987        let mut writer = store
988            .new_index_file(&doc_file_path(self.id), batch.schema())
989            .await?;
990        writer.write_record_batch(batch).await?;
991        writer.finish().await?;
992        Ok(())
993    }
994}
995
996struct IndexWorker {
997    tokenizer: Box<dyn LanceTokenizer>,
998    dest_store: Arc<dyn IndexStore>,
999    id_alloc: Arc<AtomicU64>,
1000    builder: InnerBuilder,
1001    partitions: Vec<u64>,
1002    schema: SchemaRef,
1003    memory_size: u64,
1004    worker_memory_limit_bytes: u64,
1005    total_doc_length: usize,
1006    fragment_mask: Option<u64>,
1007    token_set_format: TokenSetFormat,
1008    token_ids: Vec<u32>,
1009    last_token_count: usize,
1010}
1011
1012struct TailPartition {
1013    builder: InnerBuilder,
1014}
1015
1016struct WorkerOutput {
1017    partitions: Vec<u64>,
1018    tail_partition: Option<TailPartition>,
1019}
1020
1021#[derive(Debug, Clone, Copy)]
1022struct IndexWorkerConfig {
1023    with_position: bool,
1024    format_version: InvertedListFormatVersion,
1025    fragment_mask: Option<u64>,
1026    token_set_format: TokenSetFormat,
1027    worker_memory_limit_bytes: u64,
1028}
1029
1030impl IndexWorker {
1031    fn posting_lists_overhead_size(&self) -> u64 {
1032        (self.builder.posting_lists.capacity() * std::mem::size_of::<PostingListBuilder>()) as u64
1033    }
1034
1035    fn adjust_tracked_value(tracked: &mut u64, old: u64, new: u64) {
1036        if new >= old {
1037            *tracked += new - old;
1038        } else {
1039            *tracked -= old - new;
1040        }
1041    }
1042
1043    fn adjust_tracked_memory_size(&mut self, old_memory_size: u64, new_memory_size: u64) {
1044        Self::adjust_tracked_value(&mut self.memory_size, old_memory_size, new_memory_size);
1045    }
1046
1047    fn apply_delta(total: &mut u64, delta: i64) {
1048        if delta >= 0 {
1049            *total += delta as u64;
1050        } else {
1051            *total -= (-delta) as u64;
1052        }
1053    }
1054
1055    fn temporary_memory_size(&self) -> u64 {
1056        (self.token_ids.capacity() * std::mem::size_of::<u32>()) as u64
1057    }
1058
1059    fn trim_temporary_buffers(&mut self) {
1060        if self.token_ids.capacity() > MAX_RETAINED_TOKEN_IDS {
1061            self.token_ids = Vec::with_capacity(self.last_token_count.min(MAX_RETAINED_TOKEN_IDS));
1062        }
1063    }
1064
1065    async fn new(
1066        tokenizer: Box<dyn LanceTokenizer>,
1067        dest_store: Arc<dyn IndexStore>,
1068        id_alloc: Arc<AtomicU64>,
1069        config: IndexWorkerConfig,
1070    ) -> Result<Self> {
1071        let schema = inverted_list_schema_for_version(config.with_position, config.format_version);
1072
1073        Ok(Self {
1074            tokenizer,
1075            dest_store,
1076            builder: InnerBuilder::new_with_format_version(
1077                id_alloc.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
1078                    | config.fragment_mask.unwrap_or(0),
1079                config.with_position,
1080                config.token_set_format,
1081                config.format_version,
1082            ),
1083            partitions: Vec::new(),
1084            id_alloc,
1085            schema,
1086            memory_size: 0,
1087            worker_memory_limit_bytes: config.worker_memory_limit_bytes,
1088            total_doc_length: 0,
1089            fragment_mask: config.fragment_mask,
1090            token_set_format: config.token_set_format,
1091            token_ids: Vec::new(),
1092            last_token_count: 0,
1093        })
1094    }
1095
1096    fn has_position(&self) -> bool {
1097        self.schema
1098            .column_with_name(COMPRESSED_POSITION_COL)
1099            .is_some()
1100            || self.schema.column_with_name(POSITION_COL).is_some()
1101    }
1102
1103    async fn process_batch(&mut self, batch: RecordBatch) -> Result<()> {
1104        let doc_col = batch.column(0);
1105        let doc_iter = iter_str_array(doc_col);
1106        let row_id_col = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
1107        let docs = doc_iter
1108            .zip(row_id_col.values().iter())
1109            .filter_map(|(doc, row_id)| doc.map(|doc| (doc, *row_id)));
1110
1111        let with_position = self.has_position();
1112        for (doc, row_id) in docs {
1113            let builder_was_empty = self.builder.docs.is_empty();
1114            let old_temporary_memory_size = self.temporary_memory_size();
1115            let old_token_memory_size = self.builder.tokens.memory_size() as u64;
1116            let doc_id = self.builder.docs.len() as u32;
1117            let mut token_num: u32 = 0;
1118            let mut posting_memory_delta = 0i64;
1119            if with_position {
1120                if self.token_ids.capacity() < self.last_token_count {
1121                    self.token_ids
1122                        .reserve(self.last_token_count - self.token_ids.capacity());
1123                }
1124                self.token_ids.clear();
1125                let builder = &mut self.builder;
1126                let token_ids = &mut self.token_ids;
1127                let memory_size = &mut self.memory_size;
1128                let posting_tail_codec = builder.posting_tail_codec;
1129
1130                let mut token_stream = self.tokenizer.token_stream_for_doc(doc);
1131                while token_stream.advance() {
1132                    let token = token_stream.token_mut();
1133                    let token_text = std::mem::take(&mut token.text);
1134                    let token_id = builder.tokens.add(token_text);
1135                    if token_id as usize == builder.posting_lists.len() {
1136                        let old_posting_lists_overhead_size = (builder.posting_lists.capacity()
1137                            * std::mem::size_of::<PostingListBuilder>())
1138                            as u64;
1139                        builder.posting_lists.push(
1140                            PostingListBuilder::new_with_posting_tail_codec(
1141                                true,
1142                                posting_tail_codec,
1143                            ),
1144                        );
1145                        let new_posting_lists_overhead_size = (builder.posting_lists.capacity()
1146                            * std::mem::size_of::<PostingListBuilder>())
1147                            as u64;
1148                        Self::adjust_tracked_value(
1149                            memory_size,
1150                            old_posting_lists_overhead_size,
1151                            new_posting_lists_overhead_size,
1152                        );
1153                    }
1154                    let posting_list = &mut builder.posting_lists[token_id as usize];
1155                    let old_posting_memory_size = posting_list.size();
1156                    if posting_list.add_occurrence(doc_id, token.position as u32)? {
1157                        token_ids.push(token_id);
1158                    }
1159                    let new_posting_memory_size = posting_list.size();
1160                    posting_memory_delta +=
1161                        new_posting_memory_size as i64 - old_posting_memory_size as i64;
1162                    token_num += 1;
1163                }
1164            } else {
1165                if self.token_ids.capacity() < self.last_token_count {
1166                    self.token_ids
1167                        .reserve(self.last_token_count - self.token_ids.capacity());
1168                }
1169                self.token_ids.clear();
1170
1171                let mut token_stream = self.tokenizer.token_stream_for_doc(doc);
1172                while token_stream.advance() {
1173                    let token = token_stream.token_mut();
1174                    let token_text = std::mem::take(&mut token.text);
1175                    let token_id = self.builder.tokens.add(token_text);
1176                    self.token_ids.push(token_id);
1177                    token_num += 1;
1178                }
1179            }
1180            self.adjust_tracked_memory_size(
1181                old_token_memory_size,
1182                self.builder.tokens.memory_size() as u64,
1183            );
1184
1185            if !with_position {
1186                let old_posting_lists_overhead_size = self.posting_lists_overhead_size();
1187                self.builder
1188                    .posting_lists
1189                    .resize_with(self.builder.tokens.len(), || {
1190                        PostingListBuilder::new_with_posting_tail_codec(
1191                            false,
1192                            self.builder.posting_tail_codec,
1193                        )
1194                    });
1195                let new_posting_lists_overhead_size = self.posting_lists_overhead_size();
1196                Self::adjust_tracked_value(
1197                    &mut self.memory_size,
1198                    old_posting_lists_overhead_size,
1199                    new_posting_lists_overhead_size,
1200                );
1201            }
1202
1203            let old_doc_memory_size = self.builder.docs.memory_size() as u64;
1204            let appended_doc_id = self.builder.docs.append(row_id, token_num);
1205            debug_assert_eq!(appended_doc_id, doc_id);
1206            self.adjust_tracked_memory_size(
1207                old_doc_memory_size,
1208                self.builder.docs.memory_size() as u64,
1209            );
1210            self.total_doc_length += doc.len();
1211
1212            if with_position {
1213                for &token_id in &self.token_ids {
1214                    let (old_posting_memory_size, new_posting_memory_size) = {
1215                        let posting_list = &mut self.builder.posting_lists[token_id as usize];
1216                        let old_posting_memory_size = posting_list.size();
1217                        posting_list.finish_open_doc(doc_id)?;
1218                        let new_posting_memory_size = posting_list.size();
1219                        (old_posting_memory_size, new_posting_memory_size)
1220                    };
1221                    posting_memory_delta +=
1222                        new_posting_memory_size as i64 - old_posting_memory_size as i64;
1223                }
1224                Self::apply_delta(&mut self.memory_size, posting_memory_delta);
1225            } else if token_num > 0 {
1226                self.token_ids.sort_unstable();
1227                let mut iter = self.token_ids.iter();
1228                let mut current = *iter.next().unwrap();
1229                let mut count = 1u32;
1230                for &token_id in iter {
1231                    if token_id == current {
1232                        count += 1;
1233                        continue;
1234                    }
1235
1236                    let (old_posting_memory_size, new_posting_memory_size) = {
1237                        let posting_list = &mut self.builder.posting_lists[current as usize];
1238                        let old_posting_memory_size = posting_list.size();
1239                        posting_list.add(doc_id, PositionRecorder::Count(count));
1240                        let new_posting_memory_size = posting_list.size();
1241                        (old_posting_memory_size, new_posting_memory_size)
1242                    };
1243                    posting_memory_delta +=
1244                        new_posting_memory_size as i64 - old_posting_memory_size as i64;
1245
1246                    current = token_id;
1247                    count = 1;
1248                }
1249                let (old_posting_memory_size, new_posting_memory_size) = {
1250                    let posting_list = &mut self.builder.posting_lists[current as usize];
1251                    let old_posting_memory_size = posting_list.size();
1252                    posting_list.add(doc_id, PositionRecorder::Count(count));
1253                    let new_posting_memory_size = posting_list.size();
1254                    (old_posting_memory_size, new_posting_memory_size)
1255                };
1256                posting_memory_delta +=
1257                    new_posting_memory_size as i64 - old_posting_memory_size as i64;
1258                Self::apply_delta(&mut self.memory_size, posting_memory_delta);
1259            }
1260            self.last_token_count = self.token_ids.len();
1261            self.trim_temporary_buffers();
1262            self.adjust_tracked_memory_size(
1263                old_temporary_memory_size,
1264                self.temporary_memory_size(),
1265            );
1266
1267            if self.builder.docs.len() == 1 && self.memory_size > self.worker_memory_limit_bytes {
1268                return Err(Error::invalid_input(format!(
1269                    "single document row_id={} exceeds worker memory limit: {} > {} bytes",
1270                    row_id, self.memory_size, self.worker_memory_limit_bytes
1271                )));
1272            }
1273
1274            if self.builder.docs.len() as u32 == u32::MAX
1275                || (!builder_was_empty && self.memory_size >= self.worker_memory_limit_bytes)
1276            {
1277                self.flush().await?;
1278            }
1279        }
1280
1281        Ok(())
1282    }
1283
1284    #[instrument(level = "debug", skip_all)]
1285    async fn flush(&mut self) -> Result<()> {
1286        if self.builder.tokens.is_empty() {
1287            return Ok(());
1288        }
1289
1290        log::info!(
1291            "flushing posting lists, memory size: {} MiB",
1292            self.memory_size / (1024 * 1024)
1293        );
1294        self.memory_size = self.temporary_memory_size();
1295        let with_position = self.has_position();
1296        let format_version = self.builder.format_version;
1297        let builder = std::mem::replace(
1298            &mut self.builder,
1299            InnerBuilder::new_with_format_version(
1300                self.id_alloc
1301                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
1302                    | self.fragment_mask.unwrap_or(0),
1303                with_position,
1304                self.token_set_format,
1305                format_version,
1306            ),
1307        );
1308        let written_partition_id = builder.id();
1309        let mut builder = builder;
1310        builder
1311            .write(self.dest_store.as_ref())
1312            .await
1313            .map_err(|err| {
1314                Error::execution(format!(
1315                    "failed to write finalized partition {}: {err}",
1316                    written_partition_id
1317                ))
1318            })?;
1319        self.partitions.push(written_partition_id);
1320        Ok(())
1321    }
1322
1323    async fn finish(self) -> Result<WorkerOutput> {
1324        let tail_partition = if self.builder.tokens.is_empty() {
1325            None
1326        } else {
1327            Some(TailPartition {
1328                builder: self.builder,
1329            })
1330        };
1331        Ok(WorkerOutput {
1332            partitions: self.partitions,
1333            tail_partition,
1334        })
1335    }
1336}
1337
1338#[derive(Debug, Clone)]
1339pub enum PositionRecorder {
1340    Position(SmallVec<[u32; 2]>),
1341    Count(u32),
1342}
1343
1344impl PositionRecorder {
1345    pub fn len(&self) -> u32 {
1346        match self {
1347            Self::Position(positions) => positions.len() as u32,
1348            Self::Count(count) => *count,
1349        }
1350    }
1351
1352    pub fn is_empty(&self) -> bool {
1353        self.len() == 0
1354    }
1355
1356    pub fn into_vec(self) -> Vec<u32> {
1357        match self {
1358            Self::Position(positions) => positions.into_vec(),
1359            Self::Count(_) => vec![0],
1360        }
1361    }
1362}
1363
1364#[derive(Debug, Eq, PartialEq, Clone, DeepSizeOf)]
1365pub struct ScoredDoc {
1366    pub row_id: u64,
1367    pub score: OrderedFloat,
1368}
1369
1370impl ScoredDoc {
1371    pub fn new(row_id: u64, score: f32) -> Self {
1372        Self {
1373            row_id,
1374            score: OrderedFloat(score),
1375        }
1376    }
1377}
1378
1379impl PartialOrd for ScoredDoc {
1380    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1381        Some(self.cmp(other))
1382    }
1383}
1384
1385impl Ord for ScoredDoc {
1386    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1387        self.score.cmp(&other.score)
1388    }
1389}
1390
1391pub fn legacy_inverted_list_schema(with_position: bool) -> SchemaRef {
1392    let mut fields = vec![
1393        arrow_schema::Field::new(ROW_ID, arrow_schema::DataType::UInt64, false),
1394        arrow_schema::Field::new(FREQUENCY_COL, arrow_schema::DataType::Float32, false),
1395    ];
1396    if with_position {
1397        fields.push(arrow_schema::Field::new(
1398            POSITION_COL,
1399            arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1400                "item",
1401                arrow_schema::DataType::Int32,
1402                true,
1403            ))),
1404            false,
1405        ));
1406    }
1407    Arc::new(arrow_schema::Schema::new(fields))
1408}
1409
1410pub fn inverted_list_schema(with_position: bool) -> SchemaRef {
1411    inverted_list_schema_for_version(with_position, current_fts_format_version())
1412}
1413
1414pub fn inverted_list_schema_for_version(
1415    with_position: bool,
1416    format_version: InvertedListFormatVersion,
1417) -> SchemaRef {
1418    match format_version {
1419        InvertedListFormatVersion::V1 => inverted_list_schema_v1(with_position),
1420        InvertedListFormatVersion::V2 => inverted_list_schema_with_tail_codec_and_position_codec(
1421            with_position,
1422            PostingTailCodec::VarintDelta,
1423            Some(PositionStreamCodec::PackedDelta),
1424        ),
1425    }
1426}
1427
1428fn inverted_list_schema_v1(with_position: bool) -> SchemaRef {
1429    let mut fields = vec![
1430        arrow_schema::Field::new(
1431            POSTING_COL,
1432            datatypes::DataType::List(Arc::new(Field::new(
1433                "item",
1434                datatypes::DataType::LargeBinary,
1435                true,
1436            ))),
1437            false,
1438        ),
1439        arrow_schema::Field::new(MAX_SCORE_COL, datatypes::DataType::Float32, false),
1440        arrow_schema::Field::new(LENGTH_COL, datatypes::DataType::UInt32, false),
1441    ];
1442    if with_position {
1443        fields.push(arrow_schema::Field::new(
1444            POSITION_COL,
1445            arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1446                "item",
1447                arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1448                    "item",
1449                    arrow_schema::DataType::LargeBinary,
1450                    true,
1451                ))),
1452                true,
1453            ))),
1454            false,
1455        ));
1456    }
1457    Arc::new(arrow_schema::Schema::new(fields))
1458}
1459
1460pub fn inverted_list_schema_with_tail_codec(
1461    with_position: bool,
1462    posting_tail_codec: PostingTailCodec,
1463) -> SchemaRef {
1464    inverted_list_schema_with_tail_codec_and_position_codec(
1465        with_position,
1466        posting_tail_codec,
1467        Some(PositionStreamCodec::PackedDelta),
1468    )
1469}
1470
1471fn inverted_list_schema_with_tail_codec_and_position_codec(
1472    with_position: bool,
1473    posting_tail_codec: PostingTailCodec,
1474    position_codec: Option<PositionStreamCodec>,
1475) -> SchemaRef {
1476    let mut fields = vec![
1477        // we compress the posting lists (including row ids and frequencies),
1478        // and store the compressed posting lists, so it's a large binary array
1479        arrow_schema::Field::new(
1480            POSTING_COL,
1481            datatypes::DataType::List(Arc::new(Field::new(
1482                "item",
1483                datatypes::DataType::LargeBinary,
1484                true,
1485            ))),
1486            false,
1487        ),
1488        arrow_schema::Field::new(MAX_SCORE_COL, datatypes::DataType::Float32, false),
1489        arrow_schema::Field::new(LENGTH_COL, datatypes::DataType::UInt32, false),
1490    ];
1491    if with_position {
1492        fields.push(arrow_schema::Field::new(
1493            COMPRESSED_POSITION_COL,
1494            arrow_schema::DataType::LargeBinary,
1495            false,
1496        ));
1497        fields.push(arrow_schema::Field::new(
1498            POSITION_BLOCK_OFFSET_COL,
1499            arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1500                "item",
1501                arrow_schema::DataType::UInt32,
1502                true,
1503            ))),
1504            false,
1505        ));
1506    }
1507    let mut metadata = HashMap::from([(
1508        POSTING_TAIL_CODEC_KEY.to_owned(),
1509        posting_tail_codec.as_str().to_owned(),
1510    )]);
1511    if let Some(position_codec) = position_codec.filter(|_| with_position) {
1512        metadata.insert(
1513            POSITIONS_LAYOUT_KEY.to_owned(),
1514            POSITIONS_LAYOUT_SHARED_STREAM_V2.to_owned(),
1515        );
1516        metadata.insert(
1517            POSITIONS_CODEC_KEY.to_owned(),
1518            position_codec.as_str().to_owned(),
1519        );
1520    }
1521    Arc::new(arrow_schema::Schema::new_with_metadata(fields, metadata))
1522}
1523
1524/// Flatten the string list stream into a string stream
1525pub struct FlattenStream {
1526    /// Inner record batch stream with 2 columns:
1527    /// 1. doc_col: List(Utf8) or List(LargeUtf8)
1528    /// 2. row_id_col: UInt64
1529    inner: SendableRecordBatchStream,
1530    field_type: DataType,
1531    data_type: DataType,
1532}
1533
1534impl FlattenStream {
1535    pub fn new(input: SendableRecordBatchStream) -> Self {
1536        let schema = input.schema();
1537        let field = schema.field(0);
1538        let data_type = match field.data_type() {
1539            DataType::List(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
1540            DataType::List(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
1541                DataType::LargeUtf8
1542            }
1543            DataType::LargeList(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
1544            DataType::LargeList(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
1545                DataType::LargeUtf8
1546            }
1547            _ => panic!(
1548                "expect data type List(Utf8) or List(LargeUtf8) but got {:?}",
1549                field.data_type()
1550            ),
1551        };
1552        Self {
1553            inner: input,
1554            field_type: field.data_type().clone(),
1555            data_type,
1556        }
1557    }
1558}
1559
1560impl Stream for FlattenStream {
1561    type Item = datafusion_common::Result<RecordBatch>;
1562
1563    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1564        match Pin::new(&mut self.inner).poll_next(cx) {
1565            Poll::Ready(Some(Ok(batch))) => {
1566                let doc_col = batch.column(0);
1567                let batch = match self.field_type {
1568                    DataType::List(_) => flatten_string_list::<i32>(&batch, doc_col).map_err(|e| {
1569                        datafusion_common::error::DataFusionError::Execution(format!(
1570                            "flatten string list error: {}",
1571                            e
1572                        ))
1573                    }),
1574                    DataType::LargeList(_) => {
1575                        flatten_string_list::<i64>(&batch, doc_col).map_err(|e| {
1576                            datafusion_common::error::DataFusionError::Execution(format!(
1577                                "flatten string list error: {}",
1578                                e
1579                            ))
1580                        })
1581                    }
1582                    _ => unreachable!(
1583                        "expect data type List or LargeList but got {:?}",
1584                        self.field_type
1585                    ),
1586                };
1587                Poll::Ready(Some(batch))
1588            }
1589            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
1590            Poll::Ready(None) => Poll::Ready(None),
1591            Poll::Pending => Poll::Pending,
1592        }
1593    }
1594}
1595
1596impl RecordBatchStream for FlattenStream {
1597    fn schema(&self) -> SchemaRef {
1598        let schema = Schema::new(vec![
1599            Field::new(
1600                self.inner.schema().field(0).name(),
1601                self.data_type.clone(),
1602                true,
1603            ),
1604            ROW_ID_FIELD.clone(),
1605        ]);
1606
1607        Arc::new(schema)
1608    }
1609}
1610
1611fn flatten_string_list<Offset: arrow::array::OffsetSizeTrait>(
1612    batch: &RecordBatch,
1613    doc_col: &Arc<dyn Array>,
1614) -> Result<RecordBatch> {
1615    let docs = doc_col.as_list::<Offset>();
1616    let row_ids = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
1617
1618    let row_ids = row_ids
1619        .values()
1620        .iter()
1621        .zip(docs.iter())
1622        .flat_map(|(row_id, doc)| std::iter::repeat_n(*row_id, doc.map(|d| d.len()).unwrap_or(0)));
1623
1624    let row_ids = Arc::new(UInt64Array::from_iter_values(row_ids));
1625    let docs = match docs.value_type() {
1626        datatypes::DataType::Utf8 | datatypes::DataType::LargeUtf8 => docs.values().clone(),
1627        _ => {
1628            return Err(Error::index(format!(
1629                "expect data type String or LargeString but got {}",
1630                docs.value_type()
1631            )));
1632        }
1633    };
1634
1635    let schema = Schema::new(vec![
1636        Field::new(
1637            batch.schema().field(0).name(),
1638            docs.data_type().clone(),
1639            true,
1640        ),
1641        ROW_ID_FIELD.clone(),
1642    ]);
1643    let batch = RecordBatch::try_new(Arc::new(schema), vec![docs, row_ids])?;
1644    Ok(batch)
1645}
1646
1647pub(crate) fn token_file_path(partition_id: u64) -> String {
1648    format!("part_{}_{}", partition_id, TOKENS_FILE)
1649}
1650
1651pub(crate) fn posting_file_path(partition_id: u64) -> String {
1652    format!("part_{}_{}", partition_id, INVERT_LIST_FILE)
1653}
1654
1655pub(crate) fn doc_file_path(partition_id: u64) -> String {
1656    format!("part_{}_{}", partition_id, DOCS_FILE)
1657}
1658
1659pub(crate) fn part_metadata_file_path(partition_id: u64) -> String {
1660    format!("part_{}_{}", partition_id, METADATA_FILE)
1661}
1662
1663const PARTITION_FILE_SUFFIXES: [&str; 3] = [TOKENS_FILE, INVERT_LIST_FILE, DOCS_FILE];
1664// Each remapped file is renamed twice: first to a temp path (phase 1), then to
1665// its final path (phase 2). Keep in sync with the two rename loops below in
1666// `merge_metadata_files`.
1667const PARTITION_FILE_RENAME_PHASES: u64 = 2;
1668
1669pub async fn merge_index_files(
1670    object_store: &ObjectStore,
1671    index_dir: &Path,
1672    store: Arc<dyn IndexStore>,
1673    progress: Arc<dyn IndexBuildProgress>,
1674) -> Result<()> {
1675    // List all partition metadata files in the index directory
1676    let part_metadata_files = list_metadata_files(object_store, index_dir).await?;
1677
1678    // Call merge_metadata_files function for inverted index
1679    merge_metadata_files(store, &part_metadata_files, progress).await
1680}
1681
1682/// List and filter metadata files from the index directory
1683/// Returns partition metadata files
1684async fn list_metadata_files(object_store: &ObjectStore, index_dir: &Path) -> Result<Vec<String>> {
1685    // List all partition metadata files in the index directory
1686    let mut part_metadata_files = Vec::new();
1687    let mut list_stream = object_store.list(Some(index_dir.clone()));
1688
1689    while let Some(item) = list_stream.next().await {
1690        match item {
1691            Ok(meta) => {
1692                let file_name = meta.location.filename().unwrap_or_default();
1693                // Filter files matching the pattern part_*_metadata.lance
1694                if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
1695                    part_metadata_files.push(file_name.to_string());
1696                }
1697            }
1698            Err(_) => continue,
1699        }
1700    }
1701
1702    if part_metadata_files.is_empty() {
1703        return Err(Error::invalid_input_source(
1704            format!(
1705                "No partition metadata files found in index directory: {}",
1706                index_dir
1707            )
1708            .into(),
1709        ));
1710    }
1711
1712    Ok(part_metadata_files)
1713}
1714
1715/// Merge partition metadata files with partition ID remapping to sequential IDs starting from 0
1716async fn merge_metadata_files(
1717    store: Arc<dyn IndexStore>,
1718    part_metadata_files: &[String],
1719    progress: Arc<dyn IndexBuildProgress>,
1720) -> Result<()> {
1721    // Collect all partition IDs and params
1722    let mut all_partitions = Vec::new();
1723    let mut params = None;
1724    let mut token_set_format = None;
1725    let mut format_version = None;
1726    let mut posting_tail_codec = None;
1727    let mut deleted_fragments = RoaringBitmap::new();
1728    progress
1729        .stage_start(
1730            "read_partition_metadata",
1731            Some(part_metadata_files.len() as u64),
1732            "files",
1733        )
1734        .await?;
1735
1736    for (idx, file_name) in part_metadata_files.iter().enumerate() {
1737        let reader = store.open_index_file(file_name).await?;
1738        let metadata = &reader.schema().metadata;
1739
1740        let partitions_str = metadata.get("partitions").ok_or(Error::index(format!(
1741            "partitions not found in {}",
1742            file_name
1743        )))?;
1744
1745        let partition_ids: Vec<u64> = serde_json::from_str(partitions_str)
1746            .map_err(|e| Error::index(format!("Failed to parse partitions: {}", e)))?;
1747
1748        all_partitions.extend(partition_ids);
1749
1750        if params.is_none() {
1751            let params_str = metadata
1752                .get("params")
1753                .ok_or(Error::index(format!("params not found in {}", file_name)))?;
1754            params = Some(
1755                serde_json::from_str::<InvertedIndexParams>(params_str)
1756                    .map_err(|e| Error::index(format!("Failed to parse params: {}", e)))?,
1757            );
1758        }
1759
1760        if token_set_format.is_none()
1761            && let Some(name) = metadata.get(TOKEN_SET_FORMAT_KEY)
1762        {
1763            token_set_format = Some(TokenSetFormat::from_str(name)?);
1764        }
1765        if format_version.is_none() {
1766            format_version = Some(parse_format_version_from_metadata(metadata)?);
1767        }
1768        if posting_tail_codec.is_none() {
1769            posting_tail_codec = Some(parse_posting_tail_codec(metadata)?);
1770        }
1771
1772        if reader.num_rows() > 0 {
1773            let metadata_batch = reader.read_range(0..1, None).await?;
1774            let deleted_fragments_col = metadata_batch
1775                .column_by_name(DELETED_FRAGMENTS_COL)
1776                .expect_ok()?;
1777            let deleted_fragments_arr = deleted_fragments_col
1778                .as_any()
1779                .downcast_ref::<BinaryArray>()
1780                .expect_ok()?;
1781            let part_deleted_fragments =
1782                RoaringBitmap::deserialize_from(deleted_fragments_arr.value(0))?;
1783            deleted_fragments.extend(part_deleted_fragments);
1784        }
1785        progress
1786            .stage_progress("read_partition_metadata", idx as u64 + 1)
1787            .await?;
1788    }
1789    progress.stage_complete("read_partition_metadata").await?;
1790
1791    // Create ID mapping: sorted original IDs -> 0,1,2...
1792    let mut sorted_ids = all_partitions.clone();
1793    sorted_ids.sort();
1794    sorted_ids.dedup();
1795
1796    let id_mapping: HashMap<u64, u64> = sorted_ids
1797        .iter()
1798        .enumerate()
1799        .map(|(new_id, &old_id)| (old_id, new_id as u64))
1800        .collect();
1801
1802    // Safe rename partition files using temporary files to avoid overwrite
1803    let timestamp = std::time::SystemTime::now()
1804        .duration_since(std::time::UNIX_EPOCH)
1805        .unwrap()
1806        .as_secs();
1807
1808    let changed_partition_count = id_mapping
1809        .iter()
1810        .filter(|(old_id, new_id)| old_id != new_id)
1811        .count() as u64;
1812    let total_renames = changed_partition_count
1813        * PARTITION_FILE_SUFFIXES.len() as u64
1814        * PARTITION_FILE_RENAME_PHASES;
1815    progress
1816        .stage_start("remap_partition_files", Some(total_renames), "files")
1817        .await?;
1818
1819    // Phase 1: Move files to temporary locations
1820    let mut temp_files: Vec<(String, String, String)> = Vec::new(); // (temp_path, old_path, final_path)
1821    let mut renamed_files = 0u64;
1822
1823    for (&old_id, &new_id) in &id_mapping {
1824        if old_id != new_id {
1825            for suffix in PARTITION_FILE_SUFFIXES {
1826                let old_path = format!("part_{}_{}", old_id, suffix);
1827                let new_path = format!("part_{}_{}", new_id, suffix);
1828                let temp_path = format!("temp_{}_{}", timestamp, old_path);
1829
1830                // Move to temporary location first to avoid overwrite
1831                if let Err(e) = store.rename_index_file(&old_path, &temp_path).await {
1832                    // Rollback phase 1: restore files from temp locations
1833                    for (temp_name, old_name, _) in temp_files.iter().rev() {
1834                        let _ = store.rename_index_file(temp_name, old_name).await;
1835                    }
1836                    return Err(Error::index(format!(
1837                        "Failed to move {} to temp {}: {}",
1838                        old_path, temp_path, e
1839                    )));
1840                }
1841                temp_files.push((temp_path, old_path, new_path));
1842                renamed_files += 1;
1843                progress
1844                    .stage_progress("remap_partition_files", renamed_files)
1845                    .await?;
1846            }
1847        }
1848    }
1849
1850    // Phase 2: Move from temporary to final locations
1851    let mut completed_renames: Vec<(String, String)> = Vec::new(); // (final_path, temp_path)
1852
1853    for (temp_path, _old_path, final_path) in &temp_files {
1854        if let Err(e) = store.rename_index_file(temp_path, final_path).await {
1855            // Rollback phase 2: restore completed renames and remaining temps
1856            for (final_name, temp_name) in completed_renames.iter().rev() {
1857                let _ = store.rename_index_file(final_name, temp_name).await;
1858            }
1859            // Restore remaining temp files to original locations
1860            for (temp_name, orig_name, _) in temp_files.iter() {
1861                if !completed_renames.iter().any(|(_, t)| t == temp_name) {
1862                    let _ = store.rename_index_file(temp_name, orig_name).await;
1863                }
1864            }
1865            return Err(Error::index(format!(
1866                "Failed to rename {} to {}: {}",
1867                temp_path, final_path, e
1868            )));
1869        }
1870        completed_renames.push((final_path.clone(), temp_path.clone()));
1871        renamed_files += 1;
1872        progress
1873            .stage_progress("remap_partition_files", renamed_files)
1874            .await?;
1875    }
1876    progress.stage_complete("remap_partition_files").await?;
1877
1878    // Write merged metadata with remapped IDs
1879    let remapped_partitions: Vec<u64> = (0..id_mapping.len() as u64).collect();
1880    let params = params.unwrap_or_default();
1881    let token_set_format = token_set_format.unwrap_or(TokenSetFormat::Arrow);
1882    let builder = InvertedIndexBuilder::from_existing_index(
1883        params,
1884        None,
1885        remapped_partitions.clone(),
1886        token_set_format,
1887        None,
1888        deleted_fragments,
1889    )
1890    .with_format_version(format_version.unwrap_or(InvertedListFormatVersion::V1))
1891    .with_posting_tail_codec(posting_tail_codec.unwrap_or(PostingTailCodec::Fixed32));
1892    progress
1893        .stage_start("write_merged_metadata", Some(1), "files")
1894        .await?;
1895    builder
1896        .write_metadata(&*store, &remapped_partitions)
1897        .await?;
1898    progress.stage_progress("write_merged_metadata", 1).await?;
1899    progress.stage_complete("write_merged_metadata").await?;
1900
1901    // Cleanup partition metadata files
1902    for file_name in part_metadata_files {
1903        if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
1904            let _ = store.delete_index_file(file_name).await;
1905        }
1906    }
1907
1908    Ok(())
1909}
1910
1911/// Convert input stream into a stream of documents.
1912///
1913/// The input stream must be one of:
1914/// 1. Document in Utf8 or LargeUtf8 format.
1915/// 2. Document in List(Utf8) or List(LargeUtf8) format.
1916/// 3. Json document in LargeBinary format.
1917pub fn document_input(
1918    input: SendableRecordBatchStream,
1919    column: &str,
1920) -> Result<SendableRecordBatchStream> {
1921    let schema = input.schema();
1922    let field = schema.column_with_name(column).expect_ok()?.1;
1923    match field.data_type() {
1924        DataType::Utf8 | DataType::LargeUtf8 => Ok(input),
1925        DataType::List(field) | DataType::LargeList(field)
1926            if matches!(field.data_type(), DataType::Utf8 | DataType::LargeUtf8) =>
1927        {
1928            Ok(Box::pin(FlattenStream::new(input)))
1929        }
1930        DataType::LargeBinary => match field.metadata().get(ARROW_EXT_NAME_KEY) {
1931            Some(name) if name.as_str() == JSON_EXT_NAME => {
1932                Ok(Box::pin(JsonTextStream::new(input, column.to_string())))
1933            }
1934            _ => Err(Error::invalid_input_source(
1935                format!("column {} is not json", column).into(),
1936            )),
1937        },
1938        _ => Err(Error::invalid_input_source(
1939            format!(
1940                "column {} has type {}, is not utf8, large utf8 type/list, or large binary",
1941                column,
1942                field.data_type()
1943            )
1944            .into(),
1945        )),
1946    }
1947}
1948
1949#[cfg(test)]
1950mod tests {
1951    use super::*;
1952    use crate::metrics::NoOpMetricsCollector;
1953    use crate::progress::IndexBuildProgress;
1954    use crate::scalar::{IndexFile, IndexReader, IndexWriter, ScalarIndex};
1955    use arrow_array::{RecordBatch, StringArray, UInt64Array};
1956    use arrow_schema::{DataType, Field, Schema};
1957    use async_trait::async_trait;
1958    use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1959    use futures::stream;
1960    use lance_core::ROW_ID;
1961    use lance_core::cache::LanceCache;
1962    use lance_core::utils::tempfile::TempDir;
1963    use std::any::Any;
1964    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1965    use std::time::Duration;
1966
1967    fn make_doc_batch(doc: &str, row_id: u64) -> RecordBatch {
1968        let schema = Arc::new(Schema::new(vec![
1969            Field::new("doc", DataType::Utf8, true),
1970            Field::new(ROW_ID, DataType::UInt64, false),
1971        ]));
1972        let docs = Arc::new(StringArray::from(vec![Some(doc)]));
1973        let row_ids = Arc::new(UInt64Array::from(vec![row_id]));
1974        RecordBatch::try_new(schema, vec![docs, row_ids]).unwrap()
1975    }
1976
1977    #[derive(Debug, Default, Clone)]
1978    struct CountingStore {
1979        write_count: Arc<AtomicUsize>,
1980    }
1981
1982    impl CountingStore {
1983        fn new() -> Self {
1984            Self {
1985                write_count: Arc::new(AtomicUsize::new(0)),
1986            }
1987        }
1988
1989        fn write_count(&self) -> usize {
1990            self.write_count.load(Ordering::SeqCst)
1991        }
1992    }
1993
1994    impl DeepSizeOf for CountingStore {
1995        fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
1996            0
1997        }
1998    }
1999
2000    #[derive(Debug)]
2001    struct CountingWriter {
2002        write_count: Arc<AtomicUsize>,
2003    }
2004
2005    #[async_trait]
2006    impl IndexWriter for CountingWriter {
2007        async fn write_record_batch(&mut self, _batch: RecordBatch) -> Result<u64> {
2008            Ok(self.write_count.fetch_add(1, Ordering::SeqCst) as u64)
2009        }
2010
2011        async fn finish(&mut self) -> Result<()> {
2012            Ok(())
2013        }
2014
2015        async fn finish_with_metadata(&mut self, _metadata: HashMap<String, String>) -> Result<()> {
2016            Ok(())
2017        }
2018    }
2019
2020    #[async_trait]
2021    impl IndexStore for CountingStore {
2022        fn as_any(&self) -> &dyn Any {
2023            self
2024        }
2025
2026        fn clone_arc(&self) -> Arc<dyn IndexStore> {
2027            Arc::new(self.clone())
2028        }
2029
2030        fn io_parallelism(&self) -> usize {
2031            1
2032        }
2033
2034        async fn new_index_file(
2035            &self,
2036            _name: &str,
2037            _schema: Arc<Schema>,
2038        ) -> Result<Box<dyn IndexWriter>> {
2039            Ok(Box::new(CountingWriter {
2040                write_count: self.write_count.clone(),
2041            }))
2042        }
2043
2044        async fn open_index_file(&self, _name: &str) -> Result<Arc<dyn IndexReader>> {
2045            Err(Error::not_supported(
2046                "CountingStore does not support reading",
2047            ))
2048        }
2049
2050        async fn copy_index_file(&self, _name: &str, _dest_store: &dyn IndexStore) -> Result<()> {
2051            Err(Error::not_supported(
2052                "CountingStore does not support copying",
2053            ))
2054        }
2055
2056        async fn rename_index_file(&self, _name: &str, _new_name: &str) -> Result<()> {
2057            Err(Error::not_supported(
2058                "CountingStore does not support renaming",
2059            ))
2060        }
2061
2062        async fn delete_index_file(&self, _name: &str) -> Result<()> {
2063            Err(Error::not_supported(
2064                "CountingStore does not support deleting",
2065            ))
2066        }
2067
2068        async fn list_files_with_sizes(&self) -> Result<Vec<IndexFile>> {
2069            Ok(vec![])
2070        }
2071    }
2072
2073    #[tokio::test]
2074    async fn test_write_posting_lists_batches_multiple_rows() -> Result<()> {
2075        let mut builder = InnerBuilder::new(0, false, TokenSetFormat::default());
2076        for doc_id in 0..3u64 {
2077            builder.docs.append(doc_id, 1);
2078        }
2079
2080        for doc_id in 0..3u32 {
2081            let mut posting_list = PostingListBuilder::new(false);
2082            posting_list.add(doc_id, PositionRecorder::Count(1));
2083            builder.posting_lists.push(posting_list);
2084        }
2085
2086        let store = CountingStore::new();
2087        let docs = Arc::new(std::mem::take(&mut builder.docs));
2088        builder.write_posting_lists(&store, docs).await?;
2089
2090        assert_eq!(store.write_count(), 1);
2091        Ok(())
2092    }
2093
2094    #[tokio::test]
2095    async fn test_build_only_path_writes_partitions_as_is() -> Result<()> {
2096        let src_dir = TempDir::default();
2097        let dest_dir = TempDir::default();
2098        let src_store = Arc::new(LanceIndexStore::new(
2099            ObjectStore::local().into(),
2100            src_dir.obj_path(),
2101            Arc::new(LanceCache::no_cache()),
2102        ));
2103        let dest_store = Arc::new(LanceIndexStore::new(
2104            ObjectStore::local().into(),
2105            dest_dir.obj_path(),
2106            Arc::new(LanceCache::no_cache()),
2107        ));
2108
2109        let params = InvertedIndexParams::default();
2110        let tokenizer = params.build()?;
2111        let token_set_format = TokenSetFormat::default();
2112        let id_alloc = Arc::new(AtomicU64::new(0));
2113
2114        let mut worker1 = IndexWorker::new(
2115            tokenizer.clone(),
2116            src_store.clone(),
2117            id_alloc.clone(),
2118            IndexWorkerConfig {
2119                with_position: params.with_position,
2120                format_version: InvertedListFormatVersion::V1,
2121                fragment_mask: None,
2122                token_set_format,
2123                worker_memory_limit_bytes: u64::MAX,
2124            },
2125        )
2126        .await?;
2127        worker1
2128            .process_batch(make_doc_batch("hello world", 0))
2129            .await?;
2130        let output1 = worker1.finish().await?;
2131        let mut partitions = output1.partitions;
2132        if let Some(mut tail_partition) = output1.tail_partition {
2133            partitions.push(tail_partition.builder.id());
2134            tail_partition.builder.write(src_store.as_ref()).await?;
2135        }
2136
2137        let mut worker2 = IndexWorker::new(
2138            tokenizer.clone(),
2139            src_store.clone(),
2140            id_alloc.clone(),
2141            IndexWorkerConfig {
2142                with_position: params.with_position,
2143                format_version: InvertedListFormatVersion::V1,
2144                fragment_mask: None,
2145                token_set_format,
2146                worker_memory_limit_bytes: u64::MAX,
2147            },
2148        )
2149        .await?;
2150        worker2
2151            .process_batch(make_doc_batch("goodbye world", 1))
2152            .await?;
2153        let output2 = worker2.finish().await?;
2154        partitions.extend(output2.partitions);
2155        if let Some(mut tail_partition) = output2.tail_partition {
2156            partitions.push(tail_partition.builder.id());
2157            tail_partition.builder.write(src_store.as_ref()).await?;
2158        }
2159        partitions.sort_unstable();
2160        assert_eq!(partitions.len(), 2);
2161        assert_ne!(partitions[0], partitions[1]);
2162
2163        let builder = InvertedIndexBuilder::from_existing_index(
2164            InvertedIndexParams::default(),
2165            Some(src_store.clone()),
2166            partitions.clone(),
2167            token_set_format,
2168            None,
2169            RoaringBitmap::new(),
2170        );
2171        builder.write(dest_store.as_ref()).await?;
2172
2173        let metadata_reader = dest_store.open_index_file(METADATA_FILE).await?;
2174        let metadata = &metadata_reader.schema().metadata;
2175        let partitions_str = metadata
2176            .get("partitions")
2177            .expect("partitions missing from metadata");
2178        let written_partitions: Vec<u64> = serde_json::from_str(partitions_str).unwrap();
2179        assert_eq!(written_partitions, partitions);
2180
2181        for id in &partitions {
2182            dest_store.open_index_file(&token_file_path(*id)).await?;
2183            dest_store.open_index_file(&posting_file_path(*id)).await?;
2184            dest_store.open_index_file(&doc_file_path(*id)).await?;
2185        }
2186
2187        Ok(())
2188    }
2189
2190    #[tokio::test]
2191    async fn test_update_preserves_existing_posting_tail_codec() -> Result<()> {
2192        let src_dir = TempDir::default();
2193        let dest_dir = TempDir::default();
2194        let src_store = Arc::new(LanceIndexStore::new(
2195            ObjectStore::local().into(),
2196            src_dir.obj_path(),
2197            Arc::new(LanceCache::no_cache()),
2198        ));
2199        let dest_store = Arc::new(LanceIndexStore::new(
2200            ObjectStore::local().into(),
2201            dest_dir.obj_path(),
2202            Arc::new(LanceCache::no_cache()),
2203        ));
2204
2205        let posting_tail_codec = PostingTailCodec::Fixed32;
2206        let mut partition = InnerBuilder::new_with_posting_tail_codec(
2207            0,
2208            false,
2209            TokenSetFormat::default(),
2210            posting_tail_codec,
2211        );
2212        partition.tokens.add("hello".to_owned());
2213        let mut posting_list =
2214            PostingListBuilder::new_with_posting_tail_codec(false, posting_tail_codec);
2215        posting_list.add(0, PositionRecorder::Count(1));
2216        partition.posting_lists.push(posting_list);
2217        partition.docs.append(100, 1);
2218        partition.write(src_store.as_ref()).await?;
2219
2220        let metadata_writer = InvertedIndexBuilder::from_existing_index(
2221            InvertedIndexParams::default(),
2222            Some(src_store.clone()),
2223            vec![0],
2224            TokenSetFormat::default(),
2225            None,
2226            RoaringBitmap::new(),
2227        )
2228        .with_posting_tail_codec(posting_tail_codec);
2229        metadata_writer
2230            .write_metadata(src_store.as_ref(), &[0])
2231            .await?;
2232
2233        let index = InvertedIndex::load(src_store, None, &LanceCache::no_cache()).await?;
2234        let schema = Arc::new(Schema::new(vec![
2235            Field::new("doc", DataType::Utf8, true),
2236            Field::new(ROW_ID, DataType::UInt64, false),
2237        ]));
2238        let docs = Arc::new(StringArray::from(vec![Some("hello again")]));
2239        let row_ids = Arc::new(UInt64Array::from(vec![101u64]));
2240        let batch = RecordBatch::try_new(schema.clone(), vec![docs, row_ids])?;
2241        let stream = RecordBatchStreamAdapter::new(schema, stream::iter(vec![Ok(batch)]));
2242        index
2243            .update(Box::pin(stream), dest_store.as_ref(), None)
2244            .await?;
2245
2246        let updated =
2247            InvertedIndex::load(dest_store.clone(), None, &LanceCache::no_cache()).await?;
2248        assert_eq!(updated.partitions.len(), 2);
2249        for partition in &updated.partitions {
2250            assert_eq!(
2251                partition.inverted_list.posting_tail_codec(),
2252                posting_tail_codec
2253            );
2254        }
2255
2256        let metadata = dest_store.open_index_file(METADATA_FILE).await?;
2257        assert_eq!(
2258            metadata.schema().metadata.get(POSTING_TAIL_CODEC_KEY),
2259            Some(&posting_tail_codec.as_str().to_owned())
2260        );
2261
2262        Ok(())
2263    }
2264
2265    #[test]
2266    fn test_with_posting_tail_codec_syncs_format_version() {
2267        let builder = InvertedIndexBuilder::from_existing_index(
2268            InvertedIndexParams::default(),
2269            None,
2270            Vec::new(),
2271            TokenSetFormat::default(),
2272            None,
2273            RoaringBitmap::new(),
2274        )
2275        .with_format_version(InvertedListFormatVersion::V2)
2276        .with_posting_tail_codec(PostingTailCodec::Fixed32);
2277        assert_eq!(builder.format_version, InvertedListFormatVersion::V1);
2278        assert_eq!(builder.posting_tail_codec, PostingTailCodec::Fixed32);
2279
2280        let builder = builder.with_posting_tail_codec(PostingTailCodec::VarintDelta);
2281        assert_eq!(builder.format_version, InvertedListFormatVersion::V2);
2282        assert_eq!(builder.posting_tail_codec, PostingTailCodec::VarintDelta);
2283    }
2284
2285    #[tokio::test]
2286    async fn test_inverted_index_without_positions_tracks_frequency() -> Result<()> {
2287        let index_dir = TempDir::default();
2288        let store = Arc::new(LanceIndexStore::new(
2289            ObjectStore::local().into(),
2290            index_dir.obj_path(),
2291            Arc::new(LanceCache::no_cache()),
2292        ));
2293
2294        let schema = Arc::new(Schema::new(vec![
2295            Field::new("doc", DataType::Utf8, true),
2296            Field::new(ROW_ID, DataType::UInt64, false),
2297        ]));
2298        let docs = Arc::new(StringArray::from(vec![Some("hello hello world")]));
2299        let row_ids = Arc::new(UInt64Array::from(vec![0u64]));
2300        let batch = RecordBatch::try_new(schema.clone(), vec![docs, row_ids])?;
2301        let stream = RecordBatchStreamAdapter::new(schema, stream::iter(vec![Ok(batch)]));
2302        let stream = Box::pin(stream);
2303
2304        let params =
2305            InvertedIndexParams::new("whitespace".to_string(), lance_tokenizer::Language::English)
2306                .with_position(false)
2307                .remove_stop_words(false)
2308                .stem(false)
2309                .max_token_length(None);
2310
2311        let mut builder = InvertedIndexBuilder::new(params);
2312        builder.update(stream, store.as_ref(), None).await?;
2313
2314        let index = InvertedIndex::load(store, None, &LanceCache::no_cache()).await?;
2315        assert_eq!(index.partitions.len(), 1);
2316        let partition = &index.partitions[0];
2317        let token_id = partition.tokens.get("hello").unwrap();
2318        let posting = partition
2319            .inverted_list
2320            .posting_list(token_id, false, &NoOpMetricsCollector)
2321            .await?;
2322
2323        let mut iter = posting.iter();
2324        let (doc_id, freq, positions) = iter.next().unwrap();
2325        assert_eq!(doc_id, 0);
2326        assert_eq!(freq, 2);
2327        assert!(positions.is_none());
2328        assert!(iter.next().is_none());
2329
2330        Ok(())
2331    }
2332
2333    lance_testing::define_stage_event_progress!(RecordingProgress, IndexBuildProgress, Result<()>);
2334
2335    #[derive(Debug, Default)]
2336    struct FailingProgress;
2337
2338    #[async_trait]
2339    impl IndexBuildProgress for FailingProgress {
2340        async fn stage_start(&self, _stage: &str, _total: Option<u64>, _unit: &str) -> Result<()> {
2341            Ok(())
2342        }
2343
2344        async fn stage_progress(&self, _stage: &str, _completed: u64) -> Result<()> {
2345            Err(Error::io("injected progress failure"))
2346        }
2347
2348        async fn stage_complete(&self, _stage: &str) -> Result<()> {
2349            Ok(())
2350        }
2351    }
2352
2353    #[tokio::test]
2354    async fn test_builder_reports_progress_stages() -> Result<()> {
2355        let index_dir = TempDir::default();
2356        let store = Arc::new(LanceIndexStore::new(
2357            ObjectStore::local().into(),
2358            index_dir.obj_path(),
2359            Arc::new(LanceCache::no_cache()),
2360        ));
2361
2362        let batch1 = make_doc_batch("hello world", 0);
2363        let batch2 = make_doc_batch("goodbye world", 1);
2364        let total_rows = 2u64;
2365        let stream = RecordBatchStreamAdapter::new(
2366            batch1.schema(),
2367            stream::iter(vec![Ok(batch1), Ok(batch2)]),
2368        );
2369        let stream = Box::pin(stream);
2370
2371        let progress = Arc::new(RecordingProgress::default());
2372        let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default())
2373            .with_progress(progress.clone());
2374        builder.update(stream, store.as_ref(), None).await?;
2375
2376        let events = progress.recorded_events();
2377        let tags = events
2378            .iter()
2379            .map(|(kind, stage, _)| format!("{kind}:{stage}"))
2380            .collect::<Vec<_>>();
2381        let tokenize_progress = events
2382            .iter()
2383            .filter_map(|(kind, stage, completed)| {
2384                if kind == "progress" && stage == "tokenize_docs" {
2385                    Some(*completed)
2386                } else {
2387                    None
2388                }
2389            })
2390            .collect::<Vec<_>>();
2391
2392        let tokenize_start = tags
2393            .iter()
2394            .position(|e| e == "start:tokenize_docs")
2395            .expect("missing tokenize_docs start");
2396        let tokenize_complete = tags
2397            .iter()
2398            .position(|e| e == "complete:tokenize_docs")
2399            .expect("missing tokenize_docs complete");
2400        let copy_start = tags
2401            .iter()
2402            .position(|e| e == "start:copy_partitions")
2403            .expect("missing copy_partitions start");
2404        let copy_complete = tags
2405            .iter()
2406            .position(|e| e == "complete:copy_partitions")
2407            .expect("missing copy_partitions complete");
2408        let metadata_start = tags
2409            .iter()
2410            .position(|e| e == "start:write_metadata")
2411            .expect("missing write_metadata start");
2412        let metadata_complete = tags
2413            .iter()
2414            .position(|e| e == "complete:write_metadata")
2415            .expect("missing write_metadata complete");
2416
2417        assert!(tokenize_start < tokenize_complete);
2418        assert!(tokenize_complete < copy_start);
2419        assert!(copy_start < copy_complete);
2420        assert!(copy_complete < metadata_start);
2421        assert!(metadata_start < metadata_complete);
2422
2423        assert!(
2424            tags.iter().any(|e| e == "progress:tokenize_docs"),
2425            "expected progress callback for tokenize_docs"
2426        );
2427        assert!(
2428            tokenize_progress.len() >= 2,
2429            "expected at least two progress callbacks for tokenize_docs, got {tokenize_progress:?}"
2430        );
2431        assert_eq!(
2432            tokenize_progress.iter().copied().max().unwrap_or_default(),
2433            total_rows,
2434            "expected tokenize_docs progress to reach all rows"
2435        );
2436        assert!(
2437            tags.iter().any(|e| e == "progress:copy_partitions"),
2438            "expected progress callback for copy_partitions"
2439        );
2440        assert!(
2441            tags.iter().any(|e| e == "progress:write_metadata"),
2442            "expected progress callback for write_metadata"
2443        );
2444        assert!(
2445            !tags.iter().any(|e| e == "start:merge_partitions"),
2446            "merge_partitions should not run in the build-only path"
2447        );
2448
2449        Ok(())
2450    }
2451
2452    #[tokio::test]
2453    async fn test_builder_default_path_skips_merge_stage() -> Result<()> {
2454        let index_dir = TempDir::default();
2455        let store = Arc::new(LanceIndexStore::new(
2456            ObjectStore::local().into(),
2457            index_dir.obj_path(),
2458            Arc::new(LanceCache::no_cache()),
2459        ));
2460
2461        let batch = make_doc_batch("hello world", 0);
2462        let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2463        let stream = Box::pin(stream);
2464
2465        let progress = Arc::new(RecordingProgress::default());
2466        let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default())
2467            .with_progress(progress.clone());
2468        builder.update(stream, store.as_ref(), None).await?;
2469
2470        let tags = progress
2471            .recorded_events()
2472            .iter()
2473            .map(|(kind, stage, _)| format!("{kind}:{stage}"))
2474            .collect::<Vec<_>>();
2475
2476        assert!(
2477            tags.iter().any(|e| e == "start:copy_partitions"),
2478            "default path should copy finalized partitions"
2479        );
2480        assert!(
2481            !tags.iter().any(|e| e == "start:merge_partitions"),
2482            "default path should not run merge_partitions"
2483        );
2484        Ok(())
2485    }
2486
2487    #[tokio::test]
2488    async fn test_merge_index_files_reports_progress_stages() -> Result<()> {
2489        let index_dir = TempDir::default();
2490        let index_path = index_dir.obj_path();
2491        let object_store = ObjectStore::local();
2492        let store = Arc::new(LanceIndexStore::new(
2493            object_store.clone().into(),
2494            index_path.clone(),
2495            Arc::new(LanceCache::no_cache()),
2496        ));
2497
2498        for (fragment_id, row_id, doc) in [
2499            (1_u64 << 32, 0_u64, "hello world"),
2500            (2_u64 << 32, 1_u64, "goodbye world"),
2501        ] {
2502            let batch = make_doc_batch(doc, row_id);
2503            let stream =
2504                RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2505            let stream = Box::pin(stream);
2506            let mut builder = InvertedIndexBuilder::new_with_fragment_mask(
2507                InvertedIndexParams::default(),
2508                Some(fragment_id),
2509            )
2510            .with_progress(noop_progress());
2511            builder.update(stream, store.as_ref(), None).await?;
2512        }
2513
2514        let progress = Arc::new(RecordingProgress::default());
2515        merge_index_files(&object_store, &index_path, store.clone(), progress.clone()).await?;
2516
2517        let events = progress.recorded_events();
2518        let tags = events
2519            .iter()
2520            .map(|(kind, stage, _)| format!("{kind}:{stage}"))
2521            .collect::<Vec<_>>();
2522        let remap_progress = events
2523            .iter()
2524            .filter_map(|(kind, stage, completed)| {
2525                if kind == "progress" && stage == "remap_partition_files" {
2526                    Some(*completed)
2527                } else {
2528                    None
2529                }
2530            })
2531            .collect::<Vec<_>>();
2532
2533        let read_start = tags
2534            .iter()
2535            .position(|e| e == "start:read_partition_metadata")
2536            .expect("missing read_partition_metadata start");
2537        let read_complete = tags
2538            .iter()
2539            .position(|e| e == "complete:read_partition_metadata")
2540            .expect("missing read_partition_metadata complete");
2541        let remap_start = tags
2542            .iter()
2543            .position(|e| e == "start:remap_partition_files")
2544            .expect("missing remap_partition_files start");
2545        let remap_complete = tags
2546            .iter()
2547            .position(|e| e == "complete:remap_partition_files")
2548            .expect("missing remap_partition_files complete");
2549        let metadata_start = tags
2550            .iter()
2551            .position(|e| e == "start:write_merged_metadata")
2552            .expect("missing write_merged_metadata start");
2553        let metadata_complete = tags
2554            .iter()
2555            .position(|e| e == "complete:write_merged_metadata")
2556            .expect("missing write_merged_metadata complete");
2557
2558        assert!(read_start < read_complete);
2559        assert!(read_complete < remap_start);
2560        assert!(remap_start < remap_complete);
2561        assert!(remap_complete < metadata_start);
2562        assert!(metadata_start < metadata_complete);
2563
2564        assert!(
2565            tags.iter().any(|e| e == "progress:read_partition_metadata"),
2566            "expected progress callback for read_partition_metadata"
2567        );
2568        assert_eq!(
2569            remap_progress.last().copied().unwrap_or_default(),
2570            12,
2571            "expected remap_partition_files progress to cover both rename phases"
2572        );
2573        assert!(
2574            tags.iter().any(|e| e == "progress:write_merged_metadata"),
2575            "expected progress callback for write_merged_metadata"
2576        );
2577
2578        Ok(())
2579    }
2580
2581    #[tokio::test]
2582    async fn test_worker_memory_limit_rejects_single_large_doc() {
2583        let index_dir = TempDir::default();
2584        let store = Arc::new(LanceIndexStore::new(
2585            ObjectStore::local().into(),
2586            index_dir.obj_path(),
2587            Arc::new(LanceCache::no_cache()),
2588        ));
2589
2590        let batch = make_doc_batch("hello world", 42);
2591        let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2592        let stream = Box::pin(stream);
2593
2594        let mut builder =
2595            InvertedIndexBuilder::new(InvertedIndexParams::default().memory_limit_mb(0));
2596        let err = builder
2597            .update(stream, store.as_ref(), None)
2598            .await
2599            .expect_err("single doc should exceed zero worker memory limit");
2600        assert!(
2601            err.to_string().contains("row_id=42"),
2602            "unexpected error: {err}"
2603        );
2604    }
2605
2606    #[tokio::test]
2607    async fn test_worker_trims_position_temp_buffers() -> Result<()> {
2608        let tokenizer = InvertedIndexParams::default().with_position(true).build()?;
2609        let store = Arc::new(CountingStore::new());
2610        let id_alloc = Arc::new(AtomicU64::new(0));
2611        let mut worker = IndexWorker::new(
2612            tokenizer,
2613            store,
2614            id_alloc,
2615            IndexWorkerConfig {
2616                with_position: true,
2617                format_version: InvertedListFormatVersion::V1,
2618                fragment_mask: None,
2619                token_set_format: TokenSetFormat::default(),
2620                worker_memory_limit_bytes: u64::MAX,
2621            },
2622        )
2623        .await?;
2624
2625        let doc = (0..(MAX_RETAINED_TOKEN_IDS * 2))
2626            .map(|i| format!("tok{i}"))
2627            .collect::<Vec<_>>()
2628            .join(" ");
2629        worker.process_batch(make_doc_batch(&doc, 0)).await?;
2630
2631        assert!(worker.token_ids.is_empty());
2632        assert!(worker.token_ids.capacity() <= MAX_RETAINED_TOKEN_IDS);
2633        assert!(worker.memory_size >= worker.temporary_memory_size());
2634        Ok(())
2635    }
2636
2637    #[tokio::test]
2638    async fn test_worker_flush_keeps_position_temp_memory_bounded() -> Result<()> {
2639        let tokenizer = InvertedIndexParams::default().with_position(true).build()?;
2640        let store = Arc::new(CountingStore::new());
2641        let id_alloc = Arc::new(AtomicU64::new(0));
2642        let mut worker = IndexWorker::new(
2643            tokenizer,
2644            store,
2645            id_alloc,
2646            IndexWorkerConfig {
2647                with_position: true,
2648                format_version: InvertedListFormatVersion::V1,
2649                fragment_mask: None,
2650                token_set_format: TokenSetFormat::default(),
2651                worker_memory_limit_bytes: u64::MAX,
2652            },
2653        )
2654        .await?;
2655
2656        let doc = std::iter::repeat_n("common", 32_768)
2657            .collect::<Vec<_>>()
2658            .join(" ");
2659        let mut observed_post_flush_memory = Vec::new();
2660        for row_id in 0..8 {
2661            worker.process_batch(make_doc_batch(&doc, row_id)).await?;
2662            worker.flush().await?;
2663            observed_post_flush_memory.push(worker.memory_size);
2664        }
2665
2666        let max_memory = *observed_post_flush_memory.iter().max().unwrap();
2667        let min_memory = *observed_post_flush_memory.iter().min().unwrap();
2668        assert!(
2669            max_memory <= min_memory.saturating_add(256 * 1024),
2670            "post-flush worker memory drifted upward: {observed_post_flush_memory:?}"
2671        );
2672        Ok(())
2673    }
2674
2675    #[tokio::test]
2676    async fn test_worker_flush_writes_partition_directly() -> Result<()> {
2677        let tokenizer = InvertedIndexParams::default().with_position(true).build()?;
2678        let store = Arc::new(CountingStore::new());
2679        let id_alloc = Arc::new(AtomicU64::new(0));
2680        let mut worker = IndexWorker::new(
2681            tokenizer,
2682            store.clone(),
2683            id_alloc,
2684            IndexWorkerConfig {
2685                with_position: true,
2686                format_version: InvertedListFormatVersion::V1,
2687                fragment_mask: None,
2688                token_set_format: TokenSetFormat::default(),
2689                worker_memory_limit_bytes: u64::MAX,
2690            },
2691        )
2692        .await?;
2693        worker
2694            .process_batch(make_doc_batch("alpha beta gamma", 0))
2695            .await?;
2696        worker.flush().await?;
2697        assert!(store.write_count() > 0);
2698        Ok(())
2699    }
2700
2701    #[test]
2702    fn test_resolve_worker_memory_limit_uses_default_when_unset() {
2703        let params = InvertedIndexParams::default();
2704        assert_eq!(
2705            resolve_worker_memory_limit_bytes(&params, 8),
2706            *LANCE_FTS_PARTITION_SIZE << 20
2707        );
2708    }
2709
2710    #[test]
2711    fn test_resolve_num_workers_uses_default_when_unset() {
2712        let expected = default_num_workers().clamp(1, get_num_compute_intensive_cpus().max(1));
2713        assert_eq!(
2714            resolve_num_workers(&InvertedIndexParams::default()),
2715            expected
2716        );
2717    }
2718
2719    #[test]
2720    fn test_resolve_num_workers_clamps_requested_value() {
2721        let max_workers = get_num_compute_intensive_cpus().max(1);
2722        assert_eq!(
2723            resolve_num_workers(&InvertedIndexParams::default().num_workers(0)),
2724            1
2725        );
2726        assert_eq!(
2727            resolve_num_workers(&InvertedIndexParams::default().num_workers(max_workers + 10)),
2728            max_workers
2729        );
2730    }
2731
2732    #[test]
2733    fn test_resolve_worker_memory_limit_splits_total_memory_limit() {
2734        let params = InvertedIndexParams::default().memory_limit_mb(4096);
2735        assert_eq!(resolve_worker_memory_limit_bytes(&params, 16), 256 << 20);
2736    }
2737
2738    #[test]
2739    fn test_merge_all_tail_partitions_combines_everything() -> Result<()> {
2740        let merged = merge_all_tail_partitions(vec![
2741            TailPartition {
2742                builder: InnerBuilder::new(0, false, TokenSetFormat::default()),
2743            },
2744            TailPartition {
2745                builder: InnerBuilder::new(1, false, TokenSetFormat::default()),
2746            },
2747            TailPartition {
2748                builder: InnerBuilder::new(2, false, TokenSetFormat::default()),
2749            },
2750        ])?;
2751
2752        assert_eq!(merged.expect("merged builder should exist").id(), 0);
2753        Ok(())
2754    }
2755
2756    #[test]
2757    fn test_merge_all_tail_partitions_returns_none_for_empty_input() -> Result<()> {
2758        assert!(merge_all_tail_partitions(Vec::new())?.is_none());
2759        Ok(())
2760    }
2761
2762    #[test]
2763    fn test_merge_tail_partition_group_combines_tail_builders() -> Result<()> {
2764        let mut first = InnerBuilder::new(0, false, TokenSetFormat::default());
2765        let hello = first.tokens.add("hello".to_owned());
2766        first
2767            .posting_lists
2768            .resize_with(first.tokens.len(), || PostingListBuilder::new(false));
2769        let first_doc = first.docs.append(10, 1);
2770        first.posting_lists[hello as usize].add(first_doc, PositionRecorder::Count(1));
2771
2772        let mut second = InnerBuilder::new(1, false, TokenSetFormat::default());
2773        let world = second.tokens.add("world".to_owned());
2774        second
2775            .posting_lists
2776            .resize_with(second.tokens.len(), || PostingListBuilder::new(false));
2777        let second_doc = second.docs.append(20, 2);
2778        second.posting_lists[world as usize].add(second_doc, PositionRecorder::Count(2));
2779
2780        let merged = merge_tail_partition_group(vec![
2781            TailPartition { builder: first },
2782            TailPartition { builder: second },
2783        ])?;
2784
2785        assert_eq!(merged.id(), 0);
2786        assert_eq!(merged.docs.len(), 2);
2787        assert_eq!(merged.tokens.len(), 2);
2788        assert_eq!(merged.posting_lists.len(), 2);
2789        assert_eq!(
2790            merged.posting_lists[merged.tokens.get("hello").unwrap() as usize].len(),
2791            1
2792        );
2793        assert_eq!(
2794            merged.posting_lists[merged.tokens.get("world").unwrap() as usize].len(),
2795            1
2796        );
2797        Ok(())
2798    }
2799
2800    #[tokio::test]
2801    async fn test_update_index_returns_worker_error_when_workers_exit_during_dispatch() {
2802        let num_batches = (*LANCE_FTS_NUM_SHARDS * 2 + 1) as u64;
2803        let index_dir = TempDir::default();
2804        let store = Arc::new(LanceIndexStore::new(
2805            ObjectStore::local().into(),
2806            index_dir.obj_path(),
2807            Arc::new(LanceCache::no_cache()),
2808        ));
2809        let schema = make_doc_batch("hello world", 0).schema();
2810        let stream = RecordBatchStreamAdapter::new(
2811            schema,
2812            stream::iter((0..num_batches).map(|row_id| Ok(make_doc_batch("hello world", row_id)))),
2813        );
2814        let stream = Box::pin(stream);
2815
2816        let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default())
2817            .with_progress(Arc::new(FailingProgress));
2818
2819        let result = tokio::time::timeout(
2820            Duration::from_secs(5),
2821            builder.update_index(stream, store.as_ref()),
2822        )
2823        .await
2824        .expect("update_index should not hang")
2825        .expect_err("worker failure should be returned");
2826
2827        assert!(
2828            result.to_string().contains("injected progress failure"),
2829            "unexpected error: {result}"
2830        );
2831    }
2832
2833    #[tokio::test]
2834    async fn test_new_index_has_empty_deleted_fragments() {
2835        let index_dir = TempDir::default();
2836        let store = Arc::new(LanceIndexStore::new(
2837            ObjectStore::local().into(),
2838            index_dir.obj_path(),
2839            Arc::new(LanceCache::no_cache()),
2840        ));
2841
2842        let batch = make_doc_batch("hello world", 0);
2843        let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2844        let stream = Box::pin(stream);
2845
2846        let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
2847        builder.update(stream, store.as_ref(), None).await.unwrap();
2848
2849        let index = InvertedIndex::load(store, None, &LanceCache::no_cache())
2850            .await
2851            .unwrap();
2852        assert!(
2853            index.deleted_fragments().is_empty(),
2854            "new index should have empty deleted fragments, got {:?}",
2855            index.deleted_fragments()
2856        );
2857    }
2858
2859    #[tokio::test]
2860    async fn test_remap_preserves_deleted_fragments() {
2861        let src_dir = TempDir::default();
2862        let dest_dir = TempDir::default();
2863        let src_store = Arc::new(LanceIndexStore::new(
2864            ObjectStore::local().into(),
2865            src_dir.obj_path(),
2866            Arc::new(LanceCache::no_cache()),
2867        ));
2868        let dest_store = Arc::new(LanceIndexStore::new(
2869            ObjectStore::local().into(),
2870            dest_dir.obj_path(),
2871            Arc::new(LanceCache::no_cache()),
2872        ));
2873
2874        // Build an initial index with some deleted fragments
2875        let batch = make_doc_batch("hello world", 0);
2876        let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2877        let stream = Box::pin(stream);
2878
2879        let initial_deleted = RoaringBitmap::from_iter([5, 10, 42]);
2880        let mut builder = InvertedIndexBuilder::from_existing_index(
2881            InvertedIndexParams::default(),
2882            None,
2883            Vec::new(),
2884            TokenSetFormat::default(),
2885            None,
2886            initial_deleted.clone(),
2887        );
2888        builder
2889            .update(stream, src_store.as_ref(), None)
2890            .await
2891            .unwrap();
2892
2893        // Load it back and confirm the invalidated fragments are set
2894        let index = InvertedIndex::load(src_store.clone(), None, &LanceCache::no_cache())
2895            .await
2896            .unwrap();
2897        assert_eq!(index.deleted_fragments(), &initial_deleted);
2898
2899        // Remap the index via the ScalarIndex trait method
2900        use crate::scalar::ScalarIndex;
2901        let mapping = HashMap::from([(0u64, Some(50 << 32))]);
2902        index.remap(&mapping, dest_store.as_ref()).await.unwrap();
2903
2904        // Reload from dest and verify deleted fragments are preserved
2905        let remapped_index = InvertedIndex::load(dest_store.clone(), None, &LanceCache::no_cache())
2906            .await
2907            .unwrap();
2908        assert_eq!(
2909            remapped_index.deleted_fragments(),
2910            &initial_deleted,
2911            "remap should preserve deleted fragments"
2912        );
2913    }
2914
2915    #[tokio::test]
2916    async fn test_update_grows_deleted_fragments_from_old_data_filter() {
2917        let index_dir = TempDir::default();
2918        let store = Arc::new(LanceIndexStore::new(
2919            ObjectStore::local().into(),
2920            index_dir.obj_path(),
2921            Arc::new(LanceCache::no_cache()),
2922        ));
2923
2924        // Build an initial index with no deleted fragments
2925        let batch = make_doc_batch("hello world", 0);
2926        let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2927        let stream = Box::pin(stream);
2928
2929        let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
2930        builder.update(stream, store.as_ref(), None).await.unwrap();
2931
2932        // Load the index and update it with an old_data_filter that invalidates fragments
2933        let index = InvertedIndex::load(store.clone(), None, &LanceCache::no_cache())
2934            .await
2935            .unwrap();
2936        assert!(index.deleted_fragments().is_empty());
2937
2938        let update_dir = TempDir::default();
2939        let update_store = Arc::new(LanceIndexStore::new(
2940            ObjectStore::local().into(),
2941            update_dir.obj_path(),
2942            Arc::new(LanceCache::no_cache()),
2943        ));
2944
2945        let batch2 = make_doc_batch("new document", 1 << 32 | 1);
2946        let stream2 =
2947            RecordBatchStreamAdapter::new(batch2.schema(), stream::iter(vec![Ok(batch2)]));
2948        let stream2 = Box::pin(stream2);
2949
2950        let old_data_filter = Some(crate::scalar::OldIndexDataFilter::Fragments {
2951            to_keep: RoaringBitmap::from_iter([0]),
2952            to_remove: RoaringBitmap::from_iter([3, 7]),
2953        });
2954
2955        // Use ScalarIndex::update trait method
2956        use crate::scalar::ScalarIndex;
2957        index
2958            .update(stream2, update_store.as_ref(), old_data_filter)
2959            .await
2960            .unwrap();
2961
2962        let updated_index =
2963            InvertedIndex::load(update_store.clone(), None, &LanceCache::no_cache())
2964                .await
2965                .unwrap();
2966        assert_eq!(
2967            updated_index.deleted_fragments(),
2968            &RoaringBitmap::from_iter([3, 7]),
2969            "update should add deleted fragments from old_data_filter"
2970        );
2971    }
2972
2973    #[tokio::test]
2974    async fn test_update_accumulates_deleted_fragments() {
2975        let dir1 = TempDir::default();
2976        let store1 = Arc::new(LanceIndexStore::new(
2977            ObjectStore::local().into(),
2978            dir1.obj_path(),
2979            Arc::new(LanceCache::no_cache()),
2980        ));
2981
2982        // Build initial index
2983        let batch = make_doc_batch("hello world", 0);
2984        let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
2985        let stream = Box::pin(stream);
2986
2987        let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
2988        builder.update(stream, store1.as_ref(), None).await.unwrap();
2989
2990        // First update: delete fragments 3 and 7
2991        let index = InvertedIndex::load(store1.clone(), None, &LanceCache::no_cache())
2992            .await
2993            .unwrap();
2994
2995        let dir2 = TempDir::default();
2996        let store2 = Arc::new(LanceIndexStore::new(
2997            ObjectStore::local().into(),
2998            dir2.obj_path(),
2999            Arc::new(LanceCache::no_cache()),
3000        ));
3001
3002        let batch2 = make_doc_batch("second doc", 1 << 32 | 1);
3003        let stream2 =
3004            RecordBatchStreamAdapter::new(batch2.schema(), stream::iter(vec![Ok(batch2)]));
3005        let stream2 = Box::pin(stream2);
3006
3007        use crate::scalar::ScalarIndex;
3008        index
3009            .update(
3010                stream2,
3011                store2.as_ref(),
3012                Some(crate::scalar::OldIndexDataFilter::Fragments {
3013                    to_keep: RoaringBitmap::from_iter([0]),
3014                    to_remove: RoaringBitmap::from_iter([3, 7]),
3015                }),
3016            )
3017            .await
3018            .unwrap();
3019
3020        // Second update: invalidate additional fragments 12 and 15
3021        let index2 = InvertedIndex::load(store2.clone(), None, &LanceCache::no_cache())
3022            .await
3023            .unwrap();
3024        assert_eq!(
3025            index2.deleted_fragments(),
3026            &RoaringBitmap::from_iter([3, 7])
3027        );
3028
3029        let dir3 = TempDir::default();
3030        let store3 = Arc::new(LanceIndexStore::new(
3031            ObjectStore::local().into(),
3032            dir3.obj_path(),
3033            Arc::new(LanceCache::no_cache()),
3034        ));
3035
3036        let batch3 = make_doc_batch("third doc", 2 << 32 | 2);
3037        let stream3 =
3038            RecordBatchStreamAdapter::new(batch3.schema(), stream::iter(vec![Ok(batch3)]));
3039        let stream3 = Box::pin(stream3);
3040
3041        index2
3042            .update(
3043                stream3,
3044                store3.as_ref(),
3045                Some(crate::scalar::OldIndexDataFilter::Fragments {
3046                    to_keep: RoaringBitmap::from_iter([0, 1]),
3047                    to_remove: RoaringBitmap::from_iter([12, 15]),
3048                }),
3049            )
3050            .await
3051            .unwrap();
3052
3053        let index3 = InvertedIndex::load(store3.clone(), None, &LanceCache::no_cache())
3054            .await
3055            .unwrap();
3056        assert_eq!(
3057            index3.deleted_fragments(),
3058            &RoaringBitmap::from_iter([3, 7, 12, 15]),
3059            "deleted fragments should accumulate across updates"
3060        );
3061    }
3062
3063    #[tokio::test]
3064    async fn test_update_with_rowid_filter_does_not_grow_deleted_fragments() {
3065        let index_dir = TempDir::default();
3066        let store = Arc::new(LanceIndexStore::new(
3067            ObjectStore::local().into(),
3068            index_dir.obj_path(),
3069            Arc::new(LanceCache::no_cache()),
3070        ));
3071
3072        let batch = make_doc_batch("hello world", 0);
3073        let stream = RecordBatchStreamAdapter::new(batch.schema(), stream::iter(vec![Ok(batch)]));
3074        let stream = Box::pin(stream);
3075
3076        let mut builder = InvertedIndexBuilder::new(InvertedIndexParams::default());
3077        builder.update(stream, store.as_ref(), None).await.unwrap();
3078
3079        let index = InvertedIndex::load(store.clone(), None, &LanceCache::no_cache())
3080            .await
3081            .unwrap();
3082
3083        let update_dir = TempDir::default();
3084        let update_store = Arc::new(LanceIndexStore::new(
3085            ObjectStore::local().into(),
3086            update_dir.obj_path(),
3087            Arc::new(LanceCache::no_cache()),
3088        ));
3089
3090        let batch2 = make_doc_batch("new doc", 1);
3091        let stream2 =
3092            RecordBatchStreamAdapter::new(batch2.schema(), stream::iter(vec![Ok(batch2)]));
3093        let stream2 = Box::pin(stream2);
3094
3095        // Use RowIds filter instead of Fragments — should not affect deleted_fragments
3096        let mut valid_ids = lance_core::utils::mask::RowAddrTreeMap::new();
3097        valid_ids.insert(0);
3098        let old_data_filter = Some(crate::scalar::OldIndexDataFilter::RowIds(valid_ids));
3099
3100        use crate::scalar::ScalarIndex;
3101        index
3102            .update(stream2, update_store.as_ref(), old_data_filter)
3103            .await
3104            .unwrap();
3105
3106        let updated_index =
3107            InvertedIndex::load(update_store.clone(), None, &LanceCache::no_cache())
3108                .await
3109                .unwrap();
3110        assert!(
3111            updated_index.deleted_fragments().is_empty(),
3112            "RowIds filter should not add to deleted fragments"
3113        );
3114    }
3115}