Skip to main content

uni_store/storage/
index_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::storage::inverted_index::InvertedIndex;
5use crate::storage::vertex::VertexDataset;
6use anyhow::{Result, anyhow};
7use arrow_array::UInt64Array;
8use chrono::{DateTime, Utc};
9use futures::TryStreamExt;
10use lance::index::vector::VectorIndexParams;
11use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
12use lance_index::vector::hnsw::builder::HnswBuildParams;
13use lance_index::vector::ivf::IvfBuildParams;
14use lance_index::vector::pq::PQBuildParams;
15use lance_index::vector::sq::builder::SQBuildParams;
16use lance_index::{DatasetIndexExt, IndexType};
17use lance_linalg::distance::MetricType;
18use serde::{Deserialize, Serialize};
19use serde_json::Value;
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22use tracing::{info, instrument, warn};
23use uni_common::core::id::Vid;
24use uni_common::core::schema::{
25    DistanceMetric, FullTextIndexConfig, IndexDefinition, InvertedIndexConfig, JsonFtsIndexConfig,
26    ScalarIndexConfig, SchemaManager, VectorIndexConfig, VectorIndexType,
27};
28
29/// Validates that a column name contains only safe characters to prevent SQL injection.
30///
31/// Issue #8: Column names must be sanitized before interpolation in SQL queries.
32/// Allows only alphanumeric characters and underscores.
33fn is_valid_column_name(name: &str) -> bool {
34    !name.is_empty() && name.chars().all(|c| c.is_alphanumeric() || c == '_')
35}
36
37/// Status of an index rebuild task.
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub enum IndexRebuildStatus {
40    /// Task is waiting to be processed.
41    Pending,
42    /// Task is currently being processed.
43    InProgress,
44    /// Task completed successfully.
45    Completed,
46    /// Task failed with an error.
47    Failed,
48}
49
50/// A task representing an index rebuild operation.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct IndexRebuildTask {
53    /// Unique identifier for this task.
54    pub id: String,
55    /// The label for which indexes are being rebuilt.
56    pub label: String,
57    /// Current status of the task.
58    pub status: IndexRebuildStatus,
59    /// When the task was created.
60    pub created_at: DateTime<Utc>,
61    /// When the task started processing.
62    pub started_at: Option<DateTime<Utc>>,
63    /// When the task completed (successfully or with failure).
64    pub completed_at: Option<DateTime<Utc>>,
65    /// Error message if the task failed.
66    pub error: Option<String>,
67    /// Number of retry attempts.
68    pub retry_count: u32,
69}
70
71pub struct IndexManager {
72    base_uri: String,
73    schema_manager: Arc<SchemaManager>,
74    lancedb_store: Arc<crate::lancedb::LanceDbStore>,
75}
76
77impl IndexManager {
78    pub fn new(
79        base_uri: &str,
80        schema_manager: Arc<SchemaManager>,
81        lancedb_store: Arc<crate::lancedb::LanceDbStore>,
82    ) -> Self {
83        Self {
84            base_uri: base_uri.to_string(),
85            schema_manager,
86            lancedb_store,
87        }
88    }
89
90    #[instrument(skip(self), level = "info")]
91    pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
92        let label = &config.label;
93        let property = &config.property;
94        info!(
95            "Creating Inverted Index '{}' on {}.{}",
96            config.name, label, property
97        );
98
99        let schema = self.schema_manager.schema();
100        let label_meta = schema
101            .labels
102            .get(label)
103            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
104
105        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
106        index.create_if_missing().await?;
107
108        let ds = VertexDataset::new(&self.base_uri, label, label_meta.id);
109
110        // Check if dataset exists
111        if ds.open_raw().await.is_ok() {
112            index
113                .build_from_dataset(&ds, |n| info!("Indexed {} terms", n))
114                .await?;
115        } else {
116            warn!(
117                "Dataset for label '{}' not found, creating empty inverted index",
118                label
119            );
120        }
121
122        self.schema_manager
123            .add_index(IndexDefinition::Inverted(config))?;
124        self.schema_manager.save().await?;
125
126        Ok(())
127    }
128
129    #[instrument(skip(self), level = "info")]
130    pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
131        let label = &config.label;
132        let property = &config.property;
133        info!(
134            "Creating vector index '{}' on {}.{}",
135            config.name, label, property
136        );
137
138        let schema = self.schema_manager.schema();
139        let label_meta = schema
140            .labels
141            .get(label)
142            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
143
144        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
145
146        match ds_wrapper.open_raw().await {
147            Ok(mut lance_ds) => {
148                let metric_type = match config.metric {
149                    DistanceMetric::L2 => MetricType::L2,
150                    DistanceMetric::Cosine => MetricType::Cosine,
151                    DistanceMetric::Dot => MetricType::Dot,
152                    _ => return Err(anyhow!("Unsupported metric: {:?}", config.metric)),
153                };
154
155                let params = match config.index_type {
156                    VectorIndexType::IvfPq {
157                        num_partitions,
158                        num_sub_vectors,
159                        bits_per_subvector,
160                    } => {
161                        let ivf = IvfBuildParams::new(num_partitions as usize);
162                        let pq = PQBuildParams::new(
163                            num_sub_vectors as usize,
164                            bits_per_subvector as usize,
165                        );
166                        VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
167                    }
168                    VectorIndexType::Hnsw {
169                        m,
170                        ef_construction,
171                        ef_search: _,
172                    } => {
173                        let ivf = IvfBuildParams::new(1);
174                        let hnsw = HnswBuildParams::default()
175                            .num_edges(m as usize)
176                            .ef_construction(ef_construction as usize);
177                        let sq = SQBuildParams::default();
178                        VectorIndexParams::with_ivf_hnsw_sq_params(metric_type, ivf, hnsw, sq)
179                    }
180                    VectorIndexType::Flat => {
181                        // Fallback to basic IVF-PQ
182                        let ivf = IvfBuildParams::new(1);
183                        let pq = PQBuildParams::default();
184                        VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
185                    }
186                    _ => {
187                        return Err(anyhow!(
188                            "Unsupported vector index type: {:?}",
189                            config.index_type
190                        ));
191                    }
192                };
193
194                // Ignore errors during creation if dataset is empty or similar, but try
195                if let Err(e) = lance_ds
196                    .create_index(
197                        &[property],
198                        IndexType::Vector,
199                        Some(config.name.clone()),
200                        &params,
201                        true,
202                    )
203                    .await
204                {
205                    warn!(
206                        "Failed to create physical vector index (dataset might be empty): {}",
207                        e
208                    );
209                }
210            }
211            Err(e) => {
212                warn!(
213                    "Dataset not found for label '{}', skipping physical index creation but saving schema definition. Error: {}",
214                    label, e
215                );
216            }
217        }
218
219        self.schema_manager
220            .add_index(IndexDefinition::Vector(config))?;
221        self.schema_manager.save().await?;
222
223        Ok(())
224    }
225
226    #[instrument(skip(self), level = "info")]
227    pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
228        let label = &config.label;
229        let properties = &config.properties;
230        info!(
231            "Creating scalar index '{}' on {}.{:?}",
232            config.name, label, properties
233        );
234
235        let schema = self.schema_manager.schema();
236        let label_meta = schema
237            .labels
238            .get(label)
239            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
240
241        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
242
243        match ds_wrapper.open_raw().await {
244            Ok(mut lance_ds) => {
245                let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
246
247                if let Err(e) = lance_ds
248                    .create_index(
249                        &columns,
250                        IndexType::Scalar,
251                        Some(config.name.clone()),
252                        &ScalarIndexParams::default(),
253                        true,
254                    )
255                    .await
256                {
257                    warn!(
258                        "Failed to create physical scalar index (dataset might be empty): {}",
259                        e
260                    );
261                }
262            }
263            Err(e) => {
264                warn!(
265                    "Dataset not found for label '{}' (scalar index), skipping physical creation. Error: {}",
266                    label, e
267                );
268            }
269        }
270
271        self.schema_manager
272            .add_index(IndexDefinition::Scalar(config))?;
273        self.schema_manager.save().await?;
274
275        Ok(())
276    }
277
278    #[instrument(skip(self), level = "info")]
279    pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
280        let label = &config.label;
281        info!(
282            "Creating FTS index '{}' on {}.{:?}",
283            config.name, label, config.properties
284        );
285
286        let schema = self.schema_manager.schema();
287        let label_meta = schema
288            .labels
289            .get(label)
290            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
291
292        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
293
294        match ds_wrapper.open_raw().await {
295            Ok(mut lance_ds) => {
296                let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
297
298                let fts_params =
299                    InvertedIndexParams::default().with_position(config.with_positions);
300
301                if let Err(e) = lance_ds
302                    .create_index(
303                        &columns,
304                        IndexType::Inverted,
305                        Some(config.name.clone()),
306                        &fts_params,
307                        true,
308                    )
309                    .await
310                {
311                    warn!(
312                        "Failed to create physical FTS index (dataset might be empty): {}",
313                        e
314                    );
315                }
316            }
317            Err(e) => {
318                warn!(
319                    "Dataset not found for label '{}' (FTS index), skipping physical creation. Error: {}",
320                    label, e
321                );
322            }
323        }
324
325        self.schema_manager
326            .add_index(IndexDefinition::FullText(config))?;
327        self.schema_manager.save().await?;
328
329        Ok(())
330    }
331
332    /// Creates a JSON Full-Text Search index on a column.
333    ///
334    /// This creates a Lance inverted index on the specified column,
335    /// enabling BM25-based full-text search with optional phrase matching.
336    #[instrument(skip(self), level = "info")]
337    pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
338        let label = &config.label;
339        let column = &config.column;
340        info!(
341            "Creating JSON FTS index '{}' on {}.{}",
342            config.name, label, column
343        );
344
345        let schema = self.schema_manager.schema();
346        let label_meta = schema
347            .labels
348            .get(label)
349            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
350
351        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
352
353        match ds_wrapper.open_raw().await {
354            Ok(mut lance_ds) => {
355                let fts_params =
356                    InvertedIndexParams::default().with_position(config.with_positions);
357
358                if let Err(e) = lance_ds
359                    .create_index(
360                        &[column],
361                        IndexType::Inverted,
362                        Some(config.name.clone()),
363                        &fts_params,
364                        true,
365                    )
366                    .await
367                {
368                    warn!(
369                        "Failed to create physical JSON FTS index (dataset might be empty): {}",
370                        e
371                    );
372                }
373            }
374            Err(e) => {
375                warn!(
376                    "Dataset not found for label '{}' (JSON FTS index), skipping physical creation. Error: {}",
377                    label, e
378                );
379            }
380        }
381
382        self.schema_manager
383            .add_index(IndexDefinition::JsonFullText(config))?;
384        self.schema_manager.save().await?;
385
386        Ok(())
387    }
388
389    #[instrument(skip(self), level = "info")]
390    pub async fn drop_index(&self, name: &str) -> Result<()> {
391        info!("Dropping index '{}'", name);
392
393        // Verify the index exists before removing
394        let _idx_def = self
395            .schema_manager
396            .get_index(name)
397            .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
398
399        // Physical index drop is not supported by the current Lance version,
400        // so we only remove the definition from the schema.
401        warn!("Physical index drop not yet supported, removing from schema only.");
402
403        self.schema_manager.remove_index(name)?;
404        self.schema_manager.save().await?;
405        Ok(())
406    }
407
408    #[instrument(skip(self), level = "info")]
409    pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
410        info!("Rebuilding all indexes for label '{}'", label);
411        let schema = self.schema_manager.schema();
412
413        // Clone and filter to avoid holding lock while async awaiting
414        let indexes: Vec<_> = schema
415            .indexes
416            .iter()
417            .filter(|idx| idx.label() == label)
418            .cloned()
419            .collect();
420
421        for index in indexes {
422            match index {
423                IndexDefinition::Vector(cfg) => self.create_vector_index(cfg).await?,
424                IndexDefinition::Scalar(cfg) => self.create_scalar_index(cfg).await?,
425                IndexDefinition::FullText(cfg) => self.create_fts_index(cfg).await?,
426                IndexDefinition::Inverted(cfg) => self.create_inverted_index(cfg).await?,
427                IndexDefinition::JsonFullText(cfg) => self.create_json_fts_index(cfg).await?,
428                _ => warn!("Unknown index type encountered during rebuild, skipping"),
429            }
430        }
431        Ok(())
432    }
433
434    /// Create composite index for unique constraint
435    pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
436        let schema = self.schema_manager.schema();
437        let label_meta = schema
438            .labels
439            .get(label)
440            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
441
442        // Lance supports multi-column indexes
443        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
444
445        // We need to verify dataset exists
446        if let Ok(mut ds) = ds_wrapper.open_raw().await {
447            // Create composite BTree index
448            let index_name = format!("{}_{}_composite", label, properties.join("_"));
449
450            // Convert properties to slice of &str
451            let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
452
453            if let Err(e) = ds
454                .create_index(
455                    &columns,
456                    IndexType::Scalar,
457                    Some(index_name.clone()),
458                    &ScalarIndexParams::default(),
459                    true,
460                )
461                .await
462            {
463                warn!("Failed to create physical composite index: {}", e);
464            }
465
466            let config = ScalarIndexConfig {
467                name: index_name,
468                label: label.to_string(),
469                properties: properties.to_vec(),
470                index_type: uni_common::core::schema::ScalarIndexType::BTree,
471                where_clause: None,
472                metadata: Default::default(),
473            };
474
475            self.schema_manager
476                .add_index(IndexDefinition::Scalar(config))?;
477            self.schema_manager.save().await?;
478        }
479
480        Ok(())
481    }
482
483    /// Lookup by composite key
484    pub async fn composite_lookup(
485        &self,
486        label: &str,
487        key_values: &HashMap<String, Value>,
488    ) -> Result<Option<Vid>> {
489        use lancedb::query::{ExecutableQuery, QueryBase, Select};
490
491        let schema = self.schema_manager.schema();
492        let label_meta = schema
493            .labels
494            .get(label)
495            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
496
497        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
498        let table = match ds_wrapper.open_lancedb(&self.lancedb_store).await {
499            Ok(t) => t,
500            Err(_) => return Ok(None),
501        };
502
503        // Build filter from key values
504        let filter = key_values
505            .iter()
506            .map(|(k, v)| {
507                // Issue #8: Validate column name to prevent SQL injection
508                if !is_valid_column_name(k) {
509                    anyhow::bail!("Invalid column name '{}': must contain only alphanumeric characters and underscores", k);
510                }
511
512                let val_str = match v {
513                    Value::String(s) => format!("'{}'", s.replace('\'', "''")),
514                    Value::Number(n) => n.to_string(),
515                    Value::Bool(b) => b.to_string(),
516                    Value::Null => "null".to_string(),
517                    _ => v.to_string(),
518                };
519                // Quote column name for case sensitivity
520                Ok(format!("\"{}\" = {}", k, val_str))
521            })
522            .collect::<Result<Vec<_>>>()?
523            .join(" AND ");
524
525        let query = table
526            .query()
527            .only_if(&filter)
528            .limit(1)
529            .select(Select::Columns(vec!["_vid".to_string()]));
530
531        let stream = match query.execute().await {
532            Ok(s) => s,
533            Err(_) => return Ok(None),
534        };
535
536        let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await.unwrap_or_default();
537        for batch in batches {
538            if batch.num_rows() > 0 {
539                let vid_col = batch
540                    .column_by_name("_vid")
541                    .ok_or_else(|| anyhow!("Missing _vid column"))?
542                    .as_any()
543                    .downcast_ref::<UInt64Array>()
544                    .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
545
546                let vid = vid_col.value(0);
547                return Ok(Some(Vid::from(vid)));
548            }
549        }
550
551        Ok(None)
552    }
553
554    /// Applies incremental updates to an inverted index.
555    ///
556    /// Instead of rebuilding the entire index, this method updates only the
557    /// changed entries, making it much faster for small mutations.
558    ///
559    /// # Errors
560    ///
561    /// Returns an error if the index doesn't exist or the update fails.
562    #[instrument(skip(self, added, removed), level = "info", fields(
563        label = %config.label,
564        property = %config.property
565    ))]
566    pub async fn update_inverted_index_incremental(
567        &self,
568        config: &InvertedIndexConfig,
569        added: &HashMap<Vid, Vec<String>>,
570        removed: &HashSet<Vid>,
571    ) -> Result<()> {
572        info!(
573            added = added.len(),
574            removed = removed.len(),
575            "Incrementally updating inverted index"
576        );
577
578        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
579        index.apply_incremental_updates(added, removed).await
580    }
581}