Skip to main content

uni_store/storage/
index_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Index lifecycle management: creation, rebuild, and incremental updates for all index types.
5
6use crate::backend::StorageBackend;
7use crate::backend::table_names;
8use crate::backend::types::ScanRequest;
9#[cfg(feature = "lance-backend")]
10use crate::storage::inverted_index::InvertedIndex;
11use crate::storage::vertex::VertexDataset;
12use anyhow::{Result, anyhow};
13use chrono::{DateTime, Utc};
14#[cfg(feature = "lance-backend")]
15use lance::index::DatasetIndexExt;
16#[cfg(feature = "lance-backend")]
17use lance::index::vector::VectorIndexParams;
18#[cfg(feature = "lance-backend")]
19use lance_index::IndexType;
20#[cfg(feature = "lance-backend")]
21use lance_index::progress::IndexBuildProgress;
22#[cfg(feature = "lance-backend")]
23use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
24#[cfg(feature = "lance-backend")]
25use lance_index::vector::bq::RQBuildParams;
26#[cfg(feature = "lance-backend")]
27use lance_index::vector::hnsw::builder::HnswBuildParams;
28#[cfg(feature = "lance-backend")]
29use lance_index::vector::ivf::IvfBuildParams;
30#[cfg(feature = "lance-backend")]
31use lance_index::vector::pq::PQBuildParams;
32#[cfg(feature = "lance-backend")]
33use lance_index::vector::sq::builder::SQBuildParams;
34#[cfg(feature = "lance-backend")]
35use lance_linalg::distance::MetricType;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38#[cfg(feature = "lance-backend")]
39use std::collections::HashSet;
40use std::sync::Arc;
41#[cfg(feature = "lance-backend")]
42use tracing::{debug, info, instrument, warn};
43use uni_common::core::id::Vid;
44#[cfg(feature = "lance-backend")]
45use uni_common::core::schema::IndexDefinition;
46use uni_common::core::schema::SchemaManager;
47#[cfg(feature = "lance-backend")]
48use uni_common::core::schema::{
49    DistanceMetric, FullTextIndexConfig, InvertedIndexConfig, JsonFtsIndexConfig,
50    ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, VectorIndexType,
51};
52
53/// Tracing-based progress reporter for Lance index builds.
54///
55/// Emits structured log events at each stage boundary, enabling
56/// observability into index build duration and progress.
57#[cfg(feature = "lance-backend")]
58#[derive(Debug)]
59pub struct TracingIndexProgress {
60    index_name: String,
61}
62
63#[cfg(feature = "lance-backend")]
64impl TracingIndexProgress {
65    pub fn arc(index_name: &str) -> Arc<dyn IndexBuildProgress> {
66        Arc::new(Self {
67            index_name: index_name.to_string(),
68        })
69    }
70}
71
72#[cfg(feature = "lance-backend")]
73#[async_trait::async_trait]
74impl IndexBuildProgress for TracingIndexProgress {
75    async fn stage_start(&self, stage: &str, total: Option<u64>, unit: &str) -> lance::Result<()> {
76        info!(
77            index = %self.index_name,
78            stage,
79            ?total,
80            unit,
81            "Index build stage started"
82        );
83        Ok(())
84    }
85
86    async fn stage_progress(&self, stage: &str, completed: u64) -> lance::Result<()> {
87        debug!(
88            index = %self.index_name,
89            stage,
90            completed,
91            "Index build progress"
92        );
93        Ok(())
94    }
95
96    async fn stage_complete(&self, stage: &str) -> lance::Result<()> {
97        info!(
98            index = %self.index_name,
99            stage,
100            "Index build stage complete"
101        );
102        Ok(())
103    }
104}
105
106/// Status of an index rebuild task.
107#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
108pub enum IndexRebuildStatus {
109    /// Task is waiting to be processed.
110    Pending,
111    /// Task is currently being processed.
112    InProgress,
113    /// Task completed successfully.
114    Completed,
115    /// Task failed with an error.
116    Failed,
117}
118
119/// A task representing an index rebuild operation.
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct IndexRebuildTask {
122    /// Unique identifier for this task.
123    pub id: String,
124    /// The label for which indexes are being rebuilt.
125    pub label: String,
126    /// Current status of the task.
127    pub status: IndexRebuildStatus,
128    /// When the task was created.
129    pub created_at: DateTime<Utc>,
130    /// When the task started processing.
131    pub started_at: Option<DateTime<Utc>>,
132    /// When the task completed (successfully or with failure).
133    pub completed_at: Option<DateTime<Utc>>,
134    /// Error message if the task failed.
135    pub error: Option<String>,
136    /// Number of retry attempts.
137    pub retry_count: u32,
138}
139
140/// Resolves the embedding dimension of a vector or multi-vector property type.
141///
142/// Recurses through `List(Vector{dim})` (multi-vector / ColBERT) to the inner
143/// `Vector{dim}`; returns `None` for non-vector types.
144fn resolve_vector_dim(t: &uni_common::DataType) -> Option<usize> {
145    match t {
146        uni_common::DataType::Vector { dimensions } => Some(*dimensions),
147        uni_common::DataType::List(inner) => resolve_vector_dim(inner),
148        _ => None,
149    }
150}
151
152/// Manages physical and logical indexes across all vertex datasets.
153pub struct IndexManager {
154    base_uri: String,
155    schema_manager: Arc<SchemaManager>,
156    /// Storage backend, when available. Needed only for MUVERA FDE backfill (scan +
157    /// `replace_table_atomic`); `None` callers (e.g. some rebuild paths) still build
158    /// indexes over already-materialised columns.
159    backend: Option<Arc<dyn StorageBackend>>,
160}
161
162impl std::fmt::Debug for IndexManager {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        f.debug_struct("IndexManager")
165            .field("base_uri", &self.base_uri)
166            .finish_non_exhaustive()
167    }
168}
169
170impl IndexManager {
171    /// Create a new `IndexManager` bound to `base_uri` and the given schema, without a
172    /// storage backend (MUVERA backfill over pre-existing rows is unavailable).
173    pub fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Self {
174        Self {
175            base_uri: base_uri.to_string(),
176            schema_manager,
177            backend: None,
178        }
179    }
180
181    /// Attach a storage backend, enabling MUVERA FDE backfill over already-flushed rows.
182    pub fn with_backend(mut self, backend: Arc<dyn StorageBackend>) -> Self {
183        self.backend = Some(backend);
184        self
185    }
186
187    /// Build and persist an inverted index for set-membership queries.
188    #[cfg(feature = "lance-backend")]
189    #[instrument(skip(self), level = "info")]
190    pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
191        let label = &config.label;
192        let property = &config.property;
193        info!(
194            "Creating Inverted Index '{}' on {}.{}",
195            config.name, label, property
196        );
197
198        let schema = self.schema_manager.schema();
199        let label_meta = schema
200            .labels
201            .get(label)
202            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
203
204        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
205
206        let ds = VertexDataset::new(&self.base_uri, label, label_meta.id);
207
208        // Check if dataset exists
209        if ds.open_raw().await.is_ok() {
210            index
211                .build_from_dataset(&ds, |n| info!("Indexed {} terms", n))
212                .await?;
213        } else {
214            warn!(
215                "Dataset for label '{}' not found, creating empty inverted index",
216                label
217            );
218        }
219
220        self.schema_manager
221            .add_index(IndexDefinition::Inverted(config))?;
222        self.schema_manager.save().await?;
223
224        Ok(())
225    }
226
227    /// Build and persist a vector (ANN) index on an embedding column. This is the SINGLE
228    /// build path every creation surface converges on (Cypher DDL, the
229    /// `uni.schema.createIndex` procedure, and the Rust/Python schema builders via
230    /// `rebuild`), so dense, native-multivector, and MUVERA indexes behave identically.
231    ///
232    /// For a `Muvera` index it first prepares the derived FDE column (`prepare_muvera_fde`:
233    /// register + one-time backfill), then builds the physical single-vector ANN over that
234    /// `__fde_*` column with the **Dot** metric (its inner product approximates MaxSim),
235    /// while the persisted config stays the MUVERA one so query routing detects it.
236    #[cfg(feature = "lance-backend")]
237    #[instrument(skip(self), level = "info")]
238    pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
239        self.create_vector_index_inner(config, false).await
240    }
241
242    /// Like [`Self::create_vector_index`], but `force_backfill` re-materialises a MUVERA
243    /// index's derived FDE column over ALL current rows even if it was already registered.
244    ///
245    /// Full rebuilds ([`Self::rebuild_indexes_for_label`], hence `db.indexes().rebuild()`
246    /// and the bulk loader's sync index sync) set this. The flush-time FDE materializer
247    /// (`Writer::materialize_fde_columns`) only runs on the tx write path, so after a BULK
248    /// load — or any out-of-band table mutation — the newly written rows have no FDE. A
249    /// plain create would hit the "already registered" guard and skip the backfill, leaving
250    /// those rows invisible to the FDE ANN (silent empty results). A rebuild must therefore
251    /// force a fresh backfill; `splice_fde_batch` recomputes every row's FDE deterministically
252    /// and overwrites any stale column, so this is idempotent.
253    #[cfg(feature = "lance-backend")]
254    async fn create_vector_index_inner(
255        &self,
256        config: VectorIndexConfig,
257        force_backfill: bool,
258    ) -> Result<()> {
259        if let VectorIndexType::Muvera { inner, .. } = &config.index_type {
260            // Register + backfill the derived FDE column (forced on a full rebuild).
261            self.prepare_muvera_fde(&config, force_backfill).await?;
262            let inner_cfg = VectorIndexConfig {
263                name: config.name.clone(),
264                label: config.label.clone(),
265                property: crate::storage::muvera_index::fde_derived_column(&config.name),
266                index_type: (**inner).clone(),
267                metric: DistanceMetric::Dot,
268                embedding_config: None,
269                metadata: config.metadata.clone(),
270            };
271            self.build_physical_vector_index(&inner_cfg).await?;
272        } else {
273            self.build_physical_vector_index(&config).await?;
274        }
275        self.schema_manager
276            .add_index(IndexDefinition::Vector(config))?;
277        self.schema_manager.save().await?;
278        Ok(())
279    }
280
281    /// Prepare a MUVERA index's derived `__fde_*` column: register it as an internal
282    /// schema property and, the FIRST time (when it was not already registered), backfill
283    /// it over all already-flushed rows via a full table rewrite (scan → splice the FDE
284    /// column into the `get_arrow_schema`-sorted position → `replace_table_atomic`),
285    /// mirroring the inverted-index "scan all rows at create time" precedent.
286    ///
287    /// The "already registered" guard makes this cheap on incremental creates: a plain
288    /// `create_vector_index` (e.g. when another index on the label is added, or on schema
289    /// re-apply) skips the rewrite — on the tx write path the column is kept current by the
290    /// flush-time materializer (`Writer::materialize_fde_columns`). `force_backfill` bypasses
291    /// that guard for full rebuilds, where the materializer assumption does not hold (e.g.
292    /// after a bulk load); see [`Self::create_vector_index_inner`]. No-op for a non-MUVERA
293    /// config, an unresolved source dimension, a label with nothing flushed yet, or when no
294    /// backend is attached.
295    #[cfg(feature = "lance-backend")]
296    async fn prepare_muvera_fde(
297        &self,
298        config: &VectorIndexConfig,
299        force_backfill: bool,
300    ) -> Result<()> {
301        use crate::storage::muvera_index::fde_spec_for_config;
302
303        let schema = self.schema_manager.schema();
304        let Some(spec) = fde_spec_for_config(&schema, config) else {
305            return Ok(());
306        };
307        spec.params.validate()?;
308
309        // Register the derived column. `add_internal_property` is write-lock-guarded and
310        // reports whether THIS call inserted it, so two concurrent creates of the same MUVERA
311        // index can't both run the (expensive) full-table backfill — only the inserter does.
312        let newly_added = self.schema_manager.add_internal_property(
313            &spec.label,
314            &spec.derived_col,
315            uni_common::DataType::Vector {
316                dimensions: spec.params.fde_dim(),
317            },
318            true,
319        )?;
320
321        // Backfill when we just registered the column, or when a full rebuild forces it (the
322        // flush-time materializer doesn't cover bulk-loaded / out-of-band rows). A plain
323        // re-create that finds the column already present relies on that materializer.
324        if !newly_added && !force_backfill {
325            return Ok(());
326        }
327
328        // Run the backfill; if it FAILS after we just added the column, roll the registration
329        // back so the in-memory schema stays consistent with disk and a retry re-adds +
330        // re-backfills. Otherwise the retry would see the column registered, skip the
331        // backfill, and build the index over an unpopulated FDE column.
332        //
333        // Crash-window note: the on-disk order is backfill (`replace_table_atomic`) THEN
334        // schema save (in `create_vector_index_inner`). A crash in between leaves an orphan
335        // `__fde_*` column with no persisted schema entry, which the next create's idempotent
336        // rewrite overwrites — self-healing. Persisting the schema first would be worse (a
337        // registered column with no data errors reads), and a `Building` marker is not
338        // auto-recovered (`labels_needing_rebuild` skips `Building`/`Failed`).
339        if let Err(e) = self.backfill_fde_column(&spec).await {
340            if newly_added {
341                let _ = self
342                    .schema_manager
343                    .drop_property(&spec.label, &spec.derived_col);
344            }
345            return Err(e);
346        }
347        Ok(())
348    }
349
350    /// Materialize the MUVERA derived FDE column over all currently-flushed rows via a full
351    /// table rewrite (scan → recompute each row's FDE → splice into the
352    /// `get_arrow_schema`-sorted position → `replace_table_atomic`). No-op (kept registration)
353    /// when no backend is attached or the label has nothing flushed yet — create-before-ingest,
354    /// where the flush path materializes the column. The caller must have already registered
355    /// the derived column in the schema.
356    #[cfg(feature = "lance-backend")]
357    async fn backfill_fde_column(
358        &self,
359        spec: &crate::storage::muvera_index::FdeSpec,
360    ) -> Result<()> {
361        use crate::storage::muvera_index::splice_fde_batch;
362
363        let Some(backend) = self.backend.as_ref() else {
364            return Ok(());
365        };
366        let table = table_names::vertex_table_name(&spec.label);
367        if !backend.table_exists(&table).await.unwrap_or(false) {
368            return Ok(());
369        }
370
371        let schema = self.schema_manager.schema();
372        let label_id = schema
373            .label_id_by_name(&spec.label)
374            .ok_or_else(|| anyhow!("MUVERA: label '{}' not found", spec.label))?;
375        // Schema already carries the FDE column (registered by the caller) so it's in the
376        // arrow schema at the position future flush appends will use.
377        let target_schema =
378            VertexDataset::new(&self.base_uri, &spec.label, label_id).get_arrow_schema(&schema)?;
379        let source_dt = schema
380            .properties
381            .get(&spec.label)
382            .and_then(|p| p.get(&spec.source_prop))
383            .map(|m| m.r#type.clone());
384        let encoder = uni_common::muvera::FdeEncoder::new(&spec.params)?;
385
386        let batches = backend.scan(ScanRequest::all(&table)).await?;
387        let mut new_batches = Vec::with_capacity(batches.len());
388        for batch in &batches {
389            new_batches.push(splice_fde_batch(
390                batch,
391                &target_schema,
392                spec,
393                &encoder,
394                source_dt.as_ref(),
395            )?);
396        }
397        backend
398            .replace_table_atomic(&table, new_batches, target_schema)
399            .await?;
400        Ok(())
401    }
402
403    /// Build the physical Lance ANN index described by `config` over `config.property`
404    /// with `config.metric`. Does NOT persist the schema index definition — the caller
405    /// does, possibly under a different logical config (see MUVERA in
406    /// [`Self::create_vector_index`]).
407    #[cfg(feature = "lance-backend")]
408    async fn build_physical_vector_index(&self, config: &VectorIndexConfig) -> Result<()> {
409        let label = &config.label;
410        let property = &config.property;
411        info!(
412            "Creating vector index '{}' on {}.{}",
413            config.name, label, property
414        );
415
416        let schema = self.schema_manager.schema();
417        let label_meta = schema
418            .labels
419            .get(label)
420            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
421
422        // Fail fast on an invalid PQ configuration before touching Lance (which
423        // would otherwise error opaquely at build time). The embedding dimension
424        // comes from the schema property type, recursing `List(Vector{dim})` for
425        // multi-vector (ColBERT) columns.
426        let prop_dim = schema
427            .properties
428            .get(label)
429            .and_then(|props| props.get(property))
430            .and_then(|meta| resolve_vector_dim(&meta.r#type));
431        let pq_sub = match &config.index_type {
432            VectorIndexType::IvfPq {
433                num_sub_vectors, ..
434            }
435            | VectorIndexType::HnswPq {
436                num_sub_vectors, ..
437            } => Some(*num_sub_vectors as usize),
438            _ => None,
439        };
440        // Only the realistic misconfiguration (sub-vectors that don't divide a
441        // dimension at least as large) is rejected up front. The degenerate
442        // `sub > dim` case (e.g. the default 16 on a dim-2 column) is left to Lance,
443        // which clamps/defers it — notably so an index can be declared on an empty
444        // table before any rows exist.
445        if let (Some(dim), Some(sub)) = (prop_dim, pq_sub)
446            && sub != 0
447            && dim >= sub
448            && dim % sub != 0
449        {
450            return Err(anyhow!(
451                "Vector index '{}': PQ num_sub_vectors ({}) must divide the embedding dimension ({})",
452                config.name,
453                sub,
454                dim
455            ));
456        }
457
458        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
459
460        match ds_wrapper.open_raw().await {
461            Ok(mut lance_ds) => {
462                let metric_type = match &config.metric {
463                    DistanceMetric::L2 => MetricType::L2,
464                    DistanceMetric::Cosine => MetricType::Cosine,
465                    DistanceMetric::Dot => MetricType::Dot,
466                    _ => return Err(anyhow!("Unsupported metric: {:?}", config.metric)),
467                };
468
469                let params = match config.index_type.clone() {
470                    VectorIndexType::Flat => {
471                        let ivf = IvfBuildParams::new(1);
472                        VectorIndexParams::with_ivf_flat_params(metric_type, ivf)
473                    }
474                    VectorIndexType::IvfFlat { num_partitions } => {
475                        let ivf = IvfBuildParams::new(num_partitions as usize);
476                        VectorIndexParams::with_ivf_flat_params(metric_type, ivf)
477                    }
478                    VectorIndexType::IvfPq {
479                        num_partitions,
480                        num_sub_vectors,
481                        bits_per_subvector,
482                    } => {
483                        let ivf = IvfBuildParams::new(num_partitions as usize);
484                        let pq = PQBuildParams::new(
485                            num_sub_vectors as usize,
486                            bits_per_subvector as usize,
487                        );
488                        VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
489                    }
490                    VectorIndexType::IvfSq { num_partitions } => {
491                        let ivf = IvfBuildParams::new(num_partitions as usize);
492                        let sq = SQBuildParams::default();
493                        VectorIndexParams::with_ivf_sq_params(metric_type, ivf, sq)
494                    }
495                    VectorIndexType::IvfRq {
496                        num_partitions,
497                        num_bits,
498                    } => {
499                        let ivf = IvfBuildParams::new(num_partitions as usize);
500                        let mut rq = RQBuildParams::default();
501                        if let Some(bits) = num_bits {
502                            rq.num_bits = bits;
503                        }
504                        VectorIndexParams::with_ivf_rq_params(metric_type, ivf, rq)
505                    }
506                    VectorIndexType::HnswFlat {
507                        m,
508                        ef_construction,
509                        num_partitions,
510                    } => {
511                        let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
512                        let hnsw = HnswBuildParams::default()
513                            .num_edges(m as usize)
514                            .ef_construction(ef_construction as usize);
515                        VectorIndexParams::ivf_hnsw(metric_type, ivf, hnsw)
516                    }
517                    VectorIndexType::HnswSq {
518                        m,
519                        ef_construction,
520                        num_partitions,
521                    } => {
522                        let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
523                        let hnsw = HnswBuildParams::default()
524                            .num_edges(m as usize)
525                            .ef_construction(ef_construction as usize);
526                        let sq = SQBuildParams::default();
527                        VectorIndexParams::with_ivf_hnsw_sq_params(metric_type, ivf, hnsw, sq)
528                    }
529                    VectorIndexType::HnswPq {
530                        m,
531                        ef_construction,
532                        num_sub_vectors,
533                        num_partitions,
534                    } => {
535                        let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
536                        let hnsw = HnswBuildParams::default()
537                            .num_edges(m as usize)
538                            .ef_construction(ef_construction as usize);
539                        let pq = PQBuildParams::new(num_sub_vectors as usize, 8);
540                        VectorIndexParams::with_ivf_hnsw_pq_params(metric_type, ivf, hnsw, pq)
541                    }
542                    _ => {
543                        return Err(anyhow!(
544                            "Unsupported vector index type: {:?}",
545                            config.index_type
546                        ));
547                    }
548                };
549
550                // Ignore errors during creation if dataset is empty or similar, but try
551                let progress = TracingIndexProgress::arc(&config.name);
552                match lance_ds
553                    .create_index_builder(&[property], IndexType::Vector, &params)
554                    .name(config.name.clone())
555                    .replace(true)
556                    .progress(progress)
557                    .await
558                {
559                    Ok(metadata) => {
560                        info!(
561                            index_name = %metadata.name,
562                            index_uuid = %metadata.uuid,
563                            dataset_version = metadata.dataset_version,
564                            "Vector index created"
565                        );
566                    }
567                    Err(e) => {
568                        warn!(
569                            "Failed to create physical vector index (dataset might be empty): {}",
570                            e
571                        );
572                    }
573                }
574            }
575            Err(e) => {
576                warn!(
577                    "Dataset not found for label '{}', skipping physical index creation but saving schema definition. Error: {}",
578                    label, e
579                );
580            }
581        }
582
583        Ok(())
584    }
585
586    /// Build and persist a scalar (BTree) index for exact-match and range queries.
587    #[cfg(feature = "lance-backend")]
588    #[instrument(skip(self), level = "info")]
589    pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
590        let label = &config.label;
591        let properties = &config.properties;
592        info!(
593            "Creating scalar index '{}' on {}.{:?}",
594            config.name, label, properties
595        );
596
597        let schema = self.schema_manager.schema();
598        let label_meta = schema
599            .labels
600            .get(label)
601            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
602
603        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
604
605        match ds_wrapper.open_raw().await {
606            Ok(mut lance_ds) => {
607                let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
608
609                let progress = TracingIndexProgress::arc(&config.name);
610                let scalar_params = match config.index_type {
611                    ScalarIndexType::Bitmap => {
612                        ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap)
613                    }
614                    ScalarIndexType::LabelList => {
615                        ScalarIndexParams::for_builtin(BuiltinIndexType::LabelList)
616                    }
617                    _ => ScalarIndexParams::default(),
618                };
619                match lance_ds
620                    .create_index_builder(&columns, IndexType::Scalar, &scalar_params)
621                    .name(config.name.clone())
622                    .replace(true)
623                    .progress(progress)
624                    .await
625                {
626                    Ok(metadata) => {
627                        info!(
628                            index_name = %metadata.name,
629                            index_uuid = %metadata.uuid,
630                            dataset_version = metadata.dataset_version,
631                            "Scalar index created"
632                        );
633                    }
634                    Err(e) => {
635                        warn!(
636                            "Failed to create physical scalar index (dataset might be empty): {}",
637                            e
638                        );
639                    }
640                }
641            }
642            Err(e) => {
643                warn!(
644                    "Dataset not found for label '{}' (scalar index), skipping physical creation. Error: {}",
645                    label, e
646                );
647            }
648        }
649
650        self.schema_manager
651            .add_index(IndexDefinition::Scalar(config))?;
652        self.schema_manager.save().await?;
653
654        Ok(())
655    }
656
657    /// Build and persist a full-text search (Lance inverted) index.
658    #[cfg(feature = "lance-backend")]
659    #[instrument(skip(self), level = "info")]
660    pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
661        let label = &config.label;
662        info!(
663            "Creating FTS index '{}' on {}.{:?}",
664            config.name, label, config.properties
665        );
666
667        let schema = self.schema_manager.schema();
668        let label_meta = schema
669            .labels
670            .get(label)
671            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
672
673        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
674
675        match ds_wrapper.open_raw().await {
676            Ok(mut lance_ds) => {
677                let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
678
679                let fts_params =
680                    InvertedIndexParams::default().with_position(config.with_positions);
681
682                let progress = TracingIndexProgress::arc(&config.name);
683                match lance_ds
684                    .create_index_builder(&columns, IndexType::Inverted, &fts_params)
685                    .name(config.name.clone())
686                    .replace(true)
687                    .progress(progress)
688                    .await
689                {
690                    Ok(metadata) => {
691                        info!(
692                            index_name = %metadata.name,
693                            index_uuid = %metadata.uuid,
694                            dataset_version = metadata.dataset_version,
695                            "FTS index created"
696                        );
697                    }
698                    Err(e) => {
699                        warn!(
700                            "Failed to create physical FTS index (dataset might be empty): {}",
701                            e
702                        );
703                    }
704                }
705            }
706            Err(e) => {
707                warn!(
708                    "Dataset not found for label '{}' (FTS index), skipping physical creation. Error: {}",
709                    label, e
710                );
711            }
712        }
713
714        self.schema_manager
715            .add_index(IndexDefinition::FullText(config))?;
716        self.schema_manager.save().await?;
717
718        Ok(())
719    }
720
721    /// Creates a JSON Full-Text Search index on a column.
722    ///
723    /// This creates a Lance inverted index on the specified column,
724    /// enabling BM25-based full-text search with optional phrase matching.
725    #[cfg(feature = "lance-backend")]
726    #[instrument(skip(self), level = "info")]
727    pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
728        let label = &config.label;
729        let column = &config.column;
730        info!(
731            "Creating JSON FTS index '{}' on {}.{}",
732            config.name, label, column
733        );
734
735        let schema = self.schema_manager.schema();
736        let label_meta = schema
737            .labels
738            .get(label)
739            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
740
741        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
742
743        match ds_wrapper.open_raw().await {
744            Ok(mut lance_ds) => {
745                let fts_params =
746                    InvertedIndexParams::default().with_position(config.with_positions);
747
748                let progress = TracingIndexProgress::arc(&config.name);
749                match lance_ds
750                    .create_index_builder(&[column.as_str()], IndexType::Inverted, &fts_params)
751                    .name(config.name.clone())
752                    .replace(true)
753                    .progress(progress)
754                    .await
755                {
756                    Ok(metadata) => {
757                        info!(
758                            index_name = %metadata.name,
759                            index_uuid = %metadata.uuid,
760                            dataset_version = metadata.dataset_version,
761                            "JSON FTS index created"
762                        );
763                    }
764                    Err(e) => {
765                        warn!(
766                            "Failed to create physical JSON FTS index (dataset might be empty): {}",
767                            e
768                        );
769                    }
770                }
771            }
772            Err(e) => {
773                warn!(
774                    "Dataset not found for label '{}' (JSON FTS index), skipping physical creation. Error: {}",
775                    label, e
776                );
777            }
778        }
779
780        self.schema_manager
781            .add_index(IndexDefinition::JsonFullText(config))?;
782        self.schema_manager.save().await?;
783
784        Ok(())
785    }
786
787    /// Remove an index both physically from the Lance dataset and from the schema.
788    #[cfg(feature = "lance-backend")]
789    #[instrument(skip(self), level = "info")]
790    pub async fn drop_index(&self, name: &str) -> Result<()> {
791        info!("Dropping index '{}'", name);
792
793        let idx_def = self
794            .schema_manager
795            .get_index(name)
796            .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
797
798        // Attempt physical index drop on the underlying Lance dataset.
799        let label = idx_def.label();
800        let schema = self.schema_manager.schema();
801        if let Some(label_meta) = schema.labels.get(label) {
802            let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
803            match ds_wrapper.open_raw().await {
804                Ok(mut lance_ds) => {
805                    if let Err(e) = lance_ds.drop_index(name).await {
806                        // Log but don't fail — the index may never have been
807                        // physically built (e.g. empty dataset at creation time).
808                        warn!(
809                            "Physical index drop for '{}' returned error (non-fatal): {}",
810                            name, e
811                        );
812                    } else {
813                        info!("Physical index '{}' dropped from Lance dataset", name);
814                    }
815                }
816                Err(e) => {
817                    debug!(
818                        "Could not open dataset for label '{}' to drop physical index: {}",
819                        label, e
820                    );
821                }
822            }
823        }
824
825        self.schema_manager.remove_index(name)?;
826        self.schema_manager.save().await?;
827        Ok(())
828    }
829
830    /// Rebuild all indexes registered for `label` from scratch.
831    #[cfg(feature = "lance-backend")]
832    #[instrument(skip(self), level = "info")]
833    pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
834        info!("Rebuilding all indexes for label '{}'", label);
835        let schema = self.schema_manager.schema();
836
837        // Clone and filter to avoid holding lock while async awaiting
838        let indexes: Vec<_> = schema
839            .indexes
840            .iter()
841            .filter(|idx| idx.label() == label)
842            .cloned()
843            .collect();
844
845        for index in indexes {
846            match index {
847                // A full rebuild must force the MUVERA FDE backfill: bulk-loaded / reopened
848                // rows aren't covered by the flush-time materializer (see
849                // `create_vector_index_inner`).
850                IndexDefinition::Vector(cfg) => self.create_vector_index_inner(cfg, true).await?,
851                IndexDefinition::Scalar(cfg) => self.create_scalar_index(cfg).await?,
852                IndexDefinition::FullText(cfg) => self.create_fts_index(cfg).await?,
853                IndexDefinition::Inverted(cfg) => self.create_inverted_index(cfg).await?,
854                IndexDefinition::JsonFullText(cfg) => self.create_json_fts_index(cfg).await?,
855                _ => warn!("Unknown index type encountered during rebuild, skipping"),
856            }
857        }
858        Ok(())
859    }
860
861    /// Create composite index for unique constraint
862    #[cfg(feature = "lance-backend")]
863    pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
864        let schema = self.schema_manager.schema();
865        let label_meta = schema
866            .labels
867            .get(label)
868            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
869
870        // Lance supports multi-column indexes
871        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
872
873        // We need to verify dataset exists
874        if let Ok(mut ds) = ds_wrapper.open_raw().await {
875            // Create composite BTree index
876            let index_name = format!("{}_{}_composite", label, properties.join("_"));
877
878            // Convert properties to slice of &str
879            let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
880
881            let progress = TracingIndexProgress::arc(&index_name);
882            match ds
883                .create_index_builder(&columns, IndexType::Scalar, &ScalarIndexParams::default())
884                .name(index_name.clone())
885                .replace(true)
886                .progress(progress)
887                .await
888            {
889                Ok(metadata) => {
890                    info!(
891                        index_name = %metadata.name,
892                        index_uuid = %metadata.uuid,
893                        dataset_version = metadata.dataset_version,
894                        "Composite index created"
895                    );
896                }
897                Err(e) => {
898                    warn!("Failed to create physical composite index: {}", e);
899                }
900            }
901
902            let config = ScalarIndexConfig {
903                name: index_name,
904                label: label.to_string(),
905                properties: properties.to_vec(),
906                index_type: uni_common::core::schema::ScalarIndexType::BTree,
907                where_clause: None,
908                metadata: Default::default(),
909            };
910
911            self.schema_manager
912                .add_index(IndexDefinition::Scalar(config))?;
913            self.schema_manager.save().await?;
914        }
915
916        Ok(())
917    }
918
919    /// Applies incremental updates to an inverted index.
920    ///
921    /// Instead of rebuilding the entire index, this method updates only the
922    /// changed entries, making it much faster for small mutations.
923    ///
924    /// # Errors
925    ///
926    /// Returns an error if the index doesn't exist or the update fails.
927    #[cfg(feature = "lance-backend")]
928    #[instrument(skip(self, added, removed), level = "info", fields(
929        label = %config.label,
930        property = %config.property
931    ))]
932    pub async fn update_inverted_index_incremental(
933        &self,
934        config: &InvertedIndexConfig,
935        added: &HashMap<Vid, Vec<String>>,
936        removed: &HashSet<Vid>,
937    ) -> Result<()> {
938        info!(
939            added = added.len(),
940            removed = removed.len(),
941            "Incrementally updating inverted index"
942        );
943
944        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
945        index.apply_incremental_updates(added, removed).await
946    }
947}