Skip to main content

uni_store/storage/
index_rebuild.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Background index rebuild manager for async index building during bulk loading.
5//!
6//! This module provides `IndexRebuildManager` which handles background index
7//! rebuilding with status tracking, retry logic, and persistence for restart recovery.
8
9#[cfg(feature = "lance-backend")]
10use crate::storage::index_manager::IndexManager;
11use crate::storage::index_manager::{IndexRebuildStatus, IndexRebuildTask};
12use crate::storage::manager::StorageManager;
13use anyhow::{Result, anyhow};
14use chrono::Utc;
15use object_store::ObjectStore;
16use object_store::path::Path as ObjectPath;
17use parking_lot::RwLock;
18use serde::{Deserialize, Serialize};
19use std::collections::{HashMap, HashSet};
20use std::sync::Arc;
21use tracing::{error, info, warn};
22use uni_common::config::IndexRebuildConfig;
23use uni_common::core::schema::{IndexDefinition, IndexStatus, SchemaManager};
24use uni_common::core::snapshot::SnapshotManifest;
25use uuid::Uuid;
26
27/// Persisted state for index rebuild tasks.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct IndexRebuildState {
30    /// All tracked tasks.
31    pub tasks: Vec<IndexRebuildTask>,
32    /// When this state was last updated.
33    pub last_updated: chrono::DateTime<Utc>,
34}
35
36impl Default for IndexRebuildState {
37    fn default() -> Self {
38        Self {
39            tasks: Vec::new(),
40            last_updated: Utc::now(),
41        }
42    }
43}
44
45/// Checks whether indexes need rebuilding based on growth and time thresholds.
46pub struct RebuildTriggerChecker {
47    config: IndexRebuildConfig,
48}
49
50impl RebuildTriggerChecker {
51    pub fn new(config: IndexRebuildConfig) -> Self {
52        Self { config }
53    }
54
55    /// Returns the list of labels whose indexes need rebuilding based on
56    /// configured growth and time thresholds.
57    ///
58    /// Skips indexes with `Building` or `Failed` status.
59    pub fn labels_needing_rebuild(
60        &self,
61        manifest: &SnapshotManifest,
62        indexes: &[IndexDefinition],
63    ) -> Vec<String> {
64        let mut labels: HashSet<String> = HashSet::new();
65        let now = Utc::now();
66
67        for idx in indexes {
68            let meta = idx.metadata();
69
70            // Skip indexes that are already being rebuilt or have permanently failed
71            if meta.status == IndexStatus::Building || meta.status == IndexStatus::Failed {
72                continue;
73            }
74
75            let label = idx.label();
76
77            // Growth trigger: current count exceeds row_count_at_build * (1 + ratio)
78            if self.config.growth_trigger_ratio > 0.0
79                && let Some(built_count) = meta.row_count_at_build
80            {
81                let current_count = manifest.vertices.get(label).map(|ls| ls.count).unwrap_or(0);
82                let threshold =
83                    (built_count as f64 * (1.0 + self.config.growth_trigger_ratio)) as u64;
84                if current_count > threshold {
85                    labels.insert(label.to_string());
86                    continue;
87                }
88            }
89
90            // Time trigger: index age exceeds max_index_age
91            if let Some(max_age) = self.config.max_index_age
92                && let Some(built_at) = meta.last_built_at
93            {
94                let age = now.signed_duration_since(built_at);
95                if age.to_std().unwrap_or_default() > max_age {
96                    labels.insert(label.to_string());
97                }
98            }
99        }
100
101        labels.into_iter().collect()
102    }
103}
104
105/// Manages background index rebuilding with status tracking and retry logic.
106///
107/// The manager maintains a queue of index rebuild tasks and processes them
108/// in the background. Tasks can be monitored via `status()` and retried
109/// via `retry_failed()`.
110pub struct IndexRebuildManager {
111    storage: Arc<StorageManager>,
112    schema_manager: Arc<SchemaManager>,
113    tasks: Arc<RwLock<HashMap<String, IndexRebuildTask>>>,
114    config: IndexRebuildConfig,
115    store: Arc<dyn ObjectStore>,
116    state_path: ObjectPath,
117}
118
119impl IndexRebuildManager {
120    /// Create a new IndexRebuildManager.
121    pub async fn new(
122        storage: Arc<StorageManager>,
123        schema_manager: Arc<SchemaManager>,
124        config: IndexRebuildConfig,
125    ) -> Result<Self> {
126        let store = storage.store();
127        let state_path = ObjectPath::from("index_rebuild_state.json");
128
129        let manager = Self {
130            storage,
131            schema_manager,
132            tasks: Arc::new(RwLock::new(HashMap::new())),
133            config,
134            store,
135            state_path,
136        };
137
138        // Load persisted state if it exists
139        manager.load_state().await?;
140
141        Ok(manager)
142    }
143
144    /// Load persisted state from storage.
145    async fn load_state(&self) -> Result<()> {
146        match self.store.get(&self.state_path).await {
147            Ok(result) => {
148                let bytes = result.bytes().await?;
149                let state: IndexRebuildState = serde_json::from_slice(&bytes)?;
150
151                let mut tasks = self.tasks.write();
152                for task in state.tasks {
153                    // Only restore non-completed tasks
154                    if task.status != IndexRebuildStatus::Completed {
155                        // Reset in-progress tasks to pending for retry
156                        let mut task = task;
157                        if task.status == IndexRebuildStatus::InProgress {
158                            task.status = IndexRebuildStatus::Pending;
159                            task.started_at = None;
160                        }
161                        tasks.insert(task.id.clone(), task);
162                    }
163                }
164                info!(
165                    "Loaded {} pending index rebuild tasks from state",
166                    tasks.len()
167                );
168            }
169            Err(object_store::Error::NotFound { .. }) => {
170                // No persisted state, start fresh
171            }
172            Err(e) => {
173                warn!("Failed to load index rebuild state: {}", e);
174            }
175        }
176        Ok(())
177    }
178
179    /// Save current state to storage.
180    async fn save_state(&self) -> Result<()> {
181        let tasks: Vec<IndexRebuildTask> = self.tasks.read().values().cloned().collect();
182        let state = IndexRebuildState {
183            tasks,
184            last_updated: Utc::now(),
185        };
186        let bytes = serde_json::to_vec_pretty(&state)?;
187        self.store
188            .put(&self.state_path, bytes.into())
189            .await
190            .map_err(|e| anyhow!("Failed to save index rebuild state: {}", e))?;
191        Ok(())
192    }
193
194    /// Schedule labels for background index rebuild.
195    ///
196    /// Returns the task IDs for the scheduled rebuilds.
197    pub async fn schedule(&self, labels: Vec<String>) -> Result<Vec<String>> {
198        let mut task_ids = Vec::with_capacity(labels.len());
199        let now = Utc::now();
200
201        {
202            let mut tasks = self.tasks.write();
203            for label in labels {
204                // Check if there's already a pending/in-progress task for this label
205                let existing = tasks
206                    .values()
207                    .find(|t| {
208                        t.label == label
209                            && (t.status == IndexRebuildStatus::Pending
210                                || t.status == IndexRebuildStatus::InProgress)
211                    })
212                    .map(|t| t.id.clone());
213
214                if let Some(existing_id) = existing {
215                    info!(
216                        "Index rebuild for label '{}' already scheduled (task {})",
217                        label, existing_id
218                    );
219                    task_ids.push(existing_id);
220                    continue;
221                }
222
223                let task_id = Uuid::new_v4().to_string();
224                let task = IndexRebuildTask {
225                    id: task_id.clone(),
226                    label: label.clone(),
227                    status: IndexRebuildStatus::Pending,
228                    created_at: now,
229                    started_at: None,
230                    completed_at: None,
231                    error: None,
232                    retry_count: 0,
233                };
234                tasks.insert(task_id.clone(), task);
235                task_ids.push(task_id);
236                info!("Scheduled index rebuild for label '{}'", label);
237            }
238        }
239
240        // Persist state
241        self.save_state().await?;
242
243        Ok(task_ids)
244    }
245
246    /// Get status of all tasks.
247    pub fn status(&self) -> Vec<IndexRebuildTask> {
248        self.tasks.read().values().cloned().collect()
249    }
250
251    /// Get status of a specific task by ID.
252    pub fn task_status(&self, task_id: &str) -> Option<IndexRebuildTask> {
253        self.tasks.read().get(task_id).cloned()
254    }
255
256    /// Check if a label has a pending or in-progress index rebuild.
257    pub fn is_index_building(&self, label: &str) -> bool {
258        self.tasks.read().values().any(|t| {
259            t.label == label
260                && (t.status == IndexRebuildStatus::Pending
261                    || t.status == IndexRebuildStatus::InProgress)
262        })
263    }
264
265    /// Retry all failed tasks.
266    pub async fn retry_failed(&self) -> Result<Vec<String>> {
267        let mut retried = Vec::new();
268
269        {
270            let mut tasks = self.tasks.write();
271            for task in tasks.values_mut() {
272                if task.status == IndexRebuildStatus::Failed
273                    && task.retry_count < self.config.max_retries
274                {
275                    task.status = IndexRebuildStatus::Pending;
276                    task.error = None;
277                    task.started_at = None;
278                    task.completed_at = None;
279                    retried.push(task.id.clone());
280                    info!(
281                        "Task {} for label '{}' scheduled for retry (attempt {})",
282                        task.id,
283                        task.label,
284                        task.retry_count + 1
285                    );
286                }
287            }
288        }
289
290        if !retried.is_empty() {
291            self.save_state().await?;
292        }
293
294        Ok(retried)
295    }
296
297    /// Cancel a pending task.
298    pub async fn cancel(&self, task_id: &str) -> Result<()> {
299        {
300            let mut tasks = self.tasks.write();
301            if let Some(task) = tasks.get_mut(task_id) {
302                if task.status == IndexRebuildStatus::Pending {
303                    tasks.remove(task_id);
304                    info!("Cancelled index rebuild task {}", task_id);
305                } else if task.status == IndexRebuildStatus::InProgress {
306                    return Err(anyhow!(
307                        "Cannot cancel in-progress task. Wait for completion or restart."
308                    ));
309                } else {
310                    return Err(anyhow!("Task {} is already completed or failed", task_id));
311                }
312            } else {
313                return Err(anyhow!("Task {} not found", task_id));
314            }
315        }
316
317        self.save_state().await?;
318        Ok(())
319    }
320
321    /// Remove completed/failed tasks from tracking.
322    pub async fn cleanup_completed(&self) -> Result<usize> {
323        let removed;
324        {
325            let mut tasks = self.tasks.write();
326            let before = tasks.len();
327            tasks.retain(|_, t| {
328                t.status == IndexRebuildStatus::Pending
329                    || t.status == IndexRebuildStatus::InProgress
330            });
331            removed = before - tasks.len();
332        }
333
334        if removed > 0 {
335            self.save_state().await?;
336        }
337
338        Ok(removed)
339    }
340
341    /// Start background worker that processes pending tasks.
342    ///
343    /// This spawns a tokio task that periodically checks for pending
344    /// tasks and processes them.
345    pub fn start_background_worker(
346        self: Arc<Self>,
347        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
348    ) -> tokio::task::JoinHandle<()> {
349        tokio::spawn(async move {
350            let mut interval = tokio::time::interval(self.config.worker_check_interval);
351
352            loop {
353                tokio::select! {
354                    _ = interval.tick() => {
355                        self.process_next_pending_task().await;
356                    }
357                    _ = shutdown_rx.recv() => {
358                        info!("Index rebuild worker shutting down");
359                        let _ = self.save_state().await;
360                        break;
361                    }
362                }
363            }
364        })
365    }
366
367    /// Claim and process the next pending index rebuild task, if any.
368    ///
369    /// Marks the task as in-progress, executes the rebuild, updates task
370    /// status and index metadata, and persists state.
371    async fn process_next_pending_task(self: &Arc<Self>) {
372        // Find and claim a pending task
373        let task_to_process = {
374            let mut tasks = self.tasks.write();
375            let pending = tasks
376                .values_mut()
377                .find(|t| t.status == IndexRebuildStatus::Pending);
378
379            if let Some(task) = pending {
380                task.status = IndexRebuildStatus::InProgress;
381                task.started_at = Some(Utc::now());
382                Some((task.id.clone(), task.label.clone()))
383            } else {
384                None
385            }
386        };
387
388        let Some((task_id, label)) = task_to_process else {
389            return;
390        };
391
392        // Save state before processing
393        if let Err(e) = self.save_state().await {
394            error!("Failed to save state before processing: {}", e);
395        }
396
397        info!("Starting index rebuild for label '{}'", label);
398        self.set_index_status_for_label(&label, IndexStatus::Building);
399
400        // Execute the index rebuild
401        let result = self.execute_rebuild(&label).await;
402
403        match result {
404            Ok(()) => self.handle_rebuild_success(&task_id, &label).await,
405            Err(e) => self.handle_rebuild_failure(&task_id, &label, e),
406        }
407
408        // Save state and schema after processing
409        if let Err(e) = self.save_state().await {
410            error!("Failed to save state after processing: {}", e);
411        }
412        if let Err(e) = self.schema_manager.save().await {
413            error!("Failed to save schema after index rebuild: {}", e);
414        }
415    }
416
417    /// Handle a successful index rebuild: mark task completed and update index metadata.
418    async fn handle_rebuild_success(&self, task_id: &str, label: &str) {
419        let now = Utc::now();
420        let row_count = self.get_label_row_count(label).await;
421
422        {
423            let mut tasks = self.tasks.write();
424            if let Some(task) = tasks.get_mut(task_id) {
425                task.status = IndexRebuildStatus::Completed;
426                task.completed_at = Some(now);
427                task.error = None;
428            }
429        }
430        info!("Index rebuild completed for label '{}'", label);
431
432        self.update_index_metadata_for_label(label, IndexStatus::Online, Some(now), row_count);
433    }
434
435    /// Handle a failed index rebuild: mark task failed and schedule retry if within limits.
436    fn handle_rebuild_failure(self: &Arc<Self>, task_id: &str, label: &str, err: anyhow::Error) {
437        let (retry_count, exhausted) = {
438            let mut tasks = self.tasks.write();
439            if let Some(task) = tasks.get_mut(task_id) {
440                task.status = IndexRebuildStatus::Failed;
441                task.completed_at = Some(Utc::now());
442                task.error = Some(err.to_string());
443                task.retry_count += 1;
444                (
445                    task.retry_count,
446                    task.retry_count >= self.config.max_retries,
447                )
448            } else {
449                (0, true)
450            }
451        };
452        error!("Index rebuild failed for label '{}': {}", label, err);
453
454        if exhausted {
455            self.set_index_status_for_label(label, IndexStatus::Failed);
456        } else {
457            self.set_index_status_for_label(label, IndexStatus::Stale);
458            info!(
459                "Will retry index rebuild for '{}' after delay (attempt {}/{})",
460                label, retry_count, self.config.max_retries
461            );
462            let manager = self.clone();
463            let task_id_owned = task_id.to_string();
464            let delay = self.config.retry_delay;
465            tokio::spawn(async move {
466                tokio::time::sleep(delay).await;
467                let mut tasks = manager.tasks.write();
468                if let Some(task) = tasks.get_mut(&task_id_owned)
469                    && task.status == IndexRebuildStatus::Failed
470                {
471                    task.status = IndexRebuildStatus::Pending;
472                }
473            });
474        }
475    }
476
477    /// Set the lifecycle status for all indexes on a given label.
478    fn set_index_status_for_label(&self, label: &str, status: IndexStatus) {
479        let schema = self.schema_manager.schema();
480        for idx in &schema.indexes {
481            if idx.label() == label {
482                let _ = self.schema_manager.update_index_metadata(idx.name(), |m| {
483                    m.status = status.clone();
484                });
485            }
486        }
487    }
488
489    /// Update index metadata (status, build time, row count) for all indexes on a label.
490    fn update_index_metadata_for_label(
491        &self,
492        label: &str,
493        status: IndexStatus,
494        last_built_at: Option<chrono::DateTime<Utc>>,
495        row_count: Option<u64>,
496    ) {
497        let schema = self.schema_manager.schema();
498        for idx in &schema.indexes {
499            if idx.label() == label {
500                let _ = self.schema_manager.update_index_metadata(idx.name(), |m| {
501                    m.status = status.clone();
502                    if let Some(ts) = last_built_at {
503                        m.last_built_at = Some(ts);
504                    }
505                    if let Some(count) = row_count {
506                        m.row_count_at_build = Some(count);
507                    }
508                });
509            }
510        }
511    }
512
513    /// Get the current row count for a label from the latest snapshot.
514    async fn get_label_row_count(&self, label: &str) -> Option<u64> {
515        let manifest = self
516            .storage
517            .snapshot_manager()
518            .load_latest_snapshot()
519            .await
520            .ok()
521            .flatten()?;
522        manifest.vertices.get(label).map(|ls| ls.count)
523    }
524
525    /// Execute the actual index rebuild for a label.
526    #[cfg(feature = "lance-backend")]
527    async fn execute_rebuild(&self, label: &str) -> Result<()> {
528        let idx_mgr = IndexManager::new(
529            self.storage.base_path(),
530            self.schema_manager.clone(),
531            self.storage.backend_arc(),
532        );
533        idx_mgr.rebuild_indexes_for_label(label).await
534    }
535
536    /// Execute the actual index rebuild for a label (stub when lance-backend is disabled).
537    #[cfg(not(feature = "lance-backend"))]
538    async fn execute_rebuild(&self, _label: &str) -> Result<()> {
539        Err(anyhow!("Index rebuild requires the lance-backend feature"))
540    }
541}
542
543#[cfg(test)]
544mod tests {
545    use super::*;
546    use uni_common::core::schema::IndexMetadata;
547
548    #[test]
549    fn test_index_rebuild_status_serialize() {
550        let status = IndexRebuildStatus::Pending;
551        let json = serde_json::to_string(&status).unwrap();
552        assert_eq!(json, "\"Pending\"");
553
554        let parsed: IndexRebuildStatus = serde_json::from_str(&json).unwrap();
555        assert_eq!(parsed, IndexRebuildStatus::Pending);
556    }
557
558    #[test]
559    fn test_index_rebuild_task_serialize() {
560        let task = IndexRebuildTask {
561            id: "test-id".to_string(),
562            label: "Person".to_string(),
563            status: IndexRebuildStatus::Pending,
564            created_at: Utc::now(),
565            started_at: None,
566            completed_at: None,
567            error: None,
568            retry_count: 0,
569        };
570
571        let json = serde_json::to_string(&task).unwrap();
572        let parsed: IndexRebuildTask = serde_json::from_str(&json).unwrap();
573        assert_eq!(parsed.id, task.id);
574        assert_eq!(parsed.label, task.label);
575        assert_eq!(parsed.status, task.status);
576    }
577
578    fn make_test_manifest(label: &str, count: u64) -> SnapshotManifest {
579        use uni_common::core::snapshot::LabelSnapshot;
580
581        let mut manifest = SnapshotManifest::new("test".into(), 1);
582        manifest.vertices.insert(
583            label.to_string(),
584            LabelSnapshot {
585                version: 1,
586                count,
587                lance_version: 0,
588            },
589        );
590        manifest
591    }
592
593    fn make_scalar_index(label: &str, status: IndexStatus, meta: IndexMetadata) -> IndexDefinition {
594        use uni_common::core::schema::{ScalarIndexConfig, ScalarIndexType};
595        IndexDefinition::Scalar(ScalarIndexConfig {
596            name: format!("idx_{}", label),
597            label: label.to_string(),
598            properties: vec!["prop".to_string()],
599            index_type: ScalarIndexType::BTree,
600            where_clause: None,
601            metadata: IndexMetadata { status, ..meta },
602        })
603    }
604
605    #[test]
606    fn test_trigger_growth_fires() {
607        let config = IndexRebuildConfig {
608            growth_trigger_ratio: 0.5,
609            ..Default::default()
610        };
611        let checker = RebuildTriggerChecker::new(config);
612
613        // Built at 100 rows, now 151 (> 100 * 1.5 = 150)
614        let manifest = make_test_manifest("Person", 151);
615        let indexes = vec![make_scalar_index(
616            "Person",
617            IndexStatus::Online,
618            IndexMetadata {
619                row_count_at_build: Some(100),
620                ..Default::default()
621            },
622        )];
623
624        let labels = checker.labels_needing_rebuild(&manifest, &indexes);
625        assert_eq!(labels.len(), 1);
626        assert_eq!(labels[0], "Person");
627    }
628
629    #[test]
630    fn test_trigger_growth_below_threshold() {
631        let config = IndexRebuildConfig {
632            growth_trigger_ratio: 0.5,
633            ..Default::default()
634        };
635        let checker = RebuildTriggerChecker::new(config);
636
637        // Built at 100 rows, now 120 (< 100 * 1.5 = 150)
638        let manifest = make_test_manifest("Person", 120);
639        let indexes = vec![make_scalar_index(
640            "Person",
641            IndexStatus::Online,
642            IndexMetadata {
643                row_count_at_build: Some(100),
644                ..Default::default()
645            },
646        )];
647
648        let labels = checker.labels_needing_rebuild(&manifest, &indexes);
649        assert!(labels.is_empty());
650    }
651
652    #[test]
653    fn test_trigger_time_based() {
654        let config = IndexRebuildConfig {
655            growth_trigger_ratio: 0.0, // disable growth trigger
656            max_index_age: Some(std::time::Duration::from_secs(3600)), // 1 hour
657            ..Default::default()
658        };
659        let checker = RebuildTriggerChecker::new(config);
660
661        // Built 2 hours ago
662        let two_hours_ago = Utc::now() - chrono::Duration::hours(2);
663        let manifest = make_test_manifest("Person", 100);
664        let indexes = vec![make_scalar_index(
665            "Person",
666            IndexStatus::Online,
667            IndexMetadata {
668                last_built_at: Some(two_hours_ago),
669                row_count_at_build: Some(100),
670                ..Default::default()
671            },
672        )];
673
674        let labels = checker.labels_needing_rebuild(&manifest, &indexes);
675        assert_eq!(labels.len(), 1);
676    }
677
678    #[test]
679    fn test_trigger_skips_building_and_failed() {
680        let config = IndexRebuildConfig {
681            growth_trigger_ratio: 0.5,
682            ..Default::default()
683        };
684        let checker = RebuildTriggerChecker::new(config);
685
686        // Would trigger (151 > 150), but status is Building
687        let manifest = make_test_manifest("Person", 151);
688        let building = vec![make_scalar_index(
689            "Person",
690            IndexStatus::Building,
691            IndexMetadata {
692                row_count_at_build: Some(100),
693                ..Default::default()
694            },
695        )];
696        assert!(
697            checker
698                .labels_needing_rebuild(&manifest, &building)
699                .is_empty()
700        );
701
702        // Same with Failed status
703        let failed = vec![make_scalar_index(
704            "Person",
705            IndexStatus::Failed,
706            IndexMetadata {
707                row_count_at_build: Some(100),
708                ..Default::default()
709            },
710        )];
711        assert!(
712            checker
713                .labels_needing_rebuild(&manifest, &failed)
714                .is_empty()
715        );
716    }
717}