1use crate::storage::inverted_index::InvertedIndex;
5use crate::storage::vertex::VertexDataset;
6use anyhow::{Result, anyhow};
7use arrow_array::UInt64Array;
8use chrono::{DateTime, Utc};
9use futures::TryStreamExt;
10use lance::index::vector::VectorIndexParams;
11use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
12use lance_index::vector::hnsw::builder::HnswBuildParams;
13use lance_index::vector::ivf::IvfBuildParams;
14use lance_index::vector::pq::PQBuildParams;
15use lance_index::vector::sq::builder::SQBuildParams;
16use lance_index::{DatasetIndexExt, IndexType};
17use lance_linalg::distance::MetricType;
18use serde::{Deserialize, Serialize};
19use serde_json::Value;
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22use tracing::{info, instrument, warn};
23use uni_common::core::id::Vid;
24use uni_common::core::schema::{
25 DistanceMetric, FullTextIndexConfig, IndexDefinition, InvertedIndexConfig, JsonFtsIndexConfig,
26 ScalarIndexConfig, SchemaManager, VectorIndexConfig, VectorIndexType,
27};
28
29fn is_valid_column_name(name: &str) -> bool {
34 !name.is_empty() && name.chars().all(|c| c.is_alphanumeric() || c == '_')
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub enum IndexRebuildStatus {
40 Pending,
42 InProgress,
44 Completed,
46 Failed,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct IndexRebuildTask {
53 pub id: String,
55 pub label: String,
57 pub status: IndexRebuildStatus,
59 pub created_at: DateTime<Utc>,
61 pub started_at: Option<DateTime<Utc>>,
63 pub completed_at: Option<DateTime<Utc>>,
65 pub error: Option<String>,
67 pub retry_count: u32,
69}
70
71pub struct IndexManager {
72 base_uri: String,
73 schema_manager: Arc<SchemaManager>,
74 lancedb_store: Arc<crate::lancedb::LanceDbStore>,
75}
76
77impl IndexManager {
78 pub fn new(
79 base_uri: &str,
80 schema_manager: Arc<SchemaManager>,
81 lancedb_store: Arc<crate::lancedb::LanceDbStore>,
82 ) -> Self {
83 Self {
84 base_uri: base_uri.to_string(),
85 schema_manager,
86 lancedb_store,
87 }
88 }
89
90 #[instrument(skip(self), level = "info")]
91 pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
92 let label = &config.label;
93 let property = &config.property;
94 info!(
95 "Creating Inverted Index '{}' on {}.{}",
96 config.name, label, property
97 );
98
99 let schema = self.schema_manager.schema();
100 let label_meta = schema
101 .labels
102 .get(label)
103 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
104
105 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
106 index.create_if_missing().await?;
107
108 let ds = VertexDataset::new(&self.base_uri, label, label_meta.id);
109
110 if ds.open_raw().await.is_ok() {
112 index
113 .build_from_dataset(&ds, |n| info!("Indexed {} terms", n))
114 .await?;
115 } else {
116 warn!(
117 "Dataset for label '{}' not found, creating empty inverted index",
118 label
119 );
120 }
121
122 self.schema_manager
123 .add_index(IndexDefinition::Inverted(config))?;
124 self.schema_manager.save().await?;
125
126 Ok(())
127 }
128
129 #[instrument(skip(self), level = "info")]
130 pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
131 let label = &config.label;
132 let property = &config.property;
133 info!(
134 "Creating vector index '{}' on {}.{}",
135 config.name, label, property
136 );
137
138 let schema = self.schema_manager.schema();
139 let label_meta = schema
140 .labels
141 .get(label)
142 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
143
144 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
145
146 match ds_wrapper.open_raw().await {
147 Ok(mut lance_ds) => {
148 let metric_type = match config.metric {
149 DistanceMetric::L2 => MetricType::L2,
150 DistanceMetric::Cosine => MetricType::Cosine,
151 DistanceMetric::Dot => MetricType::Dot,
152 _ => return Err(anyhow!("Unsupported metric: {:?}", config.metric)),
153 };
154
155 let params = match config.index_type {
156 VectorIndexType::IvfPq {
157 num_partitions,
158 num_sub_vectors,
159 bits_per_subvector,
160 } => {
161 let ivf = IvfBuildParams::new(num_partitions as usize);
162 let pq = PQBuildParams::new(
163 num_sub_vectors as usize,
164 bits_per_subvector as usize,
165 );
166 VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
167 }
168 VectorIndexType::Hnsw {
169 m,
170 ef_construction,
171 ef_search: _,
172 } => {
173 let ivf = IvfBuildParams::new(1);
174 let hnsw = HnswBuildParams::default()
175 .num_edges(m as usize)
176 .ef_construction(ef_construction as usize);
177 let sq = SQBuildParams::default();
178 VectorIndexParams::with_ivf_hnsw_sq_params(metric_type, ivf, hnsw, sq)
179 }
180 VectorIndexType::Flat => {
181 let ivf = IvfBuildParams::new(1);
183 let pq = PQBuildParams::default();
184 VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
185 }
186 _ => {
187 return Err(anyhow!(
188 "Unsupported vector index type: {:?}",
189 config.index_type
190 ));
191 }
192 };
193
194 if let Err(e) = lance_ds
196 .create_index(
197 &[property],
198 IndexType::Vector,
199 Some(config.name.clone()),
200 ¶ms,
201 true,
202 )
203 .await
204 {
205 warn!(
206 "Failed to create physical vector index (dataset might be empty): {}",
207 e
208 );
209 }
210 }
211 Err(e) => {
212 warn!(
213 "Dataset not found for label '{}', skipping physical index creation but saving schema definition. Error: {}",
214 label, e
215 );
216 }
217 }
218
219 self.schema_manager
220 .add_index(IndexDefinition::Vector(config))?;
221 self.schema_manager.save().await?;
222
223 Ok(())
224 }
225
226 #[instrument(skip(self), level = "info")]
227 pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
228 let label = &config.label;
229 let properties = &config.properties;
230 info!(
231 "Creating scalar index '{}' on {}.{:?}",
232 config.name, label, properties
233 );
234
235 let schema = self.schema_manager.schema();
236 let label_meta = schema
237 .labels
238 .get(label)
239 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
240
241 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
242
243 match ds_wrapper.open_raw().await {
244 Ok(mut lance_ds) => {
245 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
246
247 if let Err(e) = lance_ds
248 .create_index(
249 &columns,
250 IndexType::Scalar,
251 Some(config.name.clone()),
252 &ScalarIndexParams::default(),
253 true,
254 )
255 .await
256 {
257 warn!(
258 "Failed to create physical scalar index (dataset might be empty): {}",
259 e
260 );
261 }
262 }
263 Err(e) => {
264 warn!(
265 "Dataset not found for label '{}' (scalar index), skipping physical creation. Error: {}",
266 label, e
267 );
268 }
269 }
270
271 self.schema_manager
272 .add_index(IndexDefinition::Scalar(config))?;
273 self.schema_manager.save().await?;
274
275 Ok(())
276 }
277
278 #[instrument(skip(self), level = "info")]
279 pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
280 let label = &config.label;
281 info!(
282 "Creating FTS index '{}' on {}.{:?}",
283 config.name, label, config.properties
284 );
285
286 let schema = self.schema_manager.schema();
287 let label_meta = schema
288 .labels
289 .get(label)
290 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
291
292 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
293
294 match ds_wrapper.open_raw().await {
295 Ok(mut lance_ds) => {
296 let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
297
298 let fts_params =
299 InvertedIndexParams::default().with_position(config.with_positions);
300
301 if let Err(e) = lance_ds
302 .create_index(
303 &columns,
304 IndexType::Inverted,
305 Some(config.name.clone()),
306 &fts_params,
307 true,
308 )
309 .await
310 {
311 warn!(
312 "Failed to create physical FTS index (dataset might be empty): {}",
313 e
314 );
315 }
316 }
317 Err(e) => {
318 warn!(
319 "Dataset not found for label '{}' (FTS index), skipping physical creation. Error: {}",
320 label, e
321 );
322 }
323 }
324
325 self.schema_manager
326 .add_index(IndexDefinition::FullText(config))?;
327 self.schema_manager.save().await?;
328
329 Ok(())
330 }
331
332 #[instrument(skip(self), level = "info")]
337 pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
338 let label = &config.label;
339 let column = &config.column;
340 info!(
341 "Creating JSON FTS index '{}' on {}.{}",
342 config.name, label, column
343 );
344
345 let schema = self.schema_manager.schema();
346 let label_meta = schema
347 .labels
348 .get(label)
349 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
350
351 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
352
353 match ds_wrapper.open_raw().await {
354 Ok(mut lance_ds) => {
355 let fts_params =
356 InvertedIndexParams::default().with_position(config.with_positions);
357
358 if let Err(e) = lance_ds
359 .create_index(
360 &[column],
361 IndexType::Inverted,
362 Some(config.name.clone()),
363 &fts_params,
364 true,
365 )
366 .await
367 {
368 warn!(
369 "Failed to create physical JSON FTS index (dataset might be empty): {}",
370 e
371 );
372 }
373 }
374 Err(e) => {
375 warn!(
376 "Dataset not found for label '{}' (JSON FTS index), skipping physical creation. Error: {}",
377 label, e
378 );
379 }
380 }
381
382 self.schema_manager
383 .add_index(IndexDefinition::JsonFullText(config))?;
384 self.schema_manager.save().await?;
385
386 Ok(())
387 }
388
389 #[instrument(skip(self), level = "info")]
390 pub async fn drop_index(&self, name: &str) -> Result<()> {
391 info!("Dropping index '{}'", name);
392
393 let idx_def = self
394 .schema_manager
395 .get_index(name)
396 .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
397
398 let label = idx_def.label();
399
400 if let Some(label_meta) = self.schema_manager.schema().labels.get(label) {
401 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
402 if let Ok(_lance_ds) = ds_wrapper.open_raw().await {
403 warn!(
408 "Physical index drop not supported by current Lance version, removing from schema only."
409 );
410 }
411 }
412
413 self.schema_manager.remove_index(name)?;
414 self.schema_manager.save().await?;
415 Ok(())
416 }
417
418 #[instrument(skip(self), level = "info")]
419 pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
420 info!("Rebuilding all indexes for label '{}'", label);
421 let schema = self.schema_manager.schema();
422
423 let indexes = schema.indexes.clone();
425
426 for index in indexes {
427 match index {
428 IndexDefinition::Vector(cfg) => {
429 if cfg.label == label {
430 self.create_vector_index(cfg).await?;
431 }
432 }
433 IndexDefinition::Scalar(cfg) => {
434 if cfg.label == label {
435 self.create_scalar_index(cfg).await?;
436 }
437 }
438 IndexDefinition::FullText(cfg) => {
439 if cfg.label == label {
440 self.create_fts_index(cfg).await?;
441 }
442 }
443 IndexDefinition::Inverted(cfg) => {
444 if cfg.label == label {
445 self.create_inverted_index(cfg).await?;
446 }
447 }
448 IndexDefinition::JsonFullText(cfg) => {
449 if cfg.label == label {
450 self.create_json_fts_index(cfg).await?;
451 }
452 }
453 _ => {
454 log::warn!("Unknown index type encountered during rebuild, skipping");
455 }
456 }
457 }
458 Ok(())
459 }
460
461 pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
463 let schema = self.schema_manager.schema();
464 let label_meta = schema
465 .labels
466 .get(label)
467 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
468
469 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
471
472 if let Ok(mut ds) = ds_wrapper.open_raw().await {
474 let index_name = format!("{}_{}_composite", label, properties.join("_"));
476
477 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
479
480 if let Err(e) = ds
481 .create_index(
482 &columns,
483 IndexType::Scalar,
484 Some(index_name.clone()),
485 &ScalarIndexParams::default(),
486 true,
487 )
488 .await
489 {
490 warn!("Failed to create physical composite index: {}", e);
491 }
492
493 let config = ScalarIndexConfig {
494 name: index_name,
495 label: label.to_string(),
496 properties: properties.to_vec(),
497 index_type: uni_common::core::schema::ScalarIndexType::BTree,
498 where_clause: None,
499 };
500
501 self.schema_manager
502 .add_index(IndexDefinition::Scalar(config))?;
503 self.schema_manager.save().await?;
504 }
505
506 Ok(())
507 }
508
509 pub async fn composite_lookup(
511 &self,
512 label: &str,
513 key_values: &HashMap<String, Value>,
514 ) -> Result<Option<Vid>> {
515 use lancedb::query::{ExecutableQuery, QueryBase, Select};
516
517 let schema = self.schema_manager.schema();
518 let label_meta = schema
519 .labels
520 .get(label)
521 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
522
523 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
524 let table = match ds_wrapper.open_lancedb(&self.lancedb_store).await {
525 Ok(t) => t,
526 Err(_) => return Ok(None),
527 };
528
529 let filter = key_values
531 .iter()
532 .map(|(k, v)| {
533 if !is_valid_column_name(k) {
535 anyhow::bail!("Invalid column name '{}': must contain only alphanumeric characters and underscores", k);
536 }
537
538 let val_str = match v {
539 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
540 Value::Number(n) => n.to_string(),
541 Value::Bool(b) => b.to_string(),
542 Value::Null => "null".to_string(),
543 _ => v.to_string(),
544 };
545 Ok(format!("\"{}\" = {}", k, val_str))
547 })
548 .collect::<Result<Vec<_>>>()?
549 .join(" AND ");
550
551 let query = table
552 .query()
553 .only_if(&filter)
554 .limit(1)
555 .select(Select::Columns(vec!["_vid".to_string()]));
556
557 let stream = match query.execute().await {
558 Ok(s) => s,
559 Err(_) => return Ok(None),
560 };
561
562 let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await.unwrap_or_default();
563 for batch in batches {
564 if batch.num_rows() > 0 {
565 let vid_col = batch
566 .column_by_name("_vid")
567 .ok_or_else(|| anyhow!("Missing _vid column"))?
568 .as_any()
569 .downcast_ref::<UInt64Array>()
570 .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
571
572 let vid = vid_col.value(0);
573 return Ok(Some(Vid::from(vid)));
574 }
575 }
576
577 Ok(None)
578 }
579
580 #[instrument(skip(self, added, removed), level = "info", fields(
589 label = %config.label,
590 property = %config.property
591 ))]
592 pub async fn update_inverted_index_incremental(
593 &self,
594 config: &InvertedIndexConfig,
595 added: &HashMap<Vid, Vec<String>>,
596 removed: &HashSet<Vid>,
597 ) -> Result<()> {
598 info!(
599 added = added.len(),
600 removed = removed.len(),
601 "Incrementally updating inverted index"
602 );
603
604 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
605 index.apply_incremental_updates(added, removed).await
606 }
607}