1#[cfg(feature = "lance-backend")]
10use crate::storage::index_manager::IndexManager;
11use crate::storage::index_manager::{IndexRebuildStatus, IndexRebuildTask};
12use crate::storage::manager::StorageManager;
13use anyhow::{Result, anyhow};
14use chrono::Utc;
15use object_store::ObjectStore;
16use object_store::path::Path as ObjectPath;
17use parking_lot::RwLock;
18use serde::{Deserialize, Serialize};
19use std::collections::{HashMap, HashSet};
20use std::sync::Arc;
21use tracing::{error, info, warn};
22use uni_common::config::IndexRebuildConfig;
23use uni_common::core::schema::{IndexDefinition, IndexStatus, SchemaManager};
24use uni_common::core::snapshot::SnapshotManifest;
25use uuid::Uuid;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct IndexRebuildState {
30 pub tasks: Vec<IndexRebuildTask>,
32 pub last_updated: chrono::DateTime<Utc>,
34}
35
36impl Default for IndexRebuildState {
37 fn default() -> Self {
38 Self {
39 tasks: Vec::new(),
40 last_updated: Utc::now(),
41 }
42 }
43}
44
45pub struct RebuildTriggerChecker {
47 config: IndexRebuildConfig,
48}
49
50impl RebuildTriggerChecker {
51 pub fn new(config: IndexRebuildConfig) -> Self {
52 Self { config }
53 }
54
55 pub fn labels_needing_rebuild(
60 &self,
61 manifest: &SnapshotManifest,
62 indexes: &[IndexDefinition],
63 ) -> Vec<String> {
64 let mut labels: HashSet<String> = HashSet::new();
65 let now = Utc::now();
66
67 for idx in indexes {
68 let meta = idx.metadata();
69
70 if meta.status == IndexStatus::Building || meta.status == IndexStatus::Failed {
72 continue;
73 }
74
75 let label = idx.label();
76
77 if self.config.growth_trigger_ratio > 0.0
79 && let Some(built_count) = meta.row_count_at_build
80 {
81 let current_count = manifest.vertices.get(label).map(|ls| ls.count).unwrap_or(0);
82 let threshold =
83 (built_count as f64 * (1.0 + self.config.growth_trigger_ratio)) as u64;
84 if current_count > threshold {
85 labels.insert(label.to_string());
86 continue;
87 }
88 }
89
90 if let Some(max_age) = self.config.max_index_age
92 && let Some(built_at) = meta.last_built_at
93 {
94 let age = now.signed_duration_since(built_at);
95 if age.to_std().unwrap_or_default() > max_age {
96 labels.insert(label.to_string());
97 }
98 }
99 }
100
101 labels.into_iter().collect()
102 }
103}
104
105pub struct IndexRebuildManager {
111 storage: Arc<StorageManager>,
112 schema_manager: Arc<SchemaManager>,
113 tasks: Arc<RwLock<HashMap<String, IndexRebuildTask>>>,
114 config: IndexRebuildConfig,
115 store: Arc<dyn ObjectStore>,
116 state_path: ObjectPath,
117}
118
119impl IndexRebuildManager {
120 pub async fn new(
122 storage: Arc<StorageManager>,
123 schema_manager: Arc<SchemaManager>,
124 config: IndexRebuildConfig,
125 ) -> Result<Self> {
126 let store = storage.store();
127 let state_path = ObjectPath::from("index_rebuild_state.json");
128
129 let manager = Self {
130 storage,
131 schema_manager,
132 tasks: Arc::new(RwLock::new(HashMap::new())),
133 config,
134 store,
135 state_path,
136 };
137
138 manager.load_state().await?;
140
141 Ok(manager)
142 }
143
144 async fn load_state(&self) -> Result<()> {
146 match self.store.get(&self.state_path).await {
147 Ok(result) => {
148 let bytes = result.bytes().await?;
149 let state: IndexRebuildState = serde_json::from_slice(&bytes)?;
150
151 let mut tasks = self.tasks.write();
152 for task in state.tasks {
153 if task.status != IndexRebuildStatus::Completed {
155 let mut task = task;
157 if task.status == IndexRebuildStatus::InProgress {
158 task.status = IndexRebuildStatus::Pending;
159 task.started_at = None;
160 }
161 tasks.insert(task.id.clone(), task);
162 }
163 }
164 info!(
165 "Loaded {} pending index rebuild tasks from state",
166 tasks.len()
167 );
168 }
169 Err(object_store::Error::NotFound { .. }) => {
170 }
172 Err(e) => {
173 warn!("Failed to load index rebuild state: {}", e);
174 }
175 }
176 Ok(())
177 }
178
179 async fn save_state(&self) -> Result<()> {
181 let tasks: Vec<IndexRebuildTask> = self.tasks.read().values().cloned().collect();
182 let state = IndexRebuildState {
183 tasks,
184 last_updated: Utc::now(),
185 };
186 let bytes = serde_json::to_vec_pretty(&state)?;
187 self.store
188 .put(&self.state_path, bytes.into())
189 .await
190 .map_err(|e| anyhow!("Failed to save index rebuild state: {}", e))?;
191 Ok(())
192 }
193
194 pub async fn schedule(&self, labels: Vec<String>) -> Result<Vec<String>> {
198 let mut task_ids = Vec::with_capacity(labels.len());
199 let now = Utc::now();
200
201 {
202 let mut tasks = self.tasks.write();
203 for label in labels {
204 let existing = tasks
206 .values()
207 .find(|t| {
208 t.label == label
209 && (t.status == IndexRebuildStatus::Pending
210 || t.status == IndexRebuildStatus::InProgress)
211 })
212 .map(|t| t.id.clone());
213
214 if let Some(existing_id) = existing {
215 info!(
216 "Index rebuild for label '{}' already scheduled (task {})",
217 label, existing_id
218 );
219 task_ids.push(existing_id);
220 continue;
221 }
222
223 let task_id = Uuid::new_v4().to_string();
224 let task = IndexRebuildTask {
225 id: task_id.clone(),
226 label: label.clone(),
227 status: IndexRebuildStatus::Pending,
228 created_at: now,
229 started_at: None,
230 completed_at: None,
231 error: None,
232 retry_count: 0,
233 };
234 tasks.insert(task_id.clone(), task);
235 task_ids.push(task_id);
236 info!("Scheduled index rebuild for label '{}'", label);
237 }
238 }
239
240 self.save_state().await?;
242
243 Ok(task_ids)
244 }
245
246 pub fn status(&self) -> Vec<IndexRebuildTask> {
248 self.tasks.read().values().cloned().collect()
249 }
250
251 pub fn task_status(&self, task_id: &str) -> Option<IndexRebuildTask> {
253 self.tasks.read().get(task_id).cloned()
254 }
255
256 pub fn is_index_building(&self, label: &str) -> bool {
258 self.tasks.read().values().any(|t| {
259 t.label == label
260 && (t.status == IndexRebuildStatus::Pending
261 || t.status == IndexRebuildStatus::InProgress)
262 })
263 }
264
265 pub async fn retry_failed(&self) -> Result<Vec<String>> {
267 let mut retried = Vec::new();
268
269 {
270 let mut tasks = self.tasks.write();
271 for task in tasks.values_mut() {
272 if task.status == IndexRebuildStatus::Failed
273 && task.retry_count < self.config.max_retries
274 {
275 task.status = IndexRebuildStatus::Pending;
276 task.error = None;
277 task.started_at = None;
278 task.completed_at = None;
279 retried.push(task.id.clone());
280 info!(
281 "Task {} for label '{}' scheduled for retry (attempt {})",
282 task.id,
283 task.label,
284 task.retry_count + 1
285 );
286 }
287 }
288 }
289
290 if !retried.is_empty() {
291 self.save_state().await?;
292 }
293
294 Ok(retried)
295 }
296
297 pub async fn cancel(&self, task_id: &str) -> Result<()> {
299 {
300 let mut tasks = self.tasks.write();
301 if let Some(task) = tasks.get_mut(task_id) {
302 if task.status == IndexRebuildStatus::Pending {
303 tasks.remove(task_id);
304 info!("Cancelled index rebuild task {}", task_id);
305 } else if task.status == IndexRebuildStatus::InProgress {
306 return Err(anyhow!(
307 "Cannot cancel in-progress task. Wait for completion or restart."
308 ));
309 } else {
310 return Err(anyhow!("Task {} is already completed or failed", task_id));
311 }
312 } else {
313 return Err(anyhow!("Task {} not found", task_id));
314 }
315 }
316
317 self.save_state().await?;
318 Ok(())
319 }
320
321 pub async fn cleanup_completed(&self) -> Result<usize> {
323 let removed;
324 {
325 let mut tasks = self.tasks.write();
326 let before = tasks.len();
327 tasks.retain(|_, t| {
328 t.status == IndexRebuildStatus::Pending
329 || t.status == IndexRebuildStatus::InProgress
330 });
331 removed = before - tasks.len();
332 }
333
334 if removed > 0 {
335 self.save_state().await?;
336 }
337
338 Ok(removed)
339 }
340
341 pub fn start_background_worker(
346 self: Arc<Self>,
347 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
348 ) -> tokio::task::JoinHandle<()> {
349 tokio::spawn(async move {
350 let mut interval = tokio::time::interval(self.config.worker_check_interval);
351
352 loop {
353 tokio::select! {
354 _ = interval.tick() => {
355 self.process_next_pending_task().await;
356 }
357 _ = shutdown_rx.recv() => {
358 info!("Index rebuild worker shutting down");
359 let _ = self.save_state().await;
360 break;
361 }
362 }
363 }
364 })
365 }
366
367 async fn process_next_pending_task(self: &Arc<Self>) {
372 let task_to_process = {
374 let mut tasks = self.tasks.write();
375 let pending = tasks
376 .values_mut()
377 .find(|t| t.status == IndexRebuildStatus::Pending);
378
379 if let Some(task) = pending {
380 task.status = IndexRebuildStatus::InProgress;
381 task.started_at = Some(Utc::now());
382 Some((task.id.clone(), task.label.clone()))
383 } else {
384 None
385 }
386 };
387
388 let Some((task_id, label)) = task_to_process else {
389 return;
390 };
391
392 if let Err(e) = self.save_state().await {
394 error!("Failed to save state before processing: {}", e);
395 }
396
397 info!("Starting index rebuild for label '{}'", label);
398 self.set_index_status_for_label(&label, IndexStatus::Building);
399
400 let result = self.execute_rebuild(&label).await;
402
403 match result {
404 Ok(()) => self.handle_rebuild_success(&task_id, &label).await,
405 Err(e) => self.handle_rebuild_failure(&task_id, &label, e),
406 }
407
408 if let Err(e) = self.save_state().await {
410 error!("Failed to save state after processing: {}", e);
411 }
412 if let Err(e) = self.schema_manager.save().await {
413 error!("Failed to save schema after index rebuild: {}", e);
414 }
415 }
416
417 async fn handle_rebuild_success(&self, task_id: &str, label: &str) {
419 let now = Utc::now();
420 let row_count = self.get_label_row_count(label).await;
421
422 {
423 let mut tasks = self.tasks.write();
424 if let Some(task) = tasks.get_mut(task_id) {
425 task.status = IndexRebuildStatus::Completed;
426 task.completed_at = Some(now);
427 task.error = None;
428 }
429 }
430 info!("Index rebuild completed for label '{}'", label);
431
432 self.update_index_metadata_for_label(label, IndexStatus::Online, Some(now), row_count);
433 }
434
435 fn handle_rebuild_failure(self: &Arc<Self>, task_id: &str, label: &str, err: anyhow::Error) {
437 let (retry_count, exhausted) = {
438 let mut tasks = self.tasks.write();
439 if let Some(task) = tasks.get_mut(task_id) {
440 task.status = IndexRebuildStatus::Failed;
441 task.completed_at = Some(Utc::now());
442 task.error = Some(err.to_string());
443 task.retry_count += 1;
444 (
445 task.retry_count,
446 task.retry_count >= self.config.max_retries,
447 )
448 } else {
449 (0, true)
450 }
451 };
452 error!("Index rebuild failed for label '{}': {}", label, err);
453
454 if exhausted {
455 self.set_index_status_for_label(label, IndexStatus::Failed);
456 } else {
457 self.set_index_status_for_label(label, IndexStatus::Stale);
458 info!(
459 "Will retry index rebuild for '{}' after delay (attempt {}/{})",
460 label, retry_count, self.config.max_retries
461 );
462 let manager = self.clone();
463 let task_id_owned = task_id.to_string();
464 let delay = self.config.retry_delay;
465 tokio::spawn(async move {
466 tokio::time::sleep(delay).await;
467 let mut tasks = manager.tasks.write();
468 if let Some(task) = tasks.get_mut(&task_id_owned)
469 && task.status == IndexRebuildStatus::Failed
470 {
471 task.status = IndexRebuildStatus::Pending;
472 }
473 });
474 }
475 }
476
477 fn set_index_status_for_label(&self, label: &str, status: IndexStatus) {
479 let schema = self.schema_manager.schema();
480 for idx in &schema.indexes {
481 if idx.label() == label {
482 let _ = self.schema_manager.update_index_metadata(idx.name(), |m| {
483 m.status = status.clone();
484 });
485 }
486 }
487 }
488
489 fn update_index_metadata_for_label(
491 &self,
492 label: &str,
493 status: IndexStatus,
494 last_built_at: Option<chrono::DateTime<Utc>>,
495 row_count: Option<u64>,
496 ) {
497 let schema = self.schema_manager.schema();
498 for idx in &schema.indexes {
499 if idx.label() == label {
500 let _ = self.schema_manager.update_index_metadata(idx.name(), |m| {
501 m.status = status.clone();
502 if let Some(ts) = last_built_at {
503 m.last_built_at = Some(ts);
504 }
505 if let Some(count) = row_count {
506 m.row_count_at_build = Some(count);
507 }
508 });
509 }
510 }
511 }
512
513 async fn get_label_row_count(&self, label: &str) -> Option<u64> {
515 let manifest = self
516 .storage
517 .snapshot_manager()
518 .load_latest_snapshot()
519 .await
520 .ok()
521 .flatten()?;
522 manifest.vertices.get(label).map(|ls| ls.count)
523 }
524
525 #[cfg(feature = "lance-backend")]
527 async fn execute_rebuild(&self, label: &str) -> Result<()> {
528 let idx_mgr = IndexManager::new(
529 self.storage.base_path(),
530 self.schema_manager.clone(),
531 self.storage.backend_arc(),
532 );
533 idx_mgr.rebuild_indexes_for_label(label).await
534 }
535
536 #[cfg(not(feature = "lance-backend"))]
538 async fn execute_rebuild(&self, _label: &str) -> Result<()> {
539 Err(anyhow!("Index rebuild requires the lance-backend feature"))
540 }
541}
542
543#[cfg(test)]
544mod tests {
545 use super::*;
546 use uni_common::core::schema::IndexMetadata;
547
548 #[test]
549 fn test_index_rebuild_status_serialize() {
550 let status = IndexRebuildStatus::Pending;
551 let json = serde_json::to_string(&status).unwrap();
552 assert_eq!(json, "\"Pending\"");
553
554 let parsed: IndexRebuildStatus = serde_json::from_str(&json).unwrap();
555 assert_eq!(parsed, IndexRebuildStatus::Pending);
556 }
557
558 #[test]
559 fn test_index_rebuild_task_serialize() {
560 let task = IndexRebuildTask {
561 id: "test-id".to_string(),
562 label: "Person".to_string(),
563 status: IndexRebuildStatus::Pending,
564 created_at: Utc::now(),
565 started_at: None,
566 completed_at: None,
567 error: None,
568 retry_count: 0,
569 };
570
571 let json = serde_json::to_string(&task).unwrap();
572 let parsed: IndexRebuildTask = serde_json::from_str(&json).unwrap();
573 assert_eq!(parsed.id, task.id);
574 assert_eq!(parsed.label, task.label);
575 assert_eq!(parsed.status, task.status);
576 }
577
578 fn make_test_manifest(label: &str, count: u64) -> SnapshotManifest {
579 use uni_common::core::snapshot::LabelSnapshot;
580
581 let mut manifest = SnapshotManifest::new("test".into(), 1);
582 manifest.vertices.insert(
583 label.to_string(),
584 LabelSnapshot {
585 version: 1,
586 count,
587 lance_version: 0,
588 },
589 );
590 manifest
591 }
592
593 fn make_scalar_index(label: &str, status: IndexStatus, meta: IndexMetadata) -> IndexDefinition {
594 use uni_common::core::schema::{ScalarIndexConfig, ScalarIndexType};
595 IndexDefinition::Scalar(ScalarIndexConfig {
596 name: format!("idx_{}", label),
597 label: label.to_string(),
598 properties: vec!["prop".to_string()],
599 index_type: ScalarIndexType::BTree,
600 where_clause: None,
601 metadata: IndexMetadata { status, ..meta },
602 })
603 }
604
605 #[test]
606 fn test_trigger_growth_fires() {
607 let config = IndexRebuildConfig {
608 growth_trigger_ratio: 0.5,
609 ..Default::default()
610 };
611 let checker = RebuildTriggerChecker::new(config);
612
613 let manifest = make_test_manifest("Person", 151);
615 let indexes = vec![make_scalar_index(
616 "Person",
617 IndexStatus::Online,
618 IndexMetadata {
619 row_count_at_build: Some(100),
620 ..Default::default()
621 },
622 )];
623
624 let labels = checker.labels_needing_rebuild(&manifest, &indexes);
625 assert_eq!(labels.len(), 1);
626 assert_eq!(labels[0], "Person");
627 }
628
629 #[test]
630 fn test_trigger_growth_below_threshold() {
631 let config = IndexRebuildConfig {
632 growth_trigger_ratio: 0.5,
633 ..Default::default()
634 };
635 let checker = RebuildTriggerChecker::new(config);
636
637 let manifest = make_test_manifest("Person", 120);
639 let indexes = vec![make_scalar_index(
640 "Person",
641 IndexStatus::Online,
642 IndexMetadata {
643 row_count_at_build: Some(100),
644 ..Default::default()
645 },
646 )];
647
648 let labels = checker.labels_needing_rebuild(&manifest, &indexes);
649 assert!(labels.is_empty());
650 }
651
652 #[test]
653 fn test_trigger_time_based() {
654 let config = IndexRebuildConfig {
655 growth_trigger_ratio: 0.0, max_index_age: Some(std::time::Duration::from_secs(3600)), ..Default::default()
658 };
659 let checker = RebuildTriggerChecker::new(config);
660
661 let two_hours_ago = Utc::now() - chrono::Duration::hours(2);
663 let manifest = make_test_manifest("Person", 100);
664 let indexes = vec![make_scalar_index(
665 "Person",
666 IndexStatus::Online,
667 IndexMetadata {
668 last_built_at: Some(two_hours_ago),
669 row_count_at_build: Some(100),
670 ..Default::default()
671 },
672 )];
673
674 let labels = checker.labels_needing_rebuild(&manifest, &indexes);
675 assert_eq!(labels.len(), 1);
676 }
677
678 #[test]
679 fn test_trigger_skips_building_and_failed() {
680 let config = IndexRebuildConfig {
681 growth_trigger_ratio: 0.5,
682 ..Default::default()
683 };
684 let checker = RebuildTriggerChecker::new(config);
685
686 let manifest = make_test_manifest("Person", 151);
688 let building = vec![make_scalar_index(
689 "Person",
690 IndexStatus::Building,
691 IndexMetadata {
692 row_count_at_build: Some(100),
693 ..Default::default()
694 },
695 )];
696 assert!(
697 checker
698 .labels_needing_rebuild(&manifest, &building)
699 .is_empty()
700 );
701
702 let failed = vec![make_scalar_index(
704 "Person",
705 IndexStatus::Failed,
706 IndexMetadata {
707 row_count_at_build: Some(100),
708 ..Default::default()
709 },
710 )];
711 assert!(
712 checker
713 .labels_needing_rebuild(&manifest, &failed)
714 .is_empty()
715 );
716 }
717}