Skip to main content

uni_store/storage/
index_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Index lifecycle management: creation, rebuild, and incremental updates for all index types.
5
6use crate::storage::inverted_index::InvertedIndex;
7use crate::storage::vertex::VertexDataset;
8use anyhow::{Result, anyhow};
9use arrow_array::UInt64Array;
10use chrono::{DateTime, Utc};
11use futures::TryStreamExt;
12use lance::index::vector::VectorIndexParams;
13use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
14use lance_index::vector::hnsw::builder::HnswBuildParams;
15use lance_index::vector::ivf::IvfBuildParams;
16use lance_index::vector::pq::PQBuildParams;
17use lance_index::vector::sq::builder::SQBuildParams;
18use lance_index::{DatasetIndexExt, IndexType};
19use lance_linalg::distance::MetricType;
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22use std::collections::{HashMap, HashSet};
23use std::sync::Arc;
24use tracing::{info, instrument, warn};
25use uni_common::core::id::Vid;
26use uni_common::core::schema::{
27    DistanceMetric, FullTextIndexConfig, IndexDefinition, InvertedIndexConfig, JsonFtsIndexConfig,
28    ScalarIndexConfig, SchemaManager, VectorIndexConfig, VectorIndexType,
29};
30
31/// Validates that a column name contains only safe characters to prevent SQL injection.
32///
33/// Issue #8: Column names must be sanitized before interpolation in SQL queries.
34/// Allows only alphanumeric characters and underscores.
35fn is_valid_column_name(name: &str) -> bool {
36    !name.is_empty() && name.chars().all(|c| c.is_alphanumeric() || c == '_')
37}
38
39/// Status of an index rebuild task.
40#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
41pub enum IndexRebuildStatus {
42    /// Task is waiting to be processed.
43    Pending,
44    /// Task is currently being processed.
45    InProgress,
46    /// Task completed successfully.
47    Completed,
48    /// Task failed with an error.
49    Failed,
50}
51
52/// A task representing an index rebuild operation.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct IndexRebuildTask {
55    /// Unique identifier for this task.
56    pub id: String,
57    /// The label for which indexes are being rebuilt.
58    pub label: String,
59    /// Current status of the task.
60    pub status: IndexRebuildStatus,
61    /// When the task was created.
62    pub created_at: DateTime<Utc>,
63    /// When the task started processing.
64    pub started_at: Option<DateTime<Utc>>,
65    /// When the task completed (successfully or with failure).
66    pub completed_at: Option<DateTime<Utc>>,
67    /// Error message if the task failed.
68    pub error: Option<String>,
69    /// Number of retry attempts.
70    pub retry_count: u32,
71}
72
73/// Manages physical and logical indexes across all vertex datasets.
74pub struct IndexManager {
75    base_uri: String,
76    schema_manager: Arc<SchemaManager>,
77    lancedb_store: Arc<crate::lancedb::LanceDbStore>,
78}
79
80impl std::fmt::Debug for IndexManager {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        f.debug_struct("IndexManager")
83            .field("base_uri", &self.base_uri)
84            .finish_non_exhaustive()
85    }
86}
87
88impl IndexManager {
89    /// Create a new `IndexManager` bound to `base_uri` and the given schema and LanceDB store.
90    pub fn new(
91        base_uri: &str,
92        schema_manager: Arc<SchemaManager>,
93        lancedb_store: Arc<crate::lancedb::LanceDbStore>,
94    ) -> Self {
95        Self {
96            base_uri: base_uri.to_string(),
97            schema_manager,
98            lancedb_store,
99        }
100    }
101
102    /// Build and persist an inverted index for set-membership queries.
103    #[instrument(skip(self), level = "info")]
104    pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
105        let label = &config.label;
106        let property = &config.property;
107        info!(
108            "Creating Inverted Index '{}' on {}.{}",
109            config.name, label, property
110        );
111
112        let schema = self.schema_manager.schema();
113        let label_meta = schema
114            .labels
115            .get(label)
116            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
117
118        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
119
120        let ds = VertexDataset::new(&self.base_uri, label, label_meta.id);
121
122        // Check if dataset exists
123        if ds.open_raw().await.is_ok() {
124            index
125                .build_from_dataset(&ds, |n| info!("Indexed {} terms", n))
126                .await?;
127        } else {
128            warn!(
129                "Dataset for label '{}' not found, creating empty inverted index",
130                label
131            );
132        }
133
134        self.schema_manager
135            .add_index(IndexDefinition::Inverted(config))?;
136        self.schema_manager.save().await?;
137
138        Ok(())
139    }
140
141    /// Build and persist a vector (ANN) index on an embedding column.
142    #[instrument(skip(self), level = "info")]
143    pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
144        let label = &config.label;
145        let property = &config.property;
146        info!(
147            "Creating vector index '{}' on {}.{}",
148            config.name, label, property
149        );
150
151        let schema = self.schema_manager.schema();
152        let label_meta = schema
153            .labels
154            .get(label)
155            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
156
157        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
158
159        match ds_wrapper.open_raw().await {
160            Ok(mut lance_ds) => {
161                let metric_type = match config.metric {
162                    DistanceMetric::L2 => MetricType::L2,
163                    DistanceMetric::Cosine => MetricType::Cosine,
164                    DistanceMetric::Dot => MetricType::Dot,
165                    _ => return Err(anyhow!("Unsupported metric: {:?}", config.metric)),
166                };
167
168                let params = match config.index_type {
169                    VectorIndexType::IvfPq {
170                        num_partitions,
171                        num_sub_vectors,
172                        bits_per_subvector,
173                    } => {
174                        let ivf = IvfBuildParams::new(num_partitions as usize);
175                        let pq = PQBuildParams::new(
176                            num_sub_vectors as usize,
177                            bits_per_subvector as usize,
178                        );
179                        VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
180                    }
181                    VectorIndexType::Hnsw {
182                        m,
183                        ef_construction,
184                        ef_search: _,
185                    } => {
186                        let ivf = IvfBuildParams::new(1);
187                        let hnsw = HnswBuildParams::default()
188                            .num_edges(m as usize)
189                            .ef_construction(ef_construction as usize);
190                        let sq = SQBuildParams::default();
191                        VectorIndexParams::with_ivf_hnsw_sq_params(metric_type, ivf, hnsw, sq)
192                    }
193                    VectorIndexType::Flat => {
194                        // Fallback to basic IVF-PQ
195                        let ivf = IvfBuildParams::new(1);
196                        let pq = PQBuildParams::default();
197                        VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
198                    }
199                    _ => {
200                        return Err(anyhow!(
201                            "Unsupported vector index type: {:?}",
202                            config.index_type
203                        ));
204                    }
205                };
206
207                // Ignore errors during creation if dataset is empty or similar, but try
208                if let Err(e) = lance_ds
209                    .create_index(
210                        &[property],
211                        IndexType::Vector,
212                        Some(config.name.clone()),
213                        &params,
214                        true,
215                    )
216                    .await
217                {
218                    warn!(
219                        "Failed to create physical vector index (dataset might be empty): {}",
220                        e
221                    );
222                }
223            }
224            Err(e) => {
225                warn!(
226                    "Dataset not found for label '{}', skipping physical index creation but saving schema definition. Error: {}",
227                    label, e
228                );
229            }
230        }
231
232        self.schema_manager
233            .add_index(IndexDefinition::Vector(config))?;
234        self.schema_manager.save().await?;
235
236        Ok(())
237    }
238
239    /// Build and persist a scalar (BTree) index for exact-match and range queries.
240    #[instrument(skip(self), level = "info")]
241    pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
242        let label = &config.label;
243        let properties = &config.properties;
244        info!(
245            "Creating scalar index '{}' on {}.{:?}",
246            config.name, label, properties
247        );
248
249        let schema = self.schema_manager.schema();
250        let label_meta = schema
251            .labels
252            .get(label)
253            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
254
255        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
256
257        match ds_wrapper.open_raw().await {
258            Ok(mut lance_ds) => {
259                let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
260
261                if let Err(e) = lance_ds
262                    .create_index(
263                        &columns,
264                        IndexType::Scalar,
265                        Some(config.name.clone()),
266                        &ScalarIndexParams::default(),
267                        true,
268                    )
269                    .await
270                {
271                    warn!(
272                        "Failed to create physical scalar index (dataset might be empty): {}",
273                        e
274                    );
275                }
276            }
277            Err(e) => {
278                warn!(
279                    "Dataset not found for label '{}' (scalar index), skipping physical creation. Error: {}",
280                    label, e
281                );
282            }
283        }
284
285        self.schema_manager
286            .add_index(IndexDefinition::Scalar(config))?;
287        self.schema_manager.save().await?;
288
289        Ok(())
290    }
291
292    /// Build and persist a full-text search (Lance inverted) index.
293    #[instrument(skip(self), level = "info")]
294    pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
295        let label = &config.label;
296        info!(
297            "Creating FTS index '{}' on {}.{:?}",
298            config.name, label, config.properties
299        );
300
301        let schema = self.schema_manager.schema();
302        let label_meta = schema
303            .labels
304            .get(label)
305            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
306
307        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
308
309        match ds_wrapper.open_raw().await {
310            Ok(mut lance_ds) => {
311                let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
312
313                let fts_params =
314                    InvertedIndexParams::default().with_position(config.with_positions);
315
316                if let Err(e) = lance_ds
317                    .create_index(
318                        &columns,
319                        IndexType::Inverted,
320                        Some(config.name.clone()),
321                        &fts_params,
322                        true,
323                    )
324                    .await
325                {
326                    warn!(
327                        "Failed to create physical FTS index (dataset might be empty): {}",
328                        e
329                    );
330                }
331            }
332            Err(e) => {
333                warn!(
334                    "Dataset not found for label '{}' (FTS index), skipping physical creation. Error: {}",
335                    label, e
336                );
337            }
338        }
339
340        self.schema_manager
341            .add_index(IndexDefinition::FullText(config))?;
342        self.schema_manager.save().await?;
343
344        Ok(())
345    }
346
347    /// Creates a JSON Full-Text Search index on a column.
348    ///
349    /// This creates a Lance inverted index on the specified column,
350    /// enabling BM25-based full-text search with optional phrase matching.
351    #[instrument(skip(self), level = "info")]
352    pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
353        let label = &config.label;
354        let column = &config.column;
355        info!(
356            "Creating JSON FTS index '{}' on {}.{}",
357            config.name, label, column
358        );
359
360        let schema = self.schema_manager.schema();
361        let label_meta = schema
362            .labels
363            .get(label)
364            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
365
366        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
367
368        match ds_wrapper.open_raw().await {
369            Ok(mut lance_ds) => {
370                let fts_params =
371                    InvertedIndexParams::default().with_position(config.with_positions);
372
373                if let Err(e) = lance_ds
374                    .create_index(
375                        &[column],
376                        IndexType::Inverted,
377                        Some(config.name.clone()),
378                        &fts_params,
379                        true,
380                    )
381                    .await
382                {
383                    warn!(
384                        "Failed to create physical JSON FTS index (dataset might be empty): {}",
385                        e
386                    );
387                }
388            }
389            Err(e) => {
390                warn!(
391                    "Dataset not found for label '{}' (JSON FTS index), skipping physical creation. Error: {}",
392                    label, e
393                );
394            }
395        }
396
397        self.schema_manager
398            .add_index(IndexDefinition::JsonFullText(config))?;
399        self.schema_manager.save().await?;
400
401        Ok(())
402    }
403
404    /// Remove an index definition from the schema (physical drop is not yet supported).
405    #[instrument(skip(self), level = "info")]
406    pub async fn drop_index(&self, name: &str) -> Result<()> {
407        info!("Dropping index '{}'", name);
408
409        // Verify the index exists before removing
410        let _idx_def = self
411            .schema_manager
412            .get_index(name)
413            .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
414
415        // Physical index drop is not supported by the current Lance version,
416        // so we only remove the definition from the schema.
417        warn!("Physical index drop not yet supported, removing from schema only.");
418
419        self.schema_manager.remove_index(name)?;
420        self.schema_manager.save().await?;
421        Ok(())
422    }
423
424    /// Rebuild all indexes registered for `label` from scratch.
425    #[instrument(skip(self), level = "info")]
426    pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
427        info!("Rebuilding all indexes for label '{}'", label);
428        let schema = self.schema_manager.schema();
429
430        // Clone and filter to avoid holding lock while async awaiting
431        let indexes: Vec<_> = schema
432            .indexes
433            .iter()
434            .filter(|idx| idx.label() == label)
435            .cloned()
436            .collect();
437
438        for index in indexes {
439            match index {
440                IndexDefinition::Vector(cfg) => self.create_vector_index(cfg).await?,
441                IndexDefinition::Scalar(cfg) => self.create_scalar_index(cfg).await?,
442                IndexDefinition::FullText(cfg) => self.create_fts_index(cfg).await?,
443                IndexDefinition::Inverted(cfg) => self.create_inverted_index(cfg).await?,
444                IndexDefinition::JsonFullText(cfg) => self.create_json_fts_index(cfg).await?,
445                _ => warn!("Unknown index type encountered during rebuild, skipping"),
446            }
447        }
448        Ok(())
449    }
450
451    /// Create composite index for unique constraint
452    pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
453        let schema = self.schema_manager.schema();
454        let label_meta = schema
455            .labels
456            .get(label)
457            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
458
459        // Lance supports multi-column indexes
460        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
461
462        // We need to verify dataset exists
463        if let Ok(mut ds) = ds_wrapper.open_raw().await {
464            // Create composite BTree index
465            let index_name = format!("{}_{}_composite", label, properties.join("_"));
466
467            // Convert properties to slice of &str
468            let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
469
470            if let Err(e) = ds
471                .create_index(
472                    &columns,
473                    IndexType::Scalar,
474                    Some(index_name.clone()),
475                    &ScalarIndexParams::default(),
476                    true,
477                )
478                .await
479            {
480                warn!("Failed to create physical composite index: {}", e);
481            }
482
483            let config = ScalarIndexConfig {
484                name: index_name,
485                label: label.to_string(),
486                properties: properties.to_vec(),
487                index_type: uni_common::core::schema::ScalarIndexType::BTree,
488                where_clause: None,
489                metadata: Default::default(),
490            };
491
492            self.schema_manager
493                .add_index(IndexDefinition::Scalar(config))?;
494            self.schema_manager.save().await?;
495        }
496
497        Ok(())
498    }
499
500    /// Lookup by composite key
501    pub async fn composite_lookup(
502        &self,
503        label: &str,
504        key_values: &HashMap<String, Value>,
505    ) -> Result<Option<Vid>> {
506        use lancedb::query::{ExecutableQuery, QueryBase, Select};
507
508        let schema = self.schema_manager.schema();
509        let label_meta = schema
510            .labels
511            .get(label)
512            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
513
514        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
515        let table = match ds_wrapper.open_lancedb(&self.lancedb_store).await {
516            Ok(t) => t,
517            Err(_) => return Ok(None),
518        };
519
520        // Build filter from key values
521        let filter = key_values
522            .iter()
523            .map(|(k, v)| {
524                // Issue #8: Validate column name to prevent SQL injection
525                if !is_valid_column_name(k) {
526                    anyhow::bail!("Invalid column name '{}': must contain only alphanumeric characters and underscores", k);
527                }
528
529                let val_str = match v {
530                    Value::String(s) => format!("'{}'", s.replace('\'', "''")),
531                    Value::Number(n) => n.to_string(),
532                    Value::Bool(b) => b.to_string(),
533                    Value::Null => "null".to_string(),
534                    _ => v.to_string(),
535                };
536                // Quote column name for case sensitivity
537                Ok(format!("\"{}\" = {}", k, val_str))
538            })
539            .collect::<Result<Vec<_>>>()?
540            .join(" AND ");
541
542        let query = table
543            .query()
544            .only_if(&filter)
545            .limit(1)
546            .select(Select::Columns(vec!["_vid".to_string()]));
547
548        let stream = match query.execute().await {
549            Ok(s) => s,
550            Err(_) => return Ok(None),
551        };
552
553        let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await.unwrap_or_default();
554        for batch in batches {
555            if batch.num_rows() > 0 {
556                let vid_col = batch
557                    .column_by_name("_vid")
558                    .ok_or_else(|| anyhow!("Missing _vid column"))?
559                    .as_any()
560                    .downcast_ref::<UInt64Array>()
561                    .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
562
563                let vid = vid_col.value(0);
564                return Ok(Some(Vid::from(vid)));
565            }
566        }
567
568        Ok(None)
569    }
570
571    /// Applies incremental updates to an inverted index.
572    ///
573    /// Instead of rebuilding the entire index, this method updates only the
574    /// changed entries, making it much faster for small mutations.
575    ///
576    /// # Errors
577    ///
578    /// Returns an error if the index doesn't exist or the update fails.
579    #[instrument(skip(self, added, removed), level = "info", fields(
580        label = %config.label,
581        property = %config.property
582    ))]
583    pub async fn update_inverted_index_incremental(
584        &self,
585        config: &InvertedIndexConfig,
586        added: &HashMap<Vid, Vec<String>>,
587        removed: &HashSet<Vid>,
588    ) -> Result<()> {
589        info!(
590            added = added.len(),
591            removed = removed.len(),
592            "Incrementally updating inverted index"
593        );
594
595        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
596        index.apply_incremental_updates(added, removed).await
597    }
598}