1#[cfg(feature = "lance-backend")]
7use crate::storage::inverted_index::InvertedIndex;
8use crate::storage::vertex::VertexDataset;
9use anyhow::{Result, anyhow};
10use chrono::{DateTime, Utc};
11#[cfg(feature = "lance-backend")]
12use lance::index::DatasetIndexExt;
13#[cfg(feature = "lance-backend")]
14use lance::index::vector::VectorIndexParams;
15#[cfg(feature = "lance-backend")]
16use lance_index::IndexType;
17#[cfg(feature = "lance-backend")]
18use lance_index::progress::IndexBuildProgress;
19#[cfg(feature = "lance-backend")]
20use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
21#[cfg(feature = "lance-backend")]
22use lance_index::vector::bq::RQBuildParams;
23#[cfg(feature = "lance-backend")]
24use lance_index::vector::hnsw::builder::HnswBuildParams;
25#[cfg(feature = "lance-backend")]
26use lance_index::vector::ivf::IvfBuildParams;
27#[cfg(feature = "lance-backend")]
28use lance_index::vector::pq::PQBuildParams;
29#[cfg(feature = "lance-backend")]
30use lance_index::vector::sq::builder::SQBuildParams;
31#[cfg(feature = "lance-backend")]
32use lance_linalg::distance::MetricType;
33use serde::{Deserialize, Serialize};
34use std::collections::HashMap;
35#[cfg(feature = "lance-backend")]
36use std::collections::HashSet;
37use std::sync::Arc;
38#[cfg(feature = "lance-backend")]
39use tracing::{debug, info, instrument, warn};
40use uni_common::core::id::Vid;
41#[cfg(feature = "lance-backend")]
42use uni_common::core::schema::IndexDefinition;
43use uni_common::core::schema::SchemaManager;
44#[cfg(feature = "lance-backend")]
45use uni_common::core::schema::{
46 DistanceMetric, FullTextIndexConfig, InvertedIndexConfig, JsonFtsIndexConfig,
47 ScalarIndexConfig, ScalarIndexType, VectorIndexConfig, VectorIndexType,
48};
49
50#[cfg(feature = "lance-backend")]
55#[derive(Debug)]
56pub struct TracingIndexProgress {
57 index_name: String,
58}
59
60#[cfg(feature = "lance-backend")]
61impl TracingIndexProgress {
62 pub fn arc(index_name: &str) -> Arc<dyn IndexBuildProgress> {
63 Arc::new(Self {
64 index_name: index_name.to_string(),
65 })
66 }
67}
68
69#[cfg(feature = "lance-backend")]
70#[async_trait::async_trait]
71impl IndexBuildProgress for TracingIndexProgress {
72 async fn stage_start(&self, stage: &str, total: Option<u64>, unit: &str) -> lance::Result<()> {
73 info!(
74 index = %self.index_name,
75 stage,
76 ?total,
77 unit,
78 "Index build stage started"
79 );
80 Ok(())
81 }
82
83 async fn stage_progress(&self, stage: &str, completed: u64) -> lance::Result<()> {
84 debug!(
85 index = %self.index_name,
86 stage,
87 completed,
88 "Index build progress"
89 );
90 Ok(())
91 }
92
93 async fn stage_complete(&self, stage: &str) -> lance::Result<()> {
94 info!(
95 index = %self.index_name,
96 stage,
97 "Index build stage complete"
98 );
99 Ok(())
100 }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
105pub enum IndexRebuildStatus {
106 Pending,
108 InProgress,
110 Completed,
112 Failed,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize)]
118pub struct IndexRebuildTask {
119 pub id: String,
121 pub label: String,
123 pub status: IndexRebuildStatus,
125 pub created_at: DateTime<Utc>,
127 pub started_at: Option<DateTime<Utc>>,
129 pub completed_at: Option<DateTime<Utc>>,
131 pub error: Option<String>,
133 pub retry_count: u32,
135}
136
137pub struct IndexManager {
139 base_uri: String,
140 schema_manager: Arc<SchemaManager>,
141}
142
143impl std::fmt::Debug for IndexManager {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 f.debug_struct("IndexManager")
146 .field("base_uri", &self.base_uri)
147 .finish_non_exhaustive()
148 }
149}
150
151impl IndexManager {
152 pub fn new(base_uri: &str, schema_manager: Arc<SchemaManager>) -> Self {
154 Self {
155 base_uri: base_uri.to_string(),
156 schema_manager,
157 }
158 }
159
160 #[cfg(feature = "lance-backend")]
162 #[instrument(skip(self), level = "info")]
163 pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
164 let label = &config.label;
165 let property = &config.property;
166 info!(
167 "Creating Inverted Index '{}' on {}.{}",
168 config.name, label, property
169 );
170
171 let schema = self.schema_manager.schema();
172 let label_meta = schema
173 .labels
174 .get(label)
175 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
176
177 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
178
179 let ds = VertexDataset::new(&self.base_uri, label, label_meta.id);
180
181 if ds.open_raw().await.is_ok() {
183 index
184 .build_from_dataset(&ds, |n| info!("Indexed {} terms", n))
185 .await?;
186 } else {
187 warn!(
188 "Dataset for label '{}' not found, creating empty inverted index",
189 label
190 );
191 }
192
193 self.schema_manager
194 .add_index(IndexDefinition::Inverted(config))?;
195 self.schema_manager.save().await?;
196
197 Ok(())
198 }
199
200 #[cfg(feature = "lance-backend")]
202 #[instrument(skip(self), level = "info")]
203 pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
204 let label = &config.label;
205 let property = &config.property;
206 info!(
207 "Creating vector index '{}' on {}.{}",
208 config.name, label, property
209 );
210
211 let schema = self.schema_manager.schema();
212 let label_meta = schema
213 .labels
214 .get(label)
215 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
216
217 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
218
219 match ds_wrapper.open_raw().await {
220 Ok(mut lance_ds) => {
221 let metric_type = match config.metric {
222 DistanceMetric::L2 => MetricType::L2,
223 DistanceMetric::Cosine => MetricType::Cosine,
224 DistanceMetric::Dot => MetricType::Dot,
225 _ => return Err(anyhow!("Unsupported metric: {:?}", config.metric)),
226 };
227
228 let params = match config.index_type {
229 VectorIndexType::Flat => {
230 let ivf = IvfBuildParams::new(1);
231 VectorIndexParams::with_ivf_flat_params(metric_type, ivf)
232 }
233 VectorIndexType::IvfFlat { num_partitions } => {
234 let ivf = IvfBuildParams::new(num_partitions as usize);
235 VectorIndexParams::with_ivf_flat_params(metric_type, ivf)
236 }
237 VectorIndexType::IvfPq {
238 num_partitions,
239 num_sub_vectors,
240 bits_per_subvector,
241 } => {
242 let ivf = IvfBuildParams::new(num_partitions as usize);
243 let pq = PQBuildParams::new(
244 num_sub_vectors as usize,
245 bits_per_subvector as usize,
246 );
247 VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
248 }
249 VectorIndexType::IvfSq { num_partitions } => {
250 let ivf = IvfBuildParams::new(num_partitions as usize);
251 let sq = SQBuildParams::default();
252 VectorIndexParams::with_ivf_sq_params(metric_type, ivf, sq)
253 }
254 VectorIndexType::IvfRq {
255 num_partitions,
256 num_bits,
257 } => {
258 let ivf = IvfBuildParams::new(num_partitions as usize);
259 let mut rq = RQBuildParams::default();
260 if let Some(bits) = num_bits {
261 rq.num_bits = bits;
262 }
263 VectorIndexParams::with_ivf_rq_params(metric_type, ivf, rq)
264 }
265 VectorIndexType::HnswFlat {
266 m,
267 ef_construction,
268 num_partitions,
269 } => {
270 let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
271 let hnsw = HnswBuildParams::default()
272 .num_edges(m as usize)
273 .ef_construction(ef_construction as usize);
274 VectorIndexParams::ivf_hnsw(metric_type, ivf, hnsw)
275 }
276 VectorIndexType::HnswSq {
277 m,
278 ef_construction,
279 num_partitions,
280 } => {
281 let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
282 let hnsw = HnswBuildParams::default()
283 .num_edges(m as usize)
284 .ef_construction(ef_construction as usize);
285 let sq = SQBuildParams::default();
286 VectorIndexParams::with_ivf_hnsw_sq_params(metric_type, ivf, hnsw, sq)
287 }
288 VectorIndexType::HnswPq {
289 m,
290 ef_construction,
291 num_sub_vectors,
292 num_partitions,
293 } => {
294 let ivf = IvfBuildParams::new(num_partitions.unwrap_or(1) as usize);
295 let hnsw = HnswBuildParams::default()
296 .num_edges(m as usize)
297 .ef_construction(ef_construction as usize);
298 let pq = PQBuildParams::new(num_sub_vectors as usize, 8);
299 VectorIndexParams::with_ivf_hnsw_pq_params(metric_type, ivf, hnsw, pq)
300 }
301 _ => {
302 return Err(anyhow!(
303 "Unsupported vector index type: {:?}",
304 config.index_type
305 ));
306 }
307 };
308
309 let progress = TracingIndexProgress::arc(&config.name);
311 match lance_ds
312 .create_index_builder(&[property], IndexType::Vector, ¶ms)
313 .name(config.name.clone())
314 .replace(true)
315 .progress(progress)
316 .await
317 {
318 Ok(metadata) => {
319 info!(
320 index_name = %metadata.name,
321 index_uuid = %metadata.uuid,
322 dataset_version = metadata.dataset_version,
323 "Vector index created"
324 );
325 }
326 Err(e) => {
327 warn!(
328 "Failed to create physical vector index (dataset might be empty): {}",
329 e
330 );
331 }
332 }
333 }
334 Err(e) => {
335 warn!(
336 "Dataset not found for label '{}', skipping physical index creation but saving schema definition. Error: {}",
337 label, e
338 );
339 }
340 }
341
342 self.schema_manager
343 .add_index(IndexDefinition::Vector(config))?;
344 self.schema_manager.save().await?;
345
346 Ok(())
347 }
348
349 #[cfg(feature = "lance-backend")]
351 #[instrument(skip(self), level = "info")]
352 pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
353 let label = &config.label;
354 let properties = &config.properties;
355 info!(
356 "Creating scalar index '{}' on {}.{:?}",
357 config.name, label, properties
358 );
359
360 let schema = self.schema_manager.schema();
361 let label_meta = schema
362 .labels
363 .get(label)
364 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
365
366 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
367
368 match ds_wrapper.open_raw().await {
369 Ok(mut lance_ds) => {
370 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
371
372 let progress = TracingIndexProgress::arc(&config.name);
373 let scalar_params = match config.index_type {
374 ScalarIndexType::Bitmap => {
375 ScalarIndexParams::for_builtin(BuiltinIndexType::Bitmap)
376 }
377 ScalarIndexType::LabelList => {
378 ScalarIndexParams::for_builtin(BuiltinIndexType::LabelList)
379 }
380 _ => ScalarIndexParams::default(),
381 };
382 match lance_ds
383 .create_index_builder(&columns, IndexType::Scalar, &scalar_params)
384 .name(config.name.clone())
385 .replace(true)
386 .progress(progress)
387 .await
388 {
389 Ok(metadata) => {
390 info!(
391 index_name = %metadata.name,
392 index_uuid = %metadata.uuid,
393 dataset_version = metadata.dataset_version,
394 "Scalar index created"
395 );
396 }
397 Err(e) => {
398 warn!(
399 "Failed to create physical scalar index (dataset might be empty): {}",
400 e
401 );
402 }
403 }
404 }
405 Err(e) => {
406 warn!(
407 "Dataset not found for label '{}' (scalar index), skipping physical creation. Error: {}",
408 label, e
409 );
410 }
411 }
412
413 self.schema_manager
414 .add_index(IndexDefinition::Scalar(config))?;
415 self.schema_manager.save().await?;
416
417 Ok(())
418 }
419
420 #[cfg(feature = "lance-backend")]
422 #[instrument(skip(self), level = "info")]
423 pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
424 let label = &config.label;
425 info!(
426 "Creating FTS index '{}' on {}.{:?}",
427 config.name, label, config.properties
428 );
429
430 let schema = self.schema_manager.schema();
431 let label_meta = schema
432 .labels
433 .get(label)
434 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
435
436 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
437
438 match ds_wrapper.open_raw().await {
439 Ok(mut lance_ds) => {
440 let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
441
442 let fts_params =
443 InvertedIndexParams::default().with_position(config.with_positions);
444
445 let progress = TracingIndexProgress::arc(&config.name);
446 match lance_ds
447 .create_index_builder(&columns, IndexType::Inverted, &fts_params)
448 .name(config.name.clone())
449 .replace(true)
450 .progress(progress)
451 .await
452 {
453 Ok(metadata) => {
454 info!(
455 index_name = %metadata.name,
456 index_uuid = %metadata.uuid,
457 dataset_version = metadata.dataset_version,
458 "FTS index created"
459 );
460 }
461 Err(e) => {
462 warn!(
463 "Failed to create physical FTS index (dataset might be empty): {}",
464 e
465 );
466 }
467 }
468 }
469 Err(e) => {
470 warn!(
471 "Dataset not found for label '{}' (FTS index), skipping physical creation. Error: {}",
472 label, e
473 );
474 }
475 }
476
477 self.schema_manager
478 .add_index(IndexDefinition::FullText(config))?;
479 self.schema_manager.save().await?;
480
481 Ok(())
482 }
483
484 #[cfg(feature = "lance-backend")]
489 #[instrument(skip(self), level = "info")]
490 pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
491 let label = &config.label;
492 let column = &config.column;
493 info!(
494 "Creating JSON FTS index '{}' on {}.{}",
495 config.name, label, column
496 );
497
498 let schema = self.schema_manager.schema();
499 let label_meta = schema
500 .labels
501 .get(label)
502 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
503
504 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
505
506 match ds_wrapper.open_raw().await {
507 Ok(mut lance_ds) => {
508 let fts_params =
509 InvertedIndexParams::default().with_position(config.with_positions);
510
511 let progress = TracingIndexProgress::arc(&config.name);
512 match lance_ds
513 .create_index_builder(&[column.as_str()], IndexType::Inverted, &fts_params)
514 .name(config.name.clone())
515 .replace(true)
516 .progress(progress)
517 .await
518 {
519 Ok(metadata) => {
520 info!(
521 index_name = %metadata.name,
522 index_uuid = %metadata.uuid,
523 dataset_version = metadata.dataset_version,
524 "JSON FTS index created"
525 );
526 }
527 Err(e) => {
528 warn!(
529 "Failed to create physical JSON FTS index (dataset might be empty): {}",
530 e
531 );
532 }
533 }
534 }
535 Err(e) => {
536 warn!(
537 "Dataset not found for label '{}' (JSON FTS index), skipping physical creation. Error: {}",
538 label, e
539 );
540 }
541 }
542
543 self.schema_manager
544 .add_index(IndexDefinition::JsonFullText(config))?;
545 self.schema_manager.save().await?;
546
547 Ok(())
548 }
549
550 #[cfg(feature = "lance-backend")]
552 #[instrument(skip(self), level = "info")]
553 pub async fn drop_index(&self, name: &str) -> Result<()> {
554 info!("Dropping index '{}'", name);
555
556 let idx_def = self
557 .schema_manager
558 .get_index(name)
559 .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
560
561 let label = idx_def.label();
563 let schema = self.schema_manager.schema();
564 if let Some(label_meta) = schema.labels.get(label) {
565 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
566 match ds_wrapper.open_raw().await {
567 Ok(mut lance_ds) => {
568 if let Err(e) = lance_ds.drop_index(name).await {
569 warn!(
572 "Physical index drop for '{}' returned error (non-fatal): {}",
573 name, e
574 );
575 } else {
576 info!("Physical index '{}' dropped from Lance dataset", name);
577 }
578 }
579 Err(e) => {
580 debug!(
581 "Could not open dataset for label '{}' to drop physical index: {}",
582 label, e
583 );
584 }
585 }
586 }
587
588 self.schema_manager.remove_index(name)?;
589 self.schema_manager.save().await?;
590 Ok(())
591 }
592
593 #[cfg(feature = "lance-backend")]
595 #[instrument(skip(self), level = "info")]
596 pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
597 info!("Rebuilding all indexes for label '{}'", label);
598 let schema = self.schema_manager.schema();
599
600 let indexes: Vec<_> = schema
602 .indexes
603 .iter()
604 .filter(|idx| idx.label() == label)
605 .cloned()
606 .collect();
607
608 for index in indexes {
609 match index {
610 IndexDefinition::Vector(cfg) => self.create_vector_index(cfg).await?,
611 IndexDefinition::Scalar(cfg) => self.create_scalar_index(cfg).await?,
612 IndexDefinition::FullText(cfg) => self.create_fts_index(cfg).await?,
613 IndexDefinition::Inverted(cfg) => self.create_inverted_index(cfg).await?,
614 IndexDefinition::JsonFullText(cfg) => self.create_json_fts_index(cfg).await?,
615 _ => warn!("Unknown index type encountered during rebuild, skipping"),
616 }
617 }
618 Ok(())
619 }
620
621 #[cfg(feature = "lance-backend")]
623 pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
624 let schema = self.schema_manager.schema();
625 let label_meta = schema
626 .labels
627 .get(label)
628 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
629
630 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
632
633 if let Ok(mut ds) = ds_wrapper.open_raw().await {
635 let index_name = format!("{}_{}_composite", label, properties.join("_"));
637
638 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
640
641 let progress = TracingIndexProgress::arc(&index_name);
642 match ds
643 .create_index_builder(&columns, IndexType::Scalar, &ScalarIndexParams::default())
644 .name(index_name.clone())
645 .replace(true)
646 .progress(progress)
647 .await
648 {
649 Ok(metadata) => {
650 info!(
651 index_name = %metadata.name,
652 index_uuid = %metadata.uuid,
653 dataset_version = metadata.dataset_version,
654 "Composite index created"
655 );
656 }
657 Err(e) => {
658 warn!("Failed to create physical composite index: {}", e);
659 }
660 }
661
662 let config = ScalarIndexConfig {
663 name: index_name,
664 label: label.to_string(),
665 properties: properties.to_vec(),
666 index_type: uni_common::core::schema::ScalarIndexType::BTree,
667 where_clause: None,
668 metadata: Default::default(),
669 };
670
671 self.schema_manager
672 .add_index(IndexDefinition::Scalar(config))?;
673 self.schema_manager.save().await?;
674 }
675
676 Ok(())
677 }
678
679 #[cfg(feature = "lance-backend")]
688 #[instrument(skip(self, added, removed), level = "info", fields(
689 label = %config.label,
690 property = %config.property
691 ))]
692 pub async fn update_inverted_index_incremental(
693 &self,
694 config: &InvertedIndexConfig,
695 added: &HashMap<Vid, Vec<String>>,
696 removed: &HashSet<Vid>,
697 ) -> Result<()> {
698 info!(
699 added = added.len(),
700 removed = removed.len(),
701 "Incrementally updating inverted index"
702 );
703
704 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
705 index.apply_incremental_updates(added, removed).await
706 }
707}