Skip to main content

uni_store/storage/
index_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::storage::inverted_index::InvertedIndex;
5use crate::storage::vertex::VertexDataset;
6use anyhow::{Result, anyhow};
7use arrow_array::UInt64Array;
8use chrono::{DateTime, Utc};
9use futures::TryStreamExt;
10use lance::index::vector::VectorIndexParams;
11use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
12use lance_index::vector::hnsw::builder::HnswBuildParams;
13use lance_index::vector::ivf::IvfBuildParams;
14use lance_index::vector::pq::PQBuildParams;
15use lance_index::vector::sq::builder::SQBuildParams;
16use lance_index::{DatasetIndexExt, IndexType};
17use lance_linalg::distance::MetricType;
18use serde::{Deserialize, Serialize};
19use serde_json::Value;
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22use tracing::{info, instrument, warn};
23use uni_common::core::id::Vid;
24use uni_common::core::schema::{
25    DistanceMetric, FullTextIndexConfig, IndexDefinition, InvertedIndexConfig, JsonFtsIndexConfig,
26    ScalarIndexConfig, SchemaManager, VectorIndexConfig, VectorIndexType,
27};
28
29/// Validates that a column name contains only safe characters to prevent SQL injection.
30///
31/// Issue #8: Column names must be sanitized before interpolation in SQL queries.
32/// Allows only alphanumeric characters and underscores.
33fn is_valid_column_name(name: &str) -> bool {
34    !name.is_empty() && name.chars().all(|c| c.is_alphanumeric() || c == '_')
35}
36
37/// Status of an index rebuild task.
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub enum IndexRebuildStatus {
40    /// Task is waiting to be processed.
41    Pending,
42    /// Task is currently being processed.
43    InProgress,
44    /// Task completed successfully.
45    Completed,
46    /// Task failed with an error.
47    Failed,
48}
49
50/// A task representing an index rebuild operation.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct IndexRebuildTask {
53    /// Unique identifier for this task.
54    pub id: String,
55    /// The label for which indexes are being rebuilt.
56    pub label: String,
57    /// Current status of the task.
58    pub status: IndexRebuildStatus,
59    /// When the task was created.
60    pub created_at: DateTime<Utc>,
61    /// When the task started processing.
62    pub started_at: Option<DateTime<Utc>>,
63    /// When the task completed (successfully or with failure).
64    pub completed_at: Option<DateTime<Utc>>,
65    /// Error message if the task failed.
66    pub error: Option<String>,
67    /// Number of retry attempts.
68    pub retry_count: u32,
69}
70
71pub struct IndexManager {
72    base_uri: String,
73    schema_manager: Arc<SchemaManager>,
74    lancedb_store: Arc<crate::lancedb::LanceDbStore>,
75}
76
77impl IndexManager {
78    pub fn new(
79        base_uri: &str,
80        schema_manager: Arc<SchemaManager>,
81        lancedb_store: Arc<crate::lancedb::LanceDbStore>,
82    ) -> Self {
83        Self {
84            base_uri: base_uri.to_string(),
85            schema_manager,
86            lancedb_store,
87        }
88    }
89
90    #[instrument(skip(self), level = "info")]
91    pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
92        let label = &config.label;
93        let property = &config.property;
94        info!(
95            "Creating Inverted Index '{}' on {}.{}",
96            config.name, label, property
97        );
98
99        let schema = self.schema_manager.schema();
100        let label_meta = schema
101            .labels
102            .get(label)
103            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
104
105        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
106        index.create_if_missing().await?;
107
108        let ds = VertexDataset::new(&self.base_uri, label, label_meta.id);
109
110        // Check if dataset exists
111        if ds.open_raw().await.is_ok() {
112            index
113                .build_from_dataset(&ds, |n| info!("Indexed {} terms", n))
114                .await?;
115        } else {
116            warn!(
117                "Dataset for label '{}' not found, creating empty inverted index",
118                label
119            );
120        }
121
122        self.schema_manager
123            .add_index(IndexDefinition::Inverted(config))?;
124        self.schema_manager.save().await?;
125
126        Ok(())
127    }
128
129    #[instrument(skip(self), level = "info")]
130    pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
131        let label = &config.label;
132        let property = &config.property;
133        info!(
134            "Creating vector index '{}' on {}.{}",
135            config.name, label, property
136        );
137
138        let schema = self.schema_manager.schema();
139        let label_meta = schema
140            .labels
141            .get(label)
142            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
143
144        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
145
146        match ds_wrapper.open_raw().await {
147            Ok(mut lance_ds) => {
148                let metric_type = match config.metric {
149                    DistanceMetric::L2 => MetricType::L2,
150                    DistanceMetric::Cosine => MetricType::Cosine,
151                    DistanceMetric::Dot => MetricType::Dot,
152                    _ => return Err(anyhow!("Unsupported metric: {:?}", config.metric)),
153                };
154
155                let params = match config.index_type {
156                    VectorIndexType::IvfPq {
157                        num_partitions,
158                        num_sub_vectors,
159                        bits_per_subvector,
160                    } => {
161                        let ivf = IvfBuildParams::new(num_partitions as usize);
162                        let pq = PQBuildParams::new(
163                            num_sub_vectors as usize,
164                            bits_per_subvector as usize,
165                        );
166                        VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
167                    }
168                    VectorIndexType::Hnsw {
169                        m,
170                        ef_construction,
171                        ef_search: _,
172                    } => {
173                        let ivf = IvfBuildParams::new(1);
174                        let hnsw = HnswBuildParams::default()
175                            .num_edges(m as usize)
176                            .ef_construction(ef_construction as usize);
177                        let sq = SQBuildParams::default();
178                        VectorIndexParams::with_ivf_hnsw_sq_params(metric_type, ivf, hnsw, sq)
179                    }
180                    VectorIndexType::Flat => {
181                        // Fallback to basic IVF-PQ
182                        let ivf = IvfBuildParams::new(1);
183                        let pq = PQBuildParams::default();
184                        VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
185                    }
186                    _ => {
187                        return Err(anyhow!(
188                            "Unsupported vector index type: {:?}",
189                            config.index_type
190                        ));
191                    }
192                };
193
194                // Ignore errors during creation if dataset is empty or similar, but try
195                if let Err(e) = lance_ds
196                    .create_index(
197                        &[property],
198                        IndexType::Vector,
199                        Some(config.name.clone()),
200                        &params,
201                        true,
202                    )
203                    .await
204                {
205                    warn!(
206                        "Failed to create physical vector index (dataset might be empty): {}",
207                        e
208                    );
209                }
210            }
211            Err(e) => {
212                warn!(
213                    "Dataset not found for label '{}', skipping physical index creation but saving schema definition. Error: {}",
214                    label, e
215                );
216            }
217        }
218
219        self.schema_manager
220            .add_index(IndexDefinition::Vector(config))?;
221        self.schema_manager.save().await?;
222
223        Ok(())
224    }
225
226    #[instrument(skip(self), level = "info")]
227    pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
228        let label = &config.label;
229        let properties = &config.properties;
230        info!(
231            "Creating scalar index '{}' on {}.{:?}",
232            config.name, label, properties
233        );
234
235        let schema = self.schema_manager.schema();
236        let label_meta = schema
237            .labels
238            .get(label)
239            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
240
241        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
242
243        match ds_wrapper.open_raw().await {
244            Ok(mut lance_ds) => {
245                let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
246
247                if let Err(e) = lance_ds
248                    .create_index(
249                        &columns,
250                        IndexType::Scalar,
251                        Some(config.name.clone()),
252                        &ScalarIndexParams::default(),
253                        true,
254                    )
255                    .await
256                {
257                    warn!(
258                        "Failed to create physical scalar index (dataset might be empty): {}",
259                        e
260                    );
261                }
262            }
263            Err(e) => {
264                warn!(
265                    "Dataset not found for label '{}' (scalar index), skipping physical creation. Error: {}",
266                    label, e
267                );
268            }
269        }
270
271        self.schema_manager
272            .add_index(IndexDefinition::Scalar(config))?;
273        self.schema_manager.save().await?;
274
275        Ok(())
276    }
277
278    #[instrument(skip(self), level = "info")]
279    pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
280        let label = &config.label;
281        info!(
282            "Creating FTS index '{}' on {}.{:?}",
283            config.name, label, config.properties
284        );
285
286        let schema = self.schema_manager.schema();
287        let label_meta = schema
288            .labels
289            .get(label)
290            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
291
292        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
293
294        match ds_wrapper.open_raw().await {
295            Ok(mut lance_ds) => {
296                let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
297
298                let fts_params =
299                    InvertedIndexParams::default().with_position(config.with_positions);
300
301                if let Err(e) = lance_ds
302                    .create_index(
303                        &columns,
304                        IndexType::Inverted,
305                        Some(config.name.clone()),
306                        &fts_params,
307                        true,
308                    )
309                    .await
310                {
311                    warn!(
312                        "Failed to create physical FTS index (dataset might be empty): {}",
313                        e
314                    );
315                }
316            }
317            Err(e) => {
318                warn!(
319                    "Dataset not found for label '{}' (FTS index), skipping physical creation. Error: {}",
320                    label, e
321                );
322            }
323        }
324
325        self.schema_manager
326            .add_index(IndexDefinition::FullText(config))?;
327        self.schema_manager.save().await?;
328
329        Ok(())
330    }
331
332    /// Creates a JSON Full-Text Search index on a column.
333    ///
334    /// This creates a Lance inverted index on the specified column,
335    /// enabling BM25-based full-text search with optional phrase matching.
336    #[instrument(skip(self), level = "info")]
337    pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
338        let label = &config.label;
339        let column = &config.column;
340        info!(
341            "Creating JSON FTS index '{}' on {}.{}",
342            config.name, label, column
343        );
344
345        let schema = self.schema_manager.schema();
346        let label_meta = schema
347            .labels
348            .get(label)
349            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
350
351        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
352
353        match ds_wrapper.open_raw().await {
354            Ok(mut lance_ds) => {
355                let fts_params =
356                    InvertedIndexParams::default().with_position(config.with_positions);
357
358                if let Err(e) = lance_ds
359                    .create_index(
360                        &[column],
361                        IndexType::Inverted,
362                        Some(config.name.clone()),
363                        &fts_params,
364                        true,
365                    )
366                    .await
367                {
368                    warn!(
369                        "Failed to create physical JSON FTS index (dataset might be empty): {}",
370                        e
371                    );
372                }
373            }
374            Err(e) => {
375                warn!(
376                    "Dataset not found for label '{}' (JSON FTS index), skipping physical creation. Error: {}",
377                    label, e
378                );
379            }
380        }
381
382        self.schema_manager
383            .add_index(IndexDefinition::JsonFullText(config))?;
384        self.schema_manager.save().await?;
385
386        Ok(())
387    }
388
389    #[instrument(skip(self), level = "info")]
390    pub async fn drop_index(&self, name: &str) -> Result<()> {
391        info!("Dropping index '{}'", name);
392
393        let idx_def = self
394            .schema_manager
395            .get_index(name)
396            .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
397
398        let label = idx_def.label();
399
400        if let Some(label_meta) = self.schema_manager.schema().labels.get(label) {
401            let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
402            if let Ok(_lance_ds) = ds_wrapper.open_raw().await {
403                // Attempt physical drop
404                // if let Err(e) = lance_ds.drop_index(name).await {
405                //     warn!("Failed to drop physical index (might not exist): {}", e);
406                // }
407                warn!(
408                    "Physical index drop not supported by current Lance version, removing from schema only."
409                );
410            }
411        }
412
413        self.schema_manager.remove_index(name)?;
414        self.schema_manager.save().await?;
415        Ok(())
416    }
417
418    #[instrument(skip(self), level = "info")]
419    pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
420        info!("Rebuilding all indexes for label '{}'", label);
421        let schema = self.schema_manager.schema();
422
423        // Clone definitions to avoid holding lock while async awaiting
424        let indexes = schema.indexes.clone();
425
426        for index in indexes {
427            match index {
428                IndexDefinition::Vector(cfg) => {
429                    if cfg.label == label {
430                        self.create_vector_index(cfg).await?;
431                    }
432                }
433                IndexDefinition::Scalar(cfg) => {
434                    if cfg.label == label {
435                        self.create_scalar_index(cfg).await?;
436                    }
437                }
438                IndexDefinition::FullText(cfg) => {
439                    if cfg.label == label {
440                        self.create_fts_index(cfg).await?;
441                    }
442                }
443                IndexDefinition::Inverted(cfg) => {
444                    if cfg.label == label {
445                        self.create_inverted_index(cfg).await?;
446                    }
447                }
448                IndexDefinition::JsonFullText(cfg) => {
449                    if cfg.label == label {
450                        self.create_json_fts_index(cfg).await?;
451                    }
452                }
453                _ => {
454                    log::warn!("Unknown index type encountered during rebuild, skipping");
455                }
456            }
457        }
458        Ok(())
459    }
460
461    /// Create composite index for unique constraint
462    pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
463        let schema = self.schema_manager.schema();
464        let label_meta = schema
465            .labels
466            .get(label)
467            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
468
469        // Lance supports multi-column indexes
470        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
471
472        // We need to verify dataset exists
473        if let Ok(mut ds) = ds_wrapper.open_raw().await {
474            // Create composite BTree index
475            let index_name = format!("{}_{}_composite", label, properties.join("_"));
476
477            // Convert properties to slice of &str
478            let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
479
480            if let Err(e) = ds
481                .create_index(
482                    &columns,
483                    IndexType::Scalar,
484                    Some(index_name.clone()),
485                    &ScalarIndexParams::default(),
486                    true,
487                )
488                .await
489            {
490                warn!("Failed to create physical composite index: {}", e);
491            }
492
493            let config = ScalarIndexConfig {
494                name: index_name,
495                label: label.to_string(),
496                properties: properties.to_vec(),
497                index_type: uni_common::core::schema::ScalarIndexType::BTree,
498                where_clause: None,
499            };
500
501            self.schema_manager
502                .add_index(IndexDefinition::Scalar(config))?;
503            self.schema_manager.save().await?;
504        }
505
506        Ok(())
507    }
508
509    /// Lookup by composite key
510    pub async fn composite_lookup(
511        &self,
512        label: &str,
513        key_values: &HashMap<String, Value>,
514    ) -> Result<Option<Vid>> {
515        use lancedb::query::{ExecutableQuery, QueryBase, Select};
516
517        let schema = self.schema_manager.schema();
518        let label_meta = schema
519            .labels
520            .get(label)
521            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
522
523        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
524        let table = match ds_wrapper.open_lancedb(&self.lancedb_store).await {
525            Ok(t) => t,
526            Err(_) => return Ok(None),
527        };
528
529        // Build filter from key values
530        let filter = key_values
531            .iter()
532            .map(|(k, v)| {
533                // Issue #8: Validate column name to prevent SQL injection
534                if !is_valid_column_name(k) {
535                    anyhow::bail!("Invalid column name '{}': must contain only alphanumeric characters and underscores", k);
536                }
537
538                let val_str = match v {
539                    Value::String(s) => format!("'{}'", s.replace('\'', "''")),
540                    Value::Number(n) => n.to_string(),
541                    Value::Bool(b) => b.to_string(),
542                    Value::Null => "null".to_string(),
543                    _ => v.to_string(),
544                };
545                // Quote column name for case sensitivity
546                Ok(format!("\"{}\" = {}", k, val_str))
547            })
548            .collect::<Result<Vec<_>>>()?
549            .join(" AND ");
550
551        let query = table
552            .query()
553            .only_if(&filter)
554            .limit(1)
555            .select(Select::Columns(vec!["_vid".to_string()]));
556
557        let stream = match query.execute().await {
558            Ok(s) => s,
559            Err(_) => return Ok(None),
560        };
561
562        let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await.unwrap_or_default();
563        for batch in batches {
564            if batch.num_rows() > 0 {
565                let vid_col = batch
566                    .column_by_name("_vid")
567                    .ok_or_else(|| anyhow!("Missing _vid column"))?
568                    .as_any()
569                    .downcast_ref::<UInt64Array>()
570                    .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
571
572                let vid = vid_col.value(0);
573                return Ok(Some(Vid::from(vid)));
574            }
575        }
576
577        Ok(None)
578    }
579
580    /// Applies incremental updates to an inverted index.
581    ///
582    /// Instead of rebuilding the entire index, this method updates only the
583    /// changed entries, making it much faster for small mutations.
584    ///
585    /// # Errors
586    ///
587    /// Returns an error if the index doesn't exist or the update fails.
588    #[instrument(skip(self, added, removed), level = "info", fields(
589        label = %config.label,
590        property = %config.property
591    ))]
592    pub async fn update_inverted_index_incremental(
593        &self,
594        config: &InvertedIndexConfig,
595        added: &HashMap<Vid, Vec<String>>,
596        removed: &HashSet<Vid>,
597    ) -> Result<()> {
598        info!(
599            added = added.len(),
600            removed = removed.len(),
601            "Incrementally updating inverted index"
602        );
603
604        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
605        index.apply_incremental_updates(added, removed).await
606    }
607}