1use crate::backend::StorageBackend;
7use crate::backend::table_names;
8use crate::backend::types::ScanRequest;
9#[cfg(feature = "lance-backend")]
10use crate::storage::inverted_index::InvertedIndex;
11use crate::storage::vertex::VertexDataset;
12use anyhow::{Result, anyhow};
13use chrono::{DateTime, Utc};
14#[cfg(feature = "lance-backend")]
15use lance::index::DatasetIndexExt;
16#[cfg(feature = "lance-backend")]
17use lance::index::vector::VectorIndexParams;
18#[cfg(feature = "lance-backend")]
19use lance_index::IndexType;
20#[cfg(feature = "lance-backend")]
21use lance_index::progress::IndexBuildProgress;
22#[cfg(feature = "lance-backend")]
23use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
24#[cfg(feature = "lance-backend")]
25use lance_index::vector::bq::RQBuildParams;
26#[cfg(feature = "lance-backend")]
27use lance_index::vector::hnsw::builder::HnswBuildParams;
28#[cfg(feature = "lance-backend")]
29use lance_index::vector::ivf::IvfBuildParams;
30#[cfg(feature = "lance-backend")]
31use lance_index::vector::pq::PQBuildParams;
32#[cfg(feature = "lance-backend")]
33use lance_index::vector::sq::builder::SQBuildParams;
34#[cfg(feature = "lance-backend")]
35use lance_linalg::distance::MetricType;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38#[cfg(feature = "lance-backend")]
39use std::collections::HashSet;
40use std::sync::Arc;
41#[cfg(feature = "lance-backend")]
42use tracing::{debug, info, instrument, warn};
43use uni_common::core::id::Vid;
44#[cfg(feature = "lance-backend")]
45use uni_common::core::schema::IndexDefinition;
46use uni_common::core::schema::SchemaManager;
47#[cfg(feature = "lance-backend")]
48use uni_common::core::schema::{
49 DistanceMetric, FullTextIndexConfig, InvertedIndexConfig, JsonFtsIndexConfig,
50 ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, VectorIndexType,
51};
52
53#[cfg(feature = "lance-backend")]
58#[derive(Debug)]
59pub struct TracingIndexProgress {
60 index_name: String,
61}
62
63#[cfg(feature = "lance-backend")]
64impl TracingIndexProgress {
65 pub fn arc(index_name: &str) -> Arc<dyn IndexBuildProgress> {
66 Arc::new(Self {
67 index_name: index_name.to_string(),
68 })
69 }
70}
71
72#[cfg(feature = "lance-backend")]
73#[async_trait::async_trait]
74impl IndexBuildProgress for TracingIndexProgress {
75 async fn stage_start(&self, stage: &str, total: Option<u64>, unit: &str) -> lance::Result<()> {
76 info!(
77 index = %self.index_name,
78 stage,
79 ?total,
80 unit,
81 "Index build stage started"
82 );
83 Ok(())
84 }
85
86 async fn stage_progress(&self, stage: &str, completed: u64) -> lance::Result<()> {
87 debug!(
88 index = %self.index_name,
89 stage,
90 completed,
91 "Index build progress"
92 );
93 Ok(())
94 }
95
96 async fn stage_complete(&self, stage: &str) -> lance::Result<()> {
97 info!(
98 index = %self.index_name,
99 stage,
100 "Index build stage complete"
101 );
102 Ok(())
103 }
104}
105
106#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
108pub enum IndexRebuildStatus {
109 Pending,
111 InProgress,
113 Completed,
115 Failed,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct IndexRebuildTask {
122 pub id: String,
124 pub label: String,
126 pub status: IndexRebuildStatus,
128 pub created_at: DateTime<Utc>,
130 pub started_at: Option<DateTime<Utc>>,
132 pub completed_at: Option<DateTime<Utc>>,
134 pub error: Option<String>,
136 pub retry_count: u32,
138}
139
140fn resolve_vector_dim(t: &uni_common::DataType) -> Option<usize> {
145 match t {
146 uni_common::DataType::Vector { dimensions } => Some(*dimensions),
147 uni_common::DataType::List(inner) => resolve_vector_dim(inner),
148 _ => None,
149 }
150}
151
152pub struct IndexManager {
154 base_uri: String,
155 schema_manager: Arc<SchemaManager>,
156 backend: Option<Arc<dyn StorageBackend>>,
160}
161
162impl std::fmt::Debug for IndexManager {
163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 f.debug_struct("IndexManager")
165 .field("base_uri", &self.base_uri)
166 .finish_non_exhaustive()
167 }
168}
169
170impl IndexManager {
171 pub fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Self {
174 Self {
175 base_uri: base_uri.to_string(),
176 schema_manager,
177 backend: None,
178 }
179 }
180
181 pub fn with_backend(mut self, backend: Arc<dyn StorageBackend>) -> Self {
183 self.backend = Some(backend);
184 self
185 }
186
187 #[cfg(feature = "lance-backend")]
189 #[instrument(skip(self), level = "info")]
190 pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
191 let label = &config.label;
192 let property = &config.property;
193 info!(
194 "Creating Inverted Index '{}' on {}.{}",
195 config.name, label, property
196 );
197
198 let schema = self.schema_manager.schema();
199 let label_meta = schema
200 .labels
201 .get(label)
202 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
203
204 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
205
206 let ds = VertexDataset::new(&self.base_uri, label, label_meta.id);
207
208 if ds.open_raw().await.is_ok() {
210 index
211 .build_from_dataset(&ds, |n| info!("Indexed {} terms", n))
212 .await?;
213 } else {
214 warn!(
215 "Dataset for label '{}' not found, creating empty inverted index",
216 label
217 );
218 }
219
220 self.schema_manager
221 .add_index(IndexDefinition::Inverted(config))?;
222 self.schema_manager.save().await?;
223
224 Ok(())
225 }
226
227 #[cfg(feature = "lance-backend")]
237 #[instrument(skip(self), level = "info")]
238 pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
239 self.create_vector_index_inner(config, false).await
240 }
241
242 #[cfg(feature = "lance-backend")]
254 async fn create_vector_index_inner(
255 &self,
256 config: VectorIndexConfig,
257 force_backfill: bool,
258 ) -> Result<()> {
259 if let VectorIndexType::Muvera { inner, .. } = &config.index_type {
260 self.prepare_muvera_fde(&config, force_backfill).await?;
262 let inner_cfg = VectorIndexConfig {
263 name: config.name.clone(),
264 label: config.label.clone(),
265 property: crate::storage::muvera_index::fde_derived_column(&config.name),
266 index_type: (**inner).clone(),
267 metric: DistanceMetric::Dot,
268 embedding_config: None,
269 metadata: config.metadata.clone(),
270 };
271 self.build_physical_vector_index(&inner_cfg).await?;
272 } else {
273 self.build_physical_vector_index(&config).await?;
274 }
275 self.schema_manager
276 .add_index(IndexDefinition::Vector(config))?;
277 self.schema_manager.save().await?;
278 Ok(())
279 }
280
281 #[cfg(feature = "lance-backend")]
296 async fn prepare_muvera_fde(
297 &self,
298 config: &VectorIndexConfig,
299 force_backfill: bool,
300 ) -> Result<()> {
301 use crate::storage::muvera_index::fde_spec_for_config;
302
303 let schema = self.schema_manager.schema();
304 let Some(spec) = fde_spec_for_config(&schema, config) else {
305 return Ok(());
306 };
307 spec.params.validate()?;
308
309 let newly_added = self.schema_manager.add_internal_property(
313 &spec.label,
314 &spec.derived_col,
315 uni_common::DataType::Vector {
316 dimensions: spec.params.fde_dim(),
317 },
318 true,
319 )?;
320
321 if !newly_added && !force_backfill {
325 return Ok(());
326 }
327
328 if let Err(e) = self.backfill_fde_column(&spec).await {
340 if newly_added {
341 let _ = self
342 .schema_manager
343 .drop_property(&spec.label, &spec.derived_col);
344 }
345 return Err(e);
346 }
347 Ok(())
348 }
349
350 #[cfg(feature = "lance-backend")]
357 async fn backfill_fde_column(
358 &self,
359 spec: &crate::storage::muvera_index::FdeSpec,
360 ) -> Result<()> {
361 use crate::storage::muvera_index::splice_fde_batch;
362
363 let Some(backend) = self.backend.as_ref() else {
364 return Ok(());
365 };
366 let table = table_names::vertex_table_name(&spec.label);
367 if !backend.table_exists(&table).await.unwrap_or(false) {
368 return Ok(());
369 }
370
371 let schema = self.schema_manager.schema();
372 let label_id = schema
373 .label_id_by_name(&spec.label)
374 .ok_or_else(|| anyhow!("MUVERA: label '{}' not found", spec.label))?;
375 let target_schema =
378 VertexDataset::new(&self.base_uri, &spec.label, label_id).get_arrow_schema(&schema)?;
379 let source_dt = schema
380 .properties
381 .get(&spec.label)
382 .and_then(|p| p.get(&spec.source_prop))
383 .map(|m| m.r#type.clone());
384 let encoder = uni_common::muvera::FdeEncoder::new(&spec.params)?;
385
386 let batches = backend.scan(ScanRequest::all(&table)).await?;
387 let mut new_batches = Vec::with_capacity(batches.len());
388 for batch in &batches {
389 new_batches.push(splice_fde_batch(
390 batch,
391 &target_schema,
392 spec,
393 &encoder,
394 source_dt.as_ref(),
395 )?);
396 }
397 backend
398 .replace_table_atomic(&table, new_batches, target_schema)
399 .await?;
400 Ok(())
401 }
402
403 #[cfg(feature = "lance-backend")]
408 async fn build_physical_vector_index(&self, config: &VectorIndexConfig) -> Result<()> {
409 let label = &config.label;
410 let property = &config.property;
411 info!(
412 "Creating vector index '{}' on {}.{}",
413 config.name, label, property
414 );
415
416 let schema = self.schema_manager.schema();
417 let label_meta = schema
418 .labels
419 .get(label)
420 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
421
422 let prop_dim = schema
427 .properties
428 .get(label)
429 .and_then(|props| props.get(property))
430 .and_then(|meta| resolve_vector_dim(&meta.r#type));
431 let pq_sub = match &config.index_type {
432 VectorIndexType::IvfPq {
433 num_sub_vectors, ..
434 }
435 | VectorIndexType::HnswPq {
436 num_sub_vectors, ..
437 } => Some(*num_sub_vectors as usize),
438 _ => None,
439 };
440 if let (Some(dim), Some(sub)) = (prop_dim, pq_sub)
446 && sub != 0
447 && dim >= sub
448 && dim % sub != 0
449 {
450 return Err(anyhow!(
451 "Vector index '{}': PQ num_sub_vectors ({}) must divide the embedding dimension ({})",
452 config.name,
453 sub,
454 dim
455 ));
456 }
457
458 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
459
460 match ds_wrapper.open_raw().await {
461 Ok(mut lance_ds) => {
462 let metric_type = match &config.metric {
463 DistanceMetric::L2 => MetricType::L2,
464 DistanceMetric::Cosine => MetricType::Cosine,
465 DistanceMetric::Dot => MetricType::Dot,
466 _ => return Err(anyhow!("Unsupported metric: {:?}", config.metric)),
467 };
468
469 let params = match config.index_type.clone() {
470 VectorIndexType::Flat => {
471 let ivf = IvfBuildParams::new(1);
472 VectorIndexParams::with_ivf_flat_params(metric_type, ivf)
473 }
474 VectorIndexType::IvfFlat { num_partitions } => {
475 let ivf = IvfBuildParams::new(num_partitions as usize);
476 VectorIndexParams::with_ivf_flat_params(metric_type, ivf)
477 }
478 VectorIndexType::IvfPq {
479 num_partitions,
480 num_sub_vectors,
481 bits_per_subvector,
482 } => {
483 let ivf = IvfBuildParams::new(num_partitions as usize);
484 let pq = PQBuildParams::new(
485 num_sub_vectors as usize,
486 bits_per_subvector as usize,
487 );
488 VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
489 }
490 VectorIndexType::IvfSq { num_partitions } => {
491 let ivf = IvfBuildParams::new(num_partitions as usize);
492 let sq = SQBuildParams::default();
493 VectorIndexParams::with_ivf_sq_params(metric_type, ivf, sq)
494 }
495 VectorIndexType::IvfRq {
496 num_partitions,
497 num_bits,
498 } => {
499 let ivf = IvfBuildParams::new(num_partitions as usize);
500 let mut rq = RQBuildParams::default();
501 if let Some(bits) = num_bits {
502 rq.num_bits = bits;
503 }
504 VectorIndexParams::with_ivf_rq_params(metric_type, ivf, rq)
505 }
506 VectorIndexType::HnswFlat {
507 m,
508 ef_construction,
509 num_partitions,
510 } => {
511 let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
512 let hnsw = HnswBuildParams::default()
513 .num_edges(m as usize)
514 .ef_construction(ef_construction as usize);
515 VectorIndexParams::ivf_hnsw(metric_type, ivf, hnsw)
516 }
517 VectorIndexType::HnswSq {
518 m,
519 ef_construction,
520 num_partitions,
521 } => {
522 let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
523 let hnsw = HnswBuildParams::default()
524 .num_edges(m as usize)
525 .ef_construction(ef_construction as usize);
526 let sq = SQBuildParams::default();
527 VectorIndexParams::with_ivf_hnsw_sq_params(metric_type, ivf, hnsw, sq)
528 }
529 VectorIndexType::HnswPq {
530 m,
531 ef_construction,
532 num_sub_vectors,
533 num_partitions,
534 } => {
535 let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
536 let hnsw = HnswBuildParams::default()
537 .num_edges(m as usize)
538 .ef_construction(ef_construction as usize);
539 let pq = PQBuildParams::new(num_sub_vectors as usize, 8);
540 VectorIndexParams::with_ivf_hnsw_pq_params(metric_type, ivf, hnsw, pq)
541 }
542 _ => {
543 return Err(anyhow!(
544 "Unsupported vector index type: {:?}",
545 config.index_type
546 ));
547 }
548 };
549
550 let progress = TracingIndexProgress::arc(&config.name);
552 match lance_ds
553 .create_index_builder(&[property], IndexType::Vector, ¶ms)
554 .name(config.name.clone())
555 .replace(true)
556 .progress(progress)
557 .await
558 {
559 Ok(metadata) => {
560 info!(
561 index_name = %metadata.name,
562 index_uuid = %metadata.uuid,
563 dataset_version = metadata.dataset_version,
564 "Vector index created"
565 );
566 }
567 Err(e) => {
568 warn!(
569 "Failed to create physical vector index (dataset might be empty): {}",
570 e
571 );
572 }
573 }
574 }
575 Err(e) => {
576 warn!(
577 "Dataset not found for label '{}', skipping physical index creation but saving schema definition. Error: {}",
578 label, e
579 );
580 }
581 }
582
583 Ok(())
584 }
585
586 #[cfg(feature = "lance-backend")]
588 #[instrument(skip(self), level = "info")]
589 pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
590 let label = &config.label;
591 let properties = &config.properties;
592 info!(
593 "Creating scalar index '{}' on {}.{:?}",
594 config.name, label, properties
595 );
596
597 let schema = self.schema_manager.schema();
598 let label_meta = schema
599 .labels
600 .get(label)
601 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
602
603 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
604
605 match ds_wrapper.open_raw().await {
606 Ok(mut lance_ds) => {
607 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
608
609 let progress = TracingIndexProgress::arc(&config.name);
610 let scalar_params = match config.index_type {
611 ScalarIndexType::Bitmap => {
612 ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap)
613 }
614 ScalarIndexType::LabelList => {
615 ScalarIndexParams::for_builtin(BuiltinIndexType::LabelList)
616 }
617 _ => ScalarIndexParams::default(),
618 };
619 match lance_ds
620 .create_index_builder(&columns, IndexType::Scalar, &scalar_params)
621 .name(config.name.clone())
622 .replace(true)
623 .progress(progress)
624 .await
625 {
626 Ok(metadata) => {
627 info!(
628 index_name = %metadata.name,
629 index_uuid = %metadata.uuid,
630 dataset_version = metadata.dataset_version,
631 "Scalar index created"
632 );
633 }
634 Err(e) => {
635 warn!(
636 "Failed to create physical scalar index (dataset might be empty): {}",
637 e
638 );
639 }
640 }
641 }
642 Err(e) => {
643 warn!(
644 "Dataset not found for label '{}' (scalar index), skipping physical creation. Error: {}",
645 label, e
646 );
647 }
648 }
649
650 self.schema_manager
651 .add_index(IndexDefinition::Scalar(config))?;
652 self.schema_manager.save().await?;
653
654 Ok(())
655 }
656
657 #[cfg(feature = "lance-backend")]
659 #[instrument(skip(self), level = "info")]
660 pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
661 let label = &config.label;
662 info!(
663 "Creating FTS index '{}' on {}.{:?}",
664 config.name, label, config.properties
665 );
666
667 let schema = self.schema_manager.schema();
668 let label_meta = schema
669 .labels
670 .get(label)
671 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
672
673 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
674
675 match ds_wrapper.open_raw().await {
676 Ok(mut lance_ds) => {
677 let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
678
679 let fts_params =
680 InvertedIndexParams::default().with_position(config.with_positions);
681
682 let progress = TracingIndexProgress::arc(&config.name);
683 match lance_ds
684 .create_index_builder(&columns, IndexType::Inverted, &fts_params)
685 .name(config.name.clone())
686 .replace(true)
687 .progress(progress)
688 .await
689 {
690 Ok(metadata) => {
691 info!(
692 index_name = %metadata.name,
693 index_uuid = %metadata.uuid,
694 dataset_version = metadata.dataset_version,
695 "FTS index created"
696 );
697 }
698 Err(e) => {
699 warn!(
700 "Failed to create physical FTS index (dataset might be empty): {}",
701 e
702 );
703 }
704 }
705 }
706 Err(e) => {
707 warn!(
708 "Dataset not found for label '{}' (FTS index), skipping physical creation. Error: {}",
709 label, e
710 );
711 }
712 }
713
714 self.schema_manager
715 .add_index(IndexDefinition::FullText(config))?;
716 self.schema_manager.save().await?;
717
718 Ok(())
719 }
720
721 #[cfg(feature = "lance-backend")]
726 #[instrument(skip(self), level = "info")]
727 pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
728 let label = &config.label;
729 let column = &config.column;
730 info!(
731 "Creating JSON FTS index '{}' on {}.{}",
732 config.name, label, column
733 );
734
735 let schema = self.schema_manager.schema();
736 let label_meta = schema
737 .labels
738 .get(label)
739 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
740
741 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
742
743 match ds_wrapper.open_raw().await {
744 Ok(mut lance_ds) => {
745 let fts_params =
746 InvertedIndexParams::default().with_position(config.with_positions);
747
748 let progress = TracingIndexProgress::arc(&config.name);
749 match lance_ds
750 .create_index_builder(&[column.as_str()], IndexType::Inverted, &fts_params)
751 .name(config.name.clone())
752 .replace(true)
753 .progress(progress)
754 .await
755 {
756 Ok(metadata) => {
757 info!(
758 index_name = %metadata.name,
759 index_uuid = %metadata.uuid,
760 dataset_version = metadata.dataset_version,
761 "JSON FTS index created"
762 );
763 }
764 Err(e) => {
765 warn!(
766 "Failed to create physical JSON FTS index (dataset might be empty): {}",
767 e
768 );
769 }
770 }
771 }
772 Err(e) => {
773 warn!(
774 "Dataset not found for label '{}' (JSON FTS index), skipping physical creation. Error: {}",
775 label, e
776 );
777 }
778 }
779
780 self.schema_manager
781 .add_index(IndexDefinition::JsonFullText(config))?;
782 self.schema_manager.save().await?;
783
784 Ok(())
785 }
786
787 #[cfg(feature = "lance-backend")]
789 #[instrument(skip(self), level = "info")]
790 pub async fn drop_index(&self, name: &str) -> Result<()> {
791 info!("Dropping index '{}'", name);
792
793 let idx_def = self
794 .schema_manager
795 .get_index(name)
796 .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
797
798 let label = idx_def.label();
800 let schema = self.schema_manager.schema();
801 if let Some(label_meta) = schema.labels.get(label) {
802 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
803 match ds_wrapper.open_raw().await {
804 Ok(mut lance_ds) => {
805 if let Err(e) = lance_ds.drop_index(name).await {
806 warn!(
809 "Physical index drop for '{}' returned error (non-fatal): {}",
810 name, e
811 );
812 } else {
813 info!("Physical index '{}' dropped from Lance dataset", name);
814 }
815 }
816 Err(e) => {
817 debug!(
818 "Could not open dataset for label '{}' to drop physical index: {}",
819 label, e
820 );
821 }
822 }
823 }
824
825 self.schema_manager.remove_index(name)?;
826 self.schema_manager.save().await?;
827 Ok(())
828 }
829
830 #[cfg(feature = "lance-backend")]
832 #[instrument(skip(self), level = "info")]
833 pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
834 info!("Rebuilding all indexes for label '{}'", label);
835 let schema = self.schema_manager.schema();
836
837 let indexes: Vec<_> = schema
839 .indexes
840 .iter()
841 .filter(|idx| idx.label() == label)
842 .cloned()
843 .collect();
844
845 for index in indexes {
846 match index {
847 IndexDefinition::Vector(cfg) => self.create_vector_index_inner(cfg, true).await?,
851 IndexDefinition::Scalar(cfg) => self.create_scalar_index(cfg).await?,
852 IndexDefinition::FullText(cfg) => self.create_fts_index(cfg).await?,
853 IndexDefinition::Inverted(cfg) => self.create_inverted_index(cfg).await?,
854 IndexDefinition::JsonFullText(cfg) => self.create_json_fts_index(cfg).await?,
855 _ => warn!("Unknown index type encountered during rebuild, skipping"),
856 }
857 }
858 Ok(())
859 }
860
861 #[cfg(feature = "lance-backend")]
863 pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
864 let schema = self.schema_manager.schema();
865 let label_meta = schema
866 .labels
867 .get(label)
868 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
869
870 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
872
873 if let Ok(mut ds) = ds_wrapper.open_raw().await {
875 let index_name = format!("{}_{}_composite", label, properties.join("_"));
877
878 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
880
881 let progress = TracingIndexProgress::arc(&index_name);
882 match ds
883 .create_index_builder(&columns, IndexType::Scalar, &ScalarIndexParams::default())
884 .name(index_name.clone())
885 .replace(true)
886 .progress(progress)
887 .await
888 {
889 Ok(metadata) => {
890 info!(
891 index_name = %metadata.name,
892 index_uuid = %metadata.uuid,
893 dataset_version = metadata.dataset_version,
894 "Composite index created"
895 );
896 }
897 Err(e) => {
898 warn!("Failed to create physical composite index: {}", e);
899 }
900 }
901
902 let config = ScalarIndexConfig {
903 name: index_name,
904 label: label.to_string(),
905 properties: properties.to_vec(),
906 index_type: uni_common::core::schema::ScalarIndexType::BTree,
907 where_clause: None,
908 metadata: Default::default(),
909 };
910
911 self.schema_manager
912 .add_index(IndexDefinition::Scalar(config))?;
913 self.schema_manager.save().await?;
914 }
915
916 Ok(())
917 }
918
919 #[cfg(feature = "lance-backend")]
928 #[instrument(skip(self, added, removed), level = "info", fields(
929 label = %config.label,
930 property = %config.property
931 ))]
932 pub async fn update_inverted_index_incremental(
933 &self,
934 config: &InvertedIndexConfig,
935 added: &HashMap<Vid, Vec<String>>,
936 removed: &HashSet<Vid>,
937 ) -> Result<()> {
938 info!(
939 added = added.len(),
940 removed = removed.len(),
941 "Incrementally updating inverted index"
942 );
943
944 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
945 index.apply_incremental_updates(added, removed).await
946 }
947}