1#[cfg(feature = "lance-backend")]
7use crate::storage::inverted_index::InvertedIndex;
8use crate::storage::vertex::VertexDataset;
9use anyhow::{Result, anyhow};
10use arrow_array::UInt64Array;
11use chrono::{DateTime, Utc};
12#[cfg(feature = "lance-backend")]
13use lance::index::vector::VectorIndexParams;
14#[cfg(feature = "lance-backend")]
15use lance_index::progress::IndexBuildProgress;
16#[cfg(feature = "lance-backend")]
17use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
18#[cfg(feature = "lance-backend")]
19use lance_index::vector::bq::RQBuildParams;
20#[cfg(feature = "lance-backend")]
21use lance_index::vector::hnsw::builder::HnswBuildParams;
22#[cfg(feature = "lance-backend")]
23use lance_index::vector::ivf::IvfBuildParams;
24#[cfg(feature = "lance-backend")]
25use lance_index::vector::pq::PQBuildParams;
26#[cfg(feature = "lance-backend")]
27use lance_index::vector::sq::builder::SQBuildParams;
28#[cfg(feature = "lance-backend")]
29use lance_index::{DatasetIndexExt, IndexType};
30#[cfg(feature = "lance-backend")]
31use lance_linalg::distance::MetricType;
32use serde::{Deserialize, Serialize};
33use serde_json::Value;
34use std::collections::HashMap;
35#[cfg(feature = "lance-backend")]
36use std::collections::HashSet;
37use std::sync::Arc;
38#[cfg(feature = "lance-backend")]
39use tracing::{debug, info, instrument, warn};
40use uni_common::core::id::Vid;
41#[cfg(feature = "lance-backend")]
42use uni_common::core::schema::IndexDefinition;
43use uni_common::core::schema::SchemaManager;
44#[cfg(feature = "lance-backend")]
45use uni_common::core::schema::{
46 DistanceMetric, FullTextIndexConfig, InvertedIndexConfig, JsonFtsIndexConfig,
47 ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, VectorIndexType,
48};
49
50fn is_valid_column_name(name: &str) -> bool {
55 !name.is_empty() && name.chars().all(|c| c.is_alphanumeric() || c == '_')
56}
57
58#[cfg(feature = "lance-backend")]
63#[derive(Debug)]
64pub struct TracingIndexProgress {
65 index_name: String,
66}
67
68#[cfg(feature = "lance-backend")]
69impl TracingIndexProgress {
70 pub fn arc(index_name: &str) -> Arc<dyn IndexBuildProgress> {
71 Arc::new(Self {
72 index_name: index_name.to_string(),
73 })
74 }
75}
76
77#[cfg(feature = "lance-backend")]
78#[async_trait::async_trait]
79impl IndexBuildProgress for TracingIndexProgress {
80 async fn stage_start(&self, stage: &str, total: Option<u64>, unit: &str) -> lance::Result<()> {
81 info!(
82 index = %self.index_name,
83 stage,
84 ?total,
85 unit,
86 "Index build stage started"
87 );
88 Ok(())
89 }
90
91 async fn stage_progress(&self, stage: &str, completed: u64) -> lance::Result<()> {
92 debug!(
93 index = %self.index_name,
94 stage,
95 completed,
96 "Index build progress"
97 );
98 Ok(())
99 }
100
101 async fn stage_complete(&self, stage: &str) -> lance::Result<()> {
102 info!(
103 index = %self.index_name,
104 stage,
105 "Index build stage complete"
106 );
107 Ok(())
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
113pub enum IndexRebuildStatus {
114 Pending,
116 InProgress,
118 Completed,
120 Failed,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct IndexRebuildTask {
127 pub id: String,
129 pub label: String,
131 pub status: IndexRebuildStatus,
133 pub created_at: DateTime<Utc>,
135 pub started_at: Option<DateTime<Utc>>,
137 pub completed_at: Option<DateTime<Utc>>,
139 pub error: Option<String>,
141 pub retry_count: u32,
143}
144
145pub struct IndexManager {
147 base_uri: String,
148 schema_manager: Arc<SchemaManager>,
149 backend: Arc<dyn crate::backend::StorageBackend>,
150}
151
152impl std::fmt::Debug for IndexManager {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 f.debug_struct("IndexManager")
155 .field("base_uri", &self.base_uri)
156 .finish_non_exhaustive()
157 }
158}
159
160impl IndexManager {
161 pub fn new(
163 base_uri: &str,
164 schema_manager: Arc<SchemaManager>,
165 backend: Arc<dyn crate::backend::StorageBackend>,
166 ) -> Self {
167 Self {
168 base_uri: base_uri.to_string(),
169 schema_manager,
170 backend,
171 }
172 }
173
174 #[cfg(feature = "lance-backend")]
176 #[instrument(skip(self), level = "info")]
177 pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
178 let label = &config.label;
179 let property = &config.property;
180 info!(
181 "Creating Inverted Index '{}' on {}.{}",
182 config.name, label, property
183 );
184
185 let schema = self.schema_manager.schema();
186 let label_meta = schema
187 .labels
188 .get(label)
189 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
190
191 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
192
193 let ds = VertexDataset::new(&self.base_uri, label, label_meta.id);
194
195 if ds.open_raw().await.is_ok() {
197 index
198 .build_from_dataset(&ds, |n| info!("Indexed {} terms", n))
199 .await?;
200 } else {
201 warn!(
202 "Dataset for label '{}' not found, creating empty inverted index",
203 label
204 );
205 }
206
207 self.schema_manager
208 .add_index(IndexDefinition::Inverted(config))?;
209 self.schema_manager.save().await?;
210
211 Ok(())
212 }
213
214 #[cfg(feature = "lance-backend")]
216 #[instrument(skip(self), level = "info")]
217 pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
218 let label = &config.label;
219 let property = &config.property;
220 info!(
221 "Creating vector index '{}' on {}.{}",
222 config.name, label, property
223 );
224
225 let schema = self.schema_manager.schema();
226 let label_meta = schema
227 .labels
228 .get(label)
229 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
230
231 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
232
233 match ds_wrapper.open_raw().await {
234 Ok(mut lance_ds) => {
235 let metric_type = match config.metric {
236 DistanceMetric::L2 => MetricType::L2,
237 DistanceMetric::Cosine => MetricType::Cosine,
238 DistanceMetric::Dot => MetricType::Dot,
239 _ => return Err(anyhow!("Unsupported metric: {:?}", config.metric)),
240 };
241
242 let params = match config.index_type {
243 VectorIndexType::Flat => {
244 let ivf = IvfBuildParams::new(1);
245 VectorIndexParams::with_ivf_flat_params(metric_type, ivf)
246 }
247 VectorIndexType::IvfFlat { num_partitions } => {
248 let ivf = IvfBuildParams::new(num_partitions as usize);
249 VectorIndexParams::with_ivf_flat_params(metric_type, ivf)
250 }
251 VectorIndexType::IvfPq {
252 num_partitions,
253 num_sub_vectors,
254 bits_per_subvector,
255 } => {
256 let ivf = IvfBuildParams::new(num_partitions as usize);
257 let pq = PQBuildParams::new(
258 num_sub_vectors as usize,
259 bits_per_subvector as usize,
260 );
261 VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
262 }
263 VectorIndexType::IvfSq { num_partitions } => {
264 let ivf = IvfBuildParams::new(num_partitions as usize);
265 let sq = SQBuildParams::default();
266 VectorIndexParams::with_ivf_sq_params(metric_type, ivf, sq)
267 }
268 VectorIndexType::IvfRq {
269 num_partitions,
270 num_bits,
271 } => {
272 let ivf = IvfBuildParams::new(num_partitions as usize);
273 let mut rq = RQBuildParams::default();
274 if let Some(bits) = num_bits {
275 rq.num_bits = bits;
276 }
277 VectorIndexParams::with_ivf_rq_params(metric_type, ivf, rq)
278 }
279 VectorIndexType::HnswFlat {
280 m,
281 ef_construction,
282 num_partitions,
283 } => {
284 let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
285 let hnsw = HnswBuildParams::default()
286 .num_edges(m as usize)
287 .ef_construction(ef_construction as usize);
288 VectorIndexParams::ivf_hnsw(metric_type, ivf, hnsw)
289 }
290 VectorIndexType::HnswSq {
291 m,
292 ef_construction,
293 num_partitions,
294 } => {
295 let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
296 let hnsw = HnswBuildParams::default()
297 .num_edges(m as usize)
298 .ef_construction(ef_construction as usize);
299 let sq = SQBuildParams::default();
300 VectorIndexParams::with_ivf_hnsw_sq_params(metric_type, ivf, hnsw, sq)
301 }
302 VectorIndexType::HnswPq {
303 m,
304 ef_construction,
305 num_sub_vectors,
306 num_partitions,
307 } => {
308 let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
309 let hnsw = HnswBuildParams::default()
310 .num_edges(m as usize)
311 .ef_construction(ef_construction as usize);
312 let pq = PQBuildParams::new(num_sub_vectors as usize, 8);
313 VectorIndexParams::with_ivf_hnsw_pq_params(metric_type, ivf, hnsw, pq)
314 }
315 _ => {
316 return Err(anyhow!(
317 "Unsupported vector index type: {:?}",
318 config.index_type
319 ));
320 }
321 };
322
323 let progress = TracingIndexProgress::arc(&config.name);
325 match lance_ds
326 .create_index_builder(&[property], IndexType::Vector, ¶ms)
327 .name(config.name.clone())
328 .replace(true)
329 .progress(progress)
330 .await
331 {
332 Ok(metadata) => {
333 info!(
334 index_name = %metadata.name,
335 index_uuid = %metadata.uuid,
336 dataset_version = metadata.dataset_version,
337 "Vector index created"
338 );
339 }
340 Err(e) => {
341 warn!(
342 "Failed to create physical vector index (dataset might be empty): {}",
343 e
344 );
345 }
346 }
347 }
348 Err(e) => {
349 warn!(
350 "Dataset not found for label '{}', skipping physical index creation but saving schema definition. Error: {}",
351 label, e
352 );
353 }
354 }
355
356 self.schema_manager
357 .add_index(IndexDefinition::Vector(config))?;
358 self.schema_manager.save().await?;
359
360 Ok(())
361 }
362
363 #[cfg(feature = "lance-backend")]
365 #[instrument(skip(self), level = "info")]
366 pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
367 let label = &config.label;
368 let properties = &config.properties;
369 info!(
370 "Creating scalar index '{}' on {}.{:?}",
371 config.name, label, properties
372 );
373
374 let schema = self.schema_manager.schema();
375 let label_meta = schema
376 .labels
377 .get(label)
378 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
379
380 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
381
382 match ds_wrapper.open_raw().await {
383 Ok(mut lance_ds) => {
384 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
385
386 let progress = TracingIndexProgress::arc(&config.name);
387 let scalar_params = match config.index_type {
388 ScalarIndexType::Bitmap => {
389 ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap)
390 }
391 ScalarIndexType::LabelList => {
392 ScalarIndexParams::for_builtin(BuiltinIndexType::LabelList)
393 }
394 _ => ScalarIndexParams::default(),
395 };
396 match lance_ds
397 .create_index_builder(&columns, IndexType::Scalar, &scalar_params)
398 .name(config.name.clone())
399 .replace(true)
400 .progress(progress)
401 .await
402 {
403 Ok(metadata) => {
404 info!(
405 index_name = %metadata.name,
406 index_uuid = %metadata.uuid,
407 dataset_version = metadata.dataset_version,
408 "Scalar index created"
409 );
410 }
411 Err(e) => {
412 warn!(
413 "Failed to create physical scalar index (dataset might be empty): {}",
414 e
415 );
416 }
417 }
418 }
419 Err(e) => {
420 warn!(
421 "Dataset not found for label '{}' (scalar index), skipping physical creation. Error: {}",
422 label, e
423 );
424 }
425 }
426
427 self.schema_manager
428 .add_index(IndexDefinition::Scalar(config))?;
429 self.schema_manager.save().await?;
430
431 Ok(())
432 }
433
434 #[cfg(feature = "lance-backend")]
436 #[instrument(skip(self), level = "info")]
437 pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
438 let label = &config.label;
439 info!(
440 "Creating FTS index '{}' on {}.{:?}",
441 config.name, label, config.properties
442 );
443
444 let schema = self.schema_manager.schema();
445 let label_meta = schema
446 .labels
447 .get(label)
448 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
449
450 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
451
452 match ds_wrapper.open_raw().await {
453 Ok(mut lance_ds) => {
454 let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
455
456 let fts_params =
457 InvertedIndexParams::default().with_position(config.with_positions);
458
459 let progress = TracingIndexProgress::arc(&config.name);
460 match lance_ds
461 .create_index_builder(&columns, IndexType::Inverted, &fts_params)
462 .name(config.name.clone())
463 .replace(true)
464 .progress(progress)
465 .await
466 {
467 Ok(metadata) => {
468 info!(
469 index_name = %metadata.name,
470 index_uuid = %metadata.uuid,
471 dataset_version = metadata.dataset_version,
472 "FTS index created"
473 );
474 }
475 Err(e) => {
476 warn!(
477 "Failed to create physical FTS index (dataset might be empty): {}",
478 e
479 );
480 }
481 }
482 }
483 Err(e) => {
484 warn!(
485 "Dataset not found for label '{}' (FTS index), skipping physical creation. Error: {}",
486 label, e
487 );
488 }
489 }
490
491 self.schema_manager
492 .add_index(IndexDefinition::FullText(config))?;
493 self.schema_manager.save().await?;
494
495 Ok(())
496 }
497
498 #[cfg(feature = "lance-backend")]
503 #[instrument(skip(self), level = "info")]
504 pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
505 let label = &config.label;
506 let column = &config.column;
507 info!(
508 "Creating JSON FTS index '{}' on {}.{}",
509 config.name, label, column
510 );
511
512 let schema = self.schema_manager.schema();
513 let label_meta = schema
514 .labels
515 .get(label)
516 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
517
518 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
519
520 match ds_wrapper.open_raw().await {
521 Ok(mut lance_ds) => {
522 let fts_params =
523 InvertedIndexParams::default().with_position(config.with_positions);
524
525 let progress = TracingIndexProgress::arc(&config.name);
526 match lance_ds
527 .create_index_builder(&[column.as_str()], IndexType::Inverted, &fts_params)
528 .name(config.name.clone())
529 .replace(true)
530 .progress(progress)
531 .await
532 {
533 Ok(metadata) => {
534 info!(
535 index_name = %metadata.name,
536 index_uuid = %metadata.uuid,
537 dataset_version = metadata.dataset_version,
538 "JSON FTS index created"
539 );
540 }
541 Err(e) => {
542 warn!(
543 "Failed to create physical JSON FTS index (dataset might be empty): {}",
544 e
545 );
546 }
547 }
548 }
549 Err(e) => {
550 warn!(
551 "Dataset not found for label '{}' (JSON FTS index), skipping physical creation. Error: {}",
552 label, e
553 );
554 }
555 }
556
557 self.schema_manager
558 .add_index(IndexDefinition::JsonFullText(config))?;
559 self.schema_manager.save().await?;
560
561 Ok(())
562 }
563
564 #[cfg(feature = "lance-backend")]
566 #[instrument(skip(self), level = "info")]
567 pub async fn drop_index(&self, name: &str) -> Result<()> {
568 info!("Dropping index '{}'", name);
569
570 let idx_def = self
571 .schema_manager
572 .get_index(name)
573 .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
574
575 let label = idx_def.label();
577 let schema = self.schema_manager.schema();
578 if let Some(label_meta) = schema.labels.get(label) {
579 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
580 match ds_wrapper.open_raw().await {
581 Ok(mut lance_ds) => {
582 if let Err(e) = lance_ds.drop_index(name).await {
583 warn!(
586 "Physical index drop for '{}' returned error (non-fatal): {}",
587 name, e
588 );
589 } else {
590 info!("Physical index '{}' dropped from Lance dataset", name);
591 }
592 }
593 Err(e) => {
594 debug!(
595 "Could not open dataset for label '{}' to drop physical index: {}",
596 label, e
597 );
598 }
599 }
600 }
601
602 self.schema_manager.remove_index(name)?;
603 self.schema_manager.save().await?;
604 Ok(())
605 }
606
607 #[cfg(feature = "lance-backend")]
609 #[instrument(skip(self), level = "info")]
610 pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
611 info!("Rebuilding all indexes for label '{}'", label);
612 let schema = self.schema_manager.schema();
613
614 let indexes: Vec<_> = schema
616 .indexes
617 .iter()
618 .filter(|idx| idx.label() == label)
619 .cloned()
620 .collect();
621
622 for index in indexes {
623 match index {
624 IndexDefinition::Vector(cfg) => self.create_vector_index(cfg).await?,
625 IndexDefinition::Scalar(cfg) => self.create_scalar_index(cfg).await?,
626 IndexDefinition::FullText(cfg) => self.create_fts_index(cfg).await?,
627 IndexDefinition::Inverted(cfg) => self.create_inverted_index(cfg).await?,
628 IndexDefinition::JsonFullText(cfg) => self.create_json_fts_index(cfg).await?,
629 _ => warn!("Unknown index type encountered during rebuild, skipping"),
630 }
631 }
632 Ok(())
633 }
634
635 #[cfg(feature = "lance-backend")]
637 pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
638 let schema = self.schema_manager.schema();
639 let label_meta = schema
640 .labels
641 .get(label)
642 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
643
644 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
646
647 if let Ok(mut ds) = ds_wrapper.open_raw().await {
649 let index_name = format!("{}_{}_composite", label, properties.join("_"));
651
652 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
654
655 let progress = TracingIndexProgress::arc(&index_name);
656 match ds
657 .create_index_builder(&columns, IndexType::Scalar, &ScalarIndexParams::default())
658 .name(index_name.clone())
659 .replace(true)
660 .progress(progress)
661 .await
662 {
663 Ok(metadata) => {
664 info!(
665 index_name = %metadata.name,
666 index_uuid = %metadata.uuid,
667 dataset_version = metadata.dataset_version,
668 "Composite index created"
669 );
670 }
671 Err(e) => {
672 warn!("Failed to create physical composite index: {}", e);
673 }
674 }
675
676 let config = ScalarIndexConfig {
677 name: index_name,
678 label: label.to_string(),
679 properties: properties.to_vec(),
680 index_type: uni_common::core::schema::ScalarIndexType::BTree,
681 where_clause: None,
682 metadata: Default::default(),
683 };
684
685 self.schema_manager
686 .add_index(IndexDefinition::Scalar(config))?;
687 self.schema_manager.save().await?;
688 }
689
690 Ok(())
691 }
692
693 pub async fn composite_lookup(
695 &self,
696 label: &str,
697 key_values: &HashMap<String, Value>,
698 ) -> Result<Option<Vid>> {
699 use crate::backend::types::ScanRequest;
700
701 let schema = self.schema_manager.schema();
702 let label_meta = schema
703 .labels
704 .get(label)
705 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
706
707 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
708 let table_name = ds_wrapper.table_name();
709 let backend = self.backend.as_ref();
710
711 if !backend.table_exists(&table_name).await.unwrap_or(false) {
712 return Ok(None);
713 }
714
715 let filter = key_values
717 .iter()
718 .map(|(k, v)| {
719 if !is_valid_column_name(k) {
721 anyhow::bail!("Invalid column name '{}': must contain only alphanumeric characters and underscores", k);
722 }
723
724 let val_str = match v {
725 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
726 Value::Number(n) => n.to_string(),
727 Value::Bool(b) => b.to_string(),
728 Value::Null => "null".to_string(),
729 _ => v.to_string(),
730 };
731 Ok(format!("\"{}\" = {}", k, val_str))
733 })
734 .collect::<Result<Vec<_>>>()?
735 .join(" AND ");
736
737 let request = ScanRequest::all(&table_name)
738 .with_filter(filter)
739 .with_limit(1)
740 .with_columns(vec!["_vid".to_string()]);
741
742 let batches = match backend.scan(request).await {
743 Ok(b) => b,
744 Err(_) => return Ok(None),
745 };
746
747 for batch in batches {
748 if batch.num_rows() > 0 {
749 let vid_col = batch
750 .column_by_name("_vid")
751 .ok_or_else(|| anyhow!("Missing _vid column"))?
752 .as_any()
753 .downcast_ref::<UInt64Array>()
754 .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
755
756 let vid = vid_col.value(0);
757 return Ok(Some(Vid::from(vid)));
758 }
759 }
760
761 Ok(None)
762 }
763
764 #[cfg(feature = "lance-backend")]
773 #[instrument(skip(self, added, removed), level = "info", fields(
774 label = %config.label,
775 property = %config.property
776 ))]
777 pub async fn update_inverted_index_incremental(
778 &self,
779 config: &InvertedIndexConfig,
780 added: &HashMap<Vid, Vec<String>>,
781 removed: &HashSet<Vid>,
782 ) -> Result<()> {
783 info!(
784 added = added.len(),
785 removed = removed.len(),
786 "Incrementally updating inverted index"
787 );
788
789 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
790 index.apply_incremental_updates(added, removed).await
791 }
792}