1#[cfg(feature = "lance-backend")]
7use crate::storage::inverted_index::InvertedIndex;
8use crate::storage::vertex::VertexDataset;
9use anyhow::{Result, anyhow};
10use arrow_array::UInt64Array;
11use chrono::{DateTime, Utc};
12#[cfg(feature = "lance-backend")]
13use lance::index::vector::VectorIndexParams;
14#[cfg(feature = "lance-backend")]
15use lance_index::progress::IndexBuildProgress;
16#[cfg(feature = "lance-backend")]
17use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
18#[cfg(feature = "lance-backend")]
19use lance_index::vector::hnsw::builder::HnswBuildParams;
20#[cfg(feature = "lance-backend")]
21use lance_index::vector::ivf::IvfBuildParams;
22#[cfg(feature = "lance-backend")]
23use lance_index::vector::pq::PQBuildParams;
24#[cfg(feature = "lance-backend")]
25use lance_index::vector::sq::builder::SQBuildParams;
26#[cfg(feature = "lance-backend")]
27use lance_index::{DatasetIndexExt, IndexType};
28#[cfg(feature = "lance-backend")]
29use lance_linalg::distance::MetricType;
30use serde::{Deserialize, Serialize};
31use serde_json::Value;
32use std::collections::HashMap;
33#[cfg(feature = "lance-backend")]
34use std::collections::HashSet;
35use std::sync::Arc;
36#[cfg(feature = "lance-backend")]
37use tracing::{debug, info, instrument, warn};
38use uni_common::core::id::Vid;
39#[cfg(feature = "lance-backend")]
40use uni_common::core::schema::IndexDefinition;
41use uni_common::core::schema::SchemaManager;
42#[cfg(feature = "lance-backend")]
43use uni_common::core::schema::{
44 DistanceMetric, FullTextIndexConfig, InvertedIndexConfig, JsonFtsIndexConfig,
45 ScalarIndexConfig, VectorIndexConfig, VectorIndexType,
46};
47
48fn is_valid_column_name(name: &str) -> bool {
53 !name.is_empty() && name.chars().all(|c| c.is_alphanumeric() || c == '_')
54}
55
56#[cfg(feature = "lance-backend")]
61#[derive(Debug)]
62pub struct TracingIndexProgress {
63 index_name: String,
64}
65
66#[cfg(feature = "lance-backend")]
67impl TracingIndexProgress {
68 pub fn arc(index_name: &str) -> Arc<dyn IndexBuildProgress> {
69 Arc::new(Self {
70 index_name: index_name.to_string(),
71 })
72 }
73}
74
75#[cfg(feature = "lance-backend")]
76#[async_trait::async_trait]
77impl IndexBuildProgress for TracingIndexProgress {
78 async fn stage_start(&self, stage: &str, total: Option<u64>, unit: &str) -> lance::Result<()> {
79 info!(
80 index = %self.index_name,
81 stage,
82 ?total,
83 unit,
84 "Index build stage started"
85 );
86 Ok(())
87 }
88
89 async fn stage_progress(&self, stage: &str, completed: u64) -> lance::Result<()> {
90 debug!(
91 index = %self.index_name,
92 stage,
93 completed,
94 "Index build progress"
95 );
96 Ok(())
97 }
98
99 async fn stage_complete(&self, stage: &str) -> lance::Result<()> {
100 info!(
101 index = %self.index_name,
102 stage,
103 "Index build stage complete"
104 );
105 Ok(())
106 }
107}
108
109#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
111pub enum IndexRebuildStatus {
112 Pending,
114 InProgress,
116 Completed,
118 Failed,
120}
121
122#[derive(Debug, Clone, Serialize, Deserialize)]
124pub struct IndexRebuildTask {
125 pub id: String,
127 pub label: String,
129 pub status: IndexRebuildStatus,
131 pub created_at: DateTime<Utc>,
133 pub started_at: Option<DateTime<Utc>>,
135 pub completed_at: Option<DateTime<Utc>>,
137 pub error: Option<String>,
139 pub retry_count: u32,
141}
142
143pub struct IndexManager {
145 base_uri: String,
146 schema_manager: Arc<SchemaManager>,
147 backend: Arc<dyn crate::backend::StorageBackend>,
148}
149
150impl std::fmt::Debug for IndexManager {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 f.debug_struct("IndexManager")
153 .field("base_uri", &self.base_uri)
154 .finish_non_exhaustive()
155 }
156}
157
158impl IndexManager {
159 pub fn new(
161 base_uri: &str,
162 schema_manager: Arc<SchemaManager>,
163 backend: Arc<dyn crate::backend::StorageBackend>,
164 ) -> Self {
165 Self {
166 base_uri: base_uri.to_string(),
167 schema_manager,
168 backend,
169 }
170 }
171
172 #[cfg(feature = "lance-backend")]
174 #[instrument(skip(self), level = "info")]
175 pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
176 let label = &config.label;
177 let property = &config.property;
178 info!(
179 "Creating Inverted Index '{}' on {}.{}",
180 config.name, label, property
181 );
182
183 let schema = self.schema_manager.schema();
184 let label_meta = schema
185 .labels
186 .get(label)
187 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
188
189 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
190
191 let ds = VertexDataset::new(&self.base_uri, label, label_meta.id);
192
193 if ds.open_raw().await.is_ok() {
195 index
196 .build_from_dataset(&ds, |n| info!("Indexed {} terms", n))
197 .await?;
198 } else {
199 warn!(
200 "Dataset for label '{}' not found, creating empty inverted index",
201 label
202 );
203 }
204
205 self.schema_manager
206 .add_index(IndexDefinition::Inverted(config))?;
207 self.schema_manager.save().await?;
208
209 Ok(())
210 }
211
212 #[cfg(feature = "lance-backend")]
214 #[instrument(skip(self), level = "info")]
215 pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
216 let label = &config.label;
217 let property = &config.property;
218 info!(
219 "Creating vector index '{}' on {}.{}",
220 config.name, label, property
221 );
222
223 let schema = self.schema_manager.schema();
224 let label_meta = schema
225 .labels
226 .get(label)
227 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
228
229 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
230
231 match ds_wrapper.open_raw().await {
232 Ok(mut lance_ds) => {
233 let metric_type = match config.metric {
234 DistanceMetric::L2 => MetricType::L2,
235 DistanceMetric::Cosine => MetricType::Cosine,
236 DistanceMetric::Dot => MetricType::Dot,
237 _ => return Err(anyhow!("Unsupported metric: {:?}", config.metric)),
238 };
239
240 let params = match config.index_type {
241 VectorIndexType::IvfPq {
242 num_partitions,
243 num_sub_vectors,
244 bits_per_subvector,
245 } => {
246 let ivf = IvfBuildParams::new(num_partitions as usize);
247 let pq = PQBuildParams::new(
248 num_sub_vectors as usize,
249 bits_per_subvector as usize,
250 );
251 VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
252 }
253 VectorIndexType::Hnsw {
254 m,
255 ef_construction,
256 ef_search: _,
257 } => {
258 let ivf = IvfBuildParams::new(1);
259 let hnsw = HnswBuildParams::default()
260 .num_edges(m as usize)
261 .ef_construction(ef_construction as usize);
262 let sq = SQBuildParams::default();
263 VectorIndexParams::with_ivf_hnsw_sq_params(metric_type, ivf, hnsw, sq)
264 }
265 VectorIndexType::Flat => {
266 let ivf = IvfBuildParams::new(1);
268 let pq = PQBuildParams::default();
269 VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
270 }
271 _ => {
272 return Err(anyhow!(
273 "Unsupported vector index type: {:?}",
274 config.index_type
275 ));
276 }
277 };
278
279 let progress = TracingIndexProgress::arc(&config.name);
281 match lance_ds
282 .create_index_builder(&[property], IndexType::Vector, ¶ms)
283 .name(config.name.clone())
284 .replace(true)
285 .progress(progress)
286 .await
287 {
288 Ok(metadata) => {
289 info!(
290 index_name = %metadata.name,
291 index_uuid = %metadata.uuid,
292 dataset_version = metadata.dataset_version,
293 "Vector index created"
294 );
295 }
296 Err(e) => {
297 warn!(
298 "Failed to create physical vector index (dataset might be empty): {}",
299 e
300 );
301 }
302 }
303 }
304 Err(e) => {
305 warn!(
306 "Dataset not found for label '{}', skipping physical index creation but saving schema definition. Error: {}",
307 label, e
308 );
309 }
310 }
311
312 self.schema_manager
313 .add_index(IndexDefinition::Vector(config))?;
314 self.schema_manager.save().await?;
315
316 Ok(())
317 }
318
319 #[cfg(feature = "lance-backend")]
321 #[instrument(skip(self), level = "info")]
322 pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
323 let label = &config.label;
324 let properties = &config.properties;
325 info!(
326 "Creating scalar index '{}' on {}.{:?}",
327 config.name, label, properties
328 );
329
330 let schema = self.schema_manager.schema();
331 let label_meta = schema
332 .labels
333 .get(label)
334 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
335
336 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
337
338 match ds_wrapper.open_raw().await {
339 Ok(mut lance_ds) => {
340 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
341
342 let progress = TracingIndexProgress::arc(&config.name);
343 match lance_ds
344 .create_index_builder(
345 &columns,
346 IndexType::Scalar,
347 &ScalarIndexParams::default(),
348 )
349 .name(config.name.clone())
350 .replace(true)
351 .progress(progress)
352 .await
353 {
354 Ok(metadata) => {
355 info!(
356 index_name = %metadata.name,
357 index_uuid = %metadata.uuid,
358 dataset_version = metadata.dataset_version,
359 "Scalar index created"
360 );
361 }
362 Err(e) => {
363 warn!(
364 "Failed to create physical scalar index (dataset might be empty): {}",
365 e
366 );
367 }
368 }
369 }
370 Err(e) => {
371 warn!(
372 "Dataset not found for label '{}' (scalar index), skipping physical creation. Error: {}",
373 label, e
374 );
375 }
376 }
377
378 self.schema_manager
379 .add_index(IndexDefinition::Scalar(config))?;
380 self.schema_manager.save().await?;
381
382 Ok(())
383 }
384
385 #[cfg(feature = "lance-backend")]
387 #[instrument(skip(self), level = "info")]
388 pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
389 let label = &config.label;
390 info!(
391 "Creating FTS index '{}' on {}.{:?}",
392 config.name, label, config.properties
393 );
394
395 let schema = self.schema_manager.schema();
396 let label_meta = schema
397 .labels
398 .get(label)
399 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
400
401 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
402
403 match ds_wrapper.open_raw().await {
404 Ok(mut lance_ds) => {
405 let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
406
407 let fts_params =
408 InvertedIndexParams::default().with_position(config.with_positions);
409
410 let progress = TracingIndexProgress::arc(&config.name);
411 match lance_ds
412 .create_index_builder(&columns, IndexType::Inverted, &fts_params)
413 .name(config.name.clone())
414 .replace(true)
415 .progress(progress)
416 .await
417 {
418 Ok(metadata) => {
419 info!(
420 index_name = %metadata.name,
421 index_uuid = %metadata.uuid,
422 dataset_version = metadata.dataset_version,
423 "FTS index created"
424 );
425 }
426 Err(e) => {
427 warn!(
428 "Failed to create physical FTS index (dataset might be empty): {}",
429 e
430 );
431 }
432 }
433 }
434 Err(e) => {
435 warn!(
436 "Dataset not found for label '{}' (FTS index), skipping physical creation. Error: {}",
437 label, e
438 );
439 }
440 }
441
442 self.schema_manager
443 .add_index(IndexDefinition::FullText(config))?;
444 self.schema_manager.save().await?;
445
446 Ok(())
447 }
448
449 #[cfg(feature = "lance-backend")]
454 #[instrument(skip(self), level = "info")]
455 pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
456 let label = &config.label;
457 let column = &config.column;
458 info!(
459 "Creating JSON FTS index '{}' on {}.{}",
460 config.name, label, column
461 );
462
463 let schema = self.schema_manager.schema();
464 let label_meta = schema
465 .labels
466 .get(label)
467 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
468
469 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
470
471 match ds_wrapper.open_raw().await {
472 Ok(mut lance_ds) => {
473 let fts_params =
474 InvertedIndexParams::default().with_position(config.with_positions);
475
476 let progress = TracingIndexProgress::arc(&config.name);
477 match lance_ds
478 .create_index_builder(&[column.as_str()], IndexType::Inverted, &fts_params)
479 .name(config.name.clone())
480 .replace(true)
481 .progress(progress)
482 .await
483 {
484 Ok(metadata) => {
485 info!(
486 index_name = %metadata.name,
487 index_uuid = %metadata.uuid,
488 dataset_version = metadata.dataset_version,
489 "JSON FTS index created"
490 );
491 }
492 Err(e) => {
493 warn!(
494 "Failed to create physical JSON FTS index (dataset might be empty): {}",
495 e
496 );
497 }
498 }
499 }
500 Err(e) => {
501 warn!(
502 "Dataset not found for label '{}' (JSON FTS index), skipping physical creation. Error: {}",
503 label, e
504 );
505 }
506 }
507
508 self.schema_manager
509 .add_index(IndexDefinition::JsonFullText(config))?;
510 self.schema_manager.save().await?;
511
512 Ok(())
513 }
514
515 #[cfg(feature = "lance-backend")]
517 #[instrument(skip(self), level = "info")]
518 pub async fn drop_index(&self, name: &str) -> Result<()> {
519 info!("Dropping index '{}'", name);
520
521 let idx_def = self
522 .schema_manager
523 .get_index(name)
524 .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
525
526 let label = idx_def.label();
528 let schema = self.schema_manager.schema();
529 if let Some(label_meta) = schema.labels.get(label) {
530 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
531 match ds_wrapper.open_raw().await {
532 Ok(mut lance_ds) => {
533 if let Err(e) = lance_ds.drop_index(name).await {
534 warn!(
537 "Physical index drop for '{}' returned error (non-fatal): {}",
538 name, e
539 );
540 } else {
541 info!("Physical index '{}' dropped from Lance dataset", name);
542 }
543 }
544 Err(e) => {
545 debug!(
546 "Could not open dataset for label '{}' to drop physical index: {}",
547 label, e
548 );
549 }
550 }
551 }
552
553 self.schema_manager.remove_index(name)?;
554 self.schema_manager.save().await?;
555 Ok(())
556 }
557
558 #[cfg(feature = "lance-backend")]
560 #[instrument(skip(self), level = "info")]
561 pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
562 info!("Rebuilding all indexes for label '{}'", label);
563 let schema = self.schema_manager.schema();
564
565 let indexes: Vec<_> = schema
567 .indexes
568 .iter()
569 .filter(|idx| idx.label() == label)
570 .cloned()
571 .collect();
572
573 for index in indexes {
574 match index {
575 IndexDefinition::Vector(cfg) => self.create_vector_index(cfg).await?,
576 IndexDefinition::Scalar(cfg) => self.create_scalar_index(cfg).await?,
577 IndexDefinition::FullText(cfg) => self.create_fts_index(cfg).await?,
578 IndexDefinition::Inverted(cfg) => self.create_inverted_index(cfg).await?,
579 IndexDefinition::JsonFullText(cfg) => self.create_json_fts_index(cfg).await?,
580 _ => warn!("Unknown index type encountered during rebuild, skipping"),
581 }
582 }
583 Ok(())
584 }
585
586 #[cfg(feature = "lance-backend")]
588 pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
589 let schema = self.schema_manager.schema();
590 let label_meta = schema
591 .labels
592 .get(label)
593 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
594
595 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
597
598 if let Ok(mut ds) = ds_wrapper.open_raw().await {
600 let index_name = format!("{}_{}_composite", label, properties.join("_"));
602
603 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
605
606 let progress = TracingIndexProgress::arc(&index_name);
607 match ds
608 .create_index_builder(&columns, IndexType::Scalar, &ScalarIndexParams::default())
609 .name(index_name.clone())
610 .replace(true)
611 .progress(progress)
612 .await
613 {
614 Ok(metadata) => {
615 info!(
616 index_name = %metadata.name,
617 index_uuid = %metadata.uuid,
618 dataset_version = metadata.dataset_version,
619 "Composite index created"
620 );
621 }
622 Err(e) => {
623 warn!("Failed to create physical composite index: {}", e);
624 }
625 }
626
627 let config = ScalarIndexConfig {
628 name: index_name,
629 label: label.to_string(),
630 properties: properties.to_vec(),
631 index_type: uni_common::core::schema::ScalarIndexType::BTree,
632 where_clause: None,
633 metadata: Default::default(),
634 };
635
636 self.schema_manager
637 .add_index(IndexDefinition::Scalar(config))?;
638 self.schema_manager.save().await?;
639 }
640
641 Ok(())
642 }
643
644 pub async fn composite_lookup(
646 &self,
647 label: &str,
648 key_values: &HashMap<String, Value>,
649 ) -> Result<Option<Vid>> {
650 use crate::backend::types::ScanRequest;
651
652 let schema = self.schema_manager.schema();
653 let label_meta = schema
654 .labels
655 .get(label)
656 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
657
658 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
659 let table_name = ds_wrapper.table_name();
660 let backend = self.backend.as_ref();
661
662 if !backend.table_exists(&table_name).await.unwrap_or(false) {
663 return Ok(None);
664 }
665
666 let filter = key_values
668 .iter()
669 .map(|(k, v)| {
670 if !is_valid_column_name(k) {
672 anyhow::bail!("Invalid column name '{}': must contain only alphanumeric characters and underscores", k);
673 }
674
675 let val_str = match v {
676 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
677 Value::Number(n) => n.to_string(),
678 Value::Bool(b) => b.to_string(),
679 Value::Null => "null".to_string(),
680 _ => v.to_string(),
681 };
682 Ok(format!("\"{}\" = {}", k, val_str))
684 })
685 .collect::<Result<Vec<_>>>()?
686 .join(" AND ");
687
688 let request = ScanRequest::all(&table_name)
689 .with_filter(filter)
690 .with_limit(1)
691 .with_columns(vec!["_vid".to_string()]);
692
693 let batches = match backend.scan(request).await {
694 Ok(b) => b,
695 Err(_) => return Ok(None),
696 };
697
698 for batch in batches {
699 if batch.num_rows() > 0 {
700 let vid_col = batch
701 .column_by_name("_vid")
702 .ok_or_else(|| anyhow!("Missing _vid column"))?
703 .as_any()
704 .downcast_ref::<UInt64Array>()
705 .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
706
707 let vid = vid_col.value(0);
708 return Ok(Some(Vid::from(vid)));
709 }
710 }
711
712 Ok(None)
713 }
714
715 #[cfg(feature = "lance-backend")]
724 #[instrument(skip(self, added, removed), level = "info", fields(
725 label = %config.label,
726 property = %config.property
727 ))]
728 pub async fn update_inverted_index_incremental(
729 &self,
730 config: &InvertedIndexConfig,
731 added: &HashMap<Vid, Vec<String>>,
732 removed: &HashSet<Vid>,
733 ) -> Result<()> {
734 info!(
735 added = added.len(),
736 removed = removed.len(),
737 "Incrementally updating inverted index"
738 );
739
740 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
741 index.apply_incremental_updates(added, removed).await
742 }
743}