Skip to main content

uni_store/storage/
index_manager.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Index lifecycle management: creation, rebuild, and incremental updates for all index types.
5
6#[cfg(feature = "lance-backend")]
7use crate::storage::inverted_index::InvertedIndex;
8use crate::storage::vertex::VertexDataset;
9use anyhow::{Result, anyhow};
10use arrow_array::UInt64Array;
11use chrono::{DateTime, Utc};
12#[cfg(feature = "lance-backend")]
13use lance::index::vector::VectorIndexParams;
14#[cfg(feature = "lance-backend")]
15use lance_index::progress::IndexBuildProgress;
16#[cfg(feature = "lance-backend")]
17use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
18#[cfg(feature = "lance-backend")]
19use lance_index::vector::bq::RQBuildParams;
20#[cfg(feature = "lance-backend")]
21use lance_index::vector::hnsw::builder::HnswBuildParams;
22#[cfg(feature = "lance-backend")]
23use lance_index::vector::ivf::IvfBuildParams;
24#[cfg(feature = "lance-backend")]
25use lance_index::vector::pq::PQBuildParams;
26#[cfg(feature = "lance-backend")]
27use lance_index::vector::sq::builder::SQBuildParams;
28#[cfg(feature = "lance-backend")]
29use lance_index::{DatasetIndexExt, IndexType};
30#[cfg(feature = "lance-backend")]
31use lance_linalg::distance::MetricType;
32use serde::{Deserialize, Serialize};
33use serde_json::Value;
34use std::collections::HashMap;
35#[cfg(feature = "lance-backend")]
36use std::collections::HashSet;
37use std::sync::Arc;
38#[cfg(feature = "lance-backend")]
39use tracing::{debug, info, instrument, warn};
40use uni_common::core::id::Vid;
41#[cfg(feature = "lance-backend")]
42use uni_common::core::schema::IndexDefinition;
43use uni_common::core::schema::SchemaManager;
44#[cfg(feature = "lance-backend")]
45use uni_common::core::schema::{
46    DistanceMetric, FullTextIndexConfig, InvertedIndexConfig, JsonFtsIndexConfig,
47    ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, VectorIndexType,
48};
49
50/// Validates that a column name contains only safe characters to prevent SQL injection.
51///
52/// Issue #8: Column names must be sanitized before interpolation in SQL queries.
53/// Allows only alphanumeric characters and underscores.
54fn is_valid_column_name(name: &str) -> bool {
55    !name.is_empty() && name.chars().all(|c| c.is_alphanumeric() || c == '_')
56}
57
58/// Tracing-based progress reporter for Lance index builds.
59///
60/// Emits structured log events at each stage boundary, enabling
61/// observability into index build duration and progress.
62#[cfg(feature = "lance-backend")]
63#[derive(Debug)]
64pub struct TracingIndexProgress {
65    index_name: String,
66}
67
68#[cfg(feature = "lance-backend")]
69impl TracingIndexProgress {
70    pub fn arc(index_name: &str) -> Arc<dyn IndexBuildProgress> {
71        Arc::new(Self {
72            index_name: index_name.to_string(),
73        })
74    }
75}
76
77#[cfg(feature = "lance-backend")]
78#[async_trait::async_trait]
79impl IndexBuildProgress for TracingIndexProgress {
80    async fn stage_start(&self, stage: &str, total: Option<u64>, unit: &str) -> lance::Result<()> {
81        info!(
82            index = %self.index_name,
83            stage,
84            ?total,
85            unit,
86            "Index build stage started"
87        );
88        Ok(())
89    }
90
91    async fn stage_progress(&self, stage: &str, completed: u64) -> lance::Result<()> {
92        debug!(
93            index = %self.index_name,
94            stage,
95            completed,
96            "Index build progress"
97        );
98        Ok(())
99    }
100
101    async fn stage_complete(&self, stage: &str) -> lance::Result<()> {
102        info!(
103            index = %self.index_name,
104            stage,
105            "Index build stage complete"
106        );
107        Ok(())
108    }
109}
110
111/// Status of an index rebuild task.
112#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
113pub enum IndexRebuildStatus {
114    /// Task is waiting to be processed.
115    Pending,
116    /// Task is currently being processed.
117    InProgress,
118    /// Task completed successfully.
119    Completed,
120    /// Task failed with an error.
121    Failed,
122}
123
124/// A task representing an index rebuild operation.
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct IndexRebuildTask {
127    /// Unique identifier for this task.
128    pub id: String,
129    /// The label for which indexes are being rebuilt.
130    pub label: String,
131    /// Current status of the task.
132    pub status: IndexRebuildStatus,
133    /// When the task was created.
134    pub created_at: DateTime<Utc>,
135    /// When the task started processing.
136    pub started_at: Option<DateTime<Utc>>,
137    /// When the task completed (successfully or with failure).
138    pub completed_at: Option<DateTime<Utc>>,
139    /// Error message if the task failed.
140    pub error: Option<String>,
141    /// Number of retry attempts.
142    pub retry_count: u32,
143}
144
145/// Manages physical and logical indexes across all vertex datasets.
146pub struct IndexManager {
147    base_uri: String,
148    schema_manager: Arc<SchemaManager>,
149    backend: Arc<dyn crate::backend::StorageBackend>,
150}
151
152impl std::fmt::Debug for IndexManager {
153    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154        f.debug_struct("IndexManager")
155            .field("base_uri", &self.base_uri)
156            .finish_non_exhaustive()
157    }
158}
159
160impl IndexManager {
161    /// Create a new `IndexManager` bound to `base_uri` and the given schema and backend.
162    pub fn new(
163        base_uri: &str,
164        schema_manager: Arc<SchemaManager>,
165        backend: Arc<dyn crate::backend::StorageBackend>,
166    ) -> Self {
167        Self {
168            base_uri: base_uri.to_string(),
169            schema_manager,
170            backend,
171        }
172    }
173
174    /// Build and persist an inverted index for set-membership queries.
175    #[cfg(feature = "lance-backend")]
176    #[instrument(skip(self), level = "info")]
177    pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
178        let label = &config.label;
179        let property = &config.property;
180        info!(
181            "Creating Inverted Index '{}' on {}.{}",
182            config.name, label, property
183        );
184
185        let schema = self.schema_manager.schema();
186        let label_meta = schema
187            .labels
188            .get(label)
189            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
190
191        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
192
193        let ds = VertexDataset::new(&self.base_uri, label, label_meta.id);
194
195        // Check if dataset exists
196        if ds.open_raw().await.is_ok() {
197            index
198                .build_from_dataset(&ds, |n| info!("Indexed {} terms", n))
199                .await?;
200        } else {
201            warn!(
202                "Dataset for label '{}' not found, creating empty inverted index",
203                label
204            );
205        }
206
207        self.schema_manager
208            .add_index(IndexDefinition::Inverted(config))?;
209        self.schema_manager.save().await?;
210
211        Ok(())
212    }
213
214    /// Build and persist a vector (ANN) index on an embedding column.
215    #[cfg(feature = "lance-backend")]
216    #[instrument(skip(self), level = "info")]
217    pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
218        let label = &config.label;
219        let property = &config.property;
220        info!(
221            "Creating vector index '{}' on {}.{}",
222            config.name, label, property
223        );
224
225        let schema = self.schema_manager.schema();
226        let label_meta = schema
227            .labels
228            .get(label)
229            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
230
231        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
232
233        match ds_wrapper.open_raw().await {
234            Ok(mut lance_ds) => {
235                let metric_type = match config.metric {
236                    DistanceMetric::L2 => MetricType::L2,
237                    DistanceMetric::Cosine => MetricType::Cosine,
238                    DistanceMetric::Dot => MetricType::Dot,
239                    _ => return Err(anyhow!("Unsupported metric: {:?}", config.metric)),
240                };
241
242                let params = match config.index_type {
243                    VectorIndexType::Flat => {
244                        let ivf = IvfBuildParams::new(1);
245                        VectorIndexParams::with_ivf_flat_params(metric_type, ivf)
246                    }
247                    VectorIndexType::IvfFlat { num_partitions } => {
248                        let ivf = IvfBuildParams::new(num_partitions as usize);
249                        VectorIndexParams::with_ivf_flat_params(metric_type, ivf)
250                    }
251                    VectorIndexType::IvfPq {
252                        num_partitions,
253                        num_sub_vectors,
254                        bits_per_subvector,
255                    } => {
256                        let ivf = IvfBuildParams::new(num_partitions as usize);
257                        let pq = PQBuildParams::new(
258                            num_sub_vectors as usize,
259                            bits_per_subvector as usize,
260                        );
261                        VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
262                    }
263                    VectorIndexType::IvfSq { num_partitions } => {
264                        let ivf = IvfBuildParams::new(num_partitions as usize);
265                        let sq = SQBuildParams::default();
266                        VectorIndexParams::with_ivf_sq_params(metric_type, ivf, sq)
267                    }
268                    VectorIndexType::IvfRq {
269                        num_partitions,
270                        num_bits,
271                    } => {
272                        let ivf = IvfBuildParams::new(num_partitions as usize);
273                        let mut rq = RQBuildParams::default();
274                        if let Some(bits) = num_bits {
275                            rq.num_bits = bits;
276                        }
277                        VectorIndexParams::with_ivf_rq_params(metric_type, ivf, rq)
278                    }
279                    VectorIndexType::HnswFlat {
280                        m,
281                        ef_construction,
282                        num_partitions,
283                    } => {
284                        let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
285                        let hnsw = HnswBuildParams::default()
286                            .num_edges(m as usize)
287                            .ef_construction(ef_construction as usize);
288                        VectorIndexParams::ivf_hnsw(metric_type, ivf, hnsw)
289                    }
290                    VectorIndexType::HnswSq {
291                        m,
292                        ef_construction,
293                        num_partitions,
294                    } => {
295                        let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
296                        let hnsw = HnswBuildParams::default()
297                            .num_edges(m as usize)
298                            .ef_construction(ef_construction as usize);
299                        let sq = SQBuildParams::default();
300                        VectorIndexParams::with_ivf_hnsw_sq_params(metric_type, ivf, hnsw, sq)
301                    }
302                    VectorIndexType::HnswPq {
303                        m,
304                        ef_construction,
305                        num_sub_vectors,
306                        num_partitions,
307                    } => {
308                        let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
309                        let hnsw = HnswBuildParams::default()
310                            .num_edges(m as usize)
311                            .ef_construction(ef_construction as usize);
312                        let pq = PQBuildParams::new(num_sub_vectors as usize, 8);
313                        VectorIndexParams::with_ivf_hnsw_pq_params(metric_type, ivf, hnsw, pq)
314                    }
315                    _ => {
316                        return Err(anyhow!(
317                            "Unsupported vector index type: {:?}",
318                            config.index_type
319                        ));
320                    }
321                };
322
323                // Ignore errors during creation if dataset is empty or similar, but try
324                let progress = TracingIndexProgress::arc(&config.name);
325                match lance_ds
326                    .create_index_builder(&[property], IndexType::Vector, &params)
327                    .name(config.name.clone())
328                    .replace(true)
329                    .progress(progress)
330                    .await
331                {
332                    Ok(metadata) => {
333                        info!(
334                            index_name = %metadata.name,
335                            index_uuid = %metadata.uuid,
336                            dataset_version = metadata.dataset_version,
337                            "Vector index created"
338                        );
339                    }
340                    Err(e) => {
341                        warn!(
342                            "Failed to create physical vector index (dataset might be empty): {}",
343                            e
344                        );
345                    }
346                }
347            }
348            Err(e) => {
349                warn!(
350                    "Dataset not found for label '{}', skipping physical index creation but saving schema definition. Error: {}",
351                    label, e
352                );
353            }
354        }
355
356        self.schema_manager
357            .add_index(IndexDefinition::Vector(config))?;
358        self.schema_manager.save().await?;
359
360        Ok(())
361    }
362
363    /// Build and persist a scalar (BTree) index for exact-match and range queries.
364    #[cfg(feature = "lance-backend")]
365    #[instrument(skip(self), level = "info")]
366    pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
367        let label = &config.label;
368        let properties = &config.properties;
369        info!(
370            "Creating scalar index '{}' on {}.{:?}",
371            config.name, label, properties
372        );
373
374        let schema = self.schema_manager.schema();
375        let label_meta = schema
376            .labels
377            .get(label)
378            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
379
380        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
381
382        match ds_wrapper.open_raw().await {
383            Ok(mut lance_ds) => {
384                let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
385
386                let progress = TracingIndexProgress::arc(&config.name);
387                let scalar_params = match config.index_type {
388                    ScalarIndexType::Bitmap => {
389                        ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap)
390                    }
391                    ScalarIndexType::LabelList => {
392                        ScalarIndexParams::for_builtin(BuiltinIndexType::LabelList)
393                    }
394                    _ => ScalarIndexParams::default(),
395                };
396                match lance_ds
397                    .create_index_builder(&columns, IndexType::Scalar, &scalar_params)
398                    .name(config.name.clone())
399                    .replace(true)
400                    .progress(progress)
401                    .await
402                {
403                    Ok(metadata) => {
404                        info!(
405                            index_name = %metadata.name,
406                            index_uuid = %metadata.uuid,
407                            dataset_version = metadata.dataset_version,
408                            "Scalar index created"
409                        );
410                    }
411                    Err(e) => {
412                        warn!(
413                            "Failed to create physical scalar index (dataset might be empty): {}",
414                            e
415                        );
416                    }
417                }
418            }
419            Err(e) => {
420                warn!(
421                    "Dataset not found for label '{}' (scalar index), skipping physical creation. Error: {}",
422                    label, e
423                );
424            }
425        }
426
427        self.schema_manager
428            .add_index(IndexDefinition::Scalar(config))?;
429        self.schema_manager.save().await?;
430
431        Ok(())
432    }
433
434    /// Build and persist a full-text search (Lance inverted) index.
435    #[cfg(feature = "lance-backend")]
436    #[instrument(skip(self), level = "info")]
437    pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
438        let label = &config.label;
439        info!(
440            "Creating FTS index '{}' on {}.{:?}",
441            config.name, label, config.properties
442        );
443
444        let schema = self.schema_manager.schema();
445        let label_meta = schema
446            .labels
447            .get(label)
448            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
449
450        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
451
452        match ds_wrapper.open_raw().await {
453            Ok(mut lance_ds) => {
454                let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
455
456                let fts_params =
457                    InvertedIndexParams::default().with_position(config.with_positions);
458
459                let progress = TracingIndexProgress::arc(&config.name);
460                match lance_ds
461                    .create_index_builder(&columns, IndexType::Inverted, &fts_params)
462                    .name(config.name.clone())
463                    .replace(true)
464                    .progress(progress)
465                    .await
466                {
467                    Ok(metadata) => {
468                        info!(
469                            index_name = %metadata.name,
470                            index_uuid = %metadata.uuid,
471                            dataset_version = metadata.dataset_version,
472                            "FTS index created"
473                        );
474                    }
475                    Err(e) => {
476                        warn!(
477                            "Failed to create physical FTS index (dataset might be empty): {}",
478                            e
479                        );
480                    }
481                }
482            }
483            Err(e) => {
484                warn!(
485                    "Dataset not found for label '{}' (FTS index), skipping physical creation. Error: {}",
486                    label, e
487                );
488            }
489        }
490
491        self.schema_manager
492            .add_index(IndexDefinition::FullText(config))?;
493        self.schema_manager.save().await?;
494
495        Ok(())
496    }
497
498    /// Creates a JSON Full-Text Search index on a column.
499    ///
500    /// This creates a Lance inverted index on the specified column,
501    /// enabling BM25-based full-text search with optional phrase matching.
502    #[cfg(feature = "lance-backend")]
503    #[instrument(skip(self), level = "info")]
504    pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
505        let label = &config.label;
506        let column = &config.column;
507        info!(
508            "Creating JSON FTS index '{}' on {}.{}",
509            config.name, label, column
510        );
511
512        let schema = self.schema_manager.schema();
513        let label_meta = schema
514            .labels
515            .get(label)
516            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
517
518        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
519
520        match ds_wrapper.open_raw().await {
521            Ok(mut lance_ds) => {
522                let fts_params =
523                    InvertedIndexParams::default().with_position(config.with_positions);
524
525                let progress = TracingIndexProgress::arc(&config.name);
526                match lance_ds
527                    .create_index_builder(&[column.as_str()], IndexType::Inverted, &fts_params)
528                    .name(config.name.clone())
529                    .replace(true)
530                    .progress(progress)
531                    .await
532                {
533                    Ok(metadata) => {
534                        info!(
535                            index_name = %metadata.name,
536                            index_uuid = %metadata.uuid,
537                            dataset_version = metadata.dataset_version,
538                            "JSON FTS index created"
539                        );
540                    }
541                    Err(e) => {
542                        warn!(
543                            "Failed to create physical JSON FTS index (dataset might be empty): {}",
544                            e
545                        );
546                    }
547                }
548            }
549            Err(e) => {
550                warn!(
551                    "Dataset not found for label '{}' (JSON FTS index), skipping physical creation. Error: {}",
552                    label, e
553                );
554            }
555        }
556
557        self.schema_manager
558            .add_index(IndexDefinition::JsonFullText(config))?;
559        self.schema_manager.save().await?;
560
561        Ok(())
562    }
563
564    /// Remove an index both physically from the Lance dataset and from the schema.
565    #[cfg(feature = "lance-backend")]
566    #[instrument(skip(self), level = "info")]
567    pub async fn drop_index(&self, name: &str) -> Result<()> {
568        info!("Dropping index '{}'", name);
569
570        let idx_def = self
571            .schema_manager
572            .get_index(name)
573            .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
574
575        // Attempt physical index drop on the underlying Lance dataset.
576        let label = idx_def.label();
577        let schema = self.schema_manager.schema();
578        if let Some(label_meta) = schema.labels.get(label) {
579            let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
580            match ds_wrapper.open_raw().await {
581                Ok(mut lance_ds) => {
582                    if let Err(e) = lance_ds.drop_index(name).await {
583                        // Log but don't fail — the index may never have been
584                        // physically built (e.g. empty dataset at creation time).
585                        warn!(
586                            "Physical index drop for '{}' returned error (non-fatal): {}",
587                            name, e
588                        );
589                    } else {
590                        info!("Physical index '{}' dropped from Lance dataset", name);
591                    }
592                }
593                Err(e) => {
594                    debug!(
595                        "Could not open dataset for label '{}' to drop physical index: {}",
596                        label, e
597                    );
598                }
599            }
600        }
601
602        self.schema_manager.remove_index(name)?;
603        self.schema_manager.save().await?;
604        Ok(())
605    }
606
607    /// Rebuild all indexes registered for `label` from scratch.
608    #[cfg(feature = "lance-backend")]
609    #[instrument(skip(self), level = "info")]
610    pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
611        info!("Rebuilding all indexes for label '{}'", label);
612        let schema = self.schema_manager.schema();
613
614        // Clone and filter to avoid holding lock while async awaiting
615        let indexes: Vec<_> = schema
616            .indexes
617            .iter()
618            .filter(|idx| idx.label() == label)
619            .cloned()
620            .collect();
621
622        for index in indexes {
623            match index {
624                IndexDefinition::Vector(cfg) => self.create_vector_index(cfg).await?,
625                IndexDefinition::Scalar(cfg) => self.create_scalar_index(cfg).await?,
626                IndexDefinition::FullText(cfg) => self.create_fts_index(cfg).await?,
627                IndexDefinition::Inverted(cfg) => self.create_inverted_index(cfg).await?,
628                IndexDefinition::JsonFullText(cfg) => self.create_json_fts_index(cfg).await?,
629                _ => warn!("Unknown index type encountered during rebuild, skipping"),
630            }
631        }
632        Ok(())
633    }
634
635    /// Create composite index for unique constraint
636    #[cfg(feature = "lance-backend")]
637    pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
638        let schema = self.schema_manager.schema();
639        let label_meta = schema
640            .labels
641            .get(label)
642            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
643
644        // Lance supports multi-column indexes
645        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
646
647        // We need to verify dataset exists
648        if let Ok(mut ds) = ds_wrapper.open_raw().await {
649            // Create composite BTree index
650            let index_name = format!("{}_{}_composite", label, properties.join("_"));
651
652            // Convert properties to slice of &str
653            let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
654
655            let progress = TracingIndexProgress::arc(&index_name);
656            match ds
657                .create_index_builder(&columns, IndexType::Scalar, &ScalarIndexParams::default())
658                .name(index_name.clone())
659                .replace(true)
660                .progress(progress)
661                .await
662            {
663                Ok(metadata) => {
664                    info!(
665                        index_name = %metadata.name,
666                        index_uuid = %metadata.uuid,
667                        dataset_version = metadata.dataset_version,
668                        "Composite index created"
669                    );
670                }
671                Err(e) => {
672                    warn!("Failed to create physical composite index: {}", e);
673                }
674            }
675
676            let config = ScalarIndexConfig {
677                name: index_name,
678                label: label.to_string(),
679                properties: properties.to_vec(),
680                index_type: uni_common::core::schema::ScalarIndexType::BTree,
681                where_clause: None,
682                metadata: Default::default(),
683            };
684
685            self.schema_manager
686                .add_index(IndexDefinition::Scalar(config))?;
687            self.schema_manager.save().await?;
688        }
689
690        Ok(())
691    }
692
693    /// Lookup by composite key
694    pub async fn composite_lookup(
695        &self,
696        label: &str,
697        key_values: &HashMap<String, Value>,
698    ) -> Result<Option<Vid>> {
699        use crate::backend::types::ScanRequest;
700
701        let schema = self.schema_manager.schema();
702        let label_meta = schema
703            .labels
704            .get(label)
705            .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
706
707        let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
708        let table_name = ds_wrapper.table_name();
709        let backend = self.backend.as_ref();
710
711        if !backend.table_exists(&table_name).await.unwrap_or(false) {
712            return Ok(None);
713        }
714
715        // Build filter from key values
716        let filter = key_values
717            .iter()
718            .map(|(k, v)| {
719                // Issue #8: Validate column name to prevent SQL injection
720                if !is_valid_column_name(k) {
721                    anyhow::bail!("Invalid column name '{}': must contain only alphanumeric characters and underscores", k);
722                }
723
724                let val_str = match v {
725                    Value::String(s) => format!("'{}'", s.replace('\'', "''")),
726                    Value::Number(n) => n.to_string(),
727                    Value::Bool(b) => b.to_string(),
728                    Value::Null => "null".to_string(),
729                    _ => v.to_string(),
730                };
731                // Quote column name for case sensitivity
732                Ok(format!("\"{}\" = {}", k, val_str))
733            })
734            .collect::<Result<Vec<_>>>()?
735            .join(" AND ");
736
737        let request = ScanRequest::all(&table_name)
738            .with_filter(filter)
739            .with_limit(1)
740            .with_columns(vec!["_vid".to_string()]);
741
742        let batches = match backend.scan(request).await {
743            Ok(b) => b,
744            Err(_) => return Ok(None),
745        };
746
747        for batch in batches {
748            if batch.num_rows() > 0 {
749                let vid_col = batch
750                    .column_by_name("_vid")
751                    .ok_or_else(|| anyhow!("Missing _vid column"))?
752                    .as_any()
753                    .downcast_ref::<UInt64Array>()
754                    .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
755
756                let vid = vid_col.value(0);
757                return Ok(Some(Vid::from(vid)));
758            }
759        }
760
761        Ok(None)
762    }
763
764    /// Applies incremental updates to an inverted index.
765    ///
766    /// Instead of rebuilding the entire index, this method updates only the
767    /// changed entries, making it much faster for small mutations.
768    ///
769    /// # Errors
770    ///
771    /// Returns an error if the index doesn't exist or the update fails.
772    #[cfg(feature = "lance-backend")]
773    #[instrument(skip(self, added, removed), level = "info", fields(
774        label = %config.label,
775        property = %config.property
776    ))]
777    pub async fn update_inverted_index_incremental(
778        &self,
779        config: &InvertedIndexConfig,
780        added: &HashMap<Vid, Vec<String>>,
781        removed: &HashSet<Vid>,
782    ) -> Result<()> {
783        info!(
784            added = added.len(),
785            removed = removed.len(),
786            "Incrementally updating inverted index"
787        );
788
789        let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
790        index.apply_incremental_updates(added, removed).await
791    }
792}