1use crate::storage::inverted_index::InvertedIndex;
7use crate::storage::vertex::VertexDataset;
8use anyhow::{Result, anyhow};
9use arrow_array::UInt64Array;
10use chrono::{DateTime, Utc};
11use futures::TryStreamExt;
12use lance::index::vector::VectorIndexParams;
13use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
14use lance_index::vector::hnsw::builder::HnswBuildParams;
15use lance_index::vector::ivf::IvfBuildParams;
16use lance_index::vector::pq::PQBuildParams;
17use lance_index::vector::sq::builder::SQBuildParams;
18use lance_index::{DatasetIndexExt, IndexType};
19use lance_linalg::distance::MetricType;
20use serde::{Deserialize, Serialize};
21use serde_json::Value;
22use std::collections::{HashMap, HashSet};
23use std::sync::Arc;
24use tracing::{info, instrument, warn};
25use uni_common::core::id::Vid;
26use uni_common::core::schema::{
27 DistanceMetric, FullTextIndexConfig, IndexDefinition, InvertedIndexConfig, JsonFtsIndexConfig,
28 ScalarIndexConfig, SchemaManager, VectorIndexConfig, VectorIndexType,
29};
30
31fn is_valid_column_name(name: &str) -> bool {
36 !name.is_empty() && name.chars().all(|c| c.is_alphanumeric() || c == '_')
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
41pub enum IndexRebuildStatus {
42 Pending,
44 InProgress,
46 Completed,
48 Failed,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct IndexRebuildTask {
55 pub id: String,
57 pub label: String,
59 pub status: IndexRebuildStatus,
61 pub created_at: DateTime<Utc>,
63 pub started_at: Option<DateTime<Utc>>,
65 pub completed_at: Option<DateTime<Utc>>,
67 pub error: Option<String>,
69 pub retry_count: u32,
71}
72
73pub struct IndexManager {
75 base_uri: String,
76 schema_manager: Arc<SchemaManager>,
77 lancedb_store: Arc<crate::lancedb::LanceDbStore>,
78}
79
80impl std::fmt::Debug for IndexManager {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 f.debug_struct("IndexManager")
83 .field("base_uri", &self.base_uri)
84 .finish_non_exhaustive()
85 }
86}
87
88impl IndexManager {
89 pub fn new(
91 base_uri: &str,
92 schema_manager: Arc<SchemaManager>,
93 lancedb_store: Arc<crate::lancedb::LanceDbStore>,
94 ) -> Self {
95 Self {
96 base_uri: base_uri.to_string(),
97 schema_manager,
98 lancedb_store,
99 }
100 }
101
102 #[instrument(skip(self), level = "info")]
104 pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
105 let label = &config.label;
106 let property = &config.property;
107 info!(
108 "Creating Inverted Index '{}' on {}.{}",
109 config.name, label, property
110 );
111
112 let schema = self.schema_manager.schema();
113 let label_meta = schema
114 .labels
115 .get(label)
116 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
117
118 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
119
120 let ds = VertexDataset::new(&self.base_uri, label, label_meta.id);
121
122 if ds.open_raw().await.is_ok() {
124 index
125 .build_from_dataset(&ds, |n| info!("Indexed {} terms", n))
126 .await?;
127 } else {
128 warn!(
129 "Dataset for label '{}' not found, creating empty inverted index",
130 label
131 );
132 }
133
134 self.schema_manager
135 .add_index(IndexDefinition::Inverted(config))?;
136 self.schema_manager.save().await?;
137
138 Ok(())
139 }
140
141 #[instrument(skip(self), level = "info")]
143 pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
144 let label = &config.label;
145 let property = &config.property;
146 info!(
147 "Creating vector index '{}' on {}.{}",
148 config.name, label, property
149 );
150
151 let schema = self.schema_manager.schema();
152 let label_meta = schema
153 .labels
154 .get(label)
155 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
156
157 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
158
159 match ds_wrapper.open_raw().await {
160 Ok(mut lance_ds) => {
161 let metric_type = match config.metric {
162 DistanceMetric::L2 => MetricType::L2,
163 DistanceMetric::Cosine => MetricType::Cosine,
164 DistanceMetric::Dot => MetricType::Dot,
165 _ => return Err(anyhow!("Unsupported metric: {:?}", config.metric)),
166 };
167
168 let params = match config.index_type {
169 VectorIndexType::IvfPq {
170 num_partitions,
171 num_sub_vectors,
172 bits_per_subvector,
173 } => {
174 let ivf = IvfBuildParams::new(num_partitions as usize);
175 let pq = PQBuildParams::new(
176 num_sub_vectors as usize,
177 bits_per_subvector as usize,
178 );
179 VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
180 }
181 VectorIndexType::Hnsw {
182 m,
183 ef_construction,
184 ef_search: _,
185 } => {
186 let ivf = IvfBuildParams::new(1);
187 let hnsw = HnswBuildParams::default()
188 .num_edges(m as usize)
189 .ef_construction(ef_construction as usize);
190 let sq = SQBuildParams::default();
191 VectorIndexParams::with_ivf_hnsw_sq_params(metric_type, ivf, hnsw, sq)
192 }
193 VectorIndexType::Flat => {
194 let ivf = IvfBuildParams::new(1);
196 let pq = PQBuildParams::default();
197 VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
198 }
199 _ => {
200 return Err(anyhow!(
201 "Unsupported vector index type: {:?}",
202 config.index_type
203 ));
204 }
205 };
206
207 if let Err(e) = lance_ds
209 .create_index(
210 &[property],
211 IndexType::Vector,
212 Some(config.name.clone()),
213 ¶ms,
214 true,
215 )
216 .await
217 {
218 warn!(
219 "Failed to create physical vector index (dataset might be empty): {}",
220 e
221 );
222 }
223 }
224 Err(e) => {
225 warn!(
226 "Dataset not found for label '{}', skipping physical index creation but saving schema definition. Error: {}",
227 label, e
228 );
229 }
230 }
231
232 self.schema_manager
233 .add_index(IndexDefinition::Vector(config))?;
234 self.schema_manager.save().await?;
235
236 Ok(())
237 }
238
239 #[instrument(skip(self), level = "info")]
241 pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
242 let label = &config.label;
243 let properties = &config.properties;
244 info!(
245 "Creating scalar index '{}' on {}.{:?}",
246 config.name, label, properties
247 );
248
249 let schema = self.schema_manager.schema();
250 let label_meta = schema
251 .labels
252 .get(label)
253 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
254
255 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
256
257 match ds_wrapper.open_raw().await {
258 Ok(mut lance_ds) => {
259 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
260
261 if let Err(e) = lance_ds
262 .create_index(
263 &columns,
264 IndexType::Scalar,
265 Some(config.name.clone()),
266 &ScalarIndexParams::default(),
267 true,
268 )
269 .await
270 {
271 warn!(
272 "Failed to create physical scalar index (dataset might be empty): {}",
273 e
274 );
275 }
276 }
277 Err(e) => {
278 warn!(
279 "Dataset not found for label '{}' (scalar index), skipping physical creation. Error: {}",
280 label, e
281 );
282 }
283 }
284
285 self.schema_manager
286 .add_index(IndexDefinition::Scalar(config))?;
287 self.schema_manager.save().await?;
288
289 Ok(())
290 }
291
292 #[instrument(skip(self), level = "info")]
294 pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
295 let label = &config.label;
296 info!(
297 "Creating FTS index '{}' on {}.{:?}",
298 config.name, label, config.properties
299 );
300
301 let schema = self.schema_manager.schema();
302 let label_meta = schema
303 .labels
304 .get(label)
305 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
306
307 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
308
309 match ds_wrapper.open_raw().await {
310 Ok(mut lance_ds) => {
311 let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
312
313 let fts_params =
314 InvertedIndexParams::default().with_position(config.with_positions);
315
316 if let Err(e) = lance_ds
317 .create_index(
318 &columns,
319 IndexType::Inverted,
320 Some(config.name.clone()),
321 &fts_params,
322 true,
323 )
324 .await
325 {
326 warn!(
327 "Failed to create physical FTS index (dataset might be empty): {}",
328 e
329 );
330 }
331 }
332 Err(e) => {
333 warn!(
334 "Dataset not found for label '{}' (FTS index), skipping physical creation. Error: {}",
335 label, e
336 );
337 }
338 }
339
340 self.schema_manager
341 .add_index(IndexDefinition::FullText(config))?;
342 self.schema_manager.save().await?;
343
344 Ok(())
345 }
346
347 #[instrument(skip(self), level = "info")]
352 pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
353 let label = &config.label;
354 let column = &config.column;
355 info!(
356 "Creating JSON FTS index '{}' on {}.{}",
357 config.name, label, column
358 );
359
360 let schema = self.schema_manager.schema();
361 let label_meta = schema
362 .labels
363 .get(label)
364 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
365
366 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
367
368 match ds_wrapper.open_raw().await {
369 Ok(mut lance_ds) => {
370 let fts_params =
371 InvertedIndexParams::default().with_position(config.with_positions);
372
373 if let Err(e) = lance_ds
374 .create_index(
375 &[column],
376 IndexType::Inverted,
377 Some(config.name.clone()),
378 &fts_params,
379 true,
380 )
381 .await
382 {
383 warn!(
384 "Failed to create physical JSON FTS index (dataset might be empty): {}",
385 e
386 );
387 }
388 }
389 Err(e) => {
390 warn!(
391 "Dataset not found for label '{}' (JSON FTS index), skipping physical creation. Error: {}",
392 label, e
393 );
394 }
395 }
396
397 self.schema_manager
398 .add_index(IndexDefinition::JsonFullText(config))?;
399 self.schema_manager.save().await?;
400
401 Ok(())
402 }
403
404 #[instrument(skip(self), level = "info")]
406 pub async fn drop_index(&self, name: &str) -> Result<()> {
407 info!("Dropping index '{}'", name);
408
409 let _idx_def = self
411 .schema_manager
412 .get_index(name)
413 .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
414
415 warn!("Physical index drop not yet supported, removing from schema only.");
418
419 self.schema_manager.remove_index(name)?;
420 self.schema_manager.save().await?;
421 Ok(())
422 }
423
424 #[instrument(skip(self), level = "info")]
426 pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
427 info!("Rebuilding all indexes for label '{}'", label);
428 let schema = self.schema_manager.schema();
429
430 let indexes: Vec<_> = schema
432 .indexes
433 .iter()
434 .filter(|idx| idx.label() == label)
435 .cloned()
436 .collect();
437
438 for index in indexes {
439 match index {
440 IndexDefinition::Vector(cfg) => self.create_vector_index(cfg).await?,
441 IndexDefinition::Scalar(cfg) => self.create_scalar_index(cfg).await?,
442 IndexDefinition::FullText(cfg) => self.create_fts_index(cfg).await?,
443 IndexDefinition::Inverted(cfg) => self.create_inverted_index(cfg).await?,
444 IndexDefinition::JsonFullText(cfg) => self.create_json_fts_index(cfg).await?,
445 _ => warn!("Unknown index type encountered during rebuild, skipping"),
446 }
447 }
448 Ok(())
449 }
450
451 pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
453 let schema = self.schema_manager.schema();
454 let label_meta = schema
455 .labels
456 .get(label)
457 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
458
459 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
461
462 if let Ok(mut ds) = ds_wrapper.open_raw().await {
464 let index_name = format!("{}_{}_composite", label, properties.join("_"));
466
467 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
469
470 if let Err(e) = ds
471 .create_index(
472 &columns,
473 IndexType::Scalar,
474 Some(index_name.clone()),
475 &ScalarIndexParams::default(),
476 true,
477 )
478 .await
479 {
480 warn!("Failed to create physical composite index: {}", e);
481 }
482
483 let config = ScalarIndexConfig {
484 name: index_name,
485 label: label.to_string(),
486 properties: properties.to_vec(),
487 index_type: uni_common::core::schema::ScalarIndexType::BTree,
488 where_clause: None,
489 metadata: Default::default(),
490 };
491
492 self.schema_manager
493 .add_index(IndexDefinition::Scalar(config))?;
494 self.schema_manager.save().await?;
495 }
496
497 Ok(())
498 }
499
500 pub async fn composite_lookup(
502 &self,
503 label: &str,
504 key_values: &HashMap<String, Value>,
505 ) -> Result<Option<Vid>> {
506 use lancedb::query::{ExecutableQuery, QueryBase, Select};
507
508 let schema = self.schema_manager.schema();
509 let label_meta = schema
510 .labels
511 .get(label)
512 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
513
514 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
515 let table = match ds_wrapper.open_lancedb(&self.lancedb_store).await {
516 Ok(t) => t,
517 Err(_) => return Ok(None),
518 };
519
520 let filter = key_values
522 .iter()
523 .map(|(k, v)| {
524 if !is_valid_column_name(k) {
526 anyhow::bail!("Invalid column name '{}': must contain only alphanumeric characters and underscores", k);
527 }
528
529 let val_str = match v {
530 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
531 Value::Number(n) => n.to_string(),
532 Value::Bool(b) => b.to_string(),
533 Value::Null => "null".to_string(),
534 _ => v.to_string(),
535 };
536 Ok(format!("\"{}\" = {}", k, val_str))
538 })
539 .collect::<Result<Vec<_>>>()?
540 .join(" AND ");
541
542 let query = table
543 .query()
544 .only_if(&filter)
545 .limit(1)
546 .select(Select::Columns(vec!["_vid".to_string()]));
547
548 let stream = match query.execute().await {
549 Ok(s) => s,
550 Err(_) => return Ok(None),
551 };
552
553 let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await.unwrap_or_default();
554 for batch in batches {
555 if batch.num_rows() > 0 {
556 let vid_col = batch
557 .column_by_name("_vid")
558 .ok_or_else(|| anyhow!("Missing _vid column"))?
559 .as_any()
560 .downcast_ref::<UInt64Array>()
561 .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
562
563 let vid = vid_col.value(0);
564 return Ok(Some(Vid::from(vid)));
565 }
566 }
567
568 Ok(None)
569 }
570
571 #[instrument(skip(self, added, removed), level = "info", fields(
580 label = %config.label,
581 property = %config.property
582 ))]
583 pub async fn update_inverted_index_incremental(
584 &self,
585 config: &InvertedIndexConfig,
586 added: &HashMap<Vid, Vec<String>>,
587 removed: &HashSet<Vid>,
588 ) -> Result<()> {
589 info!(
590 added = added.len(),
591 removed = removed.len(),
592 "Incrementally updating inverted index"
593 );
594
595 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
596 index.apply_incremental_updates(added, removed).await
597 }
598}