lance_index/scalar/inverted/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use super::{
5    index::*,
6    merger::{Merger, SizeBasedMerger},
7    InvertedIndexParams,
8};
9use crate::scalar::inverted::json::JsonTextStream;
10use crate::scalar::inverted::lance_tokenizer::DocType;
11use crate::scalar::inverted::tokenizer::lance_tokenizer::LanceTokenizer;
12use crate::scalar::lance_format::LanceIndexStore;
13use crate::scalar::IndexStore;
14use crate::vector::graph::OrderedFloat;
15use arrow::datatypes;
16use arrow::{array::AsArray, compute::concat_batches};
17use arrow_array::{Array, RecordBatch, UInt64Array};
18use arrow_schema::{DataType, Field, Schema, SchemaRef};
19use bitpacking::{BitPacker, BitPacker4x};
20use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream};
21use deepsize::DeepSizeOf;
22use futures::{stream, Stream, StreamExt, TryStreamExt};
23use lance_arrow::json::JSON_EXT_NAME;
24use lance_arrow::{iter_str_array, ARROW_EXT_NAME_KEY};
25use lance_core::utils::tokio::get_num_compute_intensive_cpus;
26use lance_core::{cache::LanceCache, utils::tokio::spawn_cpu};
27use lance_core::{error::LanceOptionExt, utils::tempfile::TempDir};
28use lance_core::{Error, Result, ROW_ID, ROW_ID_FIELD};
29use lance_io::object_store::ObjectStore;
30use object_store::path::Path;
31use snafu::location;
32use std::collections::HashMap;
33use std::pin::Pin;
34use std::str::FromStr;
35use std::sync::Arc;
36use std::sync::LazyLock;
37use std::task::{Context, Poll};
38use std::{fmt::Debug, sync::atomic::AtomicU64};
39use tracing::instrument;
40
41// the number of elements in each block
42// each block contains 128 row ids and 128 frequencies
43// WARNING: changing this value will break the compatibility with existing indexes
44pub const BLOCK_SIZE: usize = BitPacker4x::BLOCK_LEN;
45
46// the (compressed) size of each flush for posting lists in MiB,
47// when the `LANCE_FTS_FLUSH_THRESHOLD` is reached, the flush will be triggered,
48// higher for better indexing performance, but more memory usage,
49// it's in 16 MiB by default
50static LANCE_FTS_FLUSH_SIZE: LazyLock<usize> = LazyLock::new(|| {
51    std::env::var("LANCE_FTS_FLUSH_SIZE")
52        .unwrap_or_else(|_| "16".to_string())
53        .parse()
54        .expect("failed to parse LANCE_FTS_FLUSH_SIZE")
55});
56// the number of shards to split the indexing work,
57// the indexing process would spawn `LANCE_FTS_NUM_SHARDS` workers to build FTS,
58// higher for faster indexing performance, but more memory usage,
59// it's `the number of compute intensive CPUs` by default
60pub static LANCE_FTS_NUM_SHARDS: LazyLock<usize> = LazyLock::new(|| {
61    std::env::var("LANCE_FTS_NUM_SHARDS")
62        .unwrap_or_else(|_| get_num_compute_intensive_cpus().to_string())
63        .parse()
64        .expect("failed to parse LANCE_FTS_NUM_SHARDS")
65});
66// the partition size limit in MiB (uncompressed format)
67// higher for better indexing & query performance, but more memory usage,
68pub static LANCE_FTS_PARTITION_SIZE: LazyLock<u64> = LazyLock::new(|| {
69    std::env::var("LANCE_FTS_PARTITION_SIZE")
70        .unwrap_or_else(|_| "256".to_string())
71        .parse()
72        .expect("failed to parse LANCE_FTS_PARTITION_SIZE")
73});
74// the target size of partition after merging in MiB (uncompressed format)
75pub static LANCE_FTS_TARGET_SIZE: LazyLock<u64> = LazyLock::new(|| {
76    std::env::var("LANCE_FTS_TARGET_SIZE")
77        .unwrap_or_else(|_| "4096".to_string())
78        .parse()
79        .expect("failed to parse LANCE_FTS_TARGET_SIZE")
80});
81
82#[derive(Debug)]
83pub struct InvertedIndexBuilder {
84    params: InvertedIndexParams,
85    pub(crate) partitions: Vec<u64>,
86    new_partitions: Vec<u64>,
87    fragment_mask: Option<u64>,
88    token_set_format: TokenSetFormat,
89    _tmpdir: TempDir,
90    local_store: Arc<dyn IndexStore>,
91    src_store: Arc<dyn IndexStore>,
92}
93
94impl InvertedIndexBuilder {
95    pub fn new(params: InvertedIndexParams) -> Self {
96        Self::new_with_fragment_mask(params, None)
97    }
98
99    pub fn new_with_fragment_mask(params: InvertedIndexParams, fragment_mask: Option<u64>) -> Self {
100        Self::from_existing_index(
101            params,
102            None,
103            Vec::new(),
104            TokenSetFormat::default(),
105            fragment_mask,
106        )
107    }
108
109    /// Creates an InvertedIndexBuilder from existing index with fragment filtering.
110    /// This method is used to create a builder from an existing index while applying
111    /// fragment-based filtering for distributed indexing scenarios.
112    /// fragment_mask Optional mask with fragment_id in high 32 bits for filtering.
113    /// Constructed as `(fragment_id as u64) << 32`.
114    /// When provided, ensures that generated IDs belong to the specified fragment.
115    pub fn from_existing_index(
116        params: InvertedIndexParams,
117        store: Option<Arc<dyn IndexStore>>,
118        partitions: Vec<u64>,
119        token_set_format: TokenSetFormat,
120        fragment_mask: Option<u64>,
121    ) -> Self {
122        let tmpdir = TempDir::default();
123        let local_store = Arc::new(LanceIndexStore::new(
124            ObjectStore::local().into(),
125            tmpdir.obj_path(),
126            Arc::new(LanceCache::no_cache()),
127        ));
128        let src_store = store.unwrap_or_else(|| local_store.clone());
129        Self {
130            params,
131            partitions,
132            new_partitions: Vec::new(),
133            _tmpdir: tmpdir,
134            local_store,
135            src_store,
136            token_set_format,
137            fragment_mask,
138        }
139    }
140
141    pub async fn update(
142        &mut self,
143        new_data: SendableRecordBatchStream,
144        dest_store: &dyn IndexStore,
145    ) -> Result<()> {
146        let schema = new_data.schema();
147        let doc_col = schema.field(0).name();
148
149        // infer lance_tokenizer based on document type
150        if self.params.lance_tokenizer.is_none() {
151            let schema = new_data.schema();
152            let field = schema.column_with_name(doc_col).expect_ok()?.1;
153            let doc_type = DocType::try_from(field)?;
154            self.params.lance_tokenizer = Some(doc_type.as_ref().to_string());
155        }
156
157        let new_data = document_input(new_data, doc_col)?;
158
159        self.update_index(new_data).await?;
160        self.write(dest_store).await?;
161        Ok(())
162    }
163
164    #[instrument(level = "debug", skip_all)]
165    async fn update_index(&mut self, stream: SendableRecordBatchStream) -> Result<()> {
166        let num_workers = *LANCE_FTS_NUM_SHARDS;
167        let tokenizer = self.params.build()?;
168        let with_position = self.params.with_position;
169        let next_id = self.partitions.iter().map(|id| id + 1).max().unwrap_or(0);
170        let id_alloc = Arc::new(AtomicU64::new(next_id));
171        let (sender, receiver) = async_channel::bounded(num_workers);
172        let mut index_tasks = Vec::with_capacity(num_workers);
173        for _ in 0..num_workers {
174            let store = self.local_store.clone();
175            let tokenizer = tokenizer.clone();
176            let receiver = receiver.clone();
177            let id_alloc = id_alloc.clone();
178            let fragment_mask = self.fragment_mask;
179            let token_set_format = self.token_set_format;
180            let task = tokio::task::spawn(async move {
181                let mut worker = IndexWorker::new(
182                    store,
183                    tokenizer,
184                    with_position,
185                    id_alloc,
186                    fragment_mask,
187                    token_set_format,
188                )
189                .await?;
190                while let Ok(batch) = receiver.recv().await {
191                    worker.process_batch(batch).await?;
192                }
193                let partitions = worker.finish().await?;
194                Result::Ok(partitions)
195            });
196            index_tasks.push(task);
197        }
198
199        let sender = Arc::new(sender);
200
201        let mut stream = Box::pin(stream.then({
202            |batch_result| {
203                let sender = sender.clone();
204                async move {
205                    let sender = sender.clone();
206                    let batch = batch_result?;
207                    let num_rows = batch.num_rows();
208                    sender.send(batch).await.expect("failed to send batch");
209                    Result::Ok(num_rows)
210                }
211            }
212        }));
213        log::info!("indexing FTS with {} workers", num_workers);
214
215        let mut last_num_rows = 0;
216        let mut total_num_rows = 0;
217        let start = std::time::Instant::now();
218        while let Some(num_rows) = stream.try_next().await? {
219            total_num_rows += num_rows;
220            if total_num_rows >= last_num_rows + 1_000_000 {
221                log::debug!(
222                    "indexed {} documents, elapsed: {:?}, speed: {}rows/s",
223                    total_num_rows,
224                    start.elapsed(),
225                    total_num_rows as f32 / start.elapsed().as_secs_f32()
226                );
227                last_num_rows = total_num_rows;
228            }
229        }
230        // drop the sender to stop receivers
231        drop(stream);
232        debug_assert_eq!(sender.sender_count(), 1);
233        drop(sender);
234        log::info!("dispatching elapsed: {:?}", start.elapsed());
235
236        // wait for the workers to finish
237        let start = std::time::Instant::now();
238        for index_task in index_tasks {
239            self.new_partitions.extend(index_task.await??);
240        }
241        log::info!("wait workers indexing elapsed: {:?}", start.elapsed());
242        Ok(())
243    }
244
245    pub async fn remap(
246        &mut self,
247        mapping: &HashMap<u64, Option<u64>>,
248        src_store: Arc<dyn IndexStore>,
249        dest_store: &dyn IndexStore,
250    ) -> Result<()> {
251        for part in self.partitions.iter() {
252            let part = InvertedPartition::load(
253                src_store.clone(),
254                *part,
255                None,
256                &LanceCache::no_cache(),
257                self.token_set_format,
258            )
259            .await?;
260            let mut builder = part.into_builder().await?;
261            builder.remap(mapping).await?;
262            builder.write(dest_store).await?;
263        }
264        if self.fragment_mask.is_none() {
265            self.write_metadata(dest_store, &self.partitions).await?;
266        } else {
267            // in distributed mode, the part_temp_metadata is written by the worker
268            for &partition_id in &self.partitions {
269                self.write_part_metadata(dest_store, partition_id).await?;
270            }
271        }
272        Ok(())
273    }
274
275    async fn write_metadata(&self, dest_store: &dyn IndexStore, partitions: &[u64]) -> Result<()> {
276        let metadata = HashMap::from_iter(vec![
277            ("partitions".to_owned(), serde_json::to_string(&partitions)?),
278            ("params".to_owned(), serde_json::to_string(&self.params)?),
279            (
280                TOKEN_SET_FORMAT_KEY.to_owned(),
281                self.token_set_format.to_string(),
282            ),
283        ]);
284        let mut writer = dest_store
285            .new_index_file(METADATA_FILE, Arc::new(Schema::empty()))
286            .await?;
287        writer.finish_with_metadata(metadata).await?;
288        Ok(())
289    }
290
291    /// Write partition metadata file for a single partition
292    ///
293    /// In a distributed environment, each worker node can write partition metadata files for the partitions it processes,
294    /// which are then merged into a final metadata file using the `merge_metadata_files` function.
295    pub(crate) async fn write_part_metadata(
296        &self,
297        dest_store: &dyn IndexStore,
298        partition: u64, // Modify parameter type
299    ) -> Result<()> {
300        let partitions = vec![partition];
301        let metadata = HashMap::from_iter(vec![
302            ("partitions".to_owned(), serde_json::to_string(&partitions)?),
303            ("params".to_owned(), serde_json::to_string(&self.params)?),
304            (
305                TOKEN_SET_FORMAT_KEY.to_owned(),
306                self.token_set_format.to_string(),
307            ),
308        ]);
309        // Use partition ID to generate a unique temporary filename
310        let file_name = part_metadata_file_path(partition);
311        let mut writer = dest_store
312            .new_index_file(&file_name, Arc::new(Schema::empty()))
313            .await?;
314        writer.finish_with_metadata(metadata).await?;
315        Ok(())
316    }
317
318    async fn write(&self, dest_store: &dyn IndexStore) -> Result<()> {
319        let no_cache = LanceCache::no_cache();
320        let partitions = futures::future::try_join_all(
321            self.partitions
322                .iter()
323                .map(|part| {
324                    InvertedPartition::load(
325                        self.src_store.clone(),
326                        *part,
327                        None,
328                        &no_cache,
329                        self.token_set_format,
330                    )
331                })
332                .chain(self.new_partitions.iter().map(|part| {
333                    InvertedPartition::load(
334                        self.local_store.clone(),
335                        *part,
336                        None,
337                        &no_cache,
338                        self.token_set_format,
339                    )
340                })),
341        )
342        .await?;
343        let mut merger = SizeBasedMerger::new(
344            dest_store,
345            partitions,
346            *LANCE_FTS_TARGET_SIZE << 20,
347            self.token_set_format,
348        );
349        let partitions = merger.merge().await?;
350
351        if self.fragment_mask.is_none() {
352            self.write_metadata(dest_store, &partitions).await?;
353        } else {
354            for &partition_id in &partitions {
355                self.write_part_metadata(dest_store, partition_id).await?;
356            }
357        }
358        Ok(())
359    }
360}
361
362impl Default for InvertedIndexBuilder {
363    fn default() -> Self {
364        let params = InvertedIndexParams::default();
365        Self::new(params)
366    }
367}
368
369// builder for single partition
370#[derive(Debug)]
371pub struct InnerBuilder {
372    id: u64,
373    with_position: bool,
374    token_set_format: TokenSetFormat,
375    pub(crate) tokens: TokenSet,
376    pub(crate) posting_lists: Vec<PostingListBuilder>,
377    pub(crate) docs: DocSet,
378}
379
380impl InnerBuilder {
381    pub fn new(id: u64, with_position: bool, token_set_format: TokenSetFormat) -> Self {
382        Self {
383            id,
384            with_position,
385            token_set_format,
386            tokens: TokenSet::default(),
387            posting_lists: Vec::new(),
388            docs: DocSet::default(),
389        }
390    }
391
392    pub fn id(&self) -> u64 {
393        self.id
394    }
395
396    pub async fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
397        // for the docs, we need to remove the rows that are removed from the doc set,
398        // and update the row ids of the rows that are updated
399        let removed = self.docs.remap(mapping);
400
401        // for the posting lists, we need to remap the doc ids:
402        // - if the a row is removed, we need to shift the doc ids of the following rows
403        // - if a row is updated (assigned a new row id), we don't need to do anything with the posting lists
404        let mut token_id = 0;
405        let mut removed_token_ids = Vec::new();
406        self.posting_lists.retain_mut(|posting_list| {
407            posting_list.remap(&removed);
408            let keep = !posting_list.is_empty();
409            if !keep {
410                removed_token_ids.push(token_id as u32);
411            }
412            token_id += 1;
413            keep
414        });
415
416        // for the tokens, remap the token ids if any posting list is empty
417        self.tokens.remap(&removed_token_ids);
418
419        Ok(())
420    }
421
422    pub async fn write(&mut self, store: &dyn IndexStore) -> Result<()> {
423        let docs = Arc::new(std::mem::take(&mut self.docs));
424        self.write_posting_lists(store, docs.clone()).await?;
425        self.write_tokens(store).await?;
426        self.write_docs(store, docs).await?;
427        Ok(())
428    }
429
430    #[instrument(level = "debug", skip_all)]
431    async fn write_posting_lists(
432        &mut self,
433        store: &dyn IndexStore,
434        docs: Arc<DocSet>,
435    ) -> Result<()> {
436        let id = self.id;
437        let mut writer = store
438            .new_index_file(
439                &posting_file_path(self.id),
440                inverted_list_schema(self.with_position),
441            )
442            .await?;
443        let posting_lists = std::mem::take(&mut self.posting_lists);
444
445        log::info!(
446            "writing {} posting lists of partition {}, with position {}",
447            posting_lists.len(),
448            id,
449            self.with_position
450        );
451        let schema = inverted_list_schema(self.with_position);
452
453        let mut batches = stream::iter(posting_lists)
454            .map(|posting_list| {
455                let block_max_scores = docs.calculate_block_max_scores(
456                    posting_list.doc_ids.iter(),
457                    posting_list.frequencies.iter(),
458                );
459                spawn_cpu(move || posting_list.to_batch(block_max_scores))
460            })
461            .buffered(get_num_compute_intensive_cpus());
462
463        let mut write_duration = std::time::Duration::ZERO;
464        let mut num_posting_lists = 0;
465        let mut buffer = Vec::new();
466        let mut size_sum = 0;
467        while let Some(batch) = batches.try_next().await? {
468            num_posting_lists += 1;
469            size_sum += batch.get_array_memory_size();
470            buffer.push(batch);
471            if size_sum >= *LANCE_FTS_FLUSH_SIZE << 20 {
472                let batch = concat_batches(&schema, buffer.iter())?;
473                buffer.clear();
474                size_sum = 0;
475                let start = std::time::Instant::now();
476                writer.write_record_batch(batch).await?;
477                write_duration += start.elapsed();
478            }
479
480            if num_posting_lists % 500_000 == 0 {
481                log::info!(
482                    "wrote {} posting lists of partition {}, writing elapsed: {:?}",
483                    num_posting_lists,
484                    id,
485                    write_duration,
486                );
487            }
488        }
489        if !buffer.is_empty() {
490            let batch = concat_batches(&schema, buffer.iter())?;
491            writer.write_record_batch(batch).await?;
492        }
493
494        writer.finish().await?;
495        Ok(())
496    }
497
498    #[instrument(level = "debug", skip_all)]
499    async fn write_tokens(&mut self, store: &dyn IndexStore) -> Result<()> {
500        log::info!("writing tokens of partition {}", self.id);
501        let tokens = std::mem::take(&mut self.tokens);
502        let batch = tokens.to_batch(self.token_set_format)?;
503        let mut writer = store
504            .new_index_file(&token_file_path(self.id), batch.schema())
505            .await?;
506        writer.write_record_batch(batch).await?;
507        writer.finish().await?;
508        Ok(())
509    }
510
511    #[instrument(level = "debug", skip_all)]
512    async fn write_docs(&mut self, store: &dyn IndexStore, docs: Arc<DocSet>) -> Result<()> {
513        log::info!("writing docs of partition {}", self.id);
514        let batch = docs.to_batch()?;
515        let mut writer = store
516            .new_index_file(&doc_file_path(self.id), batch.schema())
517            .await?;
518        writer.write_record_batch(batch).await?;
519        writer.finish().await?;
520        Ok(())
521    }
522}
523
524struct IndexWorker {
525    store: Arc<dyn IndexStore>,
526    tokenizer: Box<dyn LanceTokenizer>,
527    id_alloc: Arc<AtomicU64>,
528    builder: InnerBuilder,
529    partitions: Vec<u64>,
530    schema: SchemaRef,
531    estimated_size: u64,
532    total_doc_length: usize,
533    fragment_mask: Option<u64>,
534    token_set_format: TokenSetFormat,
535}
536
537impl IndexWorker {
538    async fn new(
539        store: Arc<dyn IndexStore>,
540        tokenizer: Box<dyn LanceTokenizer>,
541        with_position: bool,
542        id_alloc: Arc<AtomicU64>,
543        fragment_mask: Option<u64>,
544        token_set_format: TokenSetFormat,
545    ) -> Result<Self> {
546        let schema = inverted_list_schema(with_position);
547
548        Ok(Self {
549            store,
550            tokenizer,
551            builder: InnerBuilder::new(
552                id_alloc.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
553                    | fragment_mask.unwrap_or(0),
554                with_position,
555                token_set_format,
556            ),
557            partitions: Vec::new(),
558            id_alloc,
559            schema,
560            estimated_size: 0,
561            total_doc_length: 0,
562            fragment_mask,
563            token_set_format,
564        })
565    }
566
567    fn has_position(&self) -> bool {
568        self.schema.column_with_name(POSITION_COL).is_some()
569    }
570
571    async fn process_batch(&mut self, batch: RecordBatch) -> Result<()> {
572        let doc_col = batch.column(0);
573        let doc_iter = iter_str_array(doc_col);
574        let row_id_col = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
575        let docs = doc_iter
576            .zip(row_id_col.values().iter())
577            .filter_map(|(doc, row_id)| doc.map(|doc| (doc, *row_id)));
578
579        let with_position = self.has_position();
580        for (doc, row_id) in docs {
581            let mut token_occurrences = HashMap::new();
582            let mut token_num = 0;
583            {
584                let mut token_stream = self.tokenizer.token_stream_for_doc(doc);
585                while token_stream.advance() {
586                    let token = token_stream.token_mut();
587                    let token_text = std::mem::take(&mut token.text);
588                    let token_id = self.builder.tokens.add(token_text) as usize;
589                    token_occurrences
590                        .entry(token_id as u32)
591                        .or_insert_with(|| PositionRecorder::new(with_position))
592                        .push(token.position as u32);
593                    token_num += 1;
594                }
595            }
596            self.builder
597                .posting_lists
598                .resize_with(self.builder.tokens.len(), || {
599                    PostingListBuilder::new(with_position)
600                });
601            let doc_id = self.builder.docs.append(row_id, token_num);
602            self.total_doc_length += doc.len();
603
604            token_occurrences
605                .into_iter()
606                .for_each(|(token_id, term_positions)| {
607                    let posting_list = &mut self.builder.posting_lists[token_id as usize];
608
609                    let old_size = posting_list.size();
610                    posting_list.add(doc_id, term_positions);
611                    let new_size = posting_list.size();
612                    self.estimated_size += new_size - old_size;
613                });
614
615            if self.builder.docs.len() as u32 == u32::MAX
616                || self.estimated_size >= *LANCE_FTS_PARTITION_SIZE << 20
617            {
618                self.flush().await?;
619            }
620        }
621
622        Ok(())
623    }
624
625    #[instrument(level = "debug", skip_all)]
626    async fn flush(&mut self) -> Result<()> {
627        if self.builder.tokens.is_empty() {
628            return Ok(());
629        }
630
631        log::info!(
632            "flushing posting lists, estimated size: {} MiB",
633            self.estimated_size / (1024 * 1024)
634        );
635        self.estimated_size = 0;
636        let with_position = self.has_position();
637        let mut builder = std::mem::replace(
638            &mut self.builder,
639            InnerBuilder::new(
640                self.id_alloc
641                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
642                    | self.fragment_mask.unwrap_or(0),
643                with_position,
644                self.token_set_format,
645            ),
646        );
647        builder.write(self.store.as_ref()).await?;
648        self.partitions.push(builder.id());
649        Ok(())
650    }
651
652    async fn finish(mut self) -> Result<Vec<u64>> {
653        if !self.builder.tokens.is_empty() {
654            self.flush().await?;
655        }
656        Ok(self.partitions)
657    }
658}
659
660#[derive(Debug, Clone)]
661pub enum PositionRecorder {
662    Position(Vec<u32>),
663    Count(u32),
664}
665
666impl PositionRecorder {
667    fn new(with_position: bool) -> Self {
668        if with_position {
669            Self::Position(Vec::new())
670        } else {
671            Self::Count(0)
672        }
673    }
674
675    fn push(&mut self, position: u32) {
676        match self {
677            Self::Position(positions) => positions.push(position),
678            Self::Count(count) => *count += 1,
679        }
680    }
681
682    pub fn len(&self) -> u32 {
683        match self {
684            Self::Position(positions) => positions.len() as u32,
685            Self::Count(count) => *count,
686        }
687    }
688
689    pub fn is_empty(&self) -> bool {
690        self.len() == 0
691    }
692
693    pub fn into_vec(self) -> Vec<u32> {
694        match self {
695            Self::Position(positions) => positions,
696            Self::Count(_) => vec![0],
697        }
698    }
699}
700
701#[derive(Debug, Eq, PartialEq, Clone, DeepSizeOf)]
702pub struct ScoredDoc {
703    pub row_id: u64,
704    pub score: OrderedFloat,
705}
706
707impl ScoredDoc {
708    pub fn new(row_id: u64, score: f32) -> Self {
709        Self {
710            row_id,
711            score: OrderedFloat(score),
712        }
713    }
714}
715
716impl PartialOrd for ScoredDoc {
717    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
718        Some(self.cmp(other))
719    }
720}
721
722impl Ord for ScoredDoc {
723    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
724        self.score.cmp(&other.score)
725    }
726}
727
728pub fn legacy_inverted_list_schema(with_position: bool) -> SchemaRef {
729    let mut fields = vec![
730        arrow_schema::Field::new(ROW_ID, arrow_schema::DataType::UInt64, false),
731        arrow_schema::Field::new(FREQUENCY_COL, arrow_schema::DataType::Float32, false),
732    ];
733    if with_position {
734        fields.push(arrow_schema::Field::new(
735            POSITION_COL,
736            arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
737                "item",
738                arrow_schema::DataType::Int32,
739                true,
740            ))),
741            false,
742        ));
743    }
744    Arc::new(arrow_schema::Schema::new(fields))
745}
746
747pub fn inverted_list_schema(with_position: bool) -> SchemaRef {
748    let mut fields = vec![
749        // we compress the posting lists (including row ids and frequencies),
750        // and store the compressed posting lists, so it's a large binary array
751        arrow_schema::Field::new(
752            POSTING_COL,
753            datatypes::DataType::List(Arc::new(Field::new(
754                "item",
755                datatypes::DataType::LargeBinary,
756                true,
757            ))),
758            false,
759        ),
760        arrow_schema::Field::new(MAX_SCORE_COL, datatypes::DataType::Float32, false),
761        arrow_schema::Field::new(LENGTH_COL, datatypes::DataType::UInt32, false),
762    ];
763    if with_position {
764        fields.push(arrow_schema::Field::new(
765            POSITION_COL,
766            arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
767                "item",
768                arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
769                    "item",
770                    arrow_schema::DataType::LargeBinary,
771                    true,
772                ))),
773                true,
774            ))),
775            false,
776        ));
777    }
778    Arc::new(arrow_schema::Schema::new(fields))
779}
780
781/// Flatten the string list stream into a string stream
782pub struct FlattenStream {
783    /// Inner record batch stream with 2 columns:
784    /// 1. doc_col: List(Utf8) or List(LargeUtf8)
785    /// 2. row_id_col: UInt64
786    inner: SendableRecordBatchStream,
787    field_type: DataType,
788    data_type: DataType,
789}
790
791impl FlattenStream {
792    pub fn new(input: SendableRecordBatchStream) -> Self {
793        let schema = input.schema();
794        let field = schema.field(0);
795        let data_type = match field.data_type() {
796            DataType::List(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
797            DataType::List(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
798                DataType::LargeUtf8
799            }
800            DataType::LargeList(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
801            DataType::LargeList(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
802                DataType::LargeUtf8
803            }
804            _ => panic!(
805                "expect data type List(Utf8) or List(LargeUtf8) but got {:?}",
806                field.data_type()
807            ),
808        };
809        Self {
810            inner: input,
811            field_type: field.data_type().clone(),
812            data_type,
813        }
814    }
815}
816
817impl Stream for FlattenStream {
818    type Item = datafusion_common::Result<RecordBatch>;
819
820    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
821        match Pin::new(&mut self.inner).poll_next(cx) {
822            Poll::Ready(Some(Ok(batch))) => {
823                let doc_col = batch.column(0);
824                let batch = match self.field_type {
825                    DataType::List(_) => flatten_string_list::<i32>(&batch, doc_col).map_err(|e| {
826                        datafusion_common::error::DataFusionError::Execution(format!(
827                            "flatten string list error: {}",
828                            e
829                        ))
830                    }),
831                    DataType::LargeList(_) => {
832                        flatten_string_list::<i64>(&batch, doc_col).map_err(|e| {
833                            datafusion_common::error::DataFusionError::Execution(format!(
834                                "flatten string list error: {}",
835                                e
836                            ))
837                        })
838                    }
839                    _ => unreachable!(
840                        "expect data type List or LargeList but got {:?}",
841                        self.field_type
842                    ),
843                };
844                Poll::Ready(Some(batch))
845            }
846            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
847            Poll::Ready(None) => Poll::Ready(None),
848            Poll::Pending => Poll::Pending,
849        }
850    }
851}
852
853impl RecordBatchStream for FlattenStream {
854    fn schema(&self) -> SchemaRef {
855        let schema = Schema::new(vec![
856            Field::new(
857                self.inner.schema().field(0).name(),
858                self.data_type.clone(),
859                true,
860            ),
861            ROW_ID_FIELD.clone(),
862        ]);
863
864        Arc::new(schema)
865    }
866}
867
868fn flatten_string_list<Offset: arrow::array::OffsetSizeTrait>(
869    batch: &RecordBatch,
870    doc_col: &Arc<dyn Array>,
871) -> Result<RecordBatch> {
872    let docs = doc_col.as_list::<Offset>();
873    let row_ids = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
874
875    let row_ids = row_ids
876        .values()
877        .iter()
878        .zip(docs.iter())
879        .flat_map(|(row_id, doc)| std::iter::repeat_n(*row_id, doc.map(|d| d.len()).unwrap_or(0)));
880
881    let row_ids = Arc::new(UInt64Array::from_iter_values(row_ids));
882    let docs = match docs.value_type() {
883        datatypes::DataType::Utf8 | datatypes::DataType::LargeUtf8 => docs.values().clone(),
884        _ => {
885            return Err(Error::Index {
886                message: format!(
887                    "expect data type String or LargeString but got {}",
888                    docs.value_type()
889                ),
890                location: location!(),
891            });
892        }
893    };
894
895    let schema = Schema::new(vec![
896        Field::new(
897            batch.schema().field(0).name(),
898            docs.data_type().clone(),
899            true,
900        ),
901        ROW_ID_FIELD.clone(),
902    ]);
903    let batch = RecordBatch::try_new(Arc::new(schema), vec![docs, row_ids])?;
904    Ok(batch)
905}
906
907pub(crate) fn token_file_path(partition_id: u64) -> String {
908    format!("part_{}_{}", partition_id, TOKENS_FILE)
909}
910
911pub(crate) fn posting_file_path(partition_id: u64) -> String {
912    format!("part_{}_{}", partition_id, INVERT_LIST_FILE)
913}
914
915pub(crate) fn doc_file_path(partition_id: u64) -> String {
916    format!("part_{}_{}", partition_id, DOCS_FILE)
917}
918
919pub(crate) fn part_metadata_file_path(partition_id: u64) -> String {
920    format!("part_{}_{}", partition_id, METADATA_FILE)
921}
922
923pub async fn merge_index_files(
924    object_store: &ObjectStore,
925    index_dir: &Path,
926    store: Arc<dyn IndexStore>,
927) -> Result<()> {
928    // List all partition metadata files in the index directory
929    let part_metadata_files = list_metadata_files(object_store, index_dir).await?;
930
931    // Call merge_metadata_files function for inverted index
932    merge_metadata_files(store, &part_metadata_files).await
933}
934
935/// List and filter metadata files from the index directory
936/// Returns partition metadata files
937async fn list_metadata_files(object_store: &ObjectStore, index_dir: &Path) -> Result<Vec<String>> {
938    // List all partition metadata files in the index directory
939    let mut part_metadata_files = Vec::new();
940    let mut list_stream = object_store.list(Some(index_dir.clone()));
941
942    while let Some(item) = list_stream.next().await {
943        match item {
944            Ok(meta) => {
945                let file_name = meta.location.filename().unwrap_or_default();
946                // Filter files matching the pattern part_*_metadata.lance
947                if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
948                    part_metadata_files.push(file_name.to_string());
949                }
950            }
951            Err(_) => continue,
952        }
953    }
954
955    if part_metadata_files.is_empty() {
956        return Err(Error::InvalidInput {
957            source: format!(
958                "No partition metadata files found in index directory: {}",
959                index_dir
960            )
961            .into(),
962            location: location!(),
963        });
964    }
965
966    Ok(part_metadata_files)
967}
968
969/// Merge partition metadata files with partition ID remapping to sequential IDs starting from 0
970async fn merge_metadata_files(
971    store: Arc<dyn IndexStore>,
972    part_metadata_files: &[String],
973) -> Result<()> {
974    // Collect all partition IDs and params
975    let mut all_partitions = Vec::new();
976    let mut params = None;
977    let mut token_set_format = None;
978
979    for file_name in part_metadata_files {
980        let reader = store.open_index_file(file_name).await?;
981        let metadata = &reader.schema().metadata;
982
983        let partitions_str = metadata.get("partitions").ok_or(Error::Index {
984            message: format!("partitions not found in {}", file_name),
985            location: location!(),
986        })?;
987
988        let partition_ids: Vec<u64> =
989            serde_json::from_str(partitions_str).map_err(|e| Error::Index {
990                message: format!("Failed to parse partitions: {}", e),
991                location: location!(),
992            })?;
993
994        all_partitions.extend(partition_ids);
995
996        if params.is_none() {
997            let params_str = metadata.get("params").ok_or(Error::Index {
998                message: format!("params not found in {}", file_name),
999                location: location!(),
1000            })?;
1001            params = Some(
1002                serde_json::from_str::<InvertedIndexParams>(params_str).map_err(|e| {
1003                    Error::Index {
1004                        message: format!("Failed to parse params: {}", e),
1005                        location: location!(),
1006                    }
1007                })?,
1008            );
1009        }
1010
1011        if token_set_format.is_none() {
1012            if let Some(name) = metadata.get(TOKEN_SET_FORMAT_KEY) {
1013                token_set_format = Some(TokenSetFormat::from_str(name)?);
1014            }
1015        }
1016    }
1017
1018    // Create ID mapping: sorted original IDs -> 0,1,2...
1019    let mut sorted_ids = all_partitions.clone();
1020    sorted_ids.sort();
1021    sorted_ids.dedup();
1022
1023    let id_mapping: HashMap<u64, u64> = sorted_ids
1024        .iter()
1025        .enumerate()
1026        .map(|(new_id, &old_id)| (old_id, new_id as u64))
1027        .collect();
1028
1029    // Safe rename partition files using temporary files to avoid overwrite
1030    let timestamp = std::time::SystemTime::now()
1031        .duration_since(std::time::UNIX_EPOCH)
1032        .unwrap()
1033        .as_secs();
1034
1035    // Phase 1: Move files to temporary locations
1036    let mut temp_files: Vec<(String, String, String)> = Vec::new(); // (temp_path, old_path, final_path)
1037
1038    for (&old_id, &new_id) in &id_mapping {
1039        if old_id != new_id {
1040            for suffix in [TOKENS_FILE, INVERT_LIST_FILE, DOCS_FILE] {
1041                let old_path = format!("part_{}_{}", old_id, suffix);
1042                let new_path = format!("part_{}_{}", new_id, suffix);
1043                let temp_path = format!("temp_{}_{}", timestamp, old_path);
1044
1045                // Move to temporary location first to avoid overwrite
1046                if let Err(e) = store.rename_index_file(&old_path, &temp_path).await {
1047                    // Rollback phase 1: restore files from temp locations
1048                    for (temp_name, old_name, _) in temp_files.iter().rev() {
1049                        let _ = store.rename_index_file(temp_name, old_name).await;
1050                    }
1051                    return Err(Error::Index {
1052                        message: format!(
1053                            "Failed to move {} to temp {}: {}",
1054                            old_path, temp_path, e
1055                        ),
1056                        location: location!(),
1057                    });
1058                }
1059                temp_files.push((temp_path, old_path, new_path));
1060            }
1061        }
1062    }
1063
1064    // Phase 2: Move from temporary to final locations
1065    let mut completed_renames: Vec<(String, String)> = Vec::new(); // (final_path, temp_path)
1066
1067    for (temp_path, _old_path, final_path) in &temp_files {
1068        if let Err(e) = store.rename_index_file(temp_path, final_path).await {
1069            // Rollback phase 2: restore completed renames and remaining temps
1070            for (final_name, temp_name) in completed_renames.iter().rev() {
1071                let _ = store.rename_index_file(final_name, temp_name).await;
1072            }
1073            // Restore remaining temp files to original locations
1074            for (temp_name, orig_name, _) in temp_files.iter() {
1075                if !completed_renames.iter().any(|(_, t)| t == temp_name) {
1076                    let _ = store.rename_index_file(temp_name, orig_name).await;
1077                }
1078            }
1079            return Err(Error::Index {
1080                message: format!("Failed to rename {} to {}: {}", temp_path, final_path, e),
1081                location: location!(),
1082            });
1083        }
1084        completed_renames.push((final_path.clone(), temp_path.clone()));
1085    }
1086
1087    // Write merged metadata with remapped IDs
1088    let remapped_partitions: Vec<u64> = (0..id_mapping.len() as u64).collect();
1089    let params = params.unwrap_or_default();
1090    let token_set_format = token_set_format.unwrap_or(TokenSetFormat::Arrow);
1091    let builder = InvertedIndexBuilder::from_existing_index(
1092        params,
1093        None,
1094        remapped_partitions.clone(),
1095        token_set_format,
1096        None,
1097    );
1098    builder
1099        .write_metadata(&*store, &remapped_partitions)
1100        .await?;
1101
1102    // Cleanup partition metadata files
1103    for file_name in part_metadata_files {
1104        if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
1105            let _ = store.delete_index_file(file_name).await;
1106        }
1107    }
1108
1109    Ok(())
1110}
1111
1112/// Convert input stream into a stream of documents.
1113///
1114/// The input stream must be one of:
1115/// 1. Document in Utf8 or LargeUtf8 format.
1116/// 2. Document in List(Utf8) or List(LargeUtf8) format.
1117/// 3. Json document in LargeBinary format.
1118pub fn document_input(
1119    input: SendableRecordBatchStream,
1120    column: &str,
1121) -> Result<SendableRecordBatchStream> {
1122    let schema = input.schema();
1123    let field = schema.column_with_name(column).expect_ok()?.1;
1124    match field.data_type() {
1125        DataType::Utf8 | DataType::LargeUtf8 => Ok(input),
1126        DataType::List(field) | DataType::LargeList(field)
1127            if matches!(field.data_type(), DataType::Utf8 | DataType::LargeUtf8) =>
1128        {
1129            Ok(Box::pin(FlattenStream::new(input)))
1130        }
1131        DataType::LargeBinary => match field.metadata().get(ARROW_EXT_NAME_KEY) {
1132            Some(name) if name.as_str() == JSON_EXT_NAME => {
1133                Ok(Box::pin(JsonTextStream::new(input, column.to_string())))
1134            }
1135            _ => Err(Error::InvalidInput {
1136                source: format!("column {} is not json", column).into(),
1137                location: location!(),
1138            }),
1139        },
1140        _ => Err(Error::InvalidInput {
1141            source: format!(
1142                "column {} has type {}, is not utf8, large utf8 type/list, or large binary",
1143                column,
1144                field.data_type()
1145            )
1146            .into(),
1147            location: location!(),
1148        }),
1149    }
1150}