Skip to main content

uni_store/storage/
index_rebuild.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Background index rebuild manager for async index building during bulk loading.
5//!
6//! This module provides `IndexRebuildManager` which handles background index
7//! rebuilding with status tracking, retry logic, and persistence for restart recovery.
8
9use crate::storage::index_manager::{IndexManager, IndexRebuildStatus, IndexRebuildTask};
10use crate::storage::manager::StorageManager;
11use anyhow::{Result, anyhow};
12use chrono::Utc;
13use object_store::ObjectStore;
14use object_store::path::Path as ObjectPath;
15use parking_lot::RwLock;
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::sync::Arc;
19use tracing::{error, info, warn};
20use uni_common::config::IndexRebuildConfig;
21use uni_common::core::schema::SchemaManager;
22use uuid::Uuid;
23
24/// Persisted state for index rebuild tasks.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct IndexRebuildState {
27    /// All tracked tasks.
28    pub tasks: Vec<IndexRebuildTask>,
29    /// When this state was last updated.
30    pub last_updated: chrono::DateTime<Utc>,
31}
32
33impl Default for IndexRebuildState {
34    fn default() -> Self {
35        Self {
36            tasks: Vec::new(),
37            last_updated: Utc::now(),
38        }
39    }
40}
41
42/// Manages background index rebuilding with status tracking and retry logic.
43///
44/// The manager maintains a queue of index rebuild tasks and processes them
45/// in the background. Tasks can be monitored via `status()` and retried
46/// via `retry_failed()`.
47pub struct IndexRebuildManager {
48    storage: Arc<StorageManager>,
49    schema_manager: Arc<SchemaManager>,
50    tasks: Arc<RwLock<HashMap<String, IndexRebuildTask>>>,
51    config: IndexRebuildConfig,
52    store: Arc<dyn ObjectStore>,
53    state_path: ObjectPath,
54}
55
56impl IndexRebuildManager {
57    /// Create a new IndexRebuildManager.
58    pub async fn new(
59        storage: Arc<StorageManager>,
60        schema_manager: Arc<SchemaManager>,
61        config: IndexRebuildConfig,
62    ) -> Result<Self> {
63        let store = storage.store();
64        let state_path = ObjectPath::from("index_rebuild_state.json");
65
66        let manager = Self {
67            storage,
68            schema_manager,
69            tasks: Arc::new(RwLock::new(HashMap::new())),
70            config,
71            store,
72            state_path,
73        };
74
75        // Load persisted state if it exists
76        manager.load_state().await?;
77
78        Ok(manager)
79    }
80
81    /// Load persisted state from storage.
82    async fn load_state(&self) -> Result<()> {
83        match self.store.get(&self.state_path).await {
84            Ok(result) => {
85                let bytes = result.bytes().await?;
86                let state: IndexRebuildState = serde_json::from_slice(&bytes)?;
87
88                let mut tasks = self.tasks.write();
89                for task in state.tasks {
90                    // Only restore non-completed tasks
91                    if task.status != IndexRebuildStatus::Completed {
92                        // Reset in-progress tasks to pending for retry
93                        let mut task = task;
94                        if task.status == IndexRebuildStatus::InProgress {
95                            task.status = IndexRebuildStatus::Pending;
96                            task.started_at = None;
97                        }
98                        tasks.insert(task.id.clone(), task);
99                    }
100                }
101                info!(
102                    "Loaded {} pending index rebuild tasks from state",
103                    tasks.len()
104                );
105            }
106            Err(object_store::Error::NotFound { .. }) => {
107                // No persisted state, start fresh
108            }
109            Err(e) => {
110                warn!("Failed to load index rebuild state: {}", e);
111            }
112        }
113        Ok(())
114    }
115
116    /// Save current state to storage.
117    async fn save_state(&self) -> Result<()> {
118        let tasks: Vec<IndexRebuildTask> = self.tasks.read().values().cloned().collect();
119        let state = IndexRebuildState {
120            tasks,
121            last_updated: Utc::now(),
122        };
123        let bytes = serde_json::to_vec_pretty(&state)?;
124        self.store
125            .put(&self.state_path, bytes.into())
126            .await
127            .map_err(|e| anyhow!("Failed to save index rebuild state: {}", e))?;
128        Ok(())
129    }
130
131    /// Schedule labels for background index rebuild.
132    ///
133    /// Returns the task IDs for the scheduled rebuilds.
134    pub async fn schedule(&self, labels: Vec<String>) -> Result<Vec<String>> {
135        let mut task_ids = Vec::with_capacity(labels.len());
136        let now = Utc::now();
137
138        {
139            let mut tasks = self.tasks.write();
140            for label in labels {
141                // Check if there's already a pending/in-progress task for this label
142                let existing = tasks
143                    .values()
144                    .find(|t| {
145                        t.label == label
146                            && (t.status == IndexRebuildStatus::Pending
147                                || t.status == IndexRebuildStatus::InProgress)
148                    })
149                    .map(|t| t.id.clone());
150
151                if let Some(existing_id) = existing {
152                    info!(
153                        "Index rebuild for label '{}' already scheduled (task {})",
154                        label, existing_id
155                    );
156                    task_ids.push(existing_id);
157                    continue;
158                }
159
160                let task_id = Uuid::new_v4().to_string();
161                let task = IndexRebuildTask {
162                    id: task_id.clone(),
163                    label: label.clone(),
164                    status: IndexRebuildStatus::Pending,
165                    created_at: now,
166                    started_at: None,
167                    completed_at: None,
168                    error: None,
169                    retry_count: 0,
170                };
171                tasks.insert(task_id.clone(), task);
172                task_ids.push(task_id);
173                info!("Scheduled index rebuild for label '{}'", label);
174            }
175        }
176
177        // Persist state
178        self.save_state().await?;
179
180        Ok(task_ids)
181    }
182
183    /// Get status of all tasks.
184    pub fn status(&self) -> Vec<IndexRebuildTask> {
185        self.tasks.read().values().cloned().collect()
186    }
187
188    /// Get status of a specific task by ID.
189    pub fn task_status(&self, task_id: &str) -> Option<IndexRebuildTask> {
190        self.tasks.read().get(task_id).cloned()
191    }
192
193    /// Check if a label has a pending or in-progress index rebuild.
194    pub fn is_index_building(&self, label: &str) -> bool {
195        self.tasks.read().values().any(|t| {
196            t.label == label
197                && (t.status == IndexRebuildStatus::Pending
198                    || t.status == IndexRebuildStatus::InProgress)
199        })
200    }
201
202    /// Retry all failed tasks.
203    pub async fn retry_failed(&self) -> Result<Vec<String>> {
204        let mut retried = Vec::new();
205
206        {
207            let mut tasks = self.tasks.write();
208            for task in tasks.values_mut() {
209                if task.status == IndexRebuildStatus::Failed
210                    && task.retry_count < self.config.max_retries
211                {
212                    task.status = IndexRebuildStatus::Pending;
213                    task.error = None;
214                    task.started_at = None;
215                    task.completed_at = None;
216                    retried.push(task.id.clone());
217                    info!(
218                        "Task {} for label '{}' scheduled for retry (attempt {})",
219                        task.id,
220                        task.label,
221                        task.retry_count + 1
222                    );
223                }
224            }
225        }
226
227        if !retried.is_empty() {
228            self.save_state().await?;
229        }
230
231        Ok(retried)
232    }
233
234    /// Cancel a pending task.
235    pub async fn cancel(&self, task_id: &str) -> Result<()> {
236        {
237            let mut tasks = self.tasks.write();
238            if let Some(task) = tasks.get_mut(task_id) {
239                if task.status == IndexRebuildStatus::Pending {
240                    tasks.remove(task_id);
241                    info!("Cancelled index rebuild task {}", task_id);
242                } else if task.status == IndexRebuildStatus::InProgress {
243                    return Err(anyhow!(
244                        "Cannot cancel in-progress task. Wait for completion or restart."
245                    ));
246                } else {
247                    return Err(anyhow!("Task {} is already completed or failed", task_id));
248                }
249            } else {
250                return Err(anyhow!("Task {} not found", task_id));
251            }
252        }
253
254        self.save_state().await?;
255        Ok(())
256    }
257
258    /// Remove completed/failed tasks from tracking.
259    pub async fn cleanup_completed(&self) -> Result<usize> {
260        let removed;
261        {
262            let mut tasks = self.tasks.write();
263            let before = tasks.len();
264            tasks.retain(|_, t| {
265                t.status == IndexRebuildStatus::Pending
266                    || t.status == IndexRebuildStatus::InProgress
267            });
268            removed = before - tasks.len();
269        }
270
271        if removed > 0 {
272            self.save_state().await?;
273        }
274
275        Ok(removed)
276    }
277
278    /// Start background worker that processes pending tasks.
279    ///
280    /// This spawns a tokio task that periodically checks for pending
281    /// tasks and processes them.
282    pub fn start_background_worker(
283        self: Arc<Self>,
284        mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
285    ) -> tokio::task::JoinHandle<()> {
286        tokio::spawn(async move {
287            let mut interval = tokio::time::interval(self.config.worker_check_interval);
288
289            loop {
290                tokio::select! {
291                    _ = interval.tick() => {
292
293                // Find a pending task
294                let task_to_process = {
295                    let mut tasks = self.tasks.write();
296                    let pending = tasks
297                        .values_mut()
298                        .find(|t| t.status == IndexRebuildStatus::Pending);
299
300                    if let Some(task) = pending {
301                        task.status = IndexRebuildStatus::InProgress;
302                        task.started_at = Some(Utc::now());
303                        Some((task.id.clone(), task.label.clone()))
304                    } else {
305                        None
306                    }
307                };
308
309                if let Some((task_id, label)) = task_to_process {
310                    // Save state before processing
311                    if let Err(e) = self.save_state().await {
312                        error!("Failed to save state before processing: {}", e);
313                    }
314
315                    info!("Starting index rebuild for label '{}'", label);
316
317                    // Execute the index rebuild
318                    let result = self.execute_rebuild(&label).await;
319
320                    // Update task status
321                    {
322                        let mut tasks = self.tasks.write();
323                        if let Some(task) = tasks.get_mut(&task_id) {
324                            match result {
325                                Ok(()) => {
326                                    task.status = IndexRebuildStatus::Completed;
327                                    task.completed_at = Some(Utc::now());
328                                    task.error = None;
329                                    info!("Index rebuild completed for label '{}'", label);
330                                }
331                                Err(e) => {
332                                    task.status = IndexRebuildStatus::Failed;
333                                    task.completed_at = Some(Utc::now());
334                                    task.error = Some(e.to_string());
335                                    task.retry_count += 1;
336                                    error!("Index rebuild failed for label '{}': {}", label, e);
337
338                                    // Auto-retry if within limits
339                                    if task.retry_count < self.config.max_retries {
340                                        info!(
341                                            "Will retry index rebuild for '{}' after delay (attempt {}/{})",
342                                            label, task.retry_count, self.config.max_retries
343                                        );
344                                        // Schedule retry by setting status back to pending after delay
345                                        let manager = self.clone();
346                                        let task_id_clone = task_id.clone();
347                                        let delay = self.config.retry_delay;
348                                        tokio::spawn(async move {
349                                            tokio::time::sleep(delay).await;
350                                            let mut tasks = manager.tasks.write();
351                                            if let Some(task) = tasks.get_mut(&task_id_clone)
352                                                && task.status == IndexRebuildStatus::Failed
353                                            {
354                                                task.status = IndexRebuildStatus::Pending;
355                                            }
356                                        });
357                                    }
358                                }
359                            }
360                        }
361                    }
362
363                    // Save state after processing
364                    if let Err(e) = self.save_state().await {
365                        error!("Failed to save state after processing: {}", e);
366                    }
367                }
368                    }
369                    _ = shutdown_rx.recv() => {
370                        tracing::info!("Index rebuild worker shutting down");
371                        let _ = self.save_state().await;
372                        break;
373                    }
374                }
375            }
376        })
377    }
378
379    /// Execute the actual index rebuild for a label.
380    async fn execute_rebuild(&self, label: &str) -> Result<()> {
381        let idx_mgr = IndexManager::new(
382            self.storage.base_path(),
383            self.schema_manager.clone(),
384            self.storage.lancedb_store_arc(),
385        );
386        idx_mgr.rebuild_indexes_for_label(label).await
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393
394    #[test]
395    fn test_index_rebuild_status_serialize() {
396        let status = IndexRebuildStatus::Pending;
397        let json = serde_json::to_string(&status).unwrap();
398        assert_eq!(json, "\"Pending\"");
399
400        let parsed: IndexRebuildStatus = serde_json::from_str(&json).unwrap();
401        assert_eq!(parsed, IndexRebuildStatus::Pending);
402    }
403
404    #[test]
405    fn test_index_rebuild_task_serialize() {
406        let task = IndexRebuildTask {
407            id: "test-id".to_string(),
408            label: "Person".to_string(),
409            status: IndexRebuildStatus::Pending,
410            created_at: Utc::now(),
411            started_at: None,
412            completed_at: None,
413            error: None,
414            retry_count: 0,
415        };
416
417        let json = serde_json::to_string(&task).unwrap();
418        let parsed: IndexRebuildTask = serde_json::from_str(&json).unwrap();
419        assert_eq!(parsed.id, task.id);
420        assert_eq!(parsed.label, task.label);
421        assert_eq!(parsed.status, task.status);
422    }
423}