1use crate::storage::index_manager::{IndexManager, IndexRebuildStatus, IndexRebuildTask};
10use crate::storage::manager::StorageManager;
11use anyhow::{Result, anyhow};
12use chrono::Utc;
13use object_store::ObjectStore;
14use object_store::path::Path as ObjectPath;
15use parking_lot::RwLock;
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::sync::Arc;
19use tracing::{error, info, warn};
20use uni_common::config::IndexRebuildConfig;
21use uni_common::core::schema::SchemaManager;
22use uuid::Uuid;
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct IndexRebuildState {
27 pub tasks: Vec<IndexRebuildTask>,
29 pub last_updated: chrono::DateTime<Utc>,
31}
32
33impl Default for IndexRebuildState {
34 fn default() -> Self {
35 Self {
36 tasks: Vec::new(),
37 last_updated: Utc::now(),
38 }
39 }
40}
41
42pub struct IndexRebuildManager {
48 storage: Arc<StorageManager>,
49 schema_manager: Arc<SchemaManager>,
50 tasks: Arc<RwLock<HashMap<String, IndexRebuildTask>>>,
51 config: IndexRebuildConfig,
52 store: Arc<dyn ObjectStore>,
53 state_path: ObjectPath,
54}
55
56impl IndexRebuildManager {
57 pub async fn new(
59 storage: Arc<StorageManager>,
60 schema_manager: Arc<SchemaManager>,
61 config: IndexRebuildConfig,
62 ) -> Result<Self> {
63 let store = storage.store();
64 let state_path = ObjectPath::from("index_rebuild_state.json");
65
66 let manager = Self {
67 storage,
68 schema_manager,
69 tasks: Arc::new(RwLock::new(HashMap::new())),
70 config,
71 store,
72 state_path,
73 };
74
75 manager.load_state().await?;
77
78 Ok(manager)
79 }
80
81 async fn load_state(&self) -> Result<()> {
83 match self.store.get(&self.state_path).await {
84 Ok(result) => {
85 let bytes = result.bytes().await?;
86 let state: IndexRebuildState = serde_json::from_slice(&bytes)?;
87
88 let mut tasks = self.tasks.write();
89 for task in state.tasks {
90 if task.status != IndexRebuildStatus::Completed {
92 let mut task = task;
94 if task.status == IndexRebuildStatus::InProgress {
95 task.status = IndexRebuildStatus::Pending;
96 task.started_at = None;
97 }
98 tasks.insert(task.id.clone(), task);
99 }
100 }
101 info!(
102 "Loaded {} pending index rebuild tasks from state",
103 tasks.len()
104 );
105 }
106 Err(object_store::Error::NotFound { .. }) => {
107 }
109 Err(e) => {
110 warn!("Failed to load index rebuild state: {}", e);
111 }
112 }
113 Ok(())
114 }
115
116 async fn save_state(&self) -> Result<()> {
118 let tasks: Vec<IndexRebuildTask> = self.tasks.read().values().cloned().collect();
119 let state = IndexRebuildState {
120 tasks,
121 last_updated: Utc::now(),
122 };
123 let bytes = serde_json::to_vec_pretty(&state)?;
124 self.store
125 .put(&self.state_path, bytes.into())
126 .await
127 .map_err(|e| anyhow!("Failed to save index rebuild state: {}", e))?;
128 Ok(())
129 }
130
131 pub async fn schedule(&self, labels: Vec<String>) -> Result<Vec<String>> {
135 let mut task_ids = Vec::with_capacity(labels.len());
136 let now = Utc::now();
137
138 {
139 let mut tasks = self.tasks.write();
140 for label in labels {
141 let existing = tasks
143 .values()
144 .find(|t| {
145 t.label == label
146 && (t.status == IndexRebuildStatus::Pending
147 || t.status == IndexRebuildStatus::InProgress)
148 })
149 .map(|t| t.id.clone());
150
151 if let Some(existing_id) = existing {
152 info!(
153 "Index rebuild for label '{}' already scheduled (task {})",
154 label, existing_id
155 );
156 task_ids.push(existing_id);
157 continue;
158 }
159
160 let task_id = Uuid::new_v4().to_string();
161 let task = IndexRebuildTask {
162 id: task_id.clone(),
163 label: label.clone(),
164 status: IndexRebuildStatus::Pending,
165 created_at: now,
166 started_at: None,
167 completed_at: None,
168 error: None,
169 retry_count: 0,
170 };
171 tasks.insert(task_id.clone(), task);
172 task_ids.push(task_id);
173 info!("Scheduled index rebuild for label '{}'", label);
174 }
175 }
176
177 self.save_state().await?;
179
180 Ok(task_ids)
181 }
182
183 pub fn status(&self) -> Vec<IndexRebuildTask> {
185 self.tasks.read().values().cloned().collect()
186 }
187
188 pub fn task_status(&self, task_id: &str) -> Option<IndexRebuildTask> {
190 self.tasks.read().get(task_id).cloned()
191 }
192
193 pub fn is_index_building(&self, label: &str) -> bool {
195 self.tasks.read().values().any(|t| {
196 t.label == label
197 && (t.status == IndexRebuildStatus::Pending
198 || t.status == IndexRebuildStatus::InProgress)
199 })
200 }
201
202 pub async fn retry_failed(&self) -> Result<Vec<String>> {
204 let mut retried = Vec::new();
205
206 {
207 let mut tasks = self.tasks.write();
208 for task in tasks.values_mut() {
209 if task.status == IndexRebuildStatus::Failed
210 && task.retry_count < self.config.max_retries
211 {
212 task.status = IndexRebuildStatus::Pending;
213 task.error = None;
214 task.started_at = None;
215 task.completed_at = None;
216 retried.push(task.id.clone());
217 info!(
218 "Task {} for label '{}' scheduled for retry (attempt {})",
219 task.id,
220 task.label,
221 task.retry_count + 1
222 );
223 }
224 }
225 }
226
227 if !retried.is_empty() {
228 self.save_state().await?;
229 }
230
231 Ok(retried)
232 }
233
234 pub async fn cancel(&self, task_id: &str) -> Result<()> {
236 {
237 let mut tasks = self.tasks.write();
238 if let Some(task) = tasks.get_mut(task_id) {
239 if task.status == IndexRebuildStatus::Pending {
240 tasks.remove(task_id);
241 info!("Cancelled index rebuild task {}", task_id);
242 } else if task.status == IndexRebuildStatus::InProgress {
243 return Err(anyhow!(
244 "Cannot cancel in-progress task. Wait for completion or restart."
245 ));
246 } else {
247 return Err(anyhow!("Task {} is already completed or failed", task_id));
248 }
249 } else {
250 return Err(anyhow!("Task {} not found", task_id));
251 }
252 }
253
254 self.save_state().await?;
255 Ok(())
256 }
257
258 pub async fn cleanup_completed(&self) -> Result<usize> {
260 let removed;
261 {
262 let mut tasks = self.tasks.write();
263 let before = tasks.len();
264 tasks.retain(|_, t| {
265 t.status == IndexRebuildStatus::Pending
266 || t.status == IndexRebuildStatus::InProgress
267 });
268 removed = before - tasks.len();
269 }
270
271 if removed > 0 {
272 self.save_state().await?;
273 }
274
275 Ok(removed)
276 }
277
278 pub fn start_background_worker(
283 self: Arc<Self>,
284 mut shutdown_rx: tokio::sync::broadcast::Receiver<()>,
285 ) -> tokio::task::JoinHandle<()> {
286 tokio::spawn(async move {
287 let mut interval = tokio::time::interval(self.config.worker_check_interval);
288
289 loop {
290 tokio::select! {
291 _ = interval.tick() => {
292
293 let task_to_process = {
295 let mut tasks = self.tasks.write();
296 let pending = tasks
297 .values_mut()
298 .find(|t| t.status == IndexRebuildStatus::Pending);
299
300 if let Some(task) = pending {
301 task.status = IndexRebuildStatus::InProgress;
302 task.started_at = Some(Utc::now());
303 Some((task.id.clone(), task.label.clone()))
304 } else {
305 None
306 }
307 };
308
309 if let Some((task_id, label)) = task_to_process {
310 if let Err(e) = self.save_state().await {
312 error!("Failed to save state before processing: {}", e);
313 }
314
315 info!("Starting index rebuild for label '{}'", label);
316
317 let result = self.execute_rebuild(&label).await;
319
320 {
322 let mut tasks = self.tasks.write();
323 if let Some(task) = tasks.get_mut(&task_id) {
324 match result {
325 Ok(()) => {
326 task.status = IndexRebuildStatus::Completed;
327 task.completed_at = Some(Utc::now());
328 task.error = None;
329 info!("Index rebuild completed for label '{}'", label);
330 }
331 Err(e) => {
332 task.status = IndexRebuildStatus::Failed;
333 task.completed_at = Some(Utc::now());
334 task.error = Some(e.to_string());
335 task.retry_count += 1;
336 error!("Index rebuild failed for label '{}': {}", label, e);
337
338 if task.retry_count < self.config.max_retries {
340 info!(
341 "Will retry index rebuild for '{}' after delay (attempt {}/{})",
342 label, task.retry_count, self.config.max_retries
343 );
344 let manager = self.clone();
346 let task_id_clone = task_id.clone();
347 let delay = self.config.retry_delay;
348 tokio::spawn(async move {
349 tokio::time::sleep(delay).await;
350 let mut tasks = manager.tasks.write();
351 if let Some(task) = tasks.get_mut(&task_id_clone)
352 && task.status == IndexRebuildStatus::Failed
353 {
354 task.status = IndexRebuildStatus::Pending;
355 }
356 });
357 }
358 }
359 }
360 }
361 }
362
363 if let Err(e) = self.save_state().await {
365 error!("Failed to save state after processing: {}", e);
366 }
367 }
368 }
369 _ = shutdown_rx.recv() => {
370 tracing::info!("Index rebuild worker shutting down");
371 let _ = self.save_state().await;
372 break;
373 }
374 }
375 }
376 })
377 }
378
379 async fn execute_rebuild(&self, label: &str) -> Result<()> {
381 let idx_mgr = IndexManager::new(
382 self.storage.base_path(),
383 self.schema_manager.clone(),
384 self.storage.lancedb_store_arc(),
385 );
386 idx_mgr.rebuild_indexes_for_label(label).await
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393
394 #[test]
395 fn test_index_rebuild_status_serialize() {
396 let status = IndexRebuildStatus::Pending;
397 let json = serde_json::to_string(&status).unwrap();
398 assert_eq!(json, "\"Pending\"");
399
400 let parsed: IndexRebuildStatus = serde_json::from_str(&json).unwrap();
401 assert_eq!(parsed, IndexRebuildStatus::Pending);
402 }
403
404 #[test]
405 fn test_index_rebuild_task_serialize() {
406 let task = IndexRebuildTask {
407 id: "test-id".to_string(),
408 label: "Person".to_string(),
409 status: IndexRebuildStatus::Pending,
410 created_at: Utc::now(),
411 started_at: None,
412 completed_at: None,
413 error: None,
414 retry_count: 0,
415 };
416
417 let json = serde_json::to_string(&task).unwrap();
418 let parsed: IndexRebuildTask = serde_json::from_str(&json).unwrap();
419 assert_eq!(parsed.id, task.id);
420 assert_eq!(parsed.label, task.label);
421 assert_eq!(parsed.status, task.status);
422 }
423}