1use crate::backend::StorageBackend;
7use crate::backend::table_names;
8use crate::backend::types::ScanRequest;
9#[cfg(feature = "lance-backend")]
10use crate::storage::inverted_index::InvertedIndex;
11#[cfg(feature = "lance-backend")]
12use crate::storage::sparse_index::SparseVectorIndex;
13use crate::storage::vertex::VertexDataset;
14use anyhow::{Result, anyhow};
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18#[cfg(feature = "lance-backend")]
19use std::collections::HashSet;
20use std::sync::Arc;
21#[cfg(feature = "lance-backend")]
22use tracing::{debug, info, instrument, warn};
23use uni_common::core::id::Vid;
24#[cfg(feature = "lance-backend")]
25use uni_common::core::schema::IndexDefinition;
26use uni_common::core::schema::SchemaManager;
27#[cfg(feature = "lance-backend")]
28use uni_common::core::schema::{
29 DistanceMetric, FullTextIndexConfig, InvertedIndexConfig, JsonFtsIndexConfig,
30 ScalarIndexConfig, ScalarIndexType, SparseVectorIndexConfig, VectorIndexConfig,
31 VectorIndexType,
32};
33
34#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
36pub enum IndexRebuildStatus {
37 Pending,
39 InProgress,
41 Completed,
43 Failed,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct IndexRebuildTask {
50 pub id: String,
52 pub label: String,
54 pub status: IndexRebuildStatus,
56 pub created_at: DateTime<Utc>,
58 pub started_at: Option<DateTime<Utc>>,
60 pub completed_at: Option<DateTime<Utc>>,
62 pub error: Option<String>,
64 pub retry_count: u32,
66}
67
68fn resolve_vector_dim(t: &uni_common::DataType) -> Option<usize> {
73 match t {
74 uni_common::DataType::Vector { dimensions } => Some(*dimensions),
75 uni_common::DataType::List(inner) => resolve_vector_dim(inner),
76 _ => None,
77 }
78}
79
80#[cfg(feature = "lance-backend")]
90fn to_backend_vector_params(
91 metric: DistanceMetric,
92 index_type: &VectorIndexType,
93) -> Result<crate::backend::types::VectorIndexParams> {
94 use crate::backend::types::{VectorIndexKind, VectorIndexParams};
95 let backend_metric = match metric {
98 DistanceMetric::L2 => crate::backend::types::DistanceMetric::L2,
99 DistanceMetric::Cosine => crate::backend::types::DistanceMetric::Cosine,
100 DistanceMetric::Dot => crate::backend::types::DistanceMetric::Dot,
101 other => return Err(anyhow!("Unsupported vector index metric: {:?}", other)),
102 };
103 let kind = match index_type {
104 VectorIndexType::Flat => VectorIndexKind::Flat,
105 VectorIndexType::IvfFlat { num_partitions } => VectorIndexKind::IvfFlat {
106 num_partitions: *num_partitions,
107 },
108 VectorIndexType::IvfPq {
109 num_partitions,
110 num_sub_vectors,
111 bits_per_subvector,
112 } => VectorIndexKind::IvfPq {
113 num_partitions: *num_partitions,
114 num_sub_vectors: *num_sub_vectors,
115 num_bits: *bits_per_subvector,
116 },
117 VectorIndexType::IvfSq { num_partitions } => VectorIndexKind::IvfSq {
118 num_partitions: *num_partitions,
119 },
120 VectorIndexType::IvfRq {
121 num_partitions,
122 num_bits,
123 } => VectorIndexKind::IvfRq {
124 num_partitions: *num_partitions,
125 num_bits: *num_bits,
126 },
127 VectorIndexType::HnswFlat {
128 m,
129 ef_construction,
130 num_partitions,
131 } => VectorIndexKind::HnswFlat {
132 m: *m,
133 ef_construction: *ef_construction,
134 num_partitions: num_partitions.unwrap_or(1),
135 },
136 VectorIndexType::HnswSq {
137 m,
138 ef_construction,
139 num_partitions,
140 } => VectorIndexKind::HnswSq {
141 m: *m,
142 ef_construction: *ef_construction,
143 num_partitions: num_partitions.unwrap_or(1),
144 },
145 VectorIndexType::HnswPq {
146 m,
147 ef_construction,
148 num_sub_vectors,
149 num_partitions,
150 } => VectorIndexKind::HnswPq {
151 m: *m,
152 ef_construction: *ef_construction,
153 num_sub_vectors: *num_sub_vectors,
154 num_partitions: num_partitions.unwrap_or(1),
155 },
156 VectorIndexType::Muvera { .. } => {
157 return Err(anyhow!(
158 "MUVERA must be resolved to its inner index type before the physical build"
159 ));
160 }
161 other => return Err(anyhow!("Unsupported vector index type: {:?}", other)),
162 };
163 Ok(VectorIndexParams {
164 metric: backend_metric,
165 kind,
166 })
167}
168
169pub struct IndexManager {
171 base_uri: String,
172 schema_manager: Arc<SchemaManager>,
173 backend: Option<Arc<dyn StorageBackend>>,
177}
178
179impl std::fmt::Debug for IndexManager {
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 f.debug_struct("IndexManager")
182 .field("base_uri", &self.base_uri)
183 .finish_non_exhaustive()
184 }
185}
186
187impl IndexManager {
188 pub fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Self {
191 Self {
192 base_uri: base_uri.to_string(),
193 schema_manager,
194 backend: None,
195 }
196 }
197
198 pub fn with_backend(mut self, backend: Arc<dyn StorageBackend>) -> Self {
200 self.backend = Some(backend);
201 self
202 }
203
204 #[cfg(feature = "lance-backend")]
214 async fn postings_write_guard(
215 &self,
216 postings_path: &str,
217 ) -> Option<crate::backend::traits::TableWriteGuard> {
218 match self.backend.as_ref() {
219 Some(backend) => Some(backend.lock_table_for_write(postings_path).await),
220 None => None,
221 }
222 }
223
224 #[cfg(feature = "lance-backend")]
226 #[instrument(skip(self), level = "info")]
227 pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
228 let label = &config.label;
229 let property = &config.property;
230 info!(
231 "Creating Inverted Index '{}' on {}.{}",
232 config.name, label, property
233 );
234
235 let schema = self.schema_manager.schema();
236 if !schema.labels.contains_key(label) {
237 return Err(anyhow!("Label '{}' not found", label));
238 }
239
240 let postings_path = format!("{}/indexes/{}/{}_inverted", self.base_uri, label, property);
243 let _postings_guard = self.postings_write_guard(&postings_path).await;
244
245 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
246
247 let table = table_names::vertex_table_name(label);
253 if let Some(backend) = self.backend.as_ref() {
254 if backend.table_exists(&table).await? {
255 let batches = backend.scan(ScanRequest::all(&table)).await?;
256 index
257 .build_from_batches(&batches, |n| info!("Indexed {} terms", n))
258 .await?;
259 } else {
260 debug!(
261 "Table '{}' not flushed yet; creating empty inverted index (populated on flush)",
262 table
263 );
264 }
265 } else {
266 warn!(
267 "No storage backend available; inverted index '{}' left empty (populated on flush)",
268 config.name
269 );
270 }
271
272 self.schema_manager
273 .add_index(IndexDefinition::Inverted(config))?;
274 self.schema_manager.save().await?;
275
276 Ok(())
277 }
278
279 #[cfg(feature = "lance-backend")]
289 #[instrument(skip(self), level = "info")]
290 pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
291 self.create_vector_index_inner(config, false).await
292 }
293
294 #[cfg(feature = "lance-backend")]
306 async fn create_vector_index_inner(
307 &self,
308 config: VectorIndexConfig,
309 force_backfill: bool,
310 ) -> Result<()> {
311 if let VectorIndexType::Muvera { inner, .. } = &config.index_type {
312 self.prepare_muvera_fde(&config, force_backfill).await?;
314 let inner_cfg = VectorIndexConfig {
315 name: config.name.clone(),
316 label: config.label.clone(),
317 property: crate::storage::muvera_index::fde_derived_column(&config.name),
318 index_type: (**inner).clone(),
319 metric: DistanceMetric::Dot,
320 embedding_config: None,
321 metadata: config.metadata.clone(),
322 };
323 self.build_physical_vector_index(&inner_cfg).await?;
324 } else {
325 self.build_physical_vector_index(&config).await?;
326 }
327 self.schema_manager
328 .add_index(IndexDefinition::Vector(config))?;
329 self.schema_manager.save().await?;
330 Ok(())
331 }
332
333 #[cfg(feature = "lance-backend")]
348 async fn prepare_muvera_fde(
349 &self,
350 config: &VectorIndexConfig,
351 force_backfill: bool,
352 ) -> Result<()> {
353 use crate::storage::muvera_index::fde_spec_for_config;
354
355 let schema = self.schema_manager.schema();
356 let Some(spec) = fde_spec_for_config(&schema, config) else {
357 return Ok(());
358 };
359 spec.params.validate()?;
360
361 let newly_added = self.schema_manager.add_internal_property(
365 &spec.label,
366 &spec.derived_col,
367 uni_common::DataType::Vector {
368 dimensions: spec.params.fde_dim(),
369 },
370 true,
371 )?;
372
373 if !newly_added && !force_backfill {
377 return Ok(());
378 }
379
380 if let Err(e) = self.backfill_fde_column(&spec).await {
392 if newly_added {
393 let _ = self
394 .schema_manager
395 .drop_property(&spec.label, &spec.derived_col);
396 }
397 return Err(e);
398 }
399 Ok(())
400 }
401
402 #[cfg(feature = "lance-backend")]
409 async fn backfill_fde_column(
410 &self,
411 spec: &crate::storage::muvera_index::FdeSpec,
412 ) -> Result<()> {
413 use crate::storage::muvera_index::splice_fde_batch;
414
415 let Some(backend) = self.backend.as_ref() else {
416 return Ok(());
417 };
418 let table = table_names::vertex_table_name(&spec.label);
419 if !backend.table_exists(&table).await? {
423 return Ok(());
424 }
425
426 let schema = self.schema_manager.schema();
427 let label_id = schema
428 .label_id_by_name(&spec.label)
429 .ok_or_else(|| anyhow!("MUVERA: label '{}' not found", spec.label))?;
430 let target_schema =
433 VertexDataset::new(&self.base_uri, &spec.label, label_id).get_arrow_schema(&schema)?;
434 let source_dt = schema
435 .properties
436 .get(&spec.label)
437 .and_then(|p| p.get(&spec.source_prop))
438 .map(|m| m.r#type.clone());
439 let encoder = uni_common::muvera::FdeEncoder::new(&spec.params)?;
440
441 let _table_guard = backend.lock_table_for_write(&table).await;
450
451 let batches = backend.scan(ScanRequest::all(&table)).await?;
452 let mut new_batches = Vec::with_capacity(batches.len());
453 for batch in &batches {
454 new_batches.push(splice_fde_batch(
455 batch,
456 &target_schema,
457 spec,
458 &encoder,
459 source_dt.as_ref(),
460 )?);
461 }
462 backend
463 .replace_table_atomic(&table, new_batches, target_schema)
464 .await?;
465 Ok(())
466 }
467
468 #[cfg(feature = "lance-backend")]
473 async fn build_physical_vector_index(&self, config: &VectorIndexConfig) -> Result<()> {
474 let label = &config.label;
475 let property = &config.property;
476 info!(
477 "Creating vector index '{}' on {}.{}",
478 config.name, label, property
479 );
480
481 let schema = self.schema_manager.schema();
482 if !schema.labels.contains_key(label) {
483 return Err(anyhow!("Label '{}' not found", label));
484 }
485
486 let prop_dim = schema
491 .properties
492 .get(label)
493 .and_then(|props| props.get(property))
494 .and_then(|meta| resolve_vector_dim(&meta.r#type));
495 let pq_sub = match &config.index_type {
496 VectorIndexType::IvfPq {
497 num_sub_vectors, ..
498 }
499 | VectorIndexType::HnswPq {
500 num_sub_vectors, ..
501 } => Some(*num_sub_vectors as usize),
502 _ => None,
503 };
504 if let (Some(dim), Some(sub)) = (prop_dim, pq_sub)
510 && sub != 0
511 && dim >= sub
512 && dim % sub != 0
513 {
514 return Err(anyhow!(
515 "Vector index '{}': PQ num_sub_vectors ({}) must divide the embedding dimension ({})",
516 config.name,
517 sub,
518 dim
519 ));
520 }
521
522 let params = to_backend_vector_params(config.metric.clone(), &config.index_type)?;
523 let table = table_names::vertex_table_name(label);
524
525 let Some(backend) = self.backend.as_ref() else {
526 warn!(
527 "No storage backend; physical vector index '{}' deferred until a flush",
528 config.name
529 );
530 return Ok(());
531 };
532
533 if backend.table_exists(&table).await? {
538 info!(
539 "Building physical vector index '{}' on '{}'",
540 config.name, table
541 );
542 if let Err(e) = backend
543 .create_vector_index(&table, property, &config.name, params)
544 .await
545 {
546 warn!(
547 "Failed to build physical vector index '{}' (column may be empty): {}",
548 config.name, e
549 );
550 } else {
551 info!("Vector index '{}' created", config.name);
552 }
553 } else {
554 debug!(
555 "Label '{}' not flushed yet; physical vector index '{}' built on next flush",
556 label, config.name
557 );
558 }
559
560 Ok(())
561 }
562
563 #[cfg(feature = "lance-backend")]
565 #[instrument(skip(self), level = "info")]
566 pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
567 let label = &config.label;
568 let properties = &config.properties;
569 info!(
570 "Creating scalar index '{}' on {}.{:?}",
571 config.name, label, properties
572 );
573
574 let schema = self.schema_manager.schema();
575 if !schema.labels.contains_key(label) {
576 return Err(anyhow!("Label '{}' not found", label));
577 }
578
579 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
580 let backend_idx_type = match config.index_type {
584 ScalarIndexType::Bitmap => crate::backend::types::ScalarIndexType::Bitmap,
585 ScalarIndexType::LabelList => crate::backend::types::ScalarIndexType::LabelList,
586 _ => crate::backend::types::ScalarIndexType::BTree,
587 };
588 let table = table_names::vertex_table_name(label);
589
590 if let Some(backend) = self.backend.as_ref() {
591 if backend.table_exists(&table).await? {
592 info!(
593 "Building physical scalar index '{}' on '{}'",
594 config.name, table
595 );
596 if let Err(e) = backend
597 .create_scalar_index(&table, &columns, backend_idx_type, Some(&config.name))
598 .await
599 {
600 warn!(
601 "Failed to build physical scalar index '{}' (table may be empty): {}",
602 config.name, e
603 );
604 } else {
605 info!("Scalar index '{}' created", config.name);
606 }
607 } else {
608 debug!(
609 "Label '{}' not flushed yet; physical scalar index '{}' built on next flush",
610 label, config.name
611 );
612 }
613 } else {
614 warn!(
615 "No storage backend; physical scalar index '{}' deferred until a flush",
616 config.name
617 );
618 }
619
620 self.schema_manager
621 .add_index(IndexDefinition::Scalar(config))?;
622 self.schema_manager.save().await?;
623
624 Ok(())
625 }
626
627 #[cfg(feature = "lance-backend")]
629 #[instrument(skip(self), level = "info")]
630 pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
631 let label = &config.label;
632 info!(
633 "Creating FTS index '{}' on {}.{:?}",
634 config.name, label, config.properties
635 );
636
637 let schema = self.schema_manager.schema();
638 if !schema.labels.contains_key(label) {
639 return Err(anyhow!("Label '{}' not found", label));
640 }
641
642 let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
643 let table = table_names::vertex_table_name(label);
644
645 if let Some(backend) = self.backend.as_ref() {
646 if backend.table_exists(&table).await? {
647 info!(
648 "Building physical FTS index '{}' on '{}'",
649 config.name, table
650 );
651 if let Err(e) = backend
652 .create_fts_index(&table, &columns, Some(&config.name), config.with_positions)
653 .await
654 {
655 warn!(
656 "Failed to build physical FTS index '{}' (table may be empty): {}",
657 config.name, e
658 );
659 } else {
660 info!("FTS index '{}' created", config.name);
661 }
662 } else {
663 debug!(
664 "Label '{}' not flushed yet; physical FTS index '{}' built on next flush",
665 label, config.name
666 );
667 }
668 } else {
669 warn!(
670 "No storage backend; physical FTS index '{}' deferred until a flush",
671 config.name
672 );
673 }
674
675 self.schema_manager
676 .add_index(IndexDefinition::FullText(config))?;
677 self.schema_manager.save().await?;
678
679 Ok(())
680 }
681
682 #[cfg(feature = "lance-backend")]
687 #[instrument(skip(self), level = "info")]
688 pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
689 let label = &config.label;
690 let column = &config.column;
691 info!(
692 "Creating JSON FTS index '{}' on {}.{}",
693 config.name, label, column
694 );
695
696 let schema = self.schema_manager.schema();
697 if !schema.labels.contains_key(label) {
698 return Err(anyhow!("Label '{}' not found", label));
699 }
700
701 let table = table_names::vertex_table_name(label);
702
703 if let Some(backend) = self.backend.as_ref() {
704 if backend.table_exists(&table).await? {
705 info!(
706 "Building physical JSON FTS index '{}' on '{}'",
707 config.name, table
708 );
709 if let Err(e) = backend
710 .create_fts_index(
711 &table,
712 &[column.as_str()],
713 Some(&config.name),
714 config.with_positions,
715 )
716 .await
717 {
718 warn!(
719 "Failed to build physical JSON FTS index '{}' (table may be empty): {}",
720 config.name, e
721 );
722 } else {
723 info!("JSON FTS index '{}' created", config.name);
724 }
725 } else {
726 debug!(
727 "Label '{}' not flushed yet; physical JSON FTS index '{}' built on next flush",
728 label, config.name
729 );
730 }
731 } else {
732 warn!(
733 "No storage backend; physical JSON FTS index '{}' deferred until a flush",
734 config.name
735 );
736 }
737
738 self.schema_manager
739 .add_index(IndexDefinition::JsonFullText(config))?;
740 self.schema_manager.save().await?;
741
742 Ok(())
743 }
744
745 #[cfg(feature = "lance-backend")]
747 #[instrument(skip(self), level = "info")]
748 pub async fn drop_index(&self, name: &str) -> Result<()> {
749 info!("Dropping index '{}'", name);
750
751 let idx_def = self
752 .schema_manager
753 .get_index(name)
754 .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
755
756 let label = idx_def.label();
760 let table = table_names::vertex_table_name(label);
761 if let Some(backend) = self.backend.as_ref() {
762 if let Err(e) = backend.drop_index(&table, name).await {
763 warn!(
764 "Physical index drop for '{}' returned error (non-fatal): {}",
765 name, e
766 );
767 } else {
768 info!("Physical index '{}' dropped from '{}'", name, table);
769 }
770 }
771
772 self.schema_manager.remove_index(name)?;
773 self.schema_manager.save().await?;
774 Ok(())
775 }
776
777 #[cfg(feature = "lance-backend")]
779 #[instrument(skip(self), level = "info")]
780 pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
781 info!("Rebuilding all indexes for label '{}'", label);
782 let schema = self.schema_manager.schema();
783
784 let indexes: Vec<_> = schema
786 .indexes
787 .iter()
788 .filter(|idx| idx.label() == label)
789 .cloned()
790 .collect();
791
792 for index in indexes {
793 match index {
794 IndexDefinition::Vector(cfg) => self.create_vector_index_inner(cfg, true).await?,
798 IndexDefinition::Scalar(cfg) => self.create_scalar_index(cfg).await?,
799 IndexDefinition::FullText(cfg) => self.create_fts_index(cfg).await?,
800 IndexDefinition::Inverted(cfg) => self.create_inverted_index(cfg).await?,
801 IndexDefinition::JsonFullText(cfg) => self.create_json_fts_index(cfg).await?,
802 IndexDefinition::Sparse(cfg) => self.create_sparse_vector_index(cfg).await?,
803 _ => warn!("Unknown index type encountered during rebuild, skipping"),
804 }
805 }
806 Ok(())
807 }
808
809 #[cfg(feature = "lance-backend")]
811 pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
812 let schema = self.schema_manager.schema();
813 if !schema.labels.contains_key(label) {
814 return Err(anyhow!("Label '{}' not found", label));
815 }
816
817 let index_name = format!("{}_{}_composite", label, properties.join("_"));
819 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
820 let table = table_names::vertex_table_name(label);
821
822 if let Some(backend) = self.backend.as_ref() {
823 if backend.table_exists(&table).await? {
824 info!("Building composite index '{}' on '{}'", index_name, table);
825 if let Err(e) = backend
826 .create_scalar_index(
827 &table,
828 &columns,
829 crate::backend::types::ScalarIndexType::BTree,
830 Some(&index_name),
831 )
832 .await
833 {
834 warn!(
835 "Failed to build composite index '{}' (table may be empty): {}",
836 index_name, e
837 );
838 } else {
839 info!("Composite index '{}' created", index_name);
840 }
841
842 let config = ScalarIndexConfig {
843 name: index_name,
844 label: label.to_string(),
845 properties: properties.to_vec(),
846 index_type: uni_common::core::schema::ScalarIndexType::BTree,
847 where_clause: None,
848 metadata: Default::default(),
849 };
850 self.schema_manager
851 .add_index(IndexDefinition::Scalar(config))?;
852 self.schema_manager.save().await?;
853 } else {
854 debug!(
855 "Label '{}' not flushed yet; composite index for {:?} built on next flush",
856 label, properties
857 );
858 }
859 } else {
860 warn!(
861 "No storage backend; composite index for {:?} deferred until a flush",
862 properties
863 );
864 }
865
866 Ok(())
867 }
868
869 #[cfg(feature = "lance-backend")]
878 #[instrument(skip(self, added, removed), level = "info", fields(
879 label = %config.label,
880 property = %config.property
881 ))]
882 pub async fn update_inverted_index_incremental(
883 &self,
884 config: &InvertedIndexConfig,
885 added: &HashMap<Vid, Vec<String>>,
886 removed: &HashSet<Vid>,
887 ) -> Result<()> {
888 info!(
889 added = added.len(),
890 removed = removed.len(),
891 "Incrementally updating inverted index"
892 );
893
894 let postings_path = format!(
897 "{}/indexes/{}/{}_inverted",
898 self.base_uri, config.label, config.property
899 );
900 let _postings_guard = self.postings_write_guard(&postings_path).await;
901
902 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
903 index.apply_incremental_updates(added, removed).await
904 }
905
906 #[cfg(feature = "lance-backend")]
910 #[instrument(skip(self), level = "info")]
911 pub async fn create_sparse_vector_index(&self, config: SparseVectorIndexConfig) -> Result<()> {
912 let label = &config.label;
913 let property = &config.property;
914 info!(
915 "Creating Sparse Vector Index '{}' on {}.{}",
916 config.name, label, property
917 );
918
919 let schema = self.schema_manager.schema();
920 if !schema.labels.contains_key(label) {
921 return Err(anyhow!("Label '{}' not found", label));
922 }
923
924 let postings_path = format!("{}/indexes/{}/{}_sparse", self.base_uri, label, property);
927 let _postings_guard = self.postings_write_guard(&postings_path).await;
928
929 let mut index = SparseVectorIndex::new(&self.base_uri, config.clone()).await?;
930
931 let table = table_names::vertex_table_name(label);
935 if let Some(backend) = self.backend.as_ref() {
936 if backend.table_exists(&table).await? {
939 let batches = backend.scan(ScanRequest::all(&table)).await?;
940 index
941 .build_from_batches(&batches, |n| debug!("Indexed {} sparse docs", n))
942 .await?;
943 } else {
944 debug!(
945 "Table '{}' not flushed yet; creating empty sparse index (populated on flush)",
946 table
947 );
948 }
949 } else {
950 warn!(
951 "No storage backend available; sparse index '{}' left empty (populated on flush)",
952 config.name
953 );
954 }
955
956 self.schema_manager
957 .add_index(IndexDefinition::Sparse(config))?;
958 self.schema_manager.save().await?;
959
960 Ok(())
961 }
962
963 #[cfg(feature = "lance-backend")]
966 #[instrument(skip(self, added, removed), level = "info", fields(
967 label = %config.label,
968 property = %config.property
969 ))]
970 pub async fn update_sparse_vector_index_incremental(
971 &self,
972 config: &SparseVectorIndexConfig,
973 added: &HashMap<Vid, Vec<(u32, f32)>>,
974 removed: &HashSet<Vid>,
975 ) -> Result<()> {
976 info!(
977 added = added.len(),
978 removed = removed.len(),
979 "Incrementally updating sparse vector index"
980 );
981 let postings_path = format!(
984 "{}/indexes/{}/{}_sparse",
985 self.base_uri, config.label, config.property
986 );
987 let _postings_guard = self.postings_write_guard(&postings_path).await;
988
989 let mut index = SparseVectorIndex::new(&self.base_uri, config.clone()).await?;
990 index.apply_incremental_updates(added, removed).await
991 }
992
993 #[cfg(feature = "lance-backend")]
996 pub async fn sparse_vector_index(
997 &self,
998 label: &str,
999 property: &str,
1000 ) -> Result<SparseVectorIndex> {
1001 let schema = self.schema_manager.schema();
1002 let config = schema
1003 .indexes
1004 .iter()
1005 .find_map(|idx| match idx {
1006 IndexDefinition::Sparse(cfg) if cfg.label == label && cfg.property == property => {
1007 Some(cfg.clone())
1008 }
1009 _ => None,
1010 })
1011 .ok_or_else(|| anyhow!("No sparse vector index found for {}.{}", label, property))?;
1012 SparseVectorIndex::new(&self.base_uri, config).await
1013 }
1014}