Skip to main content

uni_store/storage/
index_rebuild.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Background index rebuild manager for async index building during bulk loading.
5//!
6//! This module provides `IndexRebuildManager` which handles background index
7//! rebuilding with status tracking, retry logic, and persistence for restart recovery.
8
9use crate::storage::index_manager::{IndexManager, IndexRebuildStatus, IndexRebuildTask};
10use crate::storage::manager::StorageManager;
11use anyhow::{Result, anyhow};
12use chrono::Utc;
13use object_store::ObjectStore;
14use object_store::path::Path as ObjectPath;
15use parking_lot::RwLock;
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18use std::sync::Arc;
19use tracing::{error, info, warn};
20use uni_common::config::IndexRebuildConfig;
21use uni_common::core::schema::{IndexDefinition, IndexStatus, SchemaManager};
22use uni_common::core::snapshot::SnapshotManifest;
23use uuid::Uuid;
24
25/// Persisted state for index rebuild tasks.
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct IndexRebuildState {
28    /// All tracked tasks.
29    pub tasks: Vec<IndexRebuildTask>,
30    /// When this state was last updated.
31    pub last_updated: chrono::DateTime<Utc>,
32}
33
34impl Default for IndexRebuildState {
35    fn default() -> Self {
36        Self {
37            tasks: Vec::new(),
38            last_updated: Utc::now(),
39        }
40    }
41}
42
43/// Checks whether indexes need rebuilding based on growth and time thresholds.
44pub struct RebuildTriggerChecker {
45    config: IndexRebuildConfig,
46}
47
48impl RebuildTriggerChecker {
49    pub fn new(config: IndexRebuildConfig) -> Self {
50        Self { config }
51    }
52
53    /// Returns the list of labels whose indexes need rebuilding based on
54    /// configured growth and time thresholds.
55    ///
56    /// Skips indexes with `Building` or `Failed` status.
57    pub fn labels_needing_rebuild(
58        &self,
59        manifest: &SnapshotManifest,
60        indexes: &[IndexDefinition],
61    ) -> Vec<String> {
62        let mut labels: HashSet<String> = HashSet::new();
63        let now = Utc::now();
64
65        for idx in indexes {
66            let meta = idx.metadata();
67
68            // Skip indexes that are already being rebuilt or have permanently failed
69            if meta.status == IndexStatus::Building || meta.status == IndexStatus::Failed {
70                continue;
71            }
72
73            let label = idx.label();
74
75            // Growth trigger: current count exceeds row_count_at_build * (1 + ratio)
76            if self.config.growth_trigger_ratio > 0.0
77                && let Some(built_count) = meta.row_count_at_build
78            {
79                let current_count = manifest.vertices.get(label).map(|ls| ls.count).unwrap_or(0);
80                let threshold =
81                    (built_count as f64 * (1.0 + self.config.growth_trigger_ratio)) as u64;
82                if current_count > threshold {
83                    labels.insert(label.to_string());
84                    continue;
85                }
86            }
87
88            // Time trigger: index age exceeds max_index_age
89            if let Some(max_age) = self.config.max_index_age
90                && let Some(built_at) = meta.last_built_at
91            {
92                let age = now.signed_duration_since(built_at);
93                if age.to_std().unwrap_or_default() > max_age {
94                    labels.insert(label.to_string());
95                }
96            }
97        }
98
99        labels.into_iter().collect()
100    }
101}
102
103/// Manages background index rebuilding with status tracking and retry logic.
104///
105/// The manager maintains a queue of index rebuild tasks and processes them
106/// in the background. Tasks can be monitored via `status()` and retried
107/// via `retry_failed()`.
108pub struct IndexRebuildManager {
109    storage: Arc<StorageManager>,
110    schema_manager: Arc<SchemaManager>,
111    tasks: Arc<RwLock<HashMap<String, IndexRebuildTask>>>,
112    config: IndexRebuildConfig,
113    store: Arc<dyn ObjectStore>,
114    state_path: ObjectPath,
115}
116
117impl IndexRebuildManager {
118    /// Create a new IndexRebuildManager.
119    pub async fn new(
120        storage: Arc<StorageManager>,
121        schema_manager: Arc<SchemaManager>,
122        config: IndexRebuildConfig,
123    ) -> Result<Self> {
124        let store = storage.store();
125        let state_path = ObjectPath::from("index_rebuild_state.json");
126
127        let manager = Self {
128            storage,
129            schema_manager,
130            tasks: Arc::new(RwLock::new(HashMap::new())),
131            config,
132            store,
133            state_path,
134        };
135
136        // Load persisted state if it exists
137        manager.load_state().await?;
138
139        Ok(manager)
140    }
141
142    /// Load persisted state from storage.
143    async fn load_state(&self) -> Result<()> {
144        match self.store.get(&self.state_path).await {
145            Ok(result) => {
146                let bytes = result.bytes().await?;
147                let state: IndexRebuildState = serde_json::from_slice(&bytes)?;
148
149                let mut tasks = self.tasks.write();
150                for task in state.tasks {
151                    // Only restore non-completed tasks
152                    if task.status != IndexRebuildStatus::Completed {
153                        // Reset in-progress tasks to pending for retry
154                        let mut task = task;
155                        if task.status == IndexRebuildStatus::InProgress {
156                            task.status = IndexRebuildStatus::Pending;
157                            task.started_at = None;
158                        }
159                        tasks.insert(task.id.clone(), task);
160                    }
161                }
162                info!(
163                    "Loaded {} pending index rebuild tasks from state",
164                    tasks.len()
165                );
166            }
167            Err(object_store::Error::NotFound { .. }) => {
168                // No persisted state, start fresh
169            }
170            Err(e) => {
171                warn!("Failed to load index rebuild state: {}", e);
172            }
173        }
174        Ok(())
175    }
176
177    /// Save current state to storage.
178    async fn save_state(&self) -> Result<()> {
179        let tasks: Vec<IndexRebuildTask> = self.tasks.read().values().cloned().collect();
180        let state = IndexRebuildState {
181            tasks,
182            last_updated: Utc::now(),
183        };
184        let bytes = serde_json::to_vec_pretty(&state)?;
185        self.store
186            .put(&self.state_path, bytes.into())
187            .await
188            .map_err(|e| anyhow!("Failed to save index rebuild state: {}", e))?;
189        Ok(())
190    }
191
192    /// Schedule labels for background index rebuild.
193    ///
194    /// Returns the task IDs for the scheduled rebuilds.
195    pub async fn schedule(&self, labels: Vec<String>) -> Result<Vec<String>> {
196        let mut task_ids = Vec::with_capacity(labels.len());
197        let now = Utc::now();
198
199        {
200            let mut tasks = self.tasks.write();
201            for label in labels {
202                // Check if there's already a pending/in-progress task for this label
203                let existing = tasks
204                    .values()
205                    .find(|t| {
206                        t.label == label
207                            && (t.status == IndexRebuildStatus::Pending
208                                || t.status == IndexRebuildStatus::InProgress)
209                    })
210                    .map(|t| t.id.clone());
211
212                if let Some(existing_id) = existing {
213                    info!(
214                        "Index rebuild for label '{}' already scheduled (task {})",
215                        label, existing_id
216                    );
217                    task_ids.push(existing_id);
218                    continue;
219                }
220
221                let task_id = Uuid::new_v4().to_string();
222                let task = IndexRebuildTask {
223                    id: task_id.clone(),
224                    label: label.clone(),
225                    status: IndexRebuildStatus::Pending,
226                    created_at: now,
227                    started_at: None,
228                    completed_at: None,
229                    error: None,
230                    retry_count: 0,
231                };
232                tasks.insert(task_id.clone(), task);
233                task_ids.push(task_id);
234                info!("Scheduled index rebuild for label '{}'", label);
235            }
236        }
237
238        // Persist state
239        self.save_state().await?;
240
241        Ok(task_ids)
242    }
243
244    /// Get status of all tasks.
245    pub fn status(&self) -> Vec<IndexRebuildTask> {
246        self.tasks.read().values().cloned().collect()
247    }
248
249    /// Get status of a specific task by ID.
250    pub fn task_status(&self, task_id: &str) -> Option<IndexRebuildTask> {
251        self.tasks.read().get(task_id).cloned()
252    }
253
254    /// Check if a label has a pending or in-progress index rebuild.
255    pub fn is_index_building(&self, label: &str) -> bool {
256        self.tasks.read().values().any(|t| {
257            t.label == label
258                && (t.status == IndexRebuildStatus::Pending
259                    || t.status == IndexRebuildStatus::InProgress)
260        })
261    }
262
263    /// Retry all failed tasks.
264    pub async fn retry_failed(&self) -> Result<Vec<String>> {
265        let mut retried = Vec::new();
266
267        {
268            let mut tasks = self.tasks.write();
269            for task in tasks.values_mut() {
270                if task.status == IndexRebuildStatus::Failed
271                    && task.retry_count < self.config.max_retries
272                {
273                    task.status = IndexRebuildStatus::Pending;
274                    task.error = None;
275                    task.started_at = None;
276                    task.completed_at = None;
277                    retried.push(task.id.clone());
278                    info!(
279                        "Task {} for label '{}' scheduled for retry (attempt {})",
280                        task.id,
281                        task.label,
282                        task.retry_count + 1
283                    );
284                }
285            }
286        }
287
288        if !retried.is_empty() {
289            self.save_state().await?;
290        }
291
292        Ok(retried)
293    }
294
295    /// Cancel a pending task.
296    pub async fn cancel(&self, task_id: &str) -> Result<()> {
297        {
298            let mut tasks = self.tasks.write();
299            if let Some(task) = tasks.get_mut(task_id) {
300                if task.status == IndexRebuildStatus::Pending {
301                    tasks.remove(task_id);
302                    info!("Cancelled index rebuild task {}", task_id);
303                } else if task.status == IndexRebuildStatus::InProgress {
304                    return Err(anyhow!(
305                        "Cannot cancel in-progress task. Wait for completion or restart."
306                    ));
307                } else {
308                    return Err(anyhow!("Task {} is already completed or failed", task_id));
309                }
310            } else {
311                return Err(anyhow!("Task {} not found", task_id));
312            }
313        }
314
315        self.save_state().await?;
316        Ok(())
317    }
318
319    /// Remove completed/failed tasks from tracking.
320    pub async fn cleanup_completed(&self) -> Result<usize> {
321        let removed;
322        {
323            let mut tasks = self.tasks.write();
324            let before = tasks.len();
325            tasks.retain(|_, t| {
326                t.status == IndexRebuildStatus::Pending
327                    || t.status == IndexRebuildStatus::InProgress
328            });
329            removed = before - tasks.len();
330        }
331
332        if removed > 0 {
333            self.save_state().await?;
334        }
335
336        Ok(removed)
337    }
338
339    /// Start background worker that processes pending tasks.
340    ///
341    /// This spawns a tokio task that periodically checks for pending
342    /// tasks and processes them.
343    pub fn start_background_worker(
344        self: Arc<Self>,
345        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
346    ) -> tokio::task::JoinHandle<()> {
347        tokio::spawn(async move {
348            let mut interval = tokio::time::interval(self.config.worker_check_interval);
349
350            loop {
351                tokio::select! {
352                    _ = interval.tick() => {
353                        self.process_next_pending_task().await;
354                    }
355                    _ = shutdown_rx.recv() => {
356                        info!("Index rebuild worker shutting down");
357                        let _ = self.save_state().await;
358                        break;
359                    }
360                }
361            }
362        })
363    }
364
365    /// Claim and process the next pending index rebuild task, if any.
366    ///
367    /// Marks the task as in-progress, executes the rebuild, updates task
368    /// status and index metadata, and persists state.
369    async fn process_next_pending_task(self: &Arc<Self>) {
370        // Find and claim a pending task
371        let task_to_process = {
372            let mut tasks = self.tasks.write();
373            let pending = tasks
374                .values_mut()
375                .find(|t| t.status == IndexRebuildStatus::Pending);
376
377            if let Some(task) = pending {
378                task.status = IndexRebuildStatus::InProgress;
379                task.started_at = Some(Utc::now());
380                Some((task.id.clone(), task.label.clone()))
381            } else {
382                None
383            }
384        };
385
386        let Some((task_id, label)) = task_to_process else {
387            return;
388        };
389
390        // Save state before processing
391        if let Err(e) = self.save_state().await {
392            error!("Failed to save state before processing: {}", e);
393        }
394
395        info!("Starting index rebuild for label '{}'", label);
396        self.set_index_status_for_label(&label, IndexStatus::Building);
397
398        // Execute the index rebuild
399        let result = self.execute_rebuild(&label).await;
400
401        match result {
402            Ok(()) => self.handle_rebuild_success(&task_id, &label).await,
403            Err(e) => self.handle_rebuild_failure(&task_id, &label, e),
404        }
405
406        // Save state and schema after processing
407        if let Err(e) = self.save_state().await {
408            error!("Failed to save state after processing: {}", e);
409        }
410        if let Err(e) = self.schema_manager.save().await {
411            error!("Failed to save schema after index rebuild: {}", e);
412        }
413    }
414
415    /// Handle a successful index rebuild: mark task completed and update index metadata.
416    async fn handle_rebuild_success(&self, task_id: &str, label: &str) {
417        let now = Utc::now();
418        let row_count = self.get_label_row_count(label).await;
419
420        {
421            let mut tasks = self.tasks.write();
422            if let Some(task) = tasks.get_mut(task_id) {
423                task.status = IndexRebuildStatus::Completed;
424                task.completed_at = Some(now);
425                task.error = None;
426            }
427        }
428        info!("Index rebuild completed for label '{}'", label);
429
430        self.update_index_metadata_for_label(label, IndexStatus::Online, Some(now), row_count);
431    }
432
433    /// Handle a failed index rebuild: mark task failed and schedule retry if within limits.
434    fn handle_rebuild_failure(self: &Arc<Self>, task_id: &str, label: &str, err: anyhow::Error) {
435        let (retry_count, exhausted) = {
436            let mut tasks = self.tasks.write();
437            if let Some(task) = tasks.get_mut(task_id) {
438                task.status = IndexRebuildStatus::Failed;
439                task.completed_at = Some(Utc::now());
440                task.error = Some(err.to_string());
441                task.retry_count += 1;
442                (
443                    task.retry_count,
444                    task.retry_count >= self.config.max_retries,
445                )
446            } else {
447                (0, true)
448            }
449        };
450        error!("Index rebuild failed for label '{}': {}", label, err);
451
452        if exhausted {
453            self.set_index_status_for_label(label, IndexStatus::Failed);
454        } else {
455            self.set_index_status_for_label(label, IndexStatus::Stale);
456            info!(
457                "Will retry index rebuild for '{}' after delay (attempt {}/{})",
458                label, retry_count, self.config.max_retries
459            );
460            let manager = self.clone();
461            let task_id_owned = task_id.to_string();
462            let delay = self.config.retry_delay;
463            tokio::spawn(async move {
464                tokio::time::sleep(delay).await;
465                let mut tasks = manager.tasks.write();
466                if let Some(task) = tasks.get_mut(&task_id_owned)
467                    && task.status == IndexRebuildStatus::Failed
468                {
469                    task.status = IndexRebuildStatus::Pending;
470                }
471            });
472        }
473    }
474
475    /// Set the lifecycle status for all indexes on a given label.
476    fn set_index_status_for_label(&self, label: &str, status: IndexStatus) {
477        let schema = self.schema_manager.schema();
478        for idx in &schema.indexes {
479            if idx.label() == label {
480                let _ = self.schema_manager.update_index_metadata(idx.name(), |m| {
481                    m.status = status.clone();
482                });
483            }
484        }
485    }
486
487    /// Update index metadata (status, build time, row count) for all indexes on a label.
488    fn update_index_metadata_for_label(
489        &self,
490        label: &str,
491        status: IndexStatus,
492        last_built_at: Option<chrono::DateTime<Utc>>,
493        row_count: Option<u64>,
494    ) {
495        let schema = self.schema_manager.schema();
496        for idx in &schema.indexes {
497            if idx.label() == label {
498                let _ = self.schema_manager.update_index_metadata(idx.name(), |m| {
499                    m.status = status.clone();
500                    if let Some(ts) = last_built_at {
501                        m.last_built_at = Some(ts);
502                    }
503                    if let Some(count) = row_count {
504                        m.row_count_at_build = Some(count);
505                    }
506                });
507            }
508        }
509    }
510
511    /// Get the current row count for a label from the latest snapshot.
512    async fn get_label_row_count(&self, label: &str) -> Option<u64> {
513        let manifest = self
514            .storage
515            .snapshot_manager()
516            .load_latest_snapshot()
517            .await
518            .ok()
519            .flatten()?;
520        manifest.vertices.get(label).map(|ls| ls.count)
521    }
522
523    /// Execute the actual index rebuild for a label.
524    async fn execute_rebuild(&self, label: &str) -> Result<()> {
525        let idx_mgr = IndexManager::new(
526            self.storage.base_path(),
527            self.schema_manager.clone(),
528            self.storage.lancedb_store_arc(),
529        );
530        idx_mgr.rebuild_indexes_for_label(label).await
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537    use uni_common::core::schema::IndexMetadata;
538
539    #[test]
540    fn test_index_rebuild_status_serialize() {
541        let status = IndexRebuildStatus::Pending;
542        let json = serde_json::to_string(&status).unwrap();
543        assert_eq!(json, "\"Pending\"");
544
545        let parsed: IndexRebuildStatus = serde_json::from_str(&json).unwrap();
546        assert_eq!(parsed, IndexRebuildStatus::Pending);
547    }
548
549    #[test]
550    fn test_index_rebuild_task_serialize() {
551        let task = IndexRebuildTask {
552            id: "test-id".to_string(),
553            label: "Person".to_string(),
554            status: IndexRebuildStatus::Pending,
555            created_at: Utc::now(),
556            started_at: None,
557            completed_at: None,
558            error: None,
559            retry_count: 0,
560        };
561
562        let json = serde_json::to_string(&task).unwrap();
563        let parsed: IndexRebuildTask = serde_json::from_str(&json).unwrap();
564        assert_eq!(parsed.id, task.id);
565        assert_eq!(parsed.label, task.label);
566        assert_eq!(parsed.status, task.status);
567    }
568
569    fn make_test_manifest(label: &str, count: u64) -> SnapshotManifest {
570        use uni_common::core::snapshot::LabelSnapshot;
571
572        let mut manifest = SnapshotManifest::new("test".into(), 1);
573        manifest.vertices.insert(
574            label.to_string(),
575            LabelSnapshot {
576                version: 1,
577                count,
578                lance_version: 0,
579            },
580        );
581        manifest
582    }
583
584    fn make_scalar_index(label: &str, status: IndexStatus, meta: IndexMetadata) -> IndexDefinition {
585        use uni_common::core::schema::{ScalarIndexConfig, ScalarIndexType};
586        IndexDefinition::Scalar(ScalarIndexConfig {
587            name: format!("idx_{}", label),
588            label: label.to_string(),
589            properties: vec!["prop".to_string()],
590            index_type: ScalarIndexType::BTree,
591            where_clause: None,
592            metadata: IndexMetadata { status, ..meta },
593        })
594    }
595
596    #[test]
597    fn test_trigger_growth_fires() {
598        let config = IndexRebuildConfig {
599            growth_trigger_ratio: 0.5,
600            ..Default::default()
601        };
602        let checker = RebuildTriggerChecker::new(config);
603
604        // Built at 100 rows, now 151 (> 100 * 1.5 = 150)
605        let manifest = make_test_manifest("Person", 151);
606        let indexes = vec![make_scalar_index(
607            "Person",
608            IndexStatus::Online,
609            IndexMetadata {
610                row_count_at_build: Some(100),
611                ..Default::default()
612            },
613        )];
614
615        let labels = checker.labels_needing_rebuild(&manifest, &indexes);
616        assert_eq!(labels.len(), 1);
617        assert_eq!(labels[0], "Person");
618    }
619
620    #[test]
621    fn test_trigger_growth_below_threshold() {
622        let config = IndexRebuildConfig {
623            growth_trigger_ratio: 0.5,
624            ..Default::default()
625        };
626        let checker = RebuildTriggerChecker::new(config);
627
628        // Built at 100 rows, now 120 (< 100 * 1.5 = 150)
629        let manifest = make_test_manifest("Person", 120);
630        let indexes = vec![make_scalar_index(
631            "Person",
632            IndexStatus::Online,
633            IndexMetadata {
634                row_count_at_build: Some(100),
635                ..Default::default()
636            },
637        )];
638
639        let labels = checker.labels_needing_rebuild(&manifest, &indexes);
640        assert!(labels.is_empty());
641    }
642
643    #[test]
644    fn test_trigger_time_based() {
645        let config = IndexRebuildConfig {
646            growth_trigger_ratio: 0.0, // disable growth trigger
647            max_index_age: Some(std::time::Duration::from_secs(3600)), // 1 hour
648            ..Default::default()
649        };
650        let checker = RebuildTriggerChecker::new(config);
651
652        // Built 2 hours ago
653        let two_hours_ago = Utc::now() - chrono::Duration::hours(2);
654        let manifest = make_test_manifest("Person", 100);
655        let indexes = vec![make_scalar_index(
656            "Person",
657            IndexStatus::Online,
658            IndexMetadata {
659                last_built_at: Some(two_hours_ago),
660                row_count_at_build: Some(100),
661                ..Default::default()
662            },
663        )];
664
665        let labels = checker.labels_needing_rebuild(&manifest, &indexes);
666        assert_eq!(labels.len(), 1);
667    }
668
669    #[test]
670    fn test_trigger_skips_building_and_failed() {
671        let config = IndexRebuildConfig {
672            growth_trigger_ratio: 0.5,
673            ..Default::default()
674        };
675        let checker = RebuildTriggerChecker::new(config);
676
677        // Would trigger (151 > 150), but status is Building
678        let manifest = make_test_manifest("Person", 151);
679        let building = vec![make_scalar_index(
680            "Person",
681            IndexStatus::Building,
682            IndexMetadata {
683                row_count_at_build: Some(100),
684                ..Default::default()
685            },
686        )];
687        assert!(
688            checker
689                .labels_needing_rebuild(&manifest, &building)
690                .is_empty()
691        );
692
693        // Same with Failed status
694        let failed = vec![make_scalar_index(
695            "Person",
696            IndexStatus::Failed,
697            IndexMetadata {
698                row_count_at_build: Some(100),
699                ..Default::default()
700            },
701        )];
702        assert!(
703            checker
704                .labels_needing_rebuild(&manifest, &failed)
705                .is_empty()
706        );
707    }
708}