1use super::{
5 index::*,
6 merger::{Merger, SizeBasedMerger},
7 InvertedIndexParams,
8};
9use crate::scalar::inverted::json::JsonTextStream;
10use crate::scalar::inverted::lance_tokenizer::DocType;
11use crate::scalar::inverted::tokenizer::lance_tokenizer::LanceTokenizer;
12use crate::scalar::lance_format::LanceIndexStore;
13use crate::scalar::IndexStore;
14use crate::vector::graph::OrderedFloat;
15use arrow::datatypes;
16use arrow::{array::AsArray, compute::concat_batches};
17use arrow_array::{Array, RecordBatch, UInt64Array};
18use arrow_schema::{DataType, Field, Schema, SchemaRef};
19use bitpacking::{BitPacker, BitPacker4x};
20use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream};
21use deepsize::DeepSizeOf;
22use futures::{stream, Stream, StreamExt, TryStreamExt};
23use lance_arrow::json::JSON_EXT_NAME;
24use lance_arrow::{iter_str_array, ARROW_EXT_NAME_KEY};
25use lance_core::utils::tokio::get_num_compute_intensive_cpus;
26use lance_core::{cache::LanceCache, utils::tokio::spawn_cpu};
27use lance_core::{error::LanceOptionExt, utils::tempfile::TempDir};
28use lance_core::{Error, Result, ROW_ID, ROW_ID_FIELD};
29use lance_io::object_store::ObjectStore;
30use object_store::path::Path;
31use snafu::location;
32use std::collections::HashMap;
33use std::pin::Pin;
34use std::str::FromStr;
35use std::sync::Arc;
36use std::sync::LazyLock;
37use std::task::{Context, Poll};
38use std::{fmt::Debug, sync::atomic::AtomicU64};
39use tracing::instrument;
40
41pub const BLOCK_SIZE: usize = BitPacker4x::BLOCK_LEN;
45
46static LANCE_FTS_FLUSH_SIZE: LazyLock<usize> = LazyLock::new(|| {
51 std::env::var("LANCE_FTS_FLUSH_SIZE")
52 .unwrap_or_else(|_| "16".to_string())
53 .parse()
54 .expect("failed to parse LANCE_FTS_FLUSH_SIZE")
55});
56pub static LANCE_FTS_NUM_SHARDS: LazyLock<usize> = LazyLock::new(|| {
61 std::env::var("LANCE_FTS_NUM_SHARDS")
62 .unwrap_or_else(|_| get_num_compute_intensive_cpus().to_string())
63 .parse()
64 .expect("failed to parse LANCE_FTS_NUM_SHARDS")
65});
66pub static LANCE_FTS_PARTITION_SIZE: LazyLock<u64> = LazyLock::new(|| {
69 std::env::var("LANCE_FTS_PARTITION_SIZE")
70 .unwrap_or_else(|_| "256".to_string())
71 .parse()
72 .expect("failed to parse LANCE_FTS_PARTITION_SIZE")
73});
74pub static LANCE_FTS_TARGET_SIZE: LazyLock<u64> = LazyLock::new(|| {
76 std::env::var("LANCE_FTS_TARGET_SIZE")
77 .unwrap_or_else(|_| "4096".to_string())
78 .parse()
79 .expect("failed to parse LANCE_FTS_TARGET_SIZE")
80});
81
82#[derive(Debug)]
83pub struct InvertedIndexBuilder {
84 params: InvertedIndexParams,
85 pub(crate) partitions: Vec<u64>,
86 new_partitions: Vec<u64>,
87 fragment_mask: Option<u64>,
88 token_set_format: TokenSetFormat,
89 _tmpdir: TempDir,
90 local_store: Arc<dyn IndexStore>,
91 src_store: Arc<dyn IndexStore>,
92}
93
94impl InvertedIndexBuilder {
95 pub fn new(params: InvertedIndexParams) -> Self {
96 Self::new_with_fragment_mask(params, None)
97 }
98
99 pub fn new_with_fragment_mask(params: InvertedIndexParams, fragment_mask: Option<u64>) -> Self {
100 Self::from_existing_index(
101 params,
102 None,
103 Vec::new(),
104 TokenSetFormat::default(),
105 fragment_mask,
106 )
107 }
108
109 pub fn from_existing_index(
116 params: InvertedIndexParams,
117 store: Option<Arc<dyn IndexStore>>,
118 partitions: Vec<u64>,
119 token_set_format: TokenSetFormat,
120 fragment_mask: Option<u64>,
121 ) -> Self {
122 let tmpdir = TempDir::default();
123 let local_store = Arc::new(LanceIndexStore::new(
124 ObjectStore::local().into(),
125 tmpdir.obj_path(),
126 Arc::new(LanceCache::no_cache()),
127 ));
128 let src_store = store.unwrap_or_else(|| local_store.clone());
129 Self {
130 params,
131 partitions,
132 new_partitions: Vec::new(),
133 _tmpdir: tmpdir,
134 local_store,
135 src_store,
136 token_set_format,
137 fragment_mask,
138 }
139 }
140
141 pub async fn update(
142 &mut self,
143 new_data: SendableRecordBatchStream,
144 dest_store: &dyn IndexStore,
145 ) -> Result<()> {
146 let schema = new_data.schema();
147 let doc_col = schema.field(0).name();
148
149 if self.params.lance_tokenizer.is_none() {
151 let schema = new_data.schema();
152 let field = schema.column_with_name(doc_col).expect_ok()?.1;
153 let doc_type = DocType::try_from(field)?;
154 self.params.lance_tokenizer = Some(doc_type.as_ref().to_string());
155 }
156
157 let new_data = document_input(new_data, doc_col)?;
158
159 self.update_index(new_data).await?;
160 self.write(dest_store).await?;
161 Ok(())
162 }
163
164 #[instrument(level = "debug", skip_all)]
165 async fn update_index(&mut self, stream: SendableRecordBatchStream) -> Result<()> {
166 let num_workers = *LANCE_FTS_NUM_SHARDS;
167 let tokenizer = self.params.build()?;
168 let with_position = self.params.with_position;
169 let next_id = self.partitions.iter().map(|id| id + 1).max().unwrap_or(0);
170 let id_alloc = Arc::new(AtomicU64::new(next_id));
171 let (sender, receiver) = async_channel::bounded(num_workers);
172 let mut index_tasks = Vec::with_capacity(num_workers);
173 for _ in 0..num_workers {
174 let store = self.local_store.clone();
175 let tokenizer = tokenizer.clone();
176 let receiver = receiver.clone();
177 let id_alloc = id_alloc.clone();
178 let fragment_mask = self.fragment_mask;
179 let token_set_format = self.token_set_format;
180 let task = tokio::task::spawn(async move {
181 let mut worker = IndexWorker::new(
182 store,
183 tokenizer,
184 with_position,
185 id_alloc,
186 fragment_mask,
187 token_set_format,
188 )
189 .await?;
190 while let Ok(batch) = receiver.recv().await {
191 worker.process_batch(batch).await?;
192 }
193 let partitions = worker.finish().await?;
194 Result::Ok(partitions)
195 });
196 index_tasks.push(task);
197 }
198
199 let sender = Arc::new(sender);
200
201 let mut stream = Box::pin(stream.then({
202 |batch_result| {
203 let sender = sender.clone();
204 async move {
205 let sender = sender.clone();
206 let batch = batch_result?;
207 let num_rows = batch.num_rows();
208 sender.send(batch).await.expect("failed to send batch");
209 Result::Ok(num_rows)
210 }
211 }
212 }));
213 log::info!("indexing FTS with {} workers", num_workers);
214
215 let mut last_num_rows = 0;
216 let mut total_num_rows = 0;
217 let start = std::time::Instant::now();
218 while let Some(num_rows) = stream.try_next().await? {
219 total_num_rows += num_rows;
220 if total_num_rows >= last_num_rows + 1_000_000 {
221 log::debug!(
222 "indexed {} documents, elapsed: {:?}, speed: {}rows/s",
223 total_num_rows,
224 start.elapsed(),
225 total_num_rows as f32 / start.elapsed().as_secs_f32()
226 );
227 last_num_rows = total_num_rows;
228 }
229 }
230 drop(stream);
232 debug_assert_eq!(sender.sender_count(), 1);
233 drop(sender);
234 log::info!("dispatching elapsed: {:?}", start.elapsed());
235
236 let start = std::time::Instant::now();
238 for index_task in index_tasks {
239 self.new_partitions.extend(index_task.await??);
240 }
241 log::info!("wait workers indexing elapsed: {:?}", start.elapsed());
242 Ok(())
243 }
244
245 pub async fn remap(
246 &mut self,
247 mapping: &HashMap<u64, Option<u64>>,
248 src_store: Arc<dyn IndexStore>,
249 dest_store: &dyn IndexStore,
250 ) -> Result<()> {
251 for part in self.partitions.iter() {
252 let part = InvertedPartition::load(
253 src_store.clone(),
254 *part,
255 None,
256 &LanceCache::no_cache(),
257 self.token_set_format,
258 )
259 .await?;
260 let mut builder = part.into_builder().await?;
261 builder.remap(mapping).await?;
262 builder.write(dest_store).await?;
263 }
264 if self.fragment_mask.is_none() {
265 self.write_metadata(dest_store, &self.partitions).await?;
266 } else {
267 for &partition_id in &self.partitions {
269 self.write_part_metadata(dest_store, partition_id).await?;
270 }
271 }
272 Ok(())
273 }
274
275 async fn write_metadata(&self, dest_store: &dyn IndexStore, partitions: &[u64]) -> Result<()> {
276 let metadata = HashMap::from_iter(vec![
277 ("partitions".to_owned(), serde_json::to_string(&partitions)?),
278 ("params".to_owned(), serde_json::to_string(&self.params)?),
279 (
280 TOKEN_SET_FORMAT_KEY.to_owned(),
281 self.token_set_format.to_string(),
282 ),
283 ]);
284 let mut writer = dest_store
285 .new_index_file(METADATA_FILE, Arc::new(Schema::empty()))
286 .await?;
287 writer.finish_with_metadata(metadata).await?;
288 Ok(())
289 }
290
291 pub(crate) async fn write_part_metadata(
296 &self,
297 dest_store: &dyn IndexStore,
298 partition: u64, ) -> Result<()> {
300 let partitions = vec![partition];
301 let metadata = HashMap::from_iter(vec![
302 ("partitions".to_owned(), serde_json::to_string(&partitions)?),
303 ("params".to_owned(), serde_json::to_string(&self.params)?),
304 (
305 TOKEN_SET_FORMAT_KEY.to_owned(),
306 self.token_set_format.to_string(),
307 ),
308 ]);
309 let file_name = part_metadata_file_path(partition);
311 let mut writer = dest_store
312 .new_index_file(&file_name, Arc::new(Schema::empty()))
313 .await?;
314 writer.finish_with_metadata(metadata).await?;
315 Ok(())
316 }
317
318 async fn write(&self, dest_store: &dyn IndexStore) -> Result<()> {
319 let no_cache = LanceCache::no_cache();
320 let partitions = futures::future::try_join_all(
321 self.partitions
322 .iter()
323 .map(|part| {
324 InvertedPartition::load(
325 self.src_store.clone(),
326 *part,
327 None,
328 &no_cache,
329 self.token_set_format,
330 )
331 })
332 .chain(self.new_partitions.iter().map(|part| {
333 InvertedPartition::load(
334 self.local_store.clone(),
335 *part,
336 None,
337 &no_cache,
338 self.token_set_format,
339 )
340 })),
341 )
342 .await?;
343 let mut merger = SizeBasedMerger::new(
344 dest_store,
345 partitions,
346 *LANCE_FTS_TARGET_SIZE << 20,
347 self.token_set_format,
348 );
349 let partitions = merger.merge().await?;
350
351 if self.fragment_mask.is_none() {
352 self.write_metadata(dest_store, &partitions).await?;
353 } else {
354 for &partition_id in &partitions {
355 self.write_part_metadata(dest_store, partition_id).await?;
356 }
357 }
358 Ok(())
359 }
360}
361
362impl Default for InvertedIndexBuilder {
363 fn default() -> Self {
364 let params = InvertedIndexParams::default();
365 Self::new(params)
366 }
367}
368
369#[derive(Debug)]
371pub struct InnerBuilder {
372 id: u64,
373 with_position: bool,
374 token_set_format: TokenSetFormat,
375 pub(crate) tokens: TokenSet,
376 pub(crate) posting_lists: Vec<PostingListBuilder>,
377 pub(crate) docs: DocSet,
378}
379
380impl InnerBuilder {
381 pub fn new(id: u64, with_position: bool, token_set_format: TokenSetFormat) -> Self {
382 Self {
383 id,
384 with_position,
385 token_set_format,
386 tokens: TokenSet::default(),
387 posting_lists: Vec::new(),
388 docs: DocSet::default(),
389 }
390 }
391
392 pub fn id(&self) -> u64 {
393 self.id
394 }
395
396 pub async fn remap(&mut self, mapping: &HashMap<u64, Option<u64>>) -> Result<()> {
397 let removed = self.docs.remap(mapping);
400
401 let mut token_id = 0;
405 let mut removed_token_ids = Vec::new();
406 self.posting_lists.retain_mut(|posting_list| {
407 posting_list.remap(&removed);
408 let keep = !posting_list.is_empty();
409 if !keep {
410 removed_token_ids.push(token_id as u32);
411 }
412 token_id += 1;
413 keep
414 });
415
416 self.tokens.remap(&removed_token_ids);
418
419 Ok(())
420 }
421
422 pub async fn write(&mut self, store: &dyn IndexStore) -> Result<()> {
423 let docs = Arc::new(std::mem::take(&mut self.docs));
424 self.write_posting_lists(store, docs.clone()).await?;
425 self.write_tokens(store).await?;
426 self.write_docs(store, docs).await?;
427 Ok(())
428 }
429
430 #[instrument(level = "debug", skip_all)]
431 async fn write_posting_lists(
432 &mut self,
433 store: &dyn IndexStore,
434 docs: Arc<DocSet>,
435 ) -> Result<()> {
436 let id = self.id;
437 let mut writer = store
438 .new_index_file(
439 &posting_file_path(self.id),
440 inverted_list_schema(self.with_position),
441 )
442 .await?;
443 let posting_lists = std::mem::take(&mut self.posting_lists);
444
445 log::info!(
446 "writing {} posting lists of partition {}, with position {}",
447 posting_lists.len(),
448 id,
449 self.with_position
450 );
451 let schema = inverted_list_schema(self.with_position);
452
453 let mut batches = stream::iter(posting_lists)
454 .map(|posting_list| {
455 let block_max_scores = docs.calculate_block_max_scores(
456 posting_list.doc_ids.iter(),
457 posting_list.frequencies.iter(),
458 );
459 spawn_cpu(move || posting_list.to_batch(block_max_scores))
460 })
461 .buffered(get_num_compute_intensive_cpus());
462
463 let mut write_duration = std::time::Duration::ZERO;
464 let mut num_posting_lists = 0;
465 let mut buffer = Vec::new();
466 let mut size_sum = 0;
467 while let Some(batch) = batches.try_next().await? {
468 num_posting_lists += 1;
469 size_sum += batch.get_array_memory_size();
470 buffer.push(batch);
471 if size_sum >= *LANCE_FTS_FLUSH_SIZE << 20 {
472 let batch = concat_batches(&schema, buffer.iter())?;
473 buffer.clear();
474 size_sum = 0;
475 let start = std::time::Instant::now();
476 writer.write_record_batch(batch).await?;
477 write_duration += start.elapsed();
478 }
479
480 if num_posting_lists % 500_000 == 0 {
481 log::info!(
482 "wrote {} posting lists of partition {}, writing elapsed: {:?}",
483 num_posting_lists,
484 id,
485 write_duration,
486 );
487 }
488 }
489 if !buffer.is_empty() {
490 let batch = concat_batches(&schema, buffer.iter())?;
491 writer.write_record_batch(batch).await?;
492 }
493
494 writer.finish().await?;
495 Ok(())
496 }
497
498 #[instrument(level = "debug", skip_all)]
499 async fn write_tokens(&mut self, store: &dyn IndexStore) -> Result<()> {
500 log::info!("writing tokens of partition {}", self.id);
501 let tokens = std::mem::take(&mut self.tokens);
502 let batch = tokens.to_batch(self.token_set_format)?;
503 let mut writer = store
504 .new_index_file(&token_file_path(self.id), batch.schema())
505 .await?;
506 writer.write_record_batch(batch).await?;
507 writer.finish().await?;
508 Ok(())
509 }
510
511 #[instrument(level = "debug", skip_all)]
512 async fn write_docs(&mut self, store: &dyn IndexStore, docs: Arc<DocSet>) -> Result<()> {
513 log::info!("writing docs of partition {}", self.id);
514 let batch = docs.to_batch()?;
515 let mut writer = store
516 .new_index_file(&doc_file_path(self.id), batch.schema())
517 .await?;
518 writer.write_record_batch(batch).await?;
519 writer.finish().await?;
520 Ok(())
521 }
522}
523
524struct IndexWorker {
525 store: Arc<dyn IndexStore>,
526 tokenizer: Box<dyn LanceTokenizer>,
527 id_alloc: Arc<AtomicU64>,
528 builder: InnerBuilder,
529 partitions: Vec<u64>,
530 schema: SchemaRef,
531 estimated_size: u64,
532 total_doc_length: usize,
533 fragment_mask: Option<u64>,
534 token_set_format: TokenSetFormat,
535}
536
537impl IndexWorker {
538 async fn new(
539 store: Arc<dyn IndexStore>,
540 tokenizer: Box<dyn LanceTokenizer>,
541 with_position: bool,
542 id_alloc: Arc<AtomicU64>,
543 fragment_mask: Option<u64>,
544 token_set_format: TokenSetFormat,
545 ) -> Result<Self> {
546 let schema = inverted_list_schema(with_position);
547
548 Ok(Self {
549 store,
550 tokenizer,
551 builder: InnerBuilder::new(
552 id_alloc.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
553 | fragment_mask.unwrap_or(0),
554 with_position,
555 token_set_format,
556 ),
557 partitions: Vec::new(),
558 id_alloc,
559 schema,
560 estimated_size: 0,
561 total_doc_length: 0,
562 fragment_mask,
563 token_set_format,
564 })
565 }
566
567 fn has_position(&self) -> bool {
568 self.schema.column_with_name(POSITION_COL).is_some()
569 }
570
571 async fn process_batch(&mut self, batch: RecordBatch) -> Result<()> {
572 let doc_col = batch.column(0);
573 let doc_iter = iter_str_array(doc_col);
574 let row_id_col = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
575 let docs = doc_iter
576 .zip(row_id_col.values().iter())
577 .filter_map(|(doc, row_id)| doc.map(|doc| (doc, *row_id)));
578
579 let with_position = self.has_position();
580 for (doc, row_id) in docs {
581 let mut token_occurrences = HashMap::new();
582 let mut token_num = 0;
583 {
584 let mut token_stream = self.tokenizer.token_stream_for_doc(doc);
585 while token_stream.advance() {
586 let token = token_stream.token_mut();
587 let token_text = std::mem::take(&mut token.text);
588 let token_id = self.builder.tokens.add(token_text) as usize;
589 token_occurrences
590 .entry(token_id as u32)
591 .or_insert_with(|| PositionRecorder::new(with_position))
592 .push(token.position as u32);
593 token_num += 1;
594 }
595 }
596 self.builder
597 .posting_lists
598 .resize_with(self.builder.tokens.len(), || {
599 PostingListBuilder::new(with_position)
600 });
601 let doc_id = self.builder.docs.append(row_id, token_num);
602 self.total_doc_length += doc.len();
603
604 token_occurrences
605 .into_iter()
606 .for_each(|(token_id, term_positions)| {
607 let posting_list = &mut self.builder.posting_lists[token_id as usize];
608
609 let old_size = posting_list.size();
610 posting_list.add(doc_id, term_positions);
611 let new_size = posting_list.size();
612 self.estimated_size += new_size - old_size;
613 });
614
615 if self.builder.docs.len() as u32 == u32::MAX
616 || self.estimated_size >= *LANCE_FTS_PARTITION_SIZE << 20
617 {
618 self.flush().await?;
619 }
620 }
621
622 Ok(())
623 }
624
625 #[instrument(level = "debug", skip_all)]
626 async fn flush(&mut self) -> Result<()> {
627 if self.builder.tokens.is_empty() {
628 return Ok(());
629 }
630
631 log::info!(
632 "flushing posting lists, estimated size: {} MiB",
633 self.estimated_size / (1024 * 1024)
634 );
635 self.estimated_size = 0;
636 let with_position = self.has_position();
637 let mut builder = std::mem::replace(
638 &mut self.builder,
639 InnerBuilder::new(
640 self.id_alloc
641 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
642 | self.fragment_mask.unwrap_or(0),
643 with_position,
644 self.token_set_format,
645 ),
646 );
647 builder.write(self.store.as_ref()).await?;
648 self.partitions.push(builder.id());
649 Ok(())
650 }
651
652 async fn finish(mut self) -> Result<Vec<u64>> {
653 if !self.builder.tokens.is_empty() {
654 self.flush().await?;
655 }
656 Ok(self.partitions)
657 }
658}
659
660#[derive(Debug, Clone)]
661pub enum PositionRecorder {
662 Position(Vec<u32>),
663 Count(u32),
664}
665
666impl PositionRecorder {
667 fn new(with_position: bool) -> Self {
668 if with_position {
669 Self::Position(Vec::new())
670 } else {
671 Self::Count(0)
672 }
673 }
674
675 fn push(&mut self, position: u32) {
676 match self {
677 Self::Position(positions) => positions.push(position),
678 Self::Count(count) => *count += 1,
679 }
680 }
681
682 pub fn len(&self) -> u32 {
683 match self {
684 Self::Position(positions) => positions.len() as u32,
685 Self::Count(count) => *count,
686 }
687 }
688
689 pub fn is_empty(&self) -> bool {
690 self.len() == 0
691 }
692
693 pub fn into_vec(self) -> Vec<u32> {
694 match self {
695 Self::Position(positions) => positions,
696 Self::Count(_) => vec![0],
697 }
698 }
699}
700
701#[derive(Debug, Eq, PartialEq, Clone, DeepSizeOf)]
702pub struct ScoredDoc {
703 pub row_id: u64,
704 pub score: OrderedFloat,
705}
706
707impl ScoredDoc {
708 pub fn new(row_id: u64, score: f32) -> Self {
709 Self {
710 row_id,
711 score: OrderedFloat(score),
712 }
713 }
714}
715
716impl PartialOrd for ScoredDoc {
717 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
718 Some(self.cmp(other))
719 }
720}
721
722impl Ord for ScoredDoc {
723 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
724 self.score.cmp(&other.score)
725 }
726}
727
728pub fn legacy_inverted_list_schema(with_position: bool) -> SchemaRef {
729 let mut fields = vec![
730 arrow_schema::Field::new(ROW_ID, arrow_schema::DataType::UInt64, false),
731 arrow_schema::Field::new(FREQUENCY_COL, arrow_schema::DataType::Float32, false),
732 ];
733 if with_position {
734 fields.push(arrow_schema::Field::new(
735 POSITION_COL,
736 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
737 "item",
738 arrow_schema::DataType::Int32,
739 true,
740 ))),
741 false,
742 ));
743 }
744 Arc::new(arrow_schema::Schema::new(fields))
745}
746
747pub fn inverted_list_schema(with_position: bool) -> SchemaRef {
748 let mut fields = vec![
749 arrow_schema::Field::new(
752 POSTING_COL,
753 datatypes::DataType::List(Arc::new(Field::new(
754 "item",
755 datatypes::DataType::LargeBinary,
756 true,
757 ))),
758 false,
759 ),
760 arrow_schema::Field::new(MAX_SCORE_COL, datatypes::DataType::Float32, false),
761 arrow_schema::Field::new(LENGTH_COL, datatypes::DataType::UInt32, false),
762 ];
763 if with_position {
764 fields.push(arrow_schema::Field::new(
765 POSITION_COL,
766 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
767 "item",
768 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
769 "item",
770 arrow_schema::DataType::LargeBinary,
771 true,
772 ))),
773 true,
774 ))),
775 false,
776 ));
777 }
778 Arc::new(arrow_schema::Schema::new(fields))
779}
780
781pub struct FlattenStream {
783 inner: SendableRecordBatchStream,
787 field_type: DataType,
788 data_type: DataType,
789}
790
791impl FlattenStream {
792 pub fn new(input: SendableRecordBatchStream) -> Self {
793 let schema = input.schema();
794 let field = schema.field(0);
795 let data_type = match field.data_type() {
796 DataType::List(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
797 DataType::List(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
798 DataType::LargeUtf8
799 }
800 DataType::LargeList(f) if matches!(f.data_type(), DataType::Utf8) => DataType::Utf8,
801 DataType::LargeList(f) if matches!(f.data_type(), DataType::LargeUtf8) => {
802 DataType::LargeUtf8
803 }
804 _ => panic!(
805 "expect data type List(Utf8) or List(LargeUtf8) but got {:?}",
806 field.data_type()
807 ),
808 };
809 Self {
810 inner: input,
811 field_type: field.data_type().clone(),
812 data_type,
813 }
814 }
815}
816
817impl Stream for FlattenStream {
818 type Item = datafusion_common::Result<RecordBatch>;
819
820 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
821 match Pin::new(&mut self.inner).poll_next(cx) {
822 Poll::Ready(Some(Ok(batch))) => {
823 let doc_col = batch.column(0);
824 let batch = match self.field_type {
825 DataType::List(_) => flatten_string_list::<i32>(&batch, doc_col).map_err(|e| {
826 datafusion_common::error::DataFusionError::Execution(format!(
827 "flatten string list error: {}",
828 e
829 ))
830 }),
831 DataType::LargeList(_) => {
832 flatten_string_list::<i64>(&batch, doc_col).map_err(|e| {
833 datafusion_common::error::DataFusionError::Execution(format!(
834 "flatten string list error: {}",
835 e
836 ))
837 })
838 }
839 _ => unreachable!(
840 "expect data type List or LargeList but got {:?}",
841 self.field_type
842 ),
843 };
844 Poll::Ready(Some(batch))
845 }
846 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
847 Poll::Ready(None) => Poll::Ready(None),
848 Poll::Pending => Poll::Pending,
849 }
850 }
851}
852
853impl RecordBatchStream for FlattenStream {
854 fn schema(&self) -> SchemaRef {
855 let schema = Schema::new(vec![
856 Field::new(
857 self.inner.schema().field(0).name(),
858 self.data_type.clone(),
859 true,
860 ),
861 ROW_ID_FIELD.clone(),
862 ]);
863
864 Arc::new(schema)
865 }
866}
867
868fn flatten_string_list<Offset: arrow::array::OffsetSizeTrait>(
869 batch: &RecordBatch,
870 doc_col: &Arc<dyn Array>,
871) -> Result<RecordBatch> {
872 let docs = doc_col.as_list::<Offset>();
873 let row_ids = batch[ROW_ID].as_primitive::<datatypes::UInt64Type>();
874
875 let row_ids = row_ids
876 .values()
877 .iter()
878 .zip(docs.iter())
879 .flat_map(|(row_id, doc)| std::iter::repeat_n(*row_id, doc.map(|d| d.len()).unwrap_or(0)));
880
881 let row_ids = Arc::new(UInt64Array::from_iter_values(row_ids));
882 let docs = match docs.value_type() {
883 datatypes::DataType::Utf8 | datatypes::DataType::LargeUtf8 => docs.values().clone(),
884 _ => {
885 return Err(Error::Index {
886 message: format!(
887 "expect data type String or LargeString but got {}",
888 docs.value_type()
889 ),
890 location: location!(),
891 });
892 }
893 };
894
895 let schema = Schema::new(vec![
896 Field::new(
897 batch.schema().field(0).name(),
898 docs.data_type().clone(),
899 true,
900 ),
901 ROW_ID_FIELD.clone(),
902 ]);
903 let batch = RecordBatch::try_new(Arc::new(schema), vec![docs, row_ids])?;
904 Ok(batch)
905}
906
907pub(crate) fn token_file_path(partition_id: u64) -> String {
908 format!("part_{}_{}", partition_id, TOKENS_FILE)
909}
910
911pub(crate) fn posting_file_path(partition_id: u64) -> String {
912 format!("part_{}_{}", partition_id, INVERT_LIST_FILE)
913}
914
915pub(crate) fn doc_file_path(partition_id: u64) -> String {
916 format!("part_{}_{}", partition_id, DOCS_FILE)
917}
918
919pub(crate) fn part_metadata_file_path(partition_id: u64) -> String {
920 format!("part_{}_{}", partition_id, METADATA_FILE)
921}
922
923pub async fn merge_index_files(
924 object_store: &ObjectStore,
925 index_dir: &Path,
926 store: Arc<dyn IndexStore>,
927) -> Result<()> {
928 let part_metadata_files = list_metadata_files(object_store, index_dir).await?;
930
931 merge_metadata_files(store, &part_metadata_files).await
933}
934
935async fn list_metadata_files(object_store: &ObjectStore, index_dir: &Path) -> Result<Vec<String>> {
938 let mut part_metadata_files = Vec::new();
940 let mut list_stream = object_store.list(Some(index_dir.clone()));
941
942 while let Some(item) = list_stream.next().await {
943 match item {
944 Ok(meta) => {
945 let file_name = meta.location.filename().unwrap_or_default();
946 if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
948 part_metadata_files.push(file_name.to_string());
949 }
950 }
951 Err(_) => continue,
952 }
953 }
954
955 if part_metadata_files.is_empty() {
956 return Err(Error::InvalidInput {
957 source: format!(
958 "No partition metadata files found in index directory: {}",
959 index_dir
960 )
961 .into(),
962 location: location!(),
963 });
964 }
965
966 Ok(part_metadata_files)
967}
968
969async fn merge_metadata_files(
971 store: Arc<dyn IndexStore>,
972 part_metadata_files: &[String],
973) -> Result<()> {
974 let mut all_partitions = Vec::new();
976 let mut params = None;
977 let mut token_set_format = None;
978
979 for file_name in part_metadata_files {
980 let reader = store.open_index_file(file_name).await?;
981 let metadata = &reader.schema().metadata;
982
983 let partitions_str = metadata.get("partitions").ok_or(Error::Index {
984 message: format!("partitions not found in {}", file_name),
985 location: location!(),
986 })?;
987
988 let partition_ids: Vec<u64> =
989 serde_json::from_str(partitions_str).map_err(|e| Error::Index {
990 message: format!("Failed to parse partitions: {}", e),
991 location: location!(),
992 })?;
993
994 all_partitions.extend(partition_ids);
995
996 if params.is_none() {
997 let params_str = metadata.get("params").ok_or(Error::Index {
998 message: format!("params not found in {}", file_name),
999 location: location!(),
1000 })?;
1001 params = Some(
1002 serde_json::from_str::<InvertedIndexParams>(params_str).map_err(|e| {
1003 Error::Index {
1004 message: format!("Failed to parse params: {}", e),
1005 location: location!(),
1006 }
1007 })?,
1008 );
1009 }
1010
1011 if token_set_format.is_none() {
1012 if let Some(name) = metadata.get(TOKEN_SET_FORMAT_KEY) {
1013 token_set_format = Some(TokenSetFormat::from_str(name)?);
1014 }
1015 }
1016 }
1017
1018 let mut sorted_ids = all_partitions.clone();
1020 sorted_ids.sort();
1021 sorted_ids.dedup();
1022
1023 let id_mapping: HashMap<u64, u64> = sorted_ids
1024 .iter()
1025 .enumerate()
1026 .map(|(new_id, &old_id)| (old_id, new_id as u64))
1027 .collect();
1028
1029 let timestamp = std::time::SystemTime::now()
1031 .duration_since(std::time::UNIX_EPOCH)
1032 .unwrap()
1033 .as_secs();
1034
1035 let mut temp_files: Vec<(String, String, String)> = Vec::new(); for (&old_id, &new_id) in &id_mapping {
1039 if old_id != new_id {
1040 for suffix in [TOKENS_FILE, INVERT_LIST_FILE, DOCS_FILE] {
1041 let old_path = format!("part_{}_{}", old_id, suffix);
1042 let new_path = format!("part_{}_{}", new_id, suffix);
1043 let temp_path = format!("temp_{}_{}", timestamp, old_path);
1044
1045 if let Err(e) = store.rename_index_file(&old_path, &temp_path).await {
1047 for (temp_name, old_name, _) in temp_files.iter().rev() {
1049 let _ = store.rename_index_file(temp_name, old_name).await;
1050 }
1051 return Err(Error::Index {
1052 message: format!(
1053 "Failed to move {} to temp {}: {}",
1054 old_path, temp_path, e
1055 ),
1056 location: location!(),
1057 });
1058 }
1059 temp_files.push((temp_path, old_path, new_path));
1060 }
1061 }
1062 }
1063
1064 let mut completed_renames: Vec<(String, String)> = Vec::new(); for (temp_path, _old_path, final_path) in &temp_files {
1068 if let Err(e) = store.rename_index_file(temp_path, final_path).await {
1069 for (final_name, temp_name) in completed_renames.iter().rev() {
1071 let _ = store.rename_index_file(final_name, temp_name).await;
1072 }
1073 for (temp_name, orig_name, _) in temp_files.iter() {
1075 if !completed_renames.iter().any(|(_, t)| t == temp_name) {
1076 let _ = store.rename_index_file(temp_name, orig_name).await;
1077 }
1078 }
1079 return Err(Error::Index {
1080 message: format!("Failed to rename {} to {}: {}", temp_path, final_path, e),
1081 location: location!(),
1082 });
1083 }
1084 completed_renames.push((final_path.clone(), temp_path.clone()));
1085 }
1086
1087 let remapped_partitions: Vec<u64> = (0..id_mapping.len() as u64).collect();
1089 let params = params.unwrap_or_default();
1090 let token_set_format = token_set_format.unwrap_or(TokenSetFormat::Arrow);
1091 let builder = InvertedIndexBuilder::from_existing_index(
1092 params,
1093 None,
1094 remapped_partitions.clone(),
1095 token_set_format,
1096 None,
1097 );
1098 builder
1099 .write_metadata(&*store, &remapped_partitions)
1100 .await?;
1101
1102 for file_name in part_metadata_files {
1104 if file_name.starts_with("part_") && file_name.ends_with("_metadata.lance") {
1105 let _ = store.delete_index_file(file_name).await;
1106 }
1107 }
1108
1109 Ok(())
1110}
1111
1112pub fn document_input(
1119 input: SendableRecordBatchStream,
1120 column: &str,
1121) -> Result<SendableRecordBatchStream> {
1122 let schema = input.schema();
1123 let field = schema.column_with_name(column).expect_ok()?.1;
1124 match field.data_type() {
1125 DataType::Utf8 | DataType::LargeUtf8 => Ok(input),
1126 DataType::List(field) | DataType::LargeList(field)
1127 if matches!(field.data_type(), DataType::Utf8 | DataType::LargeUtf8) =>
1128 {
1129 Ok(Box::pin(FlattenStream::new(input)))
1130 }
1131 DataType::LargeBinary => match field.metadata().get(ARROW_EXT_NAME_KEY) {
1132 Some(name) if name.as_str() == JSON_EXT_NAME => {
1133 Ok(Box::pin(JsonTextStream::new(input, column.to_string())))
1134 }
1135 _ => Err(Error::InvalidInput {
1136 source: format!("column {} is not json", column).into(),
1137 location: location!(),
1138 }),
1139 },
1140 _ => Err(Error::InvalidInput {
1141 source: format!(
1142 "column {} has type {}, is not utf8, large utf8 type/list, or large binary",
1143 column,
1144 field.data_type()
1145 )
1146 .into(),
1147 location: location!(),
1148 }),
1149 }
1150}