1use crate::storage::index_manager::{IndexManager, IndexRebuildStatus, IndexRebuildTask};
10use crate::storage::manager::StorageManager;
11use anyhow::{Result, anyhow};
12use chrono::Utc;
13use object_store::ObjectStore;
14use object_store::path::Path as ObjectPath;
15use parking_lot::RwLock;
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18use std::sync::Arc;
19use tracing::{error, info, warn};
20use uni_common::config::IndexRebuildConfig;
21use uni_common::core::schema::{IndexDefinition, IndexStatus, SchemaManager};
22use uni_common::core::snapshot::SnapshotManifest;
23use uuid::Uuid;
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct IndexRebuildState {
28 pub tasks: Vec<IndexRebuildTask>,
30 pub last_updated: chrono::DateTime<Utc>,
32}
33
34impl Default for IndexRebuildState {
35 fn default() -> Self {
36 Self {
37 tasks: Vec::new(),
38 last_updated: Utc::now(),
39 }
40 }
41}
42
43pub struct RebuildTriggerChecker {
45 config: IndexRebuildConfig,
46}
47
48impl RebuildTriggerChecker {
49 pub fn new(config: IndexRebuildConfig) -> Self {
50 Self { config }
51 }
52
53 pub fn labels_needing_rebuild(
58 &self,
59 manifest: &SnapshotManifest,
60 indexes: &[IndexDefinition],
61 ) -> Vec<String> {
62 let mut labels: HashSet<String> = HashSet::new();
63 let now = Utc::now();
64
65 for idx in indexes {
66 let meta = idx.metadata();
67
68 if meta.status == IndexStatus::Building || meta.status == IndexStatus::Failed {
70 continue;
71 }
72
73 let label = idx.label();
74
75 if self.config.growth_trigger_ratio > 0.0
77 && let Some(built_count) = meta.row_count_at_build
78 {
79 let current_count = manifest.vertices.get(label).map(|ls| ls.count).unwrap_or(0);
80 let threshold =
81 (built_count as f64 * (1.0 + self.config.growth_trigger_ratio)) as u64;
82 if current_count > threshold {
83 labels.insert(label.to_string());
84 continue;
85 }
86 }
87
88 if let Some(max_age) = self.config.max_index_age
90 && let Some(built_at) = meta.last_built_at
91 {
92 let age = now.signed_duration_since(built_at);
93 if age.to_std().unwrap_or_default() > max_age {
94 labels.insert(label.to_string());
95 }
96 }
97 }
98
99 labels.into_iter().collect()
100 }
101}
102
103pub struct IndexRebuildManager {
109 storage: Arc<StorageManager>,
110 schema_manager: Arc<SchemaManager>,
111 tasks: Arc<RwLock<HashMap<String, IndexRebuildTask>>>,
112 config: IndexRebuildConfig,
113 store: Arc<dyn ObjectStore>,
114 state_path: ObjectPath,
115}
116
117impl IndexRebuildManager {
118 pub async fn new(
120 storage: Arc<StorageManager>,
121 schema_manager: Arc<SchemaManager>,
122 config: IndexRebuildConfig,
123 ) -> Result<Self> {
124 let store = storage.store();
125 let state_path = ObjectPath::from("index_rebuild_state.json");
126
127 let manager = Self {
128 storage,
129 schema_manager,
130 tasks: Arc::new(RwLock::new(HashMap::new())),
131 config,
132 store,
133 state_path,
134 };
135
136 manager.load_state().await?;
138
139 Ok(manager)
140 }
141
142 async fn load_state(&self) -> Result<()> {
144 match self.store.get(&self.state_path).await {
145 Ok(result) => {
146 let bytes = result.bytes().await?;
147 let state: IndexRebuildState = serde_json::from_slice(&bytes)?;
148
149 let mut tasks = self.tasks.write();
150 for task in state.tasks {
151 if task.status != IndexRebuildStatus::Completed {
153 let mut task = task;
155 if task.status == IndexRebuildStatus::InProgress {
156 task.status = IndexRebuildStatus::Pending;
157 task.started_at = None;
158 }
159 tasks.insert(task.id.clone(), task);
160 }
161 }
162 info!(
163 "Loaded {} pending index rebuild tasks from state",
164 tasks.len()
165 );
166 }
167 Err(object_store::Error::NotFound { .. }) => {
168 }
170 Err(e) => {
171 warn!("Failed to load index rebuild state: {}", e);
172 }
173 }
174 Ok(())
175 }
176
177 async fn save_state(&self) -> Result<()> {
179 let tasks: Vec<IndexRebuildTask> = self.tasks.read().values().cloned().collect();
180 let state = IndexRebuildState {
181 tasks,
182 last_updated: Utc::now(),
183 };
184 let bytes = serde_json::to_vec_pretty(&state)?;
185 self.store
186 .put(&self.state_path, bytes.into())
187 .await
188 .map_err(|e| anyhow!("Failed to save index rebuild state: {}", e))?;
189 Ok(())
190 }
191
192 pub async fn schedule(&self, labels: Vec<String>) -> Result<Vec<String>> {
196 let mut task_ids = Vec::with_capacity(labels.len());
197 let now = Utc::now();
198
199 {
200 let mut tasks = self.tasks.write();
201 for label in labels {
202 let existing = tasks
204 .values()
205 .find(|t| {
206 t.label == label
207 && (t.status == IndexRebuildStatus::Pending
208 || t.status == IndexRebuildStatus::InProgress)
209 })
210 .map(|t| t.id.clone());
211
212 if let Some(existing_id) = existing {
213 info!(
214 "Index rebuild for label '{}' already scheduled (task {})",
215 label, existing_id
216 );
217 task_ids.push(existing_id);
218 continue;
219 }
220
221 let task_id = Uuid::new_v4().to_string();
222 let task = IndexRebuildTask {
223 id: task_id.clone(),
224 label: label.clone(),
225 status: IndexRebuildStatus::Pending,
226 created_at: now,
227 started_at: None,
228 completed_at: None,
229 error: None,
230 retry_count: 0,
231 };
232 tasks.insert(task_id.clone(), task);
233 task_ids.push(task_id);
234 info!("Scheduled index rebuild for label '{}'", label);
235 }
236 }
237
238 self.save_state().await?;
240
241 Ok(task_ids)
242 }
243
244 pub fn status(&self) -> Vec<IndexRebuildTask> {
246 self.tasks.read().values().cloned().collect()
247 }
248
249 pub fn task_status(&self, task_id: &str) -> Option<IndexRebuildTask> {
251 self.tasks.read().get(task_id).cloned()
252 }
253
254 pub fn is_index_building(&self, label: &str) -> bool {
256 self.tasks.read().values().any(|t| {
257 t.label == label
258 && (t.status == IndexRebuildStatus::Pending
259 || t.status == IndexRebuildStatus::InProgress)
260 })
261 }
262
263 pub async fn retry_failed(&self) -> Result<Vec<String>> {
265 let mut retried = Vec::new();
266
267 {
268 let mut tasks = self.tasks.write();
269 for task in tasks.values_mut() {
270 if task.status == IndexRebuildStatus::Failed
271 && task.retry_count < self.config.max_retries
272 {
273 task.status = IndexRebuildStatus::Pending;
274 task.error = None;
275 task.started_at = None;
276 task.completed_at = None;
277 retried.push(task.id.clone());
278 info!(
279 "Task {} for label '{}' scheduled for retry (attempt {})",
280 task.id,
281 task.label,
282 task.retry_count + 1
283 );
284 }
285 }
286 }
287
288 if !retried.is_empty() {
289 self.save_state().await?;
290 }
291
292 Ok(retried)
293 }
294
295 pub async fn cancel(&self, task_id: &str) -> Result<()> {
297 {
298 let mut tasks = self.tasks.write();
299 if let Some(task) = tasks.get_mut(task_id) {
300 if task.status == IndexRebuildStatus::Pending {
301 tasks.remove(task_id);
302 info!("Cancelled index rebuild task {}", task_id);
303 } else if task.status == IndexRebuildStatus::InProgress {
304 return Err(anyhow!(
305 "Cannot cancel in-progress task. Wait for completion or restart."
306 ));
307 } else {
308 return Err(anyhow!("Task {} is already completed or failed", task_id));
309 }
310 } else {
311 return Err(anyhow!("Task {} not found", task_id));
312 }
313 }
314
315 self.save_state().await?;
316 Ok(())
317 }
318
319 pub async fn cleanup_completed(&self) -> Result<usize> {
321 let removed;
322 {
323 let mut tasks = self.tasks.write();
324 let before = tasks.len();
325 tasks.retain(|_, t| {
326 t.status == IndexRebuildStatus::Pending
327 || t.status == IndexRebuildStatus::InProgress
328 });
329 removed = before - tasks.len();
330 }
331
332 if removed > 0 {
333 self.save_state().await?;
334 }
335
336 Ok(removed)
337 }
338
339 pub fn start_background_worker(
344 self: Arc<Self>,
345 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
346 ) -> tokio::task::JoinHandle<()> {
347 tokio::spawn(async move {
348 let mut interval = tokio::time::interval(self.config.worker_check_interval);
349
350 loop {
351 tokio::select! {
352 _ = interval.tick() => {
353 self.process_next_pending_task().await;
354 }
355 _ = shutdown_rx.recv() => {
356 info!("Index rebuild worker shutting down");
357 let _ = self.save_state().await;
358 break;
359 }
360 }
361 }
362 })
363 }
364
365 async fn process_next_pending_task(self: &Arc<Self>) {
370 let task_to_process = {
372 let mut tasks = self.tasks.write();
373 let pending = tasks
374 .values_mut()
375 .find(|t| t.status == IndexRebuildStatus::Pending);
376
377 if let Some(task) = pending {
378 task.status = IndexRebuildStatus::InProgress;
379 task.started_at = Some(Utc::now());
380 Some((task.id.clone(), task.label.clone()))
381 } else {
382 None
383 }
384 };
385
386 let Some((task_id, label)) = task_to_process else {
387 return;
388 };
389
390 if let Err(e) = self.save_state().await {
392 error!("Failed to save state before processing: {}", e);
393 }
394
395 info!("Starting index rebuild for label '{}'", label);
396 self.set_index_status_for_label(&label, IndexStatus::Building);
397
398 let result = self.execute_rebuild(&label).await;
400
401 match result {
402 Ok(()) => self.handle_rebuild_success(&task_id, &label).await,
403 Err(e) => self.handle_rebuild_failure(&task_id, &label, e),
404 }
405
406 if let Err(e) = self.save_state().await {
408 error!("Failed to save state after processing: {}", e);
409 }
410 if let Err(e) = self.schema_manager.save().await {
411 error!("Failed to save schema after index rebuild: {}", e);
412 }
413 }
414
415 async fn handle_rebuild_success(&self, task_id: &str, label: &str) {
417 let now = Utc::now();
418 let row_count = self.get_label_row_count(label).await;
419
420 {
421 let mut tasks = self.tasks.write();
422 if let Some(task) = tasks.get_mut(task_id) {
423 task.status = IndexRebuildStatus::Completed;
424 task.completed_at = Some(now);
425 task.error = None;
426 }
427 }
428 info!("Index rebuild completed for label '{}'", label);
429
430 self.update_index_metadata_for_label(label, IndexStatus::Online, Some(now), row_count);
431 }
432
433 fn handle_rebuild_failure(self: &Arc<Self>, task_id: &str, label: &str, err: anyhow::Error) {
435 let (retry_count, exhausted) = {
436 let mut tasks = self.tasks.write();
437 if let Some(task) = tasks.get_mut(task_id) {
438 task.status = IndexRebuildStatus::Failed;
439 task.completed_at = Some(Utc::now());
440 task.error = Some(err.to_string());
441 task.retry_count += 1;
442 (
443 task.retry_count,
444 task.retry_count >= self.config.max_retries,
445 )
446 } else {
447 (0, true)
448 }
449 };
450 error!("Index rebuild failed for label '{}': {}", label, err);
451
452 if exhausted {
453 self.set_index_status_for_label(label, IndexStatus::Failed);
454 } else {
455 self.set_index_status_for_label(label, IndexStatus::Stale);
456 info!(
457 "Will retry index rebuild for '{}' after delay (attempt {}/{})",
458 label, retry_count, self.config.max_retries
459 );
460 let manager = self.clone();
461 let task_id_owned = task_id.to_string();
462 let delay = self.config.retry_delay;
463 tokio::spawn(async move {
464 tokio::time::sleep(delay).await;
465 let mut tasks = manager.tasks.write();
466 if let Some(task) = tasks.get_mut(&task_id_owned)
467 && task.status == IndexRebuildStatus::Failed
468 {
469 task.status = IndexRebuildStatus::Pending;
470 }
471 });
472 }
473 }
474
475 fn set_index_status_for_label(&self, label: &str, status: IndexStatus) {
477 let schema = self.schema_manager.schema();
478 for idx in &schema.indexes {
479 if idx.label() == label {
480 let _ = self.schema_manager.update_index_metadata(idx.name(), |m| {
481 m.status = status.clone();
482 });
483 }
484 }
485 }
486
487 fn update_index_metadata_for_label(
489 &self,
490 label: &str,
491 status: IndexStatus,
492 last_built_at: Option<chrono::DateTime<Utc>>,
493 row_count: Option<u64>,
494 ) {
495 let schema = self.schema_manager.schema();
496 for idx in &schema.indexes {
497 if idx.label() == label {
498 let _ = self.schema_manager.update_index_metadata(idx.name(), |m| {
499 m.status = status.clone();
500 if let Some(ts) = last_built_at {
501 m.last_built_at = Some(ts);
502 }
503 if let Some(count) = row_count {
504 m.row_count_at_build = Some(count);
505 }
506 });
507 }
508 }
509 }
510
511 async fn get_label_row_count(&self, label: &str) -> Option<u64> {
513 let manifest = self
514 .storage
515 .snapshot_manager()
516 .load_latest_snapshot()
517 .await
518 .ok()
519 .flatten()?;
520 manifest.vertices.get(label).map(|ls| ls.count)
521 }
522
523 async fn execute_rebuild(&self, label: &str) -> Result<()> {
525 let idx_mgr = IndexManager::new(
526 self.storage.base_path(),
527 self.schema_manager.clone(),
528 self.storage.lancedb_store_arc(),
529 );
530 idx_mgr.rebuild_indexes_for_label(label).await
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use super::*;
537 use uni_common::core::schema::IndexMetadata;
538
539 #[test]
540 fn test_index_rebuild_status_serialize() {
541 let status = IndexRebuildStatus::Pending;
542 let json = serde_json::to_string(&status).unwrap();
543 assert_eq!(json, "\"Pending\"");
544
545 let parsed: IndexRebuildStatus = serde_json::from_str(&json).unwrap();
546 assert_eq!(parsed, IndexRebuildStatus::Pending);
547 }
548
549 #[test]
550 fn test_index_rebuild_task_serialize() {
551 let task = IndexRebuildTask {
552 id: "test-id".to_string(),
553 label: "Person".to_string(),
554 status: IndexRebuildStatus::Pending,
555 created_at: Utc::now(),
556 started_at: None,
557 completed_at: None,
558 error: None,
559 retry_count: 0,
560 };
561
562 let json = serde_json::to_string(&task).unwrap();
563 let parsed: IndexRebuildTask = serde_json::from_str(&json).unwrap();
564 assert_eq!(parsed.id, task.id);
565 assert_eq!(parsed.label, task.label);
566 assert_eq!(parsed.status, task.status);
567 }
568
569 fn make_test_manifest(label: &str, count: u64) -> SnapshotManifest {
570 use uni_common::core::snapshot::LabelSnapshot;
571
572 let mut manifest = SnapshotManifest::new("test".into(), 1);
573 manifest.vertices.insert(
574 label.to_string(),
575 LabelSnapshot {
576 version: 1,
577 count,
578 lance_version: 0,
579 },
580 );
581 manifest
582 }
583
584 fn make_scalar_index(label: &str, status: IndexStatus, meta: IndexMetadata) -> IndexDefinition {
585 use uni_common::core::schema::{ScalarIndexConfig, ScalarIndexType};
586 IndexDefinition::Scalar(ScalarIndexConfig {
587 name: format!("idx_{}", label),
588 label: label.to_string(),
589 properties: vec!["prop".to_string()],
590 index_type: ScalarIndexType::BTree,
591 where_clause: None,
592 metadata: IndexMetadata { status, ..meta },
593 })
594 }
595
596 #[test]
597 fn test_trigger_growth_fires() {
598 let config = IndexRebuildConfig {
599 growth_trigger_ratio: 0.5,
600 ..Default::default()
601 };
602 let checker = RebuildTriggerChecker::new(config);
603
604 let manifest = make_test_manifest("Person", 151);
606 let indexes = vec![make_scalar_index(
607 "Person",
608 IndexStatus::Online,
609 IndexMetadata {
610 row_count_at_build: Some(100),
611 ..Default::default()
612 },
613 )];
614
615 let labels = checker.labels_needing_rebuild(&manifest, &indexes);
616 assert_eq!(labels.len(), 1);
617 assert_eq!(labels[0], "Person");
618 }
619
620 #[test]
621 fn test_trigger_growth_below_threshold() {
622 let config = IndexRebuildConfig {
623 growth_trigger_ratio: 0.5,
624 ..Default::default()
625 };
626 let checker = RebuildTriggerChecker::new(config);
627
628 let manifest = make_test_manifest("Person", 120);
630 let indexes = vec![make_scalar_index(
631 "Person",
632 IndexStatus::Online,
633 IndexMetadata {
634 row_count_at_build: Some(100),
635 ..Default::default()
636 },
637 )];
638
639 let labels = checker.labels_needing_rebuild(&manifest, &indexes);
640 assert!(labels.is_empty());
641 }
642
643 #[test]
644 fn test_trigger_time_based() {
645 let config = IndexRebuildConfig {
646 growth_trigger_ratio: 0.0, max_index_age: Some(std::time::Duration::from_secs(3600)), ..Default::default()
649 };
650 let checker = RebuildTriggerChecker::new(config);
651
652 let two_hours_ago = Utc::now() - chrono::Duration::hours(2);
654 let manifest = make_test_manifest("Person", 100);
655 let indexes = vec![make_scalar_index(
656 "Person",
657 IndexStatus::Online,
658 IndexMetadata {
659 last_built_at: Some(two_hours_ago),
660 row_count_at_build: Some(100),
661 ..Default::default()
662 },
663 )];
664
665 let labels = checker.labels_needing_rebuild(&manifest, &indexes);
666 assert_eq!(labels.len(), 1);
667 }
668
669 #[test]
670 fn test_trigger_skips_building_and_failed() {
671 let config = IndexRebuildConfig {
672 growth_trigger_ratio: 0.5,
673 ..Default::default()
674 };
675 let checker = RebuildTriggerChecker::new(config);
676
677 let manifest = make_test_manifest("Person", 151);
679 let building = vec![make_scalar_index(
680 "Person",
681 IndexStatus::Building,
682 IndexMetadata {
683 row_count_at_build: Some(100),
684 ..Default::default()
685 },
686 )];
687 assert!(
688 checker
689 .labels_needing_rebuild(&manifest, &building)
690 .is_empty()
691 );
692
693 let failed = vec![make_scalar_index(
695 "Person",
696 IndexStatus::Failed,
697 IndexMetadata {
698 row_count_at_build: Some(100),
699 ..Default::default()
700 },
701 )];
702 assert!(
703 checker
704 .labels_needing_rebuild(&manifest, &failed)
705 .is_empty()
706 );
707 }
708}