Skip to main content

uni_store/storage/
index_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Index lifecycle management: creation, rebuild, and incremental updates for all index types.
5
6#[cfg(feature = "lance-backend")]
7use crate::storage::inverted_index::InvertedIndex;
8use crate::storage::vertex::VertexDataset;
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11#[cfg(feature = "lance-backend")]
12use lance::index::DatasetIndexExt;
13#[cfg(feature = "lance-backend")]
14use lance::index::vector::VectorIndexParams;
15#[cfg(feature = "lance-backend")]
16use lance_index::IndexType;
17#[cfg(feature = "lance-backend")]
18use lance_index::progress::IndexBuildProgress;
19#[cfg(feature = "lance-backend")]
20use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
21#[cfg(feature = "lance-backend")]
22use lance_index::vector::bq::RQBuildParams;
23#[cfg(feature = "lance-backend")]
24use lance_index::vector::hnsw::builder::HnswBuildParams;
25#[cfg(feature = "lance-backend")]
26use lance_index::vector::ivf::IvfBuildParams;
27#[cfg(feature = "lance-backend")]
28use lance_index::vector::pq::PQBuildParams;
29#[cfg(feature = "lance-backend")]
30use lance_index::vector::sq::builder::SQBuildParams;
31#[cfg(feature = "lance-backend")]
32use lance_linalg::distance::MetricType;
33use serde::{Deserialize, Serialize};
34use std::collections::HashMap;
35#[cfg(feature = "lance-backend")]
36use std::collections::HashSet;
37use std::sync::Arc;
38#[cfg(feature = "lance-backend")]
39use tracing::{debug, info, instrument, warn};
40use uni_common::core::id::Vid;
41#[cfg(feature = "lance-backend")]
42use uni_common::core::schema::IndexDefinition;
43use uni_common::core::schema::SchemaManager;
44#[cfg(feature = "lance-backend")]
45use uni_common::core::schema::{
46    DistanceMetric, FullTextIndexConfig, InvertedIndexConfig, JsonFtsIndexConfig,
47    ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, VectorIndexType,
48};
49
50/// Tracing-based progress reporter for Lance index builds.
51///
52/// Emits structured log events at each stage boundary, enabling
53/// observability into index build duration and progress.
54#[cfg(feature = "lance-backend")]
55#[derive(Debug)]
56pub struct TracingIndexProgress {
57    index_name: String,
58}
59
60#[cfg(feature = "lance-backend")]
61impl TracingIndexProgress {
62    pub fn arc(index_name: &str) -> Arc<dyn IndexBuildProgress> {
63        Arc::new(Self {
64            index_name: index_name.to_string(),
65        })
66    }
67}
68
69#[cfg(feature = "lance-backend")]
70#[async_trait::async_trait]
71impl IndexBuildProgress for TracingIndexProgress {
72    async fn stage_start(&self, stage: &str, total: Option<u64>, unit: &str) -> lance::Result<()> {
73        info!(
74            index = %self.index_name,
75            stage,
76            ?total,
77            unit,
78            "Index build stage started"
79        );
80        Ok(())
81    }
82
83    async fn stage_progress(&self, stage: &str, completed: u64) -> lance::Result<()> {
84        debug!(
85            index = %self.index_name,
86            stage,
87            completed,
88            "Index build progress"
89        );
90        Ok(())
91    }
92
93    async fn stage_complete(&self, stage: &str) -> lance::Result<()> {
94        info!(
95            index = %self.index_name,
96            stage,
97            "Index build stage complete"
98        );
99        Ok(())
100    }
101}
102
103/// Status of an index rebuild task.
104#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
105pub enum IndexRebuildStatus {
106    /// Task is waiting to be processed.
107    Pending,
108    /// Task is currently being processed.
109    InProgress,
110    /// Task completed successfully.
111    Completed,
112    /// Task failed with an error.
113    Failed,
114}
115
116/// A task representing an index rebuild operation.
117#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct IndexRebuildTask {
119    /// Unique identifier for this task.
120    pub id: String,
121    /// The label for which indexes are being rebuilt.
122    pub label: String,
123    /// Current status of the task.
124    pub status: IndexRebuildStatus,
125    /// When the task was created.
126    pub created_at: DateTime<Utc>,
127    /// When the task started processing.
128    pub started_at: Option<DateTime<Utc>>,
129    /// When the task completed (successfully or with failure).
130    pub completed_at: Option<DateTime<Utc>>,
131    /// Error message if the task failed.
132    pub error: Option<String>,
133    /// Number of retry attempts.
134    pub retry_count: u32,
135}
136
137/// Manages physical and logical indexes across all vertex datasets.
138pub struct IndexManager {
139    base_uri: String,
140    schema_manager: Arc<SchemaManager>,
141}
142
143impl std::fmt::Debug for IndexManager {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        f.debug_struct("IndexManager")
146            .field("base_uri", &self.base_uri)
147            .finish_non_exhaustive()
148    }
149}
150
151impl IndexManager {
152    /// Create a new `IndexManager` bound to `base_uri` and the given schema.
153    pub fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Self {
154        Self {
155            base_uri: base_uri.to_string(),
156            schema_manager,
157        }
158    }
159
160    /// Build and persist an inverted index for set-membership queries.
161    #[cfg(feature = "lance-backend")]
162    #[instrument(skip(self), level = "info")]
163    pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
164        let label = &config.label;
165        let property = &config.property;
166        info!(
167            "Creating Inverted Index '{}' on {}.{}",
168            config.name, label, property
169        );
170
171        let schema = self.schema_manager.schema();
172        let label_meta = schema
173            .labels
174            .get(label)
175            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
176
177        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
178
179        let ds = VertexDataset::new(&self.base_uri, label, label_meta.id);
180
181        // Check if dataset exists
182        if ds.open_raw().await.is_ok() {
183            index
184                .build_from_dataset(&ds, |n| info!("Indexed {} terms", n))
185                .await?;
186        } else {
187            warn!(
188                "Dataset for label '{}' not found, creating empty inverted index",
189                label
190            );
191        }
192
193        self.schema_manager
194            .add_index(IndexDefinition::Inverted(config))?;
195        self.schema_manager.save().await?;
196
197        Ok(())
198    }
199
200    /// Build and persist a vector (ANN) index on an embedding column.
201    #[cfg(feature = "lance-backend")]
202    #[instrument(skip(self), level = "info")]
203    pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
204        let label = &config.label;
205        let property = &config.property;
206        info!(
207            "Creating vector index '{}' on {}.{}",
208            config.name, label, property
209        );
210
211        let schema = self.schema_manager.schema();
212        let label_meta = schema
213            .labels
214            .get(label)
215            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
216
217        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
218
219        match ds_wrapper.open_raw().await {
220            Ok(mut lance_ds) => {
221                let metric_type = match config.metric {
222                    DistanceMetric::L2 => MetricType::L2,
223                    DistanceMetric::Cosine => MetricType::Cosine,
224                    DistanceMetric::Dot => MetricType::Dot,
225                    _ => return Err(anyhow!("Unsupported metric: {:?}", config.metric)),
226                };
227
228                let params = match config.index_type {
229                    VectorIndexType::Flat => {
230                        let ivf = IvfBuildParams::new(1);
231                        VectorIndexParams::with_ivf_flat_params(metric_type, ivf)
232                    }
233                    VectorIndexType::IvfFlat { num_partitions } => {
234                        let ivf = IvfBuildParams::new(num_partitions as usize);
235                        VectorIndexParams::with_ivf_flat_params(metric_type, ivf)
236                    }
237                    VectorIndexType::IvfPq {
238                        num_partitions,
239                        num_sub_vectors,
240                        bits_per_subvector,
241                    } => {
242                        let ivf = IvfBuildParams::new(num_partitions as usize);
243                        let pq = PQBuildParams::new(
244                            num_sub_vectors as usize,
245                            bits_per_subvector as usize,
246                        );
247                        VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
248                    }
249                    VectorIndexType::IvfSq { num_partitions } => {
250                        let ivf = IvfBuildParams::new(num_partitions as usize);
251                        let sq = SQBuildParams::default();
252                        VectorIndexParams::with_ivf_sq_params(metric_type, ivf, sq)
253                    }
254                    VectorIndexType::IvfRq {
255                        num_partitions,
256                        num_bits,
257                    } => {
258                        let ivf = IvfBuildParams::new(num_partitions as usize);
259                        let mut rq = RQBuildParams::default();
260                        if let Some(bits) = num_bits {
261                            rq.num_bits = bits;
262                        }
263                        VectorIndexParams::with_ivf_rq_params(metric_type, ivf, rq)
264                    }
265                    VectorIndexType::HnswFlat {
266                        m,
267                        ef_construction,
268                        num_partitions,
269                    } => {
270                        let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
271                        let hnsw = HnswBuildParams::default()
272                            .num_edges(m as usize)
273                            .ef_construction(ef_construction as usize);
274                        VectorIndexParams::ivf_hnsw(metric_type, ivf, hnsw)
275                    }
276                    VectorIndexType::HnswSq {
277                        m,
278                        ef_construction,
279                        num_partitions,
280                    } => {
281                        let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
282                        let hnsw = HnswBuildParams::default()
283                            .num_edges(m as usize)
284                            .ef_construction(ef_construction as usize);
285                        let sq = SQBuildParams::default();
286                        VectorIndexParams::with_ivf_hnsw_sq_params(metric_type, ivf, hnsw, sq)
287                    }
288                    VectorIndexType::HnswPq {
289                        m,
290                        ef_construction,
291                        num_sub_vectors,
292                        num_partitions,
293                    } => {
294                        let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
295                        let hnsw = HnswBuildParams::default()
296                            .num_edges(m as usize)
297                            .ef_construction(ef_construction as usize);
298                        let pq = PQBuildParams::new(num_sub_vectors as usize, 8);
299                        VectorIndexParams::with_ivf_hnsw_pq_params(metric_type, ivf, hnsw, pq)
300                    }
301                    _ => {
302                        return Err(anyhow!(
303                            "Unsupported vector index type: {:?}",
304                            config.index_type
305                        ));
306                    }
307                };
308
309                // Ignore errors during creation if dataset is empty or similar, but try
310                let progress = TracingIndexProgress::arc(&config.name);
311                match lance_ds
312                    .create_index_builder(&[property], IndexType::Vector, &params)
313                    .name(config.name.clone())
314                    .replace(true)
315                    .progress(progress)
316                    .await
317                {
318                    Ok(metadata) => {
319                        info!(
320                            index_name = %metadata.name,
321                            index_uuid = %metadata.uuid,
322                            dataset_version = metadata.dataset_version,
323                            "Vector index created"
324                        );
325                    }
326                    Err(e) => {
327                        warn!(
328                            "Failed to create physical vector index (dataset might be empty): {}",
329                            e
330                        );
331                    }
332                }
333            }
334            Err(e) => {
335                warn!(
336                    "Dataset not found for label '{}', skipping physical index creation but saving schema definition. Error: {}",
337                    label, e
338                );
339            }
340        }
341
342        self.schema_manager
343            .add_index(IndexDefinition::Vector(config))?;
344        self.schema_manager.save().await?;
345
346        Ok(())
347    }
348
349    /// Build and persist a scalar (BTree) index for exact-match and range queries.
350    #[cfg(feature = "lance-backend")]
351    #[instrument(skip(self), level = "info")]
352    pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
353        let label = &config.label;
354        let properties = &config.properties;
355        info!(
356            "Creating scalar index '{}' on {}.{:?}",
357            config.name, label, properties
358        );
359
360        let schema = self.schema_manager.schema();
361        let label_meta = schema
362            .labels
363            .get(label)
364            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
365
366        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
367
368        match ds_wrapper.open_raw().await {
369            Ok(mut lance_ds) => {
370                let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
371
372                let progress = TracingIndexProgress::arc(&config.name);
373                let scalar_params = match config.index_type {
374                    ScalarIndexType::Bitmap => {
375                        ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap)
376                    }
377                    ScalarIndexType::LabelList => {
378                        ScalarIndexParams::for_builtin(BuiltinIndexType::LabelList)
379                    }
380                    _ => ScalarIndexParams::default(),
381                };
382                match lance_ds
383                    .create_index_builder(&columns, IndexType::Scalar, &scalar_params)
384                    .name(config.name.clone())
385                    .replace(true)
386                    .progress(progress)
387                    .await
388                {
389                    Ok(metadata) => {
390                        info!(
391                            index_name = %metadata.name,
392                            index_uuid = %metadata.uuid,
393                            dataset_version = metadata.dataset_version,
394                            "Scalar index created"
395                        );
396                    }
397                    Err(e) => {
398                        warn!(
399                            "Failed to create physical scalar index (dataset might be empty): {}",
400                            e
401                        );
402                    }
403                }
404            }
405            Err(e) => {
406                warn!(
407                    "Dataset not found for label '{}' (scalar index), skipping physical creation. Error: {}",
408                    label, e
409                );
410            }
411        }
412
413        self.schema_manager
414            .add_index(IndexDefinition::Scalar(config))?;
415        self.schema_manager.save().await?;
416
417        Ok(())
418    }
419
420    /// Build and persist a full-text search (Lance inverted) index.
421    #[cfg(feature = "lance-backend")]
422    #[instrument(skip(self), level = "info")]
423    pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
424        let label = &config.label;
425        info!(
426            "Creating FTS index '{}' on {}.{:?}",
427            config.name, label, config.properties
428        );
429
430        let schema = self.schema_manager.schema();
431        let label_meta = schema
432            .labels
433            .get(label)
434            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
435
436        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
437
438        match ds_wrapper.open_raw().await {
439            Ok(mut lance_ds) => {
440                let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
441
442                let fts_params =
443                    InvertedIndexParams::default().with_position(config.with_positions);
444
445                let progress = TracingIndexProgress::arc(&config.name);
446                match lance_ds
447                    .create_index_builder(&columns, IndexType::Inverted, &fts_params)
448                    .name(config.name.clone())
449                    .replace(true)
450                    .progress(progress)
451                    .await
452                {
453                    Ok(metadata) => {
454                        info!(
455                            index_name = %metadata.name,
456                            index_uuid = %metadata.uuid,
457                            dataset_version = metadata.dataset_version,
458                            "FTS index created"
459                        );
460                    }
461                    Err(e) => {
462                        warn!(
463                            "Failed to create physical FTS index (dataset might be empty): {}",
464                            e
465                        );
466                    }
467                }
468            }
469            Err(e) => {
470                warn!(
471                    "Dataset not found for label '{}' (FTS index), skipping physical creation. Error: {}",
472                    label, e
473                );
474            }
475        }
476
477        self.schema_manager
478            .add_index(IndexDefinition::FullText(config))?;
479        self.schema_manager.save().await?;
480
481        Ok(())
482    }
483
484    /// Creates a JSON Full-Text Search index on a column.
485    ///
486    /// This creates a Lance inverted index on the specified column,
487    /// enabling BM25-based full-text search with optional phrase matching.
488    #[cfg(feature = "lance-backend")]
489    #[instrument(skip(self), level = "info")]
490    pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
491        let label = &config.label;
492        let column = &config.column;
493        info!(
494            "Creating JSON FTS index '{}' on {}.{}",
495            config.name, label, column
496        );
497
498        let schema = self.schema_manager.schema();
499        let label_meta = schema
500            .labels
501            .get(label)
502            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
503
504        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
505
506        match ds_wrapper.open_raw().await {
507            Ok(mut lance_ds) => {
508                let fts_params =
509                    InvertedIndexParams::default().with_position(config.with_positions);
510
511                let progress = TracingIndexProgress::arc(&config.name);
512                match lance_ds
513                    .create_index_builder(&[column.as_str()], IndexType::Inverted, &fts_params)
514                    .name(config.name.clone())
515                    .replace(true)
516                    .progress(progress)
517                    .await
518                {
519                    Ok(metadata) => {
520                        info!(
521                            index_name = %metadata.name,
522                            index_uuid = %metadata.uuid,
523                            dataset_version = metadata.dataset_version,
524                            "JSON FTS index created"
525                        );
526                    }
527                    Err(e) => {
528                        warn!(
529                            "Failed to create physical JSON FTS index (dataset might be empty): {}",
530                            e
531                        );
532                    }
533                }
534            }
535            Err(e) => {
536                warn!(
537                    "Dataset not found for label '{}' (JSON FTS index), skipping physical creation. Error: {}",
538                    label, e
539                );
540            }
541        }
542
543        self.schema_manager
544            .add_index(IndexDefinition::JsonFullText(config))?;
545        self.schema_manager.save().await?;
546
547        Ok(())
548    }
549
550    /// Remove an index both physically from the Lance dataset and from the schema.
551    #[cfg(feature = "lance-backend")]
552    #[instrument(skip(self), level = "info")]
553    pub async fn drop_index(&self, name: &str) -> Result<()> {
554        info!("Dropping index '{}'", name);
555
556        let idx_def = self
557            .schema_manager
558            .get_index(name)
559            .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
560
561        // Attempt physical index drop on the underlying Lance dataset.
562        let label = idx_def.label();
563        let schema = self.schema_manager.schema();
564        if let Some(label_meta) = schema.labels.get(label) {
565            let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
566            match ds_wrapper.open_raw().await {
567                Ok(mut lance_ds) => {
568                    if let Err(e) = lance_ds.drop_index(name).await {
569                        // Log but don't fail — the index may never have been
570                        // physically built (e.g. empty dataset at creation time).
571                        warn!(
572                            "Physical index drop for '{}' returned error (non-fatal): {}",
573                            name, e
574                        );
575                    } else {
576                        info!("Physical index '{}' dropped from Lance dataset", name);
577                    }
578                }
579                Err(e) => {
580                    debug!(
581                        "Could not open dataset for label '{}' to drop physical index: {}",
582                        label, e
583                    );
584                }
585            }
586        }
587
588        self.schema_manager.remove_index(name)?;
589        self.schema_manager.save().await?;
590        Ok(())
591    }
592
593    /// Rebuild all indexes registered for `label` from scratch.
594    #[cfg(feature = "lance-backend")]
595    #[instrument(skip(self), level = "info")]
596    pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
597        info!("Rebuilding all indexes for label '{}'", label);
598        let schema = self.schema_manager.schema();
599
600        // Clone and filter to avoid holding lock while async awaiting
601        let indexes: Vec<_> = schema
602            .indexes
603            .iter()
604            .filter(|idx| idx.label() == label)
605            .cloned()
606            .collect();
607
608        for index in indexes {
609            match index {
610                IndexDefinition::Vector(cfg) => self.create_vector_index(cfg).await?,
611                IndexDefinition::Scalar(cfg) => self.create_scalar_index(cfg).await?,
612                IndexDefinition::FullText(cfg) => self.create_fts_index(cfg).await?,
613                IndexDefinition::Inverted(cfg) => self.create_inverted_index(cfg).await?,
614                IndexDefinition::JsonFullText(cfg) => self.create_json_fts_index(cfg).await?,
615                _ => warn!("Unknown index type encountered during rebuild, skipping"),
616            }
617        }
618        Ok(())
619    }
620
621    /// Create composite index for unique constraint
622    #[cfg(feature = "lance-backend")]
623    pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
624        let schema = self.schema_manager.schema();
625        let label_meta = schema
626            .labels
627            .get(label)
628            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
629
630        // Lance supports multi-column indexes
631        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
632
633        // We need to verify dataset exists
634        if let Ok(mut ds) = ds_wrapper.open_raw().await {
635            // Create composite BTree index
636            let index_name = format!("{}_{}_composite", label, properties.join("_"));
637
638            // Convert properties to slice of &str
639            let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
640
641            let progress = TracingIndexProgress::arc(&index_name);
642            match ds
643                .create_index_builder(&columns, IndexType::Scalar, &ScalarIndexParams::default())
644                .name(index_name.clone())
645                .replace(true)
646                .progress(progress)
647                .await
648            {
649                Ok(metadata) => {
650                    info!(
651                        index_name = %metadata.name,
652                        index_uuid = %metadata.uuid,
653                        dataset_version = metadata.dataset_version,
654                        "Composite index created"
655                    );
656                }
657                Err(e) => {
658                    warn!("Failed to create physical composite index: {}", e);
659                }
660            }
661
662            let config = ScalarIndexConfig {
663                name: index_name,
664                label: label.to_string(),
665                properties: properties.to_vec(),
666                index_type: uni_common::core::schema::ScalarIndexType::BTree,
667                where_clause: None,
668                metadata: Default::default(),
669            };
670
671            self.schema_manager
672                .add_index(IndexDefinition::Scalar(config))?;
673            self.schema_manager.save().await?;
674        }
675
676        Ok(())
677    }
678
679    /// Applies incremental updates to an inverted index.
680    ///
681    /// Instead of rebuilding the entire index, this method updates only the
682    /// changed entries, making it much faster for small mutations.
683    ///
684    /// # Errors
685    ///
686    /// Returns an error if the index doesn't exist or the update fails.
687    #[cfg(feature = "lance-backend")]
688    #[instrument(skip(self, added, removed), level = "info", fields(
689        label = %config.label,
690        property = %config.property
691    ))]
692    pub async fn update_inverted_index_incremental(
693        &self,
694        config: &InvertedIndexConfig,
695        added: &HashMap<Vid, Vec<String>>,
696        removed: &HashSet<Vid>,
697    ) -> Result<()> {
698        info!(
699            added = added.len(),
700            removed = removed.len(),
701            "Incrementally updating inverted index"
702        );
703
704        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
705        index.apply_incremental_updates(added, removed).await
706    }
707}