1use crate::storage::inverted_index::InvertedIndex;
5use crate::storage::vertex::VertexDataset;
6use anyhow::{Result, anyhow};
7use arrow_array::UInt64Array;
8use chrono::{DateTime, Utc};
9use futures::TryStreamExt;
10use lance::index::vector::VectorIndexParams;
11use lance_index::scalar::{InvertedIndexParams, ScalarIndexParams};
12use lance_index::vector::hnsw::builder::HnswBuildParams;
13use lance_index::vector::ivf::IvfBuildParams;
14use lance_index::vector::pq::PQBuildParams;
15use lance_index::vector::sq::builder::SQBuildParams;
16use lance_index::{DatasetIndexExt, IndexType};
17use lance_linalg::distance::MetricType;
18use serde::{Deserialize, Serialize};
19use serde_json::Value;
20use std::collections::{HashMap, HashSet};
21use std::sync::Arc;
22use tracing::{info, instrument, warn};
23use uni_common::core::id::Vid;
24use uni_common::core::schema::{
25 DistanceMetric, FullTextIndexConfig, IndexDefinition, InvertedIndexConfig, JsonFtsIndexConfig,
26 ScalarIndexConfig, SchemaManager, VectorIndexConfig, VectorIndexType,
27};
28
29fn is_valid_column_name(name: &str) -> bool {
34 !name.is_empty() && name.chars().all(|c| c.is_alphanumeric() || c == '_')
35}
36
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub enum IndexRebuildStatus {
40 Pending,
42 InProgress,
44 Completed,
46 Failed,
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct IndexRebuildTask {
53 pub id: String,
55 pub label: String,
57 pub status: IndexRebuildStatus,
59 pub created_at: DateTime<Utc>,
61 pub started_at: Option<DateTime<Utc>>,
63 pub completed_at: Option<DateTime<Utc>>,
65 pub error: Option<String>,
67 pub retry_count: u32,
69}
70
71pub struct IndexManager {
72 base_uri: String,
73 schema_manager: Arc<SchemaManager>,
74 lancedb_store: Arc<crate::lancedb::LanceDbStore>,
75}
76
77impl IndexManager {
78 pub fn new(
79 base_uri: &str,
80 schema_manager: Arc<SchemaManager>,
81 lancedb_store: Arc<crate::lancedb::LanceDbStore>,
82 ) -> Self {
83 Self {
84 base_uri: base_uri.to_string(),
85 schema_manager,
86 lancedb_store,
87 }
88 }
89
90 #[instrument(skip(self), level = "info")]
91 pub async fn create_inverted_index(&self, config: InvertedIndexConfig) -> Result<()> {
92 let label = &config.label;
93 let property = &config.property;
94 info!(
95 "Creating Inverted Index '{}' on {}.{}",
96 config.name, label, property
97 );
98
99 let schema = self.schema_manager.schema();
100 let label_meta = schema
101 .labels
102 .get(label)
103 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
104
105 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
106 index.create_if_missing().await?;
107
108 let ds = VertexDataset::new(&self.base_uri, label, label_meta.id);
109
110 if ds.open_raw().await.is_ok() {
112 index
113 .build_from_dataset(&ds, |n| info!("Indexed {} terms", n))
114 .await?;
115 } else {
116 warn!(
117 "Dataset for label '{}' not found, creating empty inverted index",
118 label
119 );
120 }
121
122 self.schema_manager
123 .add_index(IndexDefinition::Inverted(config))?;
124 self.schema_manager.save().await?;
125
126 Ok(())
127 }
128
129 #[instrument(skip(self), level = "info")]
130 pub async fn create_vector_index(&self, config: VectorIndexConfig) -> Result<()> {
131 let label = &config.label;
132 let property = &config.property;
133 info!(
134 "Creating vector index '{}' on {}.{}",
135 config.name, label, property
136 );
137
138 let schema = self.schema_manager.schema();
139 let label_meta = schema
140 .labels
141 .get(label)
142 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
143
144 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
145
146 match ds_wrapper.open_raw().await {
147 Ok(mut lance_ds) => {
148 let metric_type = match config.metric {
149 DistanceMetric::L2 => MetricType::L2,
150 DistanceMetric::Cosine => MetricType::Cosine,
151 DistanceMetric::Dot => MetricType::Dot,
152 _ => return Err(anyhow!("Unsupported metric: {:?}", config.metric)),
153 };
154
155 let params = match config.index_type {
156 VectorIndexType::IvfPq {
157 num_partitions,
158 num_sub_vectors,
159 bits_per_subvector,
160 } => {
161 let ivf = IvfBuildParams::new(num_partitions as usize);
162 let pq = PQBuildParams::new(
163 num_sub_vectors as usize,
164 bits_per_subvector as usize,
165 );
166 VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
167 }
168 VectorIndexType::Hnsw {
169 m,
170 ef_construction,
171 ef_search: _,
172 } => {
173 let ivf = IvfBuildParams::new(1);
174 let hnsw = HnswBuildParams::default()
175 .num_edges(m as usize)
176 .ef_construction(ef_construction as usize);
177 let sq = SQBuildParams::default();
178 VectorIndexParams::with_ivf_hnsw_sq_params(metric_type, ivf, hnsw, sq)
179 }
180 VectorIndexType::Flat => {
181 let ivf = IvfBuildParams::new(1);
183 let pq = PQBuildParams::default();
184 VectorIndexParams::with_ivf_pq_params(metric_type, ivf, pq)
185 }
186 _ => {
187 return Err(anyhow!(
188 "Unsupported vector index type: {:?}",
189 config.index_type
190 ));
191 }
192 };
193
194 if let Err(e) = lance_ds
196 .create_index(
197 &[property],
198 IndexType::Vector,
199 Some(config.name.clone()),
200 ¶ms,
201 true,
202 )
203 .await
204 {
205 warn!(
206 "Failed to create physical vector index (dataset might be empty): {}",
207 e
208 );
209 }
210 }
211 Err(e) => {
212 warn!(
213 "Dataset not found for label '{}', skipping physical index creation but saving schema definition. Error: {}",
214 label, e
215 );
216 }
217 }
218
219 self.schema_manager
220 .add_index(IndexDefinition::Vector(config))?;
221 self.schema_manager.save().await?;
222
223 Ok(())
224 }
225
226 #[instrument(skip(self), level = "info")]
227 pub async fn create_scalar_index(&self, config: ScalarIndexConfig) -> Result<()> {
228 let label = &config.label;
229 let properties = &config.properties;
230 info!(
231 "Creating scalar index '{}' on {}.{:?}",
232 config.name, label, properties
233 );
234
235 let schema = self.schema_manager.schema();
236 let label_meta = schema
237 .labels
238 .get(label)
239 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
240
241 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
242
243 match ds_wrapper.open_raw().await {
244 Ok(mut lance_ds) => {
245 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
246
247 if let Err(e) = lance_ds
248 .create_index(
249 &columns,
250 IndexType::Scalar,
251 Some(config.name.clone()),
252 &ScalarIndexParams::default(),
253 true,
254 )
255 .await
256 {
257 warn!(
258 "Failed to create physical scalar index (dataset might be empty): {}",
259 e
260 );
261 }
262 }
263 Err(e) => {
264 warn!(
265 "Dataset not found for label '{}' (scalar index), skipping physical creation. Error: {}",
266 label, e
267 );
268 }
269 }
270
271 self.schema_manager
272 .add_index(IndexDefinition::Scalar(config))?;
273 self.schema_manager.save().await?;
274
275 Ok(())
276 }
277
278 #[instrument(skip(self), level = "info")]
279 pub async fn create_fts_index(&self, config: FullTextIndexConfig) -> Result<()> {
280 let label = &config.label;
281 info!(
282 "Creating FTS index '{}' on {}.{:?}",
283 config.name, label, config.properties
284 );
285
286 let schema = self.schema_manager.schema();
287 let label_meta = schema
288 .labels
289 .get(label)
290 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
291
292 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
293
294 match ds_wrapper.open_raw().await {
295 Ok(mut lance_ds) => {
296 let columns: Vec<&str> = config.properties.iter().map(|s| s.as_str()).collect();
297
298 let fts_params =
299 InvertedIndexParams::default().with_position(config.with_positions);
300
301 if let Err(e) = lance_ds
302 .create_index(
303 &columns,
304 IndexType::Inverted,
305 Some(config.name.clone()),
306 &fts_params,
307 true,
308 )
309 .await
310 {
311 warn!(
312 "Failed to create physical FTS index (dataset might be empty): {}",
313 e
314 );
315 }
316 }
317 Err(e) => {
318 warn!(
319 "Dataset not found for label '{}' (FTS index), skipping physical creation. Error: {}",
320 label, e
321 );
322 }
323 }
324
325 self.schema_manager
326 .add_index(IndexDefinition::FullText(config))?;
327 self.schema_manager.save().await?;
328
329 Ok(())
330 }
331
332 #[instrument(skip(self), level = "info")]
337 pub async fn create_json_fts_index(&self, config: JsonFtsIndexConfig) -> Result<()> {
338 let label = &config.label;
339 let column = &config.column;
340 info!(
341 "Creating JSON FTS index '{}' on {}.{}",
342 config.name, label, column
343 );
344
345 let schema = self.schema_manager.schema();
346 let label_meta = schema
347 .labels
348 .get(label)
349 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
350
351 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
352
353 match ds_wrapper.open_raw().await {
354 Ok(mut lance_ds) => {
355 let fts_params =
356 InvertedIndexParams::default().with_position(config.with_positions);
357
358 if let Err(e) = lance_ds
359 .create_index(
360 &[column],
361 IndexType::Inverted,
362 Some(config.name.clone()),
363 &fts_params,
364 true,
365 )
366 .await
367 {
368 warn!(
369 "Failed to create physical JSON FTS index (dataset might be empty): {}",
370 e
371 );
372 }
373 }
374 Err(e) => {
375 warn!(
376 "Dataset not found for label '{}' (JSON FTS index), skipping physical creation. Error: {}",
377 label, e
378 );
379 }
380 }
381
382 self.schema_manager
383 .add_index(IndexDefinition::JsonFullText(config))?;
384 self.schema_manager.save().await?;
385
386 Ok(())
387 }
388
389 #[instrument(skip(self), level = "info")]
390 pub async fn drop_index(&self, name: &str) -> Result<()> {
391 info!("Dropping index '{}'", name);
392
393 let _idx_def = self
395 .schema_manager
396 .get_index(name)
397 .ok_or_else(|| anyhow!("Index '{}' not found in schema", name))?;
398
399 warn!("Physical index drop not yet supported, removing from schema only.");
402
403 self.schema_manager.remove_index(name)?;
404 self.schema_manager.save().await?;
405 Ok(())
406 }
407
408 #[instrument(skip(self), level = "info")]
409 pub async fn rebuild_indexes_for_label(&self, label: &str) -> Result<()> {
410 info!("Rebuilding all indexes for label '{}'", label);
411 let schema = self.schema_manager.schema();
412
413 let indexes: Vec<_> = schema
415 .indexes
416 .iter()
417 .filter(|idx| idx.label() == label)
418 .cloned()
419 .collect();
420
421 for index in indexes {
422 match index {
423 IndexDefinition::Vector(cfg) => self.create_vector_index(cfg).await?,
424 IndexDefinition::Scalar(cfg) => self.create_scalar_index(cfg).await?,
425 IndexDefinition::FullText(cfg) => self.create_fts_index(cfg).await?,
426 IndexDefinition::Inverted(cfg) => self.create_inverted_index(cfg).await?,
427 IndexDefinition::JsonFullText(cfg) => self.create_json_fts_index(cfg).await?,
428 _ => warn!("Unknown index type encountered during rebuild, skipping"),
429 }
430 }
431 Ok(())
432 }
433
434 pub async fn create_composite_index(&self, label: &str, properties: &[String]) -> Result<()> {
436 let schema = self.schema_manager.schema();
437 let label_meta = schema
438 .labels
439 .get(label)
440 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
441
442 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
444
445 if let Ok(mut ds) = ds_wrapper.open_raw().await {
447 let index_name = format!("{}_{}_composite", label, properties.join("_"));
449
450 let columns: Vec<&str> = properties.iter().map(|s| s.as_str()).collect();
452
453 if let Err(e) = ds
454 .create_index(
455 &columns,
456 IndexType::Scalar,
457 Some(index_name.clone()),
458 &ScalarIndexParams::default(),
459 true,
460 )
461 .await
462 {
463 warn!("Failed to create physical composite index: {}", e);
464 }
465
466 let config = ScalarIndexConfig {
467 name: index_name,
468 label: label.to_string(),
469 properties: properties.to_vec(),
470 index_type: uni_common::core::schema::ScalarIndexType::BTree,
471 where_clause: None,
472 metadata: Default::default(),
473 };
474
475 self.schema_manager
476 .add_index(IndexDefinition::Scalar(config))?;
477 self.schema_manager.save().await?;
478 }
479
480 Ok(())
481 }
482
483 pub async fn composite_lookup(
485 &self,
486 label: &str,
487 key_values: &HashMap<String, Value>,
488 ) -> Result<Option<Vid>> {
489 use lancedb::query::{ExecutableQuery, QueryBase, Select};
490
491 let schema = self.schema_manager.schema();
492 let label_meta = schema
493 .labels
494 .get(label)
495 .ok_or_else(|| anyhow!("Label '{}' not found", label))?;
496
497 let ds_wrapper = VertexDataset::new(&self.base_uri, label, label_meta.id);
498 let table = match ds_wrapper.open_lancedb(&self.lancedb_store).await {
499 Ok(t) => t,
500 Err(_) => return Ok(None),
501 };
502
503 let filter = key_values
505 .iter()
506 .map(|(k, v)| {
507 if !is_valid_column_name(k) {
509 anyhow::bail!("Invalid column name '{}': must contain only alphanumeric characters and underscores", k);
510 }
511
512 let val_str = match v {
513 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
514 Value::Number(n) => n.to_string(),
515 Value::Bool(b) => b.to_string(),
516 Value::Null => "null".to_string(),
517 _ => v.to_string(),
518 };
519 Ok(format!("\"{}\" = {}", k, val_str))
521 })
522 .collect::<Result<Vec<_>>>()?
523 .join(" AND ");
524
525 let query = table
526 .query()
527 .only_if(&filter)
528 .limit(1)
529 .select(Select::Columns(vec!["_vid".to_string()]));
530
531 let stream = match query.execute().await {
532 Ok(s) => s,
533 Err(_) => return Ok(None),
534 };
535
536 let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await.unwrap_or_default();
537 for batch in batches {
538 if batch.num_rows() > 0 {
539 let vid_col = batch
540 .column_by_name("_vid")
541 .ok_or_else(|| anyhow!("Missing _vid column"))?
542 .as_any()
543 .downcast_ref::<UInt64Array>()
544 .ok_or_else(|| anyhow!("Invalid _vid column type"))?;
545
546 let vid = vid_col.value(0);
547 return Ok(Some(Vid::from(vid)));
548 }
549 }
550
551 Ok(None)
552 }
553
554 #[instrument(skip(self, added, removed), level = "info", fields(
563 label = %config.label,
564 property = %config.property
565 ))]
566 pub async fn update_inverted_index_incremental(
567 &self,
568 config: &InvertedIndexConfig,
569 added: &HashMap<Vid, Vec<String>>,
570 removed: &HashSet<Vid>,
571 ) -> Result<()> {
572 info!(
573 added = added.len(),
574 removed = removed.len(),
575 "Incrementally updating inverted index"
576 );
577
578 let mut index = InvertedIndex::new(&self.base_uri, config.clone()).await?;
579 index.apply_incremental_updates(added, removed).await
580 }
581}