Skip to main content

lance_index/scalar/inverted/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use super::{
5    index::*,
6    merger::{Merger, PartitionSource, SizeBasedMerger},
7    InvertedIndexParams,
8};
9use crate::scalar::inverted::json::JsonTextStream;
10use crate::scalar::inverted::lance_tokenizer::DocType;
11use crate::scalar::inverted::tokenizer::lance_tokenizer::LanceTokenizer;
12use crate::scalar::lance_format::LanceIndexStore;
13use crate::scalar::IndexStore;
14use crate::vector::graph::OrderedFloat;
15use arrow::datatypes;
16use arrow::{array::AsArray, compute::concat_batches};
17use arrow_array::{Array, RecordBatch, UInt64Array};
18use arrow_schema::{DataType, Field, Schema, SchemaRef};
19use bitpacking::{BitPacker, BitPacker4x};
20use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream};
21use deepsize::DeepSizeOf;
22use futures::{stream, Stream, StreamExt, TryStreamExt};
23use lance_arrow::json::JSON_EXT_NAME;
24use lance_arrow::{iter_str_array, ARROW_EXT_NAME_KEY};
25use lance_core::utils::tokio::get_num_compute_intensive_cpus;
26use lance_core::{cache::LanceCache, utils::tokio::spawn_cpu};
27use lance_core::{error::LanceOptionExt, utils::tempfile::TempDir};
28use lance_core::{Error, Result, ROW_ID, ROW_ID_FIELD};
29use lance_io::object_store::ObjectStore;
30use object_store::path::Path;
31use smallvec::SmallVec;
32use snafu::location;
33use std::collections::HashMap;
34use std::pin::Pin;
35use std::str::FromStr;
36use std::sync::Arc;
37use std::sync::LazyLock;
38use std::task::{Context, Poll};
39use std::{fmt::Debug, sync::atomic::AtomicU64};
40use tracing::instrument;
41
42// the number of elements in each block
43// each block contains 128 row ids and 128 frequencies
44// WARNING: changing this value will break the compatibility with existing indexes
45pub const BLOCK_SIZE: usize = BitPacker4x::BLOCK_LEN;
46
47// the (compressed) size of each flush for posting lists in MiB,
48// when the `LANCE_FTS_FLUSH_THRESHOLD` is reached, the flush will be triggered,
49// higher for better indexing performance, but more memory usage,
50// it's in 16 MiB by default
51static LANCE_FTS_FLUSH_SIZE: LazyLock<usize> = LazyLock::new(|| {
52    std::env::var("LANCE_FTS_FLUSH_SIZE")
53        .unwrap_or_else(|_| "16".to_string())
54        .parse()
55        .expect("failed to parse LANCE_FTS_FLUSH_SIZE")
56});
57// the number of shards to split the indexing work,
58// the indexing process would spawn `LANCE_FTS_NUM_SHARDS` workers to build FTS,
59// higher for faster indexing performance, but more memory usage,
60// it's `the number of compute intensive CPUs` by default
61pub static LANCE_FTS_NUM_SHARDS: LazyLock<usize> = LazyLock::new(|| {
62    std::env::var("LANCE_FTS_NUM_SHARDS")
63        .unwrap_or_else(|_| get_num_compute_intensive_cpus().to_string())
64        .parse()
65        .expect("failed to parse LANCE_FTS_NUM_SHARDS")
66});
67// the partition size limit in MiB (uncompressed format)
68// higher for better indexing & query performance, but more memory usage,
69pub static LANCE_FTS_PARTITION_SIZE: LazyLock<u64> = LazyLock::new(|| {
70    std::env::var("LANCE_FTS_PARTITION_SIZE")
71        .unwrap_or_else(|_| "256".to_string())
72        .parse()
73        .expect("failed to parse LANCE_FTS_PARTITION_SIZE")
74});
75// the target size of partition after merging in MiB (uncompressed format)
76pub static LANCE_FTS_TARGET_SIZE: LazyLock<u64> = LazyLock::new(|| {
77    std::env::var("LANCE_FTS_TARGET_SIZE")
78        .unwrap_or_else(|_| "4096".to_string())
79        .parse()
80        .expect("failed to parse LANCE_FTS_TARGET_SIZE")
81});
82
83#[derive(Debug)]
84pub struct InvertedIndexBuilder {
85    params: InvertedIndexParams,
86    pub(crate) partitions: Vec<u64>,
87    new_partitions: Vec<u64>,
88    fragment_mask: Option<u64>,
89    token_set_format: TokenSetFormat,
90    _tmpdir: TempDir,
91    local_store: Arc<dyn IndexStore>,
92    src_store: Arc<dyn IndexStore>,
93}
94
95impl InvertedIndexBuilder {
96    pub fn new(params: InvertedIndexParams) -> Self {
97        Self::new_with_fragment_mask(params, None)
98    }
99
100    pub fn new_with_fragment_mask(params: InvertedIndexParams, fragment_mask: Option<u64>) -> Self {
101        Self::from_existing_index(
102            params,
103            None,
104            Vec::new(),
105            TokenSetFormat::default(),
106            fragment_mask,
107        )
108    }
109
110    /// Creates an InvertedIndexBuilder from existing index with fragment filtering.
111    /// This method is used to create a builder from an existing index while applying
112    /// fragment-based filtering for distributed indexing scenarios.
113    /// fragment_mask Optional mask with fragment_id in high 32 bits for filtering.
114    /// Constructed as `(fragment_id as u64) << 32`.
115    /// When provided, ensures that generated IDs belong to the specified fragment.
116    pub fn from_existing_index(
117        params: InvertedIndexParams,
118        store: Option<Arc<dyn IndexStore>>,
119        partitions: Vec<u64>,
120        token_set_format: TokenSetFormat,
121        fragment_mask: Option<u64>,
122    ) -> Self {
123        let tmpdir = TempDir::default();
124        let local_store = Arc::new(LanceIndexStore::new(
125            ObjectStore::local().into(),
126            tmpdir.obj_path(),
127            Arc::new(LanceCache::no_cache()),
128        ));
129        let src_store = store.unwrap_or_else(|| local_store.clone());
130        Self {
131            params,
132            partitions,
133            new_partitions: Vec::new(),
134            _tmpdir: tmpdir,
135            local_store,
136            src_store,
137            token_set_format,
138            fragment_mask,
139        }
140    }
141
142    pub async fn update(
143        &mut self,
144        new_data: SendableRecordBatchStream,
145        dest_store: &dyn IndexStore,
146    ) -> Result<()> {
147        let schema = new_data.schema();
148        let doc_col = schema.field(0).name();
149
150        // infer lance_tokenizer based on document type
151        if self.params.lance_tokenizer.is_none() {
152            let schema = new_data.schema();
153            let field = schema.column_with_name(doc_col).expect_ok()?.1;
154            let doc_type = DocType::try_from(field)?;
155            self.params.lance_tokenizer = Some(doc_type.as_ref().to_string());
156        }
157
158        let new_data = document_input(new_data, doc_col)?;
159
160        self.update_index(new_data).await?;
161        self.write(dest_store).await?;
162        Ok(())
163    }
164
165    #[instrument(level = "debug", skip_all)]
166    async fn update_index(&mut self, stream: SendableRecordBatchStream) -> Result<()> {
167        let num_workers = *LANCE_FTS_NUM_SHARDS;
168        let tokenizer = self.params.build()?;
169        let with_position = self.params.with_position;
170        let next_id = self.partitions.iter().map(|id| id + 1).max().unwrap_or(0);
171        let id_alloc = Arc::new(AtomicU64::new(next_id));
172        let (sender, receiver) = async_channel::bounded(num_workers);
173        let mut index_tasks = Vec::with_capacity(num_workers);
174        for _ in 0..num_workers {
175            let store = self.local_store.clone();
176            let tokenizer = tokenizer.clone();
177            let receiver = receiver.clone();
178            let id_alloc = id_alloc.clone();
179            let fragment_mask = self.fragment_mask;
180            let token_set_format = self.token_set_format;
181            let task = tokio::task::spawn(async move {
182                let mut worker = IndexWorker::new(
183                    store,
184                    tokenizer,
185                    with_position,
186                    id_alloc,
187                    fragment_mask,
188                    token_set_format,
189                )
190                .await?;
191                while let Ok(batch) = receiver.recv().await {
192                    worker.process_batch(batch).await?;
193                }
194                let partitions = worker.finish().await?;
195                Result::Ok(partitions)
196            });
197            index_tasks.push(task);
198        }
199
200        let sender = Arc::new(sender);
201
202        let mut stream = Box::pin(stream.then({
203            |batch_result| {
204                let sender = sender.clone();
205                async move {
206                    let sender = sender.clone();
207                    let batch = batch_result?;
208                    let num_rows = batch.num_rows();
209                    sender.send(batch).await.expect("failed to send batch");
210                    Result::Ok(num_rows)
211                }
212            }
213        }));
214        log::info!("indexing FTS with {} workers", num_workers);
215
216        let mut last_num_rows = 0;
217        let mut total_num_rows = 0;
218        let start = std::time::Instant::now();
219        while let Some(num_rows) = stream.try_next().await? {
220            total_num_rows += num_rows;
221            if total_num_rows >= last_num_rows + 1_000_000 {
222                log::debug!(
223                    "indexed {} documents, elapsed: {:?}, speed: {}rows/s",
224                    total_num_rows,
225                    start.elapsed(),
226                    total_num_rows as f32 / start.elapsed().as_secs_f32()
227                );
228                last_num_rows = total_num_rows;
229            }
230        }
231        // drop the sender to stop receivers
232        drop(stream);
233        debug_assert_eq!(sender.sender_count(), 1);
234        drop(sender);
235        log::info!("dispatching elapsed: {:?}", start.elapsed());
236
237        // wait for the workers to finish
238        let start = std::time::Instant::now();
239        for index_task in index_tasks {
240            self.new_partitions.extend(index_task.await??);
241        }
242        log::info!("wait workers indexing elapsed: {:?}", start.elapsed());
243        Ok(())
244    }
245
246    pub async fn remap(
247        &mut self,
248        mapping: &HashMap<u64, Option<u64>>,
249        src_store: Arc<dyn IndexStore>,
250        dest_store: &dyn IndexStore,
251    ) -> Result<()> {
252        for part in self.partitions.iter() {
253            let part = InvertedPartition::load(
254                src_store.clone(),
255                *part,
256                None,
257                &LanceCache::no_cache(),
258                self.token_set_format,
259            )
260            .await?;
261            let mut builder = part.into_builder().await?;
262            builder.remap(mapping).await?;
263            builder.write(dest_store).await?;
264        }
265        if self.fragment_mask.is_none() {
266            self.write_metadata(dest_store, &self.partitions).await?;
267        } else {
268            // in distributed mode, the part_temp_metadata is written by the worker
269            for &partition_id in &self.partitions {
270                self.write_part_metadata(dest_store, partition_id).await?;
271            }
272        }
273        Ok(())
274    }
275
276    async fn write_metadata(&self, dest_store: &dyn IndexStore, partitions: &[u64]) -> Result<()> {
277        let metadata = HashMap::from_iter(vec![
278            ("partitions".to_owned(), serde_json::to_string(&partitions)?),
279            ("params".to_owned(), serde_json::to_string(&self.params)?),
280            (
281                TOKEN_SET_FORMAT_KEY.to_owned(),
282                self.token_set_format.to_string(),
283            ),
284        ]);
285        let mut writer = dest_store
286            .new_index_file(METADATA_FILE, Arc::new(Schema::empty()))
287            .await?;
288        writer.finish_with_metadata(metadata).await?;
289        Ok(())
290    }
291
292    /// Write partition metadata file for a single partition
293    ///
294    /// In a distributed environment, each worker node can write partition metadata files for the partitions it processes,
295    /// which are then merged into a final metadata file using the `merge_metadata_files` function.
296    pub(crate) async fn write_part_metadata(
297        &self,
298        dest_store: &dyn IndexStore,
299        partition: u64, // Modify parameter type
300    ) -> Result<()> {
301        let partitions = vec![partition];
302        let metadata = HashMap::from_iter(vec![
303            ("partitions".to_owned(), serde_json::to_string(&partitions)?),
304            ("params".to_owned(), serde_json::to_string(&self.params)?),
305            (
306                TOKEN_SET_FORMAT_KEY.to_owned(),
307                self.token_set_format.to_string(),
308            ),
309        ]);
310        // Use partition ID to generate a unique temporary filename
311        let file_name = part_metadata_file_path(partition);
312        let mut writer = dest_store
313            .new_index_file(&file_name, Arc::new(Schema::empty()))
314            .await?;
315        writer.finish_with_metadata(metadata).await?;
316        Ok(())
317    }
318
319    async fn write(&self, dest_store: &dyn IndexStore) -> Result<()> {
320        if self.params.skip_merge {
321            let mut partitions =
322                Vec::with_capacity(self.partitions.len() + self.new_partitions.len());
323            partitions.extend_from_slice(&self.partitions);
324            partitions.extend_from_slice(&self.new_partitions);
325            partitions.sort_unstable();
326
327            for part in self.partitions.iter() {
328                self.src_store
329                    .copy_index_file(&token_file_path(*part), dest_store)
330                    .await?;
331                self.src_store
332                    .copy_index_file(&posting_file_path(*part), dest_store)
333                    .await?;
334                self.src_store
335                    .copy_index_file(&doc_file_path(*part), dest_store)
336                    .await?;
337            }
338            for part in self.new_partitions.iter() {
339                self.local_store
340                    .copy_index_file(&token_file_path(*part), dest_store)
341                    .await?;
342                self.local_store
343                    .copy_index_file(&posting_file_path(*part), dest_store)
344                    .await?;
345                self.local_store
346                    .copy_index_file(&doc_file_path(*part), dest_store)
347                    .await?;
348            }
349
350            if self.fragment_mask.is_none() {
351                self.write_metadata(dest_store, &partitions).await?;
352            } else {
353                for &partition_id in &partitions {
354                    self.write_part_metadata(dest_store, partition_id).await?;
355                }
356            }
357            return Ok(());
358        }
359
360        let partitions = self
361            .partitions
362            .iter()
363            .map(|part| PartitionSource::new(self.src_store.clone(), *part))
364            .chain(
365                self.new_partitions
366                    .iter()
367                    .map(|part| PartitionSource::new(self.local_store.clone(), *part)),
368            )
369            .collect::<Vec<_>>();
370        let mut merger = SizeBasedMerger::new(
371            dest_store,
372            partitions,
373            *LANCE_FTS_TARGET_SIZE << 20,
374            self.token_set_format,
375        );
376        let partitions = merger.merge().await?;
377
378        if self.fragment_mask.is_none() {
379            self.write_metadata(dest_store, &partitions).await?;
380        } else {
381            for &partition_id in &partitions {
382                self.write_part_metadata(dest_store, partition_id).await?;
383            }
384        }
385        Ok(())
386    }
387}
388
389impl Default for InvertedIndexBuilder {
390    fn default() -> Self {
391        let params = InvertedIndexParams::default();
392        Self::new(params)
393    }
394}
395
396// builder for single partition
397#[derive(Debug)]
398pub struct InnerBuilder {
399    id: u64,
400    with_position: bool,
401    token_set_format: TokenSetFormat,
402    pub(crate) tokens: TokenSet,
403    pub(crate) posting_lists: Vec<PostingListBuilder>,
404    pub(crate) docs: DocSet,
405}
406
407impl InnerBuilder {
408    pub fn new(id: u64, with_position: bool, token_set_format: TokenSetFormat) -> Self {
409        Self {
410            id,
411            with_position,
412            token_set_format,
413            tokens: TokenSet::default(),
414            posting_lists: Vec::new(),
415            docs: DocSet::default(),
416        }
417    }
418
419    pub fn id(&self) -> u64 {
420        self.id
421    }
422
423    pub async fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
424        // for the docs, we need to remove the rows that are removed from the doc set,
425        // and update the row ids of the rows that are updated
426        let removed = self.docs.remap(mapping);
427
428        // for the posting lists, we need to remap the doc ids:
429        // - if the a row is removed, we need to shift the doc ids of the following rows
430        // - if a row is updated (assigned a new row id), we don't need to do anything with the posting lists
431        let mut token_id = 0;
432        let mut removed_token_ids = Vec::new();
433        self.posting_lists.retain_mut(|posting_list| {
434            posting_list.remap(&removed);
435            let keep = !posting_list.is_empty();
436            if !keep {
437                removed_token_ids.push(token_id as u32);
438            }
439            token_id += 1;
440            keep
441        });
442
443        // for the tokens, remap the token ids if any posting list is empty
444        self.tokens.remap(&removed_token_ids);
445
446        Ok(())
447    }
448
449    pub async fn write(&mut self, store: &dyn IndexStore) -> Result<()> {
450        let docs = Arc::new(std::mem::take(&mut self.docs));
451        self.write_posting_lists(store, docs.clone()).await?;
452        self.write_tokens(store).await?;
453        self.write_docs(store, docs).await?;
454        Ok(())
455    }
456
457    #[instrument(level = "debug", skip_all)]
458    async fn write_posting_lists(
459        &mut self,
460        store: &dyn IndexStore,
461        docs: Arc<DocSet>,
462    ) -> Result<()> {
463        let id = self.id;
464        let mut writer = store
465            .new_index_file(
466                &posting_file_path(self.id),
467                inverted_list_schema(self.with_position),
468            )
469            .await?;
470        let posting_lists = std::mem::take(&mut self.posting_lists);
471
472        log::info!(
473            "writing {} posting lists of partition {}, with position {}",
474            posting_lists.len(),
475            id,
476            self.with_position
477        );
478        let schema = inverted_list_schema(self.with_position);
479
480        let mut batches = stream::iter(posting_lists)
481            .map(|posting_list| {
482                let block_max_scores = docs.calculate_block_max_scores(
483                    posting_list.doc_ids.iter(),
484                    posting_list.frequencies.iter(),
485                );
486                spawn_cpu(move || posting_list.to_batch(block_max_scores))
487            })
488            .buffered(get_num_compute_intensive_cpus());
489
490        let mut write_duration = std::time::Duration::ZERO;
491        let mut num_posting_lists = 0;
492        let mut buffer = Vec::new();
493        let mut size_sum = 0;
494        while let Some(batch) = batches.try_next().await? {
495            num_posting_lists += 1;
496            size_sum += batch.get_array_memory_size();
497            buffer.push(batch);
498            if size_sum >= *LANCE_FTS_FLUSH_SIZE << 20 {
499                let batch = concat_batches(&schema, buffer.iter())?;
500                buffer.clear();
501                size_sum = 0;
502                let start = std::time::Instant::now();
503                writer.write_record_batch(batch).await?;
504                write_duration += start.elapsed();
505            }
506
507            if num_posting_lists % 500_000 == 0 {
508                log::info!(
509                    "wrote {} posting lists of partition {}, writing elapsed: {:?}",
510                    num_posting_lists,
511                    id,
512                    write_duration,
513                );
514            }
515        }
516        if !buffer.is_empty() {
517            let batch = concat_batches(&schema, buffer.iter())?;
518            writer.write_record_batch(batch).await?;
519        }
520
521        writer.finish().await?;
522        Ok(())
523    }
524
525    #[instrument(level = "debug", skip_all)]
526    async fn write_tokens(&mut self, store: &dyn IndexStore) -> Result<()> {
527        log::info!("writing tokens of partition {}", self.id);
528        let tokens = std::mem::take(&mut self.tokens);
529        let batch = tokens.to_batch(self.token_set_format)?;
530        let mut writer = store
531            .new_index_file(&token_file_path(self.id), batch.schema())
532            .await?;
533        writer.write_record_batch(batch).await?;
534        writer.finish().await?;
535        Ok(())
536    }
537
538    #[instrument(level = "debug", skip_all)]
539    async fn write_docs(&mut self, store: &dyn IndexStore, docs: Arc<DocSet>) -> Result<()> {
540        log::info!("writing docs of partition {}", self.id);
541        let batch = docs.to_batch()?;
542        let mut writer = store
543            .new_index_file(&doc_file_path(self.id), batch.schema())
544            .await?;
545        writer.write_record_batch(batch).await?;
546        writer.finish().await?;
547        Ok(())
548    }
549}
550
551struct IndexWorker {
552    store: Arc<dyn IndexStore>,
553    tokenizer: Box<dyn LanceTokenizer>,
554    id_alloc: Arc<AtomicU64>,
555    builder: InnerBuilder,
556    partitions: Vec<u64>,
557    schema: SchemaRef,
558    estimated_size: u64,
559    total_doc_length: usize,
560    fragment_mask: Option<u64>,
561    token_set_format: TokenSetFormat,
562    token_occurrences: HashMap<u32, PositionRecorder>,
563    token_ids: Vec<u32>,
564    last_token_count: usize,
565    last_unique_token_count: usize,
566}
567
568impl IndexWorker {
569    async fn new(
570        store: Arc<dyn IndexStore>,
571        tokenizer: Box<dyn LanceTokenizer>,
572        with_position: bool,
573        id_alloc: Arc<AtomicU64>,
574        fragment_mask: Option<u64>,
575        token_set_format: TokenSetFormat,
576    ) -> Result<Self> {
577        let schema = inverted_list_schema(with_position);
578
579        Ok(Self {
580            store,
581            tokenizer,
582            builder: InnerBuilder::new(
583                id_alloc.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
584                    | fragment_mask.unwrap_or(0),
585                with_position,
586                token_set_format,
587            ),
588            partitions: Vec::new(),
589            id_alloc,
590            schema,
591            estimated_size: 0,
592            total_doc_length: 0,
593            fragment_mask,
594            token_set_format,
595            token_occurrences: HashMap::new(),
596            token_ids: Vec::new(),
597            last_token_count: 0,
598            last_unique_token_count: 0,
599        })
600    }
601
602    fn has_position(&self) -> bool {
603        self.schema.column_with_name(POSITION_COL).is_some()
604    }
605
606    async fn process_batch(&mut self, batch: RecordBatch) -> Result<()> {
607        let doc_col = batch.column(0);
608        let doc_iter = iter_str_array(doc_col);
609        let row_id_col = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
610        let docs = doc_iter
611            .zip(row_id_col.values().iter())
612            .filter_map(|(doc, row_id)| doc.map(|doc| (doc, *row_id)));
613
614        let with_position = self.has_position();
615        for (doc, row_id) in docs {
616            let mut token_num: u32 = 0;
617            if with_position {
618                if self.token_occurrences.capacity() < self.last_unique_token_count {
619                    self.token_occurrences
620                        .reserve(self.last_unique_token_count - self.token_occurrences.capacity());
621                }
622                self.token_occurrences.clear();
623
624                let mut token_stream = self.tokenizer.token_stream_for_doc(doc);
625                while token_stream.advance() {
626                    let token = token_stream.token_mut();
627                    let token_text = std::mem::take(&mut token.text);
628                    let token_id = self.builder.tokens.add(token_text);
629                    self.token_occurrences
630                        .entry(token_id)
631                        .or_insert_with(|| PositionRecorder::new(true))
632                        .push(token.position as u32);
633                    token_num += 1;
634                }
635            } else {
636                if self.token_ids.capacity() < self.last_token_count {
637                    self.token_ids
638                        .reserve(self.last_token_count - self.token_ids.capacity());
639                }
640                self.token_ids.clear();
641
642                let mut token_stream = self.tokenizer.token_stream_for_doc(doc);
643                while token_stream.advance() {
644                    let token = token_stream.token_mut();
645                    let token_text = std::mem::take(&mut token.text);
646                    let token_id = self.builder.tokens.add(token_text);
647                    self.token_ids.push(token_id);
648                    token_num += 1;
649                }
650            }
651            self.builder
652                .posting_lists
653                .resize_with(self.builder.tokens.len(), || {
654                    PostingListBuilder::new(with_position)
655                });
656            let doc_id = self.builder.docs.append(row_id, token_num);
657            self.total_doc_length += doc.len();
658
659            if with_position {
660                let unique_tokens = self.token_occurrences.len();
661                for (token_id, term_positions) in self.token_occurrences.drain() {
662                    let posting_list = &mut self.builder.posting_lists[token_id as usize];
663
664                    let old_size = posting_list.size();
665                    posting_list.add(doc_id, term_positions);
666                    let new_size = posting_list.size();
667                    self.estimated_size += new_size - old_size;
668                }
669                self.last_unique_token_count = unique_tokens;
670            } else if token_num > 0 {
671                self.token_ids.sort_unstable();
672                let mut iter = self.token_ids.iter();
673                let mut current = *iter.next().unwrap();
674                let mut count = 1u32;
675                for &token_id in iter {
676                    if token_id == current {
677                        count += 1;
678                        continue;
679                    }
680
681                    let posting_list = &mut self.builder.posting_lists[current as usize];
682                    let old_size = posting_list.size();
683                    posting_list.add(doc_id, PositionRecorder::Count(count));
684                    let new_size = posting_list.size();
685                    self.estimated_size += new_size - old_size;
686
687                    current = token_id;
688                    count = 1;
689                }
690                let posting_list = &mut self.builder.posting_lists[current as usize];
691                let old_size = posting_list.size();
692                posting_list.add(doc_id, PositionRecorder::Count(count));
693                let new_size = posting_list.size();
694                self.estimated_size += new_size - old_size;
695            }
696            self.last_token_count = token_num as usize;
697
698            if self.builder.docs.len() as u32 == u32::MAX
699                || self.estimated_size >= *LANCE_FTS_PARTITION_SIZE << 20
700            {
701                self.flush().await?;
702            }
703        }
704
705        Ok(())
706    }
707
708    #[instrument(level = "debug", skip_all)]
709    async fn flush(&mut self) -> Result<()> {
710        if self.builder.tokens.is_empty() {
711            return Ok(());
712        }
713
714        log::info!(
715            "flushing posting lists, estimated size: {} MiB",
716            self.estimated_size / (1024 * 1024)
717        );
718        self.estimated_size = 0;
719        let with_position = self.has_position();
720        let mut builder = std::mem::replace(
721            &mut self.builder,
722            InnerBuilder::new(
723                self.id_alloc
724                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
725                    | self.fragment_mask.unwrap_or(0),
726                with_position,
727                self.token_set_format,
728            ),
729        );
730        builder.write(self.store.as_ref()).await?;
731        self.partitions.push(builder.id());
732        Ok(())
733    }
734
735    async fn finish(mut self) -> Result<Vec<u64>> {
736        if !self.builder.tokens.is_empty() {
737            self.flush().await?;
738        }
739        Ok(self.partitions)
740    }
741}
742
743#[derive(Debug, Clone)]
744pub enum PositionRecorder {
745    Position(SmallVec<[u32; 4]>),
746    Count(u32),
747}
748
749impl PositionRecorder {
750    fn new(with_position: bool) -> Self {
751        if with_position {
752            Self::Position(SmallVec::new())
753        } else {
754            Self::Count(0)
755        }
756    }
757
758    fn push(&mut self, position: u32) {
759        match self {
760            Self::Position(positions) => positions.push(position),
761            Self::Count(count) => *count += 1,
762        }
763    }
764
765    pub fn len(&self) -> u32 {
766        match self {
767            Self::Position(positions) => positions.len() as u32,
768            Self::Count(count) => *count,
769        }
770    }
771
772    pub fn is_empty(&self) -> bool {
773        self.len() == 0
774    }
775
776    pub fn into_vec(self) -> Vec<u32> {
777        match self {
778            Self::Position(positions) => positions.into_vec(),
779            Self::Count(_) => vec![0],
780        }
781    }
782}
783
784#[derive(Debug, Eq, PartialEq, Clone, DeepSizeOf)]
785pub struct ScoredDoc {
786    pub row_id: u64,
787    pub score: OrderedFloat,
788}
789
790impl ScoredDoc {
791    pub fn new(row_id: u64, score: f32) -> Self {
792        Self {
793            row_id,
794            score: OrderedFloat(score),
795        }
796    }
797}
798
799impl PartialOrd for ScoredDoc {
800    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
801        Some(self.cmp(other))
802    }
803}
804
805impl Ord for ScoredDoc {
806    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
807        self.score.cmp(&other.score)
808    }
809}
810
811pub fn legacy_inverted_list_schema(with_position: bool) -> SchemaRef {
812    let mut fields = vec![
813        arrow_schema::Field::new(ROW_ID, arrow_schema::DataType::UInt64, false),
814        arrow_schema::Field::new(FREQUENCY_COL, arrow_schema::DataType::Float32, false),
815    ];
816    if with_position {
817        fields.push(arrow_schema::Field::new(
818            POSITION_COL,
819            arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
820                "item",
821                arrow_schema::DataType::Int32,
822                true,
823            ))),
824            false,
825        ));
826    }
827    Arc::new(arrow_schema::Schema::new(fields))
828}
829
830pub fn inverted_list_schema(with_position: bool) -> SchemaRef {
831    let mut fields = vec![
832        // we compress the posting lists (including row ids and frequencies),
833        // and store the compressed posting lists, so it's a large binary array
834        arrow_schema::Field::new(
835            POSTING_COL,
836            datatypes::DataType::List(Arc::new(Field::new(
837                "item",
838                datatypes::DataType::LargeBinary,
839                true,
840            ))),
841            false,
842        ),
843        arrow_schema::Field::new(MAX_SCORE_COL, datatypes::DataType::Float32, false),
844        arrow_schema::Field::new(LENGTH_COL, datatypes::DataType::UInt32, false),
845    ];
846    if with_position {
847        fields.push(arrow_schema::Field::new(
848            POSITION_COL,
849            arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
850                "item",
851                arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
852                    "item",
853                    arrow_schema::DataType::LargeBinary,
854                    true,
855                ))),
856                true,
857            ))),
858            false,
859        ));
860    }
861    Arc::new(arrow_schema::Schema::new(fields))
862}
863
864/// Flatten the string list stream into a string stream
865pub struct FlattenStream {
866    /// Inner record batch stream with 2 columns:
867    /// 1. doc_col: List(Utf8) or List(LargeUtf8)
868    /// 2. row_id_col: UInt64
869    inner: SendableRecordBatchStream,
870    field_type: DataType,
871    data_type: DataType,
872}
873
874impl FlattenStream {
875    pub fn new(input: SendableRecordBatchStream) -> Self {
876        let schema = input.schema();
877        let field = schema.field(0);
878        let data_type = match field.data_type() {
879            DataType::List(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
880            DataType::List(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
881                DataType::LargeUtf8
882            }
883            DataType::LargeList(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
884            DataType::LargeList(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
885                DataType::LargeUtf8
886            }
887            _ => panic!(
888                "expect data type List(Utf8) or List(LargeUtf8) but got {:?}",
889                field.data_type()
890            ),
891        };
892        Self {
893            inner: input,
894            field_type: field.data_type().clone(),
895            data_type,
896        }
897    }
898}
899
900impl Stream for FlattenStream {
901    type Item = datafusion_common::Result<RecordBatch>;
902
903    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
904        match Pin::new(&mut self.inner).poll_next(cx) {
905            Poll::Ready(Some(Ok(batch))) => {
906                let doc_col = batch.column(0);
907                let batch = match self.field_type {
908                    DataType::List(_) => flatten_string_list::<i32>(&batch, doc_col).map_err(|e| {
909                        datafusion_common::error::DataFusionError::Execution(format!(
910                            "flatten string list error: {}",
911                            e
912                        ))
913                    }),
914                    DataType::LargeList(_) => {
915                        flatten_string_list::<i64>(&batch, doc_col).map_err(|e| {
916                            datafusion_common::error::DataFusionError::Execution(format!(
917                                "flatten string list error: {}",
918                                e
919                            ))
920                        })
921                    }
922                    _ => unreachable!(
923                        "expect data type List or LargeList but got {:?}",
924                        self.field_type
925                    ),
926                };
927                Poll::Ready(Some(batch))
928            }
929            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
930            Poll::Ready(None) => Poll::Ready(None),
931            Poll::Pending => Poll::Pending,
932        }
933    }
934}
935
936impl RecordBatchStream for FlattenStream {
937    fn schema(&self) -> SchemaRef {
938        let schema = Schema::new(vec![
939            Field::new(
940                self.inner.schema().field(0).name(),
941                self.data_type.clone(),
942                true,
943            ),
944            ROW_ID_FIELD.clone(),
945        ]);
946
947        Arc::new(schema)
948    }
949}
950
951fn flatten_string_list<Offset: arrow::array::OffsetSizeTrait>(
952    batch: &RecordBatch,
953    doc_col: &Arc<dyn Array>,
954) -> Result<RecordBatch> {
955    let docs = doc_col.as_list::<Offset>();
956    let row_ids = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
957
958    let row_ids = row_ids
959        .values()
960        .iter()
961        .zip(docs.iter())
962        .flat_map(|(row_id, doc)| std::iter::repeat_n(*row_id, doc.map(|d| d.len()).unwrap_or(0)));
963
964    let row_ids = Arc::new(UInt64Array::from_iter_values(row_ids));
965    let docs = match docs.value_type() {
966        datatypes::DataType::Utf8 | datatypes::DataType::LargeUtf8 => docs.values().clone(),
967        _ => {
968            return Err(Error::Index {
969                message: format!(
970                    "expect data type String or LargeString but got {}",
971                    docs.value_type()
972                ),
973                location: location!(),
974            });
975        }
976    };
977
978    let schema = Schema::new(vec![
979        Field::new(
980            batch.schema().field(0).name(),
981            docs.data_type().clone(),
982            true,
983        ),
984        ROW_ID_FIELD.clone(),
985    ]);
986    let batch = RecordBatch::try_new(Arc::new(schema), vec![docs, row_ids])?;
987    Ok(batch)
988}
989
990pub(crate) fn token_file_path(partition_id: u64) -> String {
991    format!("part_{}_{}", partition_id, TOKENS_FILE)
992}
993
994pub(crate) fn posting_file_path(partition_id: u64) -> String {
995    format!("part_{}_{}", partition_id, INVERT_LIST_FILE)
996}
997
998pub(crate) fn doc_file_path(partition_id: u64) -> String {
999    format!("part_{}_{}", partition_id, DOCS_FILE)
1000}
1001
1002pub(crate) fn part_metadata_file_path(partition_id: u64) -> String {
1003    format!("part_{}_{}", partition_id, METADATA_FILE)
1004}
1005
1006pub async fn merge_index_files(
1007    object_store: &ObjectStore,
1008    index_dir: &Path,
1009    store: Arc<dyn IndexStore>,
1010) -> Result<()> {
1011    // List all partition metadata files in the index directory
1012    let part_metadata_files = list_metadata_files(object_store, index_dir).await?;
1013
1014    // Call merge_metadata_files function for inverted index
1015    merge_metadata_files(store, &part_metadata_files).await
1016}
1017
1018/// List and filter metadata files from the index directory
1019/// Returns partition metadata files
1020async fn list_metadata_files(object_store: &ObjectStore, index_dir: &Path) -> Result<Vec<String>> {
1021    // List all partition metadata files in the index directory
1022    let mut part_metadata_files = Vec::new();
1023    let mut list_stream = object_store.list(Some(index_dir.clone()));
1024
1025    while let Some(item) = list_stream.next().await {
1026        match item {
1027            Ok(meta) => {
1028                let file_name = meta.location.filename().unwrap_or_default();
1029                // Filter files matching the pattern part_*_metadata.lance
1030                if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
1031                    part_metadata_files.push(file_name.to_string());
1032                }
1033            }
1034            Err(_) => continue,
1035        }
1036    }
1037
1038    if part_metadata_files.is_empty() {
1039        return Err(Error::InvalidInput {
1040            source: format!(
1041                "No partition metadata files found in index directory: {}",
1042                index_dir
1043            )
1044            .into(),
1045            location: location!(),
1046        });
1047    }
1048
1049    Ok(part_metadata_files)
1050}
1051
1052/// Merge partition metadata files with partition ID remapping to sequential IDs starting from 0
1053async fn merge_metadata_files(
1054    store: Arc<dyn IndexStore>,
1055    part_metadata_files: &[String],
1056) -> Result<()> {
1057    // Collect all partition IDs and params
1058    let mut all_partitions = Vec::new();
1059    let mut params = None;
1060    let mut token_set_format = None;
1061
1062    for file_name in part_metadata_files {
1063        let reader = store.open_index_file(file_name).await?;
1064        let metadata = &reader.schema().metadata;
1065
1066        let partitions_str = metadata.get("partitions").ok_or(Error::Index {
1067            message: format!("partitions not found in {}", file_name),
1068            location: location!(),
1069        })?;
1070
1071        let partition_ids: Vec<u64> =
1072            serde_json::from_str(partitions_str).map_err(|e| Error::Index {
1073                message: format!("Failed to parse partitions: {}", e),
1074                location: location!(),
1075            })?;
1076
1077        all_partitions.extend(partition_ids);
1078
1079        if params.is_none() {
1080            let params_str = metadata.get("params").ok_or(Error::Index {
1081                message: format!("params not found in {}", file_name),
1082                location: location!(),
1083            })?;
1084            params = Some(
1085                serde_json::from_str::<InvertedIndexParams>(params_str).map_err(|e| {
1086                    Error::Index {
1087                        message: format!("Failed to parse params: {}", e),
1088                        location: location!(),
1089                    }
1090                })?,
1091            );
1092        }
1093
1094        if token_set_format.is_none() {
1095            if let Some(name) = metadata.get(TOKEN_SET_FORMAT_KEY) {
1096                token_set_format = Some(TokenSetFormat::from_str(name)?);
1097            }
1098        }
1099    }
1100
1101    // Create ID mapping: sorted original IDs -> 0,1,2...
1102    let mut sorted_ids = all_partitions.clone();
1103    sorted_ids.sort();
1104    sorted_ids.dedup();
1105
1106    let id_mapping: HashMap<u64, u64> = sorted_ids
1107        .iter()
1108        .enumerate()
1109        .map(|(new_id, &old_id)| (old_id, new_id as u64))
1110        .collect();
1111
1112    // Safe rename partition files using temporary files to avoid overwrite
1113    let timestamp = std::time::SystemTime::now()
1114        .duration_since(std::time::UNIX_EPOCH)
1115        .unwrap()
1116        .as_secs();
1117
1118    // Phase 1: Move files to temporary locations
1119    let mut temp_files: Vec<(String, String, String)> = Vec::new(); // (temp_path, old_path, final_path)
1120
1121    for (&old_id, &new_id) in &id_mapping {
1122        if old_id != new_id {
1123            for suffix in [TOKENS_FILE, INVERT_LIST_FILE, DOCS_FILE] {
1124                let old_path = format!("part_{}_{}", old_id, suffix);
1125                let new_path = format!("part_{}_{}", new_id, suffix);
1126                let temp_path = format!("temp_{}_{}", timestamp, old_path);
1127
1128                // Move to temporary location first to avoid overwrite
1129                if let Err(e) = store.rename_index_file(&old_path, &temp_path).await {
1130                    // Rollback phase 1: restore files from temp locations
1131                    for (temp_name, old_name, _) in temp_files.iter().rev() {
1132                        let _ = store.rename_index_file(temp_name, old_name).await;
1133                    }
1134                    return Err(Error::Index {
1135                        message: format!(
1136                            "Failed to move {} to temp {}: {}",
1137                            old_path, temp_path, e
1138                        ),
1139                        location: location!(),
1140                    });
1141                }
1142                temp_files.push((temp_path, old_path, new_path));
1143            }
1144        }
1145    }
1146
1147    // Phase 2: Move from temporary to final locations
1148    let mut completed_renames: Vec<(String, String)> = Vec::new(); // (final_path, temp_path)
1149
1150    for (temp_path, _old_path, final_path) in &temp_files {
1151        if let Err(e) = store.rename_index_file(temp_path, final_path).await {
1152            // Rollback phase 2: restore completed renames and remaining temps
1153            for (final_name, temp_name) in completed_renames.iter().rev() {
1154                let _ = store.rename_index_file(final_name, temp_name).await;
1155            }
1156            // Restore remaining temp files to original locations
1157            for (temp_name, orig_name, _) in temp_files.iter() {
1158                if !completed_renames.iter().any(|(_, t)| t == temp_name) {
1159                    let _ = store.rename_index_file(temp_name, orig_name).await;
1160                }
1161            }
1162            return Err(Error::Index {
1163                message: format!("Failed to rename {} to {}: {}", temp_path, final_path, e),
1164                location: location!(),
1165            });
1166        }
1167        completed_renames.push((final_path.clone(), temp_path.clone()));
1168    }
1169
1170    // Write merged metadata with remapped IDs
1171    let remapped_partitions: Vec<u64> = (0..id_mapping.len() as u64).collect();
1172    let params = params.unwrap_or_default();
1173    let token_set_format = token_set_format.unwrap_or(TokenSetFormat::Arrow);
1174    let builder = InvertedIndexBuilder::from_existing_index(
1175        params,
1176        None,
1177        remapped_partitions.clone(),
1178        token_set_format,
1179        None,
1180    );
1181    builder
1182        .write_metadata(&*store, &remapped_partitions)
1183        .await?;
1184
1185    // Cleanup partition metadata files
1186    for file_name in part_metadata_files {
1187        if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
1188            let _ = store.delete_index_file(file_name).await;
1189        }
1190    }
1191
1192    Ok(())
1193}
1194
1195/// Convert input stream into a stream of documents.
1196///
1197/// The input stream must be one of:
1198/// 1. Document in Utf8 or LargeUtf8 format.
1199/// 2. Document in List(Utf8) or List(LargeUtf8) format.
1200/// 3. Json document in LargeBinary format.
1201pub fn document_input(
1202    input: SendableRecordBatchStream,
1203    column: &str,
1204) -> Result<SendableRecordBatchStream> {
1205    let schema = input.schema();
1206    let field = schema.column_with_name(column).expect_ok()?.1;
1207    match field.data_type() {
1208        DataType::Utf8 | DataType::LargeUtf8 => Ok(input),
1209        DataType::List(field) | DataType::LargeList(field)
1210            if matches!(field.data_type(), DataType::Utf8 | DataType::LargeUtf8) =>
1211        {
1212            Ok(Box::pin(FlattenStream::new(input)))
1213        }
1214        DataType::LargeBinary => match field.metadata().get(ARROW_EXT_NAME_KEY) {
1215            Some(name) if name.as_str() == JSON_EXT_NAME => {
1216                Ok(Box::pin(JsonTextStream::new(input, column.to_string())))
1217            }
1218            _ => Err(Error::InvalidInput {
1219                source: format!("column {} is not json", column).into(),
1220                location: location!(),
1221            }),
1222        },
1223        _ => Err(Error::InvalidInput {
1224            source: format!(
1225                "column {} has type {}, is not utf8, large utf8 type/list, or large binary",
1226                column,
1227                field.data_type()
1228            )
1229            .into(),
1230            location: location!(),
1231        }),
1232    }
1233}
1234
1235#[cfg(test)]
1236mod tests {
1237    use super::*;
1238    use crate::metrics::NoOpMetricsCollector;
1239    use arrow_array::{RecordBatch, StringArray, UInt64Array};
1240    use arrow_schema::{DataType, Field, Schema};
1241    use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
1242    use futures::stream;
1243    use lance_core::cache::LanceCache;
1244    use lance_core::utils::tempfile::TempDir;
1245    use lance_core::ROW_ID;
1246    use std::sync::atomic::AtomicU64;
1247
1248    fn make_doc_batch(doc: &str, row_id: u64) -> RecordBatch {
1249        let schema = Arc::new(Schema::new(vec![
1250            Field::new("doc", DataType::Utf8, true),
1251            Field::new(ROW_ID, DataType::UInt64, false),
1252        ]));
1253        let docs = Arc::new(StringArray::from(vec![Some(doc)]));
1254        let row_ids = Arc::new(UInt64Array::from(vec![row_id]));
1255        RecordBatch::try_new(schema, vec![docs, row_ids]).unwrap()
1256    }
1257
1258    #[tokio::test]
1259    async fn test_skip_merge_writes_partitions_as_is() -> Result<()> {
1260        let src_dir = TempDir::default();
1261        let dest_dir = TempDir::default();
1262        let src_store = Arc::new(LanceIndexStore::new(
1263            ObjectStore::local().into(),
1264            src_dir.obj_path(),
1265            Arc::new(LanceCache::no_cache()),
1266        ));
1267        let dest_store = Arc::new(LanceIndexStore::new(
1268            ObjectStore::local().into(),
1269            dest_dir.obj_path(),
1270            Arc::new(LanceCache::no_cache()),
1271        ));
1272
1273        let params = InvertedIndexParams::default();
1274        let tokenizer = params.build()?;
1275        let token_set_format = TokenSetFormat::default();
1276        let id_alloc = Arc::new(AtomicU64::new(0));
1277
1278        let mut worker1 = IndexWorker::new(
1279            src_store.clone(),
1280            tokenizer.clone(),
1281            params.with_position,
1282            id_alloc.clone(),
1283            None,
1284            token_set_format,
1285        )
1286        .await?;
1287        worker1
1288            .process_batch(make_doc_batch("hello world", 0))
1289            .await?;
1290        let mut partitions = worker1.finish().await?;
1291
1292        let mut worker2 = IndexWorker::new(
1293            src_store.clone(),
1294            tokenizer.clone(),
1295            params.with_position,
1296            id_alloc.clone(),
1297            None,
1298            token_set_format,
1299        )
1300        .await?;
1301        worker2
1302            .process_batch(make_doc_batch("goodbye world", 1))
1303            .await?;
1304        partitions.extend(worker2.finish().await?);
1305        partitions.sort_unstable();
1306        assert_eq!(partitions.len(), 2);
1307        assert_ne!(partitions[0], partitions[1]);
1308
1309        let builder = InvertedIndexBuilder::from_existing_index(
1310            InvertedIndexParams::default().skip_merge(true),
1311            Some(src_store.clone()),
1312            partitions.clone(),
1313            token_set_format,
1314            None,
1315        );
1316        builder.write(dest_store.as_ref()).await?;
1317
1318        let metadata_reader = dest_store.open_index_file(METADATA_FILE).await?;
1319        let metadata = &metadata_reader.schema().metadata;
1320        let partitions_str = metadata
1321            .get("partitions")
1322            .expect("partitions missing from metadata");
1323        let written_partitions: Vec<u64> = serde_json::from_str(partitions_str).unwrap();
1324        assert_eq!(written_partitions, partitions);
1325
1326        for id in &partitions {
1327            dest_store.open_index_file(&token_file_path(*id)).await?;
1328            dest_store.open_index_file(&posting_file_path(*id)).await?;
1329            dest_store.open_index_file(&doc_file_path(*id)).await?;
1330        }
1331
1332        Ok(())
1333    }
1334
1335    #[tokio::test]
1336    async fn test_inverted_index_without_positions_tracks_frequency() -> Result<()> {
1337        let index_dir = TempDir::default();
1338        let store = Arc::new(LanceIndexStore::new(
1339            ObjectStore::local().into(),
1340            index_dir.obj_path(),
1341            Arc::new(LanceCache::no_cache()),
1342        ));
1343
1344        let schema = Arc::new(Schema::new(vec![
1345            Field::new("doc", DataType::Utf8, true),
1346            Field::new(ROW_ID, DataType::UInt64, false),
1347        ]));
1348        let docs = Arc::new(StringArray::from(vec![Some("hello hello world")]));
1349        let row_ids = Arc::new(UInt64Array::from(vec![0u64]));
1350        let batch = RecordBatch::try_new(schema.clone(), vec![docs, row_ids])?;
1351        let stream = RecordBatchStreamAdapter::new(schema, stream::iter(vec![Ok(batch)]));
1352        let stream = Box::pin(stream);
1353
1354        let params = InvertedIndexParams::new(
1355            "whitespace".to_string(),
1356            tantivy::tokenizer::Language::English,
1357        )
1358        .with_position(false)
1359        .remove_stop_words(false)
1360        .stem(false)
1361        .max_token_length(None);
1362
1363        let mut builder = InvertedIndexBuilder::new(params);
1364        builder.update(stream, store.as_ref()).await?;
1365
1366        let index = InvertedIndex::load(store, None, &LanceCache::no_cache()).await?;
1367        assert_eq!(index.partitions.len(), 1);
1368        let partition = &index.partitions[0];
1369        let token_id = partition.tokens.get("hello").unwrap();
1370        let posting = partition
1371            .inverted_list
1372            .posting_list(token_id, false, &NoOpMetricsCollector)
1373            .await?;
1374
1375        let mut iter = posting.iter();
1376        let (doc_id, freq, positions) = iter.next().unwrap();
1377        assert_eq!(doc_id, 0);
1378        assert_eq!(freq, 2);
1379        assert!(positions.is_none());
1380        assert!(iter.next().is_none());
1381
1382        Ok(())
1383    }
1384}