Skip to main content

uni_store/storage/
index_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Index lifecycle management: creation, rebuild, and incremental updates for all index types.
5
6use crate::backend::StorageBackend;
7use crate::backend::table_names;
8use crate::backend::types::ScanRequest;
9#[cfg(feature = "lance-backend")]
10use crate::storage::inverted_index::InvertedIndex;
11#[cfg(feature = "lance-backend")]
12use crate::storage::sparse_index::SparseVectorIndex;
13use crate::storage::vertex::VertexDataset;
14use anyhow::{Result, anyhow};
15use chrono::{DateTime, Utc};
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18#[cfg(feature = "lance-backend")]
19use std::collections::HashSet;
20use std::sync::Arc;
21#[cfg(feature = "lance-backend")]
22use tracing::{debug, info, instrument, warn};
23use uni_common::core::id::Vid;
24#[cfg(feature = "lance-backend")]
25use uni_common::core::schema::IndexDefinition;
26use uni_common::core::schema::SchemaManager;
27#[cfg(feature = "lance-backend")]
28use uni_common::core::schema::{
29    DistanceMetric, FullTextIndexConfig, InvertedIndexConfig, JsonFtsIndexConfig,
30    ScalarIndexConfig, ScalarIndexType, SparseVectorIndexConfig, VectorIndexConfig,
31    VectorIndexType,
32};
33
34/// Status of an index rebuild task.
35#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
36pub enum IndexRebuildStatus {
37    /// Task is waiting to be processed.
38    Pending,
39    /// Task is currently being processed.
40    InProgress,
41    /// Task completed successfully.
42    Completed,
43    /// Task failed with an error.
44    Failed,
45}
46
47/// A task representing an index rebuild operation.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct IndexRebuildTask {
50    /// Unique identifier for this task.
51    pub id: String,
52    /// The label for which indexes are being rebuilt.
53    pub label: String,
54    /// Current status of the task.
55    pub status: IndexRebuildStatus,
56    /// When the task was created.
57    pub created_at: DateTime<Utc>,
58    /// When the task started processing.
59    pub started_at: Option<DateTime<Utc>>,
60    /// When the task completed (successfully or with failure).
61    pub completed_at: Option<DateTime<Utc>>,
62    /// Error message if the task failed.
63    pub error: Option<String>,
64    /// Number of retry attempts.
65    pub retry_count: u32,
66}
67
68/// Resolves the embedding dimension of a vector or multi-vector property type.
69///
70/// Recurses through `List(Vector{dim})` (multi-vector / ColBERT) to the inner
71/// `Vector{dim}`; returns `None` for non-vector types.
72fn resolve_vector_dim(t: &uni_common::DataType) -> Option<usize> {
73    match t {
74        uni_common::DataType::Vector { dimensions } => Some(*dimensions),
75        uni_common::DataType::List(inner) => resolve_vector_dim(inner),
76        _ => None,
77    }
78}
79
80/// Maps a schema [`VectorIndexType`] to the backend's [`VectorIndexParams`].
81///
82/// The logical MUVERA type is resolved to its `inner` shape by the caller
83/// (`create_vector_index_inner`) before reaching here, so a `Muvera` value is a
84/// programming error. The `Option<num_partitions>` HNSW default of "auto" is
85/// resolved to a single partition, matching the prior raw-`Dataset` mapping.
86///
87/// # Errors
88/// Returns an error if a `Muvera` type reaches this physical-build mapping.
89#[cfg(feature = "lance-backend")]
90fn to_backend_vector_params(
91    metric: DistanceMetric,
92    index_type: &VectorIndexType,
93) -> Result<crate::backend::types::VectorIndexParams> {
94    use crate::backend::types::{VectorIndexKind, VectorIndexParams};
95    // `DistanceMetric` here is the schema (uni_common) enum; the backend has its
96    // own. Both are `#[non_exhaustive]`, so the catch-all arms are required.
97    let backend_metric = match metric {
98        DistanceMetric::L2 => crate::backend::types::DistanceMetric::L2,
99        DistanceMetric::Cosine => crate::backend::types::DistanceMetric::Cosine,
100        DistanceMetric::Dot => crate::backend::types::DistanceMetric::Dot,
101        other => return Err(anyhow!("Unsupported vector index metric: {:?}", other)),
102    };
103    let kind = match index_type {
104        VectorIndexType::Flat => VectorIndexKind::Flat,
105        VectorIndexType::IvfFlat { num_partitions } => VectorIndexKind::IvfFlat {
106            num_partitions: *num_partitions,
107        },
108        VectorIndexType::IvfPq {
109            num_partitions,
110            num_sub_vectors,
111            bits_per_subvector,
112        } => VectorIndexKind::IvfPq {
113            num_partitions: *num_partitions,
114            num_sub_vectors: *num_sub_vectors,
115            num_bits: *bits_per_subvector,
116        },
117        VectorIndexType::IvfSq { num_partitions } => VectorIndexKind::IvfSq {
118            num_partitions: *num_partitions,
119        },
120        VectorIndexType::IvfRq {
121            num_partitions,
122            num_bits,
123        } => VectorIndexKind::IvfRq {
124            num_partitions: *num_partitions,
125            num_bits: *num_bits,
126        },
127        VectorIndexType::HnswFlat {
128            m,
129            ef_construction,
130            num_partitions,
131        } => VectorIndexKind::HnswFlat {
132            m: *m,
133            ef_construction: *ef_construction,
134            num_partitions: num_partitions.unwrap_or(1),
135        },
136        VectorIndexType::HnswSq {
137            m,
138            ef_construction,
139            num_partitions,
140        } => VectorIndexKind::HnswSq {
141            m: *m,
142            ef_construction: *ef_construction,
143            num_partitions: num_partitions.unwrap_or(1),
144        },
145        VectorIndexType::HnswPq {
146            m,
147            ef_construction,
148            num_sub_vectors,
149            num_partitions,
150        } => VectorIndexKind::HnswPq {
151            m: *m,
152            ef_construction: *ef_construction,
153            num_sub_vectors: *num_sub_vectors,
154            num_partitions: num_partitions.unwrap_or(1),
155        },
156        VectorIndexType::Muvera { .. } => {
157            return Err(anyhow!(
158                "MUVERA must be resolved to its inner index type before the physical build"
159            ));
160        }
161        other => return Err(anyhow!("Unsupported vector index type: {:?}", other)),
162    };
163    Ok(VectorIndexParams {
164        metric: backend_metric,
165        kind,
166    })
167}
168
169/// Manages physical and logical indexes across all vertex datasets.
170pub struct IndexManager {
171    base_uri: String,
172    schema_manager: Arc<SchemaManager>,
173    /// Storage backend, when available. Needed only for MUVERA FDE backfill (scan +
174    /// `replace_table_atomic`); `None` callers (e.g. some rebuild paths) still build
175    /// indexes over already-materialised columns.
176    backend: Option<Arc<dyn StorageBackend>>,
177}
178
179impl std::fmt::Debug for IndexManager {
180    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181        f.debug_struct("IndexManager")
182            .field("base_uri", &self.base_uri)
183            .finish_non_exhaustive()
184    }
185}
186
187impl IndexManager {
188    /// Create a new `IndexManager` bound to `base_uri` and the given schema, without a
189    /// storage backend (MUVERA backfill over pre-existing rows is unavailable).
190    pub fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Self {
191        Self {
192            base_uri: base_uri.to_string(),
193            schema_manager,
194            backend: None,
195        }
196    }
197
198    /// Attach a storage backend, enabling MUVERA FDE backfill over already-flushed rows.
199    pub fn with_backend(mut self, backend: Arc<dyn StorageBackend>) -> Self {
200        self.backend = Some(backend);
201        self
202    }
203
204    /// Serialize writers to a single index's postings dataset.
205    ///
206    /// The sparse and set-membership inverted indexes persist their postings with an
207    /// unconditional `WriteMode::Overwrite`. A DDL `CREATE … INDEX` backfill and a
208    /// concurrent flush incremental update both load-modify-overwrite the *same* postings
209    /// path; without serialization the second overwrite clobbers the first (a silent lost
210    /// update → a vid's posting vanishes from search candidates — issue #95). Keyed by the
211    /// postings dataset path, reusing the backend's per-key write-lock map. Returns `None`
212    /// when no backend is attached (offline rebuild paths cannot race a flush).
213    #[cfg(feature = "lance-backend")]
214    async fn postings_write_guard(
215        &self,
216        postings_path: &str,
217    ) -> Option<crate::backend::traits::TableWriteGuard> {
218        match self.backend.as_ref() {
219            Some(backend) => Some(backend.lock_table_for_write(postings_path).await),
220            None => None,
221        }
222    }
223
224    /// Build and persist an inverted index for set-membership queries.
225    #[cfg(feature = "lance-backend")]
226    #[instrument(skip(self), level = "info")]
227    pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
228        let label = &config.label;
229        let property = &config.property;
230        info!(
231            "Creating Inverted Index '{}' on {}.{}",
232            config.name, label, property
233        );
234
235        let schema = self.schema_manager.schema();
236        if !schema.labels.contains_key(label) {
237            return Err(anyhow!("Label '{}' not found", label));
238        }
239
240        // Serialize this full-rebuild overwrite against a concurrent flush incremental
241        // update of the same postings dataset (issue #95).
242        let postings_path = format!("{}/indexes/{}/{}_inverted", self.base_uri, label, property);
243        let _postings_guard = self.postings_write_guard(&postings_path).await;
244
245        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
246
247        // Backfill from the flushed vertex table via the storage backend. The
248        // LanceDB-managed table is not at the raw `{base}/vertices_<label>`
249        // path a `VertexDataset` open would target, and the backend read is
250        // branch-aware. Mirrors the sparse-index backfill. A not-yet-flushed
251        // table legitimately yields an empty index, populated on the next flush.
252        let table = table_names::vertex_table_name(label);
253        if let Some(backend) = self.backend.as_ref() {
254            if backend.table_exists(&table).await? {
255                let batches = backend.scan(ScanRequest::all(&table)).await?;
256                index
257                    .build_from_batches(&batches, |n| info!("Indexed {} terms", n))
258                    .await?;
259            } else {
260                debug!(
261                    "Table '{}' not flushed yet; creating empty inverted index (populated on flush)",
262                    table
263                );
264            }
265        } else {
266            warn!(
267                "No storage backend available; inverted index '{}' left empty (populated on flush)",
268                config.name
269            );
270        }
271
272        self.schema_manager
273            .add_index(IndexDefinition::Inverted(config))?;
274        self.schema_manager.save().await?;
275
276        Ok(())
277    }
278
279    /// Build and persist a vector (ANN) index on an embedding column. This is the SINGLE
280    /// build path every creation surface converges on (Cypher DDL, the
281    /// `uni.schema.createIndex` procedure, and the Rust/Python schema builders via
282    /// `rebuild`), so dense, native-multivector, and MUVERA indexes behave identically.
283    ///
284    /// For a `Muvera` index it first prepares the derived FDE column (`prepare_muvera_fde`:
285    /// register + one-time backfill), then builds the physical single-vector ANN over that
286    /// `__fde_*` column with the **Dot** metric (its inner product approximates MaxSim),
287    /// while the persisted config stays the MUVERA one so query routing detects it.
288    #[cfg(feature = "lance-backend")]
289    #[instrument(skip(self), level = "info")]
290    pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
291        self.create_vector_index_inner(config, false).await
292    }
293
294    /// Like [`Self::create_vector_index`], but `force_backfill` re-materialises a MUVERA
295    /// index's derived FDE column over ALL current rows even if it was already registered.
296    ///
297    /// Full rebuilds ([`Self::rebuild_indexes_for_label`], hence `db.indexes().rebuild()`
298    /// and the bulk loader's sync index sync) set this. The flush-time FDE materializer
299    /// (`Writer::materialize_fde_columns`) only runs on the tx write path, so after a BULK
300    /// load — or any out-of-band table mutation — the newly written rows have no FDE. A
301    /// plain create would hit the "already registered" guard and skip the backfill, leaving
302    /// those rows invisible to the FDE ANN (silent empty results). A rebuild must therefore
303    /// force a fresh backfill; `splice_fde_batch` recomputes every row's FDE deterministically
304    /// and overwrites any stale column, so this is idempotent.
305    #[cfg(feature = "lance-backend")]
306    async fn create_vector_index_inner(
307        &self,
308        config: VectorIndexConfig,
309        force_backfill: bool,
310    ) -> Result<()> {
311        if let VectorIndexType::Muvera { inner, .. } = &config.index_type {
312            // Register + backfill the derived FDE column (forced on a full rebuild).
313            self.prepare_muvera_fde(&config, force_backfill).await?;
314            let inner_cfg = VectorIndexConfig {
315                name: config.name.clone(),
316                label: config.label.clone(),
317                property: crate::storage::muvera_index::fde_derived_column(&config.name),
318                index_type: (**inner).clone(),
319                metric: DistanceMetric::Dot,
320                embedding_config: None,
321                metadata: config.metadata.clone(),
322            };
323            self.build_physical_vector_index(&inner_cfg).await?;
324        } else {
325            self.build_physical_vector_index(&config).await?;
326        }
327        self.schema_manager
328            .add_index(IndexDefinition::Vector(config))?;
329        self.schema_manager.save().await?;
330        Ok(())
331    }
332
333    /// Prepare a MUVERA index's derived `__fde_*` column: register it as an internal
334    /// schema property and, the FIRST time (when it was not already registered), backfill
335    /// it over all already-flushed rows via a full table rewrite (scan → splice the FDE
336    /// column into the `get_arrow_schema`-sorted position → `replace_table_atomic`),
337    /// mirroring the inverted-index "scan all rows at create time" precedent.
338    ///
339    /// The "already registered" guard makes this cheap on incremental creates: a plain
340    /// `create_vector_index` (e.g. when another index on the label is added, or on schema
341    /// re-apply) skips the rewrite — on the tx write path the column is kept current by the
342    /// flush-time materializer (`Writer::materialize_fde_columns`). `force_backfill` bypasses
343    /// that guard for full rebuilds, where the materializer assumption does not hold (e.g.
344    /// after a bulk load); see [`Self::create_vector_index_inner`]. No-op for a non-MUVERA
345    /// config, an unresolved source dimension, a label with nothing flushed yet, or when no
346    /// backend is attached.
347    #[cfg(feature = "lance-backend")]
348    async fn prepare_muvera_fde(
349        &self,
350        config: &VectorIndexConfig,
351        force_backfill: bool,
352    ) -> Result<()> {
353        use crate::storage::muvera_index::fde_spec_for_config;
354
355        let schema = self.schema_manager.schema();
356        let Some(spec) = fde_spec_for_config(&schema, config) else {
357            return Ok(());
358        };
359        spec.params.validate()?;
360
361        // Register the derived column. `add_internal_property` is write-lock-guarded and
362        // reports whether THIS call inserted it, so two concurrent creates of the same MUVERA
363        // index can't both run the (expensive) full-table backfill — only the inserter does.
364        let newly_added = self.schema_manager.add_internal_property(
365            &spec.label,
366            &spec.derived_col,
367            uni_common::DataType::Vector {
368                dimensions: spec.params.fde_dim(),
369            },
370            true,
371        )?;
372
373        // Backfill when we just registered the column, or when a full rebuild forces it (the
374        // flush-time materializer doesn't cover bulk-loaded / out-of-band rows). A plain
375        // re-create that finds the column already present relies on that materializer.
376        if !newly_added && !force_backfill {
377            return Ok(());
378        }
379
380        // Run the backfill; if it FAILS after we just added the column, roll the registration
381        // back so the in-memory schema stays consistent with disk and a retry re-adds +
382        // re-backfills. Otherwise the retry would see the column registered, skip the
383        // backfill, and build the index over an unpopulated FDE column.
384        //
385        // Crash-window note: the on-disk order is backfill (`replace_table_atomic`) THEN
386        // schema save (in `create_vector_index_inner`). A crash in between leaves an orphan
387        // `__fde_*` column with no persisted schema entry, which the next create's idempotent
388        // rewrite overwrites — self-healing. Persisting the schema first would be worse (a
389        // registered column with no data errors reads), and a `Building` marker is not
390        // auto-recovered (`labels_needing_rebuild` skips `Building`/`Failed`).
391        if let Err(e) = self.backfill_fde_column(&spec).await {
392            if newly_added {
393                let _ = self
394                    .schema_manager
395                    .drop_property(&spec.label, &spec.derived_col);
396            }
397            return Err(e);
398        }
399        Ok(())
400    }
401
402    /// Materialize the MUVERA derived FDE column over all currently-flushed rows via a full
403    /// table rewrite (scan → recompute each row's FDE → splice into the
404    /// `get_arrow_schema`-sorted position → `replace_table_atomic`). No-op (kept registration)
405    /// when no backend is attached or the label has nothing flushed yet — create-before-ingest,
406    /// where the flush path materializes the column. The caller must have already registered
407    /// the derived column in the schema.
408    #[cfg(feature = "lance-backend")]
409    async fn backfill_fde_column(
410        &self,
411        spec: &crate::storage::muvera_index::FdeSpec,
412    ) -> Result<()> {
413        use crate::storage::muvera_index::splice_fde_batch;
414
415        let Some(backend) = self.backend.as_ref() else {
416            return Ok(());
417        };
418        let table = table_names::vertex_table_name(&spec.label);
419        // Err propagates (a backend fault must not silently skip the backfill and
420        // leave the FDE column NULL); Ok(false) is the create-before-ingest case
421        // where the flush path will materialize the column.
422        if !backend.table_exists(&table).await? {
423            return Ok(());
424        }
425
426        let schema = self.schema_manager.schema();
427        let label_id = schema
428            .label_id_by_name(&spec.label)
429            .ok_or_else(|| anyhow!("MUVERA: label '{}' not found", spec.label))?;
430        // Schema already carries the FDE column (registered by the caller) so it's in the
431        // arrow schema at the position future flush appends will use.
432        let target_schema =
433            VertexDataset::new(&self.base_uri, &spec.label, label_id).get_arrow_schema(&schema)?;
434        let source_dt = schema
435            .properties
436            .get(&spec.label)
437            .and_then(|p| p.get(&spec.source_prop))
438            .map(|m| m.r#type.clone());
439        let encoder = uni_common::muvera::FdeEncoder::new(&spec.params)?;
440
441        // Serialize the scan → splice → overwrite against concurrent flush appends.
442        // `replace_table_atomic` overwrites the WHOLE vertex table, so a row a flush
443        // appends between our scan and our overwrite would be silently dropped —
444        // durable loss of committed data (issue #96). The flush append takes this same
445        // per-table write lock inside `StorageBackend::write`, so holding it across the
446        // read-modify-write makes the two mutually exclusive and guarantees we scan the
447        // post-append state. The lock must wrap BOTH the scan and the replace (not just
448        // the replace) to close the read→overwrite TOCTOU window.
449        let _table_guard = backend.lock_table_for_write(&table).await;
450
451        let batches = backend.scan(ScanRequest::all(&table)).await?;
452        let mut new_batches = Vec::with_capacity(batches.len());
453        for batch in &batches {
454            new_batches.push(splice_fde_batch(
455                batch,
456                &target_schema,
457                spec,
458                &encoder,
459                source_dt.as_ref(),
460            )?);
461        }
462        backend
463            .replace_table_atomic(&table, new_batches, target_schema)
464            .await?;
465        Ok(())
466    }
467
468    /// Build the physical Lance ANN index described by `config` over `config.property`
469    /// with `config.metric`. Does NOT persist the schema index definition — the caller
470    /// does, possibly under a different logical config (see MUVERA in
471    /// [`Self::create_vector_index`]).
472    #[cfg(feature = "lance-backend")]
473    async fn build_physical_vector_index(&self, config: &VectorIndexConfig) -> Result<()> {
474        let label = &config.label;
475        let property = &config.property;
476        info!(
477            "Creating vector index '{}' on {}.{}",
478            config.name, label, property
479        );
480
481        let schema = self.schema_manager.schema();
482        if !schema.labels.contains_key(label) {
483            return Err(anyhow!("Label '{}' not found", label));
484        }
485
486        // Fail fast on an invalid PQ configuration before touching Lance (which
487        // would otherwise error opaquely at build time). The embedding dimension
488        // comes from the schema property type, recursing `List(Vector{dim})` for
489        // multi-vector (ColBERT) columns.
490        let prop_dim = schema
491            .properties
492            .get(label)
493            .and_then(|props| props.get(property))
494            .and_then(|meta| resolve_vector_dim(&meta.r#type));
495        let pq_sub = match &config.index_type {
496            VectorIndexType::IvfPq {
497                num_sub_vectors, ..
498            }
499            | VectorIndexType::HnswPq {
500                num_sub_vectors, ..
501            } => Some(*num_sub_vectors as usize),
502            _ => None,
503        };
504        // Only the realistic misconfiguration (sub-vectors that don't divide a
505        // dimension at least as large) is rejected up front. The degenerate
506        // `sub > dim` case (e.g. the default 16 on a dim-2 column) is left to Lance,
507        // which clamps/defers it — notably so an index can be declared on an empty
508        // table before any rows exist.
509        if let (Some(dim), Some(sub)) = (prop_dim, pq_sub)
510            && sub != 0
511            && dim >= sub
512            && dim % sub != 0
513        {
514            return Err(anyhow!(
515                "Vector index '{}': PQ num_sub_vectors ({}) must divide the embedding dimension ({})",
516                config.name,
517                sub,
518                dim
519            ));
520        }
521
522        let params = to_backend_vector_params(config.metric.clone(), &config.index_type)?;
523        let table = table_names::vertex_table_name(label);
524
525        let Some(backend) = self.backend.as_ref() else {
526            warn!(
527                "No storage backend; physical vector index '{}' deferred until a flush",
528                config.name
529            );
530            return Ok(());
531        };
532
533        // Build only once the table is flushed; create-before-flush is a no-op
534        // here and is materialized by the next flush's rebuild. A build failure
535        // on a tiny/degenerate column is tolerated (Lance may clamp or defer ANN
536        // training) — the schema definition is still persisted by the caller.
537        if backend.table_exists(&table).await? {
538            info!(
539                "Building physical vector index '{}' on '{}'",
540                config.name, table
541            );
542            if let Err(e) = backend
543                .create_vector_index(&table, property, &config.name, params)
544                .await
545            {
546                warn!(
547                    "Failed to build physical vector index '{}' (column may be empty): {}",
548                    config.name, e
549                );
550            } else {
551                info!("Vector index '{}' created", config.name);
552            }
553        } else {
554            debug!(
555                "Label '{}' not flushed yet; physical vector index '{}' built on next flush",
556                label, config.name
557            );
558        }
559
560        Ok(())
561    }
562
563    /// Build and persist a scalar (BTree) index for exact-match and range queries.
564    #[cfg(feature = "lance-backend")]
565    #[instrument(skip(self), level = "info")]
566    pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
567        let label = &config.label;
568        let properties = &config.properties;
569        info!(
570            "Creating scalar index '{}' on {}.{:?}",
571            config.name, label, properties
572        );
573
574        let schema = self.schema_manager.schema();
575        if !schema.labels.contains_key(label) {
576            return Err(anyhow!("Label '{}' not found", label));
577        }
578
579        let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
580        // Map the schema scalar type to the backend's; anything other than the
581        // explicit Bitmap/LabelList falls back to BTree (matching the prior
582        // `ScalarIndexParams::default()`).
583        let backend_idx_type = match config.index_type {
584            ScalarIndexType::Bitmap => crate::backend::types::ScalarIndexType::Bitmap,
585            ScalarIndexType::LabelList => crate::backend::types::ScalarIndexType::LabelList,
586            _ => crate::backend::types::ScalarIndexType::BTree,
587        };
588        let table = table_names::vertex_table_name(label);
589
590        if let Some(backend) = self.backend.as_ref() {
591            if backend.table_exists(&table).await? {
592                info!(
593                    "Building physical scalar index '{}' on '{}'",
594                    config.name, table
595                );
596                if let Err(e) = backend
597                    .create_scalar_index(&table, &columns, backend_idx_type, Some(&config.name))
598                    .await
599                {
600                    warn!(
601                        "Failed to build physical scalar index '{}' (table may be empty): {}",
602                        config.name, e
603                    );
604                } else {
605                    info!("Scalar index '{}' created", config.name);
606                }
607            } else {
608                debug!(
609                    "Label '{}' not flushed yet; physical scalar index '{}' built on next flush",
610                    label, config.name
611                );
612            }
613        } else {
614            warn!(
615                "No storage backend; physical scalar index '{}' deferred until a flush",
616                config.name
617            );
618        }
619
620        self.schema_manager
621            .add_index(IndexDefinition::Scalar(config))?;
622        self.schema_manager.save().await?;
623
624        Ok(())
625    }
626
627    /// Build and persist a full-text search (Lance inverted) index.
628    #[cfg(feature = "lance-backend")]
629    #[instrument(skip(self), level = "info")]
630    pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
631        let label = &config.label;
632        info!(
633            "Creating FTS index '{}' on {}.{:?}",
634            config.name, label, config.properties
635        );
636
637        let schema = self.schema_manager.schema();
638        if !schema.labels.contains_key(label) {
639            return Err(anyhow!("Label '{}' not found", label));
640        }
641
642        let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
643        let table = table_names::vertex_table_name(label);
644
645        if let Some(backend) = self.backend.as_ref() {
646            if backend.table_exists(&table).await? {
647                info!(
648                    "Building physical FTS index '{}' on '{}'",
649                    config.name, table
650                );
651                if let Err(e) = backend
652                    .create_fts_index(&table, &columns, Some(&config.name), config.with_positions)
653                    .await
654                {
655                    warn!(
656                        "Failed to build physical FTS index '{}' (table may be empty): {}",
657                        config.name, e
658                    );
659                } else {
660                    info!("FTS index '{}' created", config.name);
661                }
662            } else {
663                debug!(
664                    "Label '{}' not flushed yet; physical FTS index '{}' built on next flush",
665                    label, config.name
666                );
667            }
668        } else {
669            warn!(
670                "No storage backend; physical FTS index '{}' deferred until a flush",
671                config.name
672            );
673        }
674
675        self.schema_manager
676            .add_index(IndexDefinition::FullText(config))?;
677        self.schema_manager.save().await?;
678
679        Ok(())
680    }
681
682    /// Creates a JSON Full-Text Search index on a column.
683    ///
684    /// This creates a Lance inverted index on the specified column,
685    /// enabling BM25-based full-text search with optional phrase matching.
686    #[cfg(feature = "lance-backend")]
687    #[instrument(skip(self), level = "info")]
688    pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
689        let label = &config.label;
690        let column = &config.column;
691        info!(
692            "Creating JSON FTS index '{}' on {}.{}",
693            config.name, label, column
694        );
695
696        let schema = self.schema_manager.schema();
697        if !schema.labels.contains_key(label) {
698            return Err(anyhow!("Label '{}' not found", label));
699        }
700
701        let table = table_names::vertex_table_name(label);
702
703        if let Some(backend) = self.backend.as_ref() {
704            if backend.table_exists(&table).await? {
705                info!(
706                    "Building physical JSON FTS index '{}' on '{}'",
707                    config.name, table
708                );
709                if let Err(e) = backend
710                    .create_fts_index(
711                        &table,
712                        &[column.as_str()],
713                        Some(&config.name),
714                        config.with_positions,
715                    )
716                    .await
717                {
718                    warn!(
719                        "Failed to build physical JSON FTS index '{}' (table may be empty): {}",
720                        config.name, e
721                    );
722                } else {
723                    info!("JSON FTS index '{}' created", config.name);
724                }
725            } else {
726                debug!(
727                    "Label '{}' not flushed yet; physical JSON FTS index '{}' built on next flush",
728                    label, config.name
729                );
730            }
731        } else {
732            warn!(
733                "No storage backend; physical JSON FTS index '{}' deferred until a flush",
734                config.name
735            );
736        }
737
738        self.schema_manager
739            .add_index(IndexDefinition::JsonFullText(config))?;
740        self.schema_manager.save().await?;
741
742        Ok(())
743    }
744
745    /// Remove an index both physically from the Lance dataset and from the schema.
746    #[cfg(feature = "lance-backend")]
747    #[instrument(skip(self), level = "info")]
748    pub async fn drop_index(&self, name: &str) -> Result<()> {
749        info!("Dropping index '{}'", name);
750
751        let idx_def = self
752            .schema_manager
753            .get_index(name)
754            .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
755
756        // Drop the physical index through the backend. Best-effort: the index
757        // may never have been physically built (e.g. created before any flush),
758        // so a failure here is non-fatal.
759        let label = idx_def.label();
760        let table = table_names::vertex_table_name(label);
761        if let Some(backend) = self.backend.as_ref() {
762            if let Err(e) = backend.drop_index(&table, name).await {
763                warn!(
764                    "Physical index drop for '{}' returned error (non-fatal): {}",
765                    name, e
766                );
767            } else {
768                info!("Physical index '{}' dropped from '{}'", name, table);
769            }
770        }
771
772        self.schema_manager.remove_index(name)?;
773        self.schema_manager.save().await?;
774        Ok(())
775    }
776
777    /// Rebuild all indexes registered for `label` from scratch.
778    #[cfg(feature = "lance-backend")]
779    #[instrument(skip(self), level = "info")]
780    pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
781        info!("Rebuilding all indexes for label '{}'", label);
782        let schema = self.schema_manager.schema();
783
784        // Clone and filter to avoid holding lock while async awaiting
785        let indexes: Vec<_> = schema
786            .indexes
787            .iter()
788            .filter(|idx| idx.label() == label)
789            .cloned()
790            .collect();
791
792        for index in indexes {
793            match index {
794                // A full rebuild must force the MUVERA FDE backfill: bulk-loaded / reopened
795                // rows aren't covered by the flush-time materializer (see
796                // `create_vector_index_inner`).
797                IndexDefinition::Vector(cfg) => self.create_vector_index_inner(cfg, true).await?,
798                IndexDefinition::Scalar(cfg) => self.create_scalar_index(cfg).await?,
799                IndexDefinition::FullText(cfg) => self.create_fts_index(cfg).await?,
800                IndexDefinition::Inverted(cfg) => self.create_inverted_index(cfg).await?,
801                IndexDefinition::JsonFullText(cfg) => self.create_json_fts_index(cfg).await?,
802                IndexDefinition::Sparse(cfg) => self.create_sparse_vector_index(cfg).await?,
803                _ => warn!("Unknown index type encountered during rebuild, skipping"),
804            }
805        }
806        Ok(())
807    }
808
809    /// Create composite index for unique constraint
810    #[cfg(feature = "lance-backend")]
811    pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
812        let schema = self.schema_manager.schema();
813        if !schema.labels.contains_key(label) {
814            return Err(anyhow!("Label '{}' not found", label));
815        }
816
817        // Lance supports multi-column indexes.
818        let index_name = format!("{}_{}_composite", label, properties.join("_"));
819        let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
820        let table = table_names::vertex_table_name(label);
821
822        if let Some(backend) = self.backend.as_ref() {
823            if backend.table_exists(&table).await? {
824                info!("Building composite index '{}' on '{}'", index_name, table);
825                if let Err(e) = backend
826                    .create_scalar_index(
827                        &table,
828                        &columns,
829                        crate::backend::types::ScalarIndexType::BTree,
830                        Some(&index_name),
831                    )
832                    .await
833                {
834                    warn!(
835                        "Failed to build composite index '{}' (table may be empty): {}",
836                        index_name, e
837                    );
838                } else {
839                    info!("Composite index '{}' created", index_name);
840                }
841
842                let config = ScalarIndexConfig {
843                    name: index_name,
844                    label: label.to_string(),
845                    properties: properties.to_vec(),
846                    index_type: uni_common::core::schema::ScalarIndexType::BTree,
847                    where_clause: None,
848                    metadata: Default::default(),
849                };
850                self.schema_manager
851                    .add_index(IndexDefinition::Scalar(config))?;
852                self.schema_manager.save().await?;
853            } else {
854                debug!(
855                    "Label '{}' not flushed yet; composite index for {:?} built on next flush",
856                    label, properties
857                );
858            }
859        } else {
860            warn!(
861                "No storage backend; composite index for {:?} deferred until a flush",
862                properties
863            );
864        }
865
866        Ok(())
867    }
868
869    /// Applies incremental updates to an inverted index.
870    ///
871    /// Instead of rebuilding the entire index, this method updates only the
872    /// changed entries, making it much faster for small mutations.
873    ///
874    /// # Errors
875    ///
876    /// Returns an error if the index doesn't exist or the update fails.
877    #[cfg(feature = "lance-backend")]
878    #[instrument(skip(self, added, removed), level = "info", fields(
879        label = %config.label,
880        property = %config.property
881    ))]
882    pub async fn update_inverted_index_incremental(
883        &self,
884        config: &InvertedIndexConfig,
885        added: &HashMap<Vid, Vec<String>>,
886        removed: &HashSet<Vid>,
887    ) -> Result<()> {
888        info!(
889            added = added.len(),
890            removed = removed.len(),
891            "Incrementally updating inverted index"
892        );
893
894        // Serialize this load-modify-overwrite against a concurrent DDL backfill of the
895        // same postings dataset (issue #95).
896        let postings_path = format!(
897            "{}/indexes/{}/{}_inverted",
898            self.base_uri, config.label, config.property
899        );
900        let _postings_guard = self.postings_write_guard(&postings_path).await;
901
902        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
903        index.apply_incremental_updates(added, removed).await
904    }
905
906    /// Create (and backfill) a scored sparse-vector index. Mirrors
907    /// `create_inverted_index`: build from the flushed vertex dataset if it
908    /// exists, then register + persist the config.
909    #[cfg(feature = "lance-backend")]
910    #[instrument(skip(self), level = "info")]
911    pub async fn create_sparse_vector_index(&self, config: SparseVectorIndexConfig) -> Result<()> {
912        let label = &config.label;
913        let property = &config.property;
914        info!(
915            "Creating Sparse Vector Index '{}' on {}.{}",
916            config.name, label, property
917        );
918
919        let schema = self.schema_manager.schema();
920        if !schema.labels.contains_key(label) {
921            return Err(anyhow!("Label '{}' not found", label));
922        }
923
924        // Serialize this full-rebuild overwrite against a concurrent flush incremental
925        // update of the same postings dataset (issue #95).
926        let postings_path = format!("{}/indexes/{}/{}_sparse", self.base_uri, label, property);
927        let _postings_guard = self.postings_write_guard(&postings_path).await;
928
929        let mut index = SparseVectorIndex::new(&self.base_uri, config.clone()).await?;
930
931        // Backfill from the flushed vertex table via the storage backend (the
932        // LanceDB-managed table is not at the raw `{base}/vertices_<label>`
933        // path a `VertexDataset::open` expects). Mirrors the MUVERA backfill.
934        let table = table_names::vertex_table_name(label);
935        if let Some(backend) = self.backend.as_ref() {
936            // Err propagates (a backend fault must surface, not silently build an empty
937            // index); Ok(false) is the not-yet-flushed create-before-ingest case.
938            if backend.table_exists(&table).await? {
939                let batches = backend.scan(ScanRequest::all(&table)).await?;
940                index
941                    .build_from_batches(&batches, |n| debug!("Indexed {} sparse docs", n))
942                    .await?;
943            } else {
944                debug!(
945                    "Table '{}' not flushed yet; creating empty sparse index (populated on flush)",
946                    table
947                );
948            }
949        } else {
950            warn!(
951                "No storage backend available; sparse index '{}' left empty (populated on flush)",
952                config.name
953            );
954        }
955
956        self.schema_manager
957            .add_index(IndexDefinition::Sparse(config))?;
958        self.schema_manager.save().await?;
959
960        Ok(())
961    }
962
963    /// Applies incremental updates to a sparse-vector index (load-modify-write,
964    /// same semantics as the set-membership inverted index).
965    #[cfg(feature = "lance-backend")]
966    #[instrument(skip(self, added, removed), level = "info", fields(
967        label = %config.label,
968        property = %config.property
969    ))]
970    pub async fn update_sparse_vector_index_incremental(
971        &self,
972        config: &SparseVectorIndexConfig,
973        added: &HashMap<Vid, Vec<(u32, f32)>>,
974        removed: &HashSet<Vid>,
975    ) -> Result<()> {
976        info!(
977            added = added.len(),
978            removed = removed.len(),
979            "Incrementally updating sparse vector index"
980        );
981        // Serialize this load-modify-overwrite against a concurrent DDL backfill of the
982        // same postings dataset (issue #95).
983        let postings_path = format!(
984            "{}/indexes/{}/{}_sparse",
985            self.base_uri, config.label, config.property
986        );
987        let _postings_guard = self.postings_write_guard(&postings_path).await;
988
989        let mut index = SparseVectorIndex::new(&self.base_uri, config.clone()).await?;
990        index.apply_incremental_updates(added, removed).await
991    }
992
993    /// Open a sparse-vector index for querying, given its label + property.
994    /// Errors if no `IndexDefinition::Sparse` is registered for that pair.
995    #[cfg(feature = "lance-backend")]
996    pub async fn sparse_vector_index(
997        &self,
998        label: &str,
999        property: &str,
1000    ) -> Result<SparseVectorIndex> {
1001        let schema = self.schema_manager.schema();
1002        let config = schema
1003            .indexes
1004            .iter()
1005            .find_map(|idx| match idx {
1006                IndexDefinition::Sparse(cfg) if cfg.label == label && cfg.property == property => {
1007                    Some(cfg.clone())
1008                }
1009                _ => None,
1010            })
1011            .ok_or_else(|| anyhow!("No sparse vector index found for {}.{}", label, property))?;
1012        SparseVectorIndex::new(&self.base_uri, config).await
1013    }
1014}