Skip to main content

ravenclaws/
background.rs

1//! Background task manager for long-horizon async runs
2//!
3//! Supports assign-and-walk-away execution with persistence across restarts.
4//! Tasks are serialized to JSON in a configurable directory and can be resumed
5//! on process restart.
6//!
7//! # Architecture
8//!
9//! - `BackgroundTaskManager` — owns the task store and manages lifecycle
10//! - `BackgroundTask` — a single task with status, prompt, and result
11//! - `TaskStatus` — pending → running → completed / failed
12//! - Tasks are persisted as individual JSON files in `tasks/` directory
13//!
14//! # Usage
15//!
16//! ```ignore
17//! let mut manager = BackgroundTaskManager::new("/tmp/ravenclaws-tasks")?;
18//! let task_id = manager.submit("Analyze this data", llm_client).await?;
19//! let status = manager.status(&task_id)?;
20//! ```
21
22use crate::config::RuntimeConfig;
23use crate::error::{RavenClawsError, Result};
24use crate::llm::LLMProviderTrait;
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29use tokio::sync::RwLock;
30use tracing::{info, instrument, warn};
31
32/// Status of a background task
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
34pub enum TaskStatus {
35    /// Task has been submitted but not yet started
36    Pending,
37    /// Task is currently running
38    Running,
39    /// Task completed successfully
40    Completed,
41    /// Task failed with an error
42    Failed,
43    /// Task was cancelled by the user
44    Cancelled,
45}
46
47impl std::fmt::Display for TaskStatus {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        match self {
50            TaskStatus::Pending => write!(f, "pending"),
51            TaskStatus::Running => write!(f, "running"),
52            TaskStatus::Completed => write!(f, "completed"),
53            TaskStatus::Failed => write!(f, "failed"),
54            TaskStatus::Cancelled => write!(f, "cancelled"),
55        }
56    }
57}
58
59/// A single background task with full lifecycle tracking
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct BackgroundTask {
62    /// Unique task identifier (UUID v4)
63    pub id: String,
64    /// User-provided prompt for the agent
65    pub prompt: String,
66    /// System prompt used for this task
67    pub system_prompt: String,
68    /// Current status
69    pub status: TaskStatus,
70    /// Final result (set when completed)
71    pub result: Option<String>,
72    /// Error message (set when failed)
73    pub error: Option<String>,
74    /// When the task was created (ISO 8601)
75    pub created_at: String,
76    /// When the task was last updated (ISO 8601)
77    pub updated_at: String,
78    /// Number of agent loop iterations used
79    pub iterations: usize,
80    /// Provider name that executed this task
81    pub provider: Option<String>,
82    /// Model name that executed this task
83    pub model: Option<String>,
84    /// Serialized checkpoint state for durable execution (v0.9.12+)
85    /// When set, the agent loop can resume from this checkpoint instead of
86    /// starting from scratch after a process restart.
87    pub checkpoint: Option<String>,
88}
89
90impl BackgroundTask {
91    /// Create a new pending task
92    pub fn new(id: String, prompt: String, system_prompt: String) -> Self {
93        let now = chrono::Utc::now().to_rfc3339();
94        Self {
95            id,
96            prompt,
97            system_prompt,
98            status: TaskStatus::Pending,
99            result: None,
100            error: None,
101            created_at: now.clone(),
102            updated_at: now,
103            iterations: 0,
104            provider: None,
105            model: None,
106            checkpoint: None,
107        }
108    }
109}
110
111/// Manager for background tasks with disk persistence
112#[derive(Debug, Clone)]
113pub struct BackgroundTaskManager {
114    /// Directory where task files are stored
115    tasks_dir: PathBuf,
116    /// In-memory task index (id → task)
117    tasks: Arc<RwLock<HashMap<String, BackgroundTask>>>,
118}
119
120impl BackgroundTaskManager {
121    /// Create a new background task manager.
122    ///
123    /// The `tasks_dir` is where task JSON files are persisted.
124    /// If the directory doesn't exist, it will be created.
125    /// On creation, it loads any existing tasks from disk.
126    pub async fn new(tasks_dir: &Path) -> Result<Self> {
127        let tasks_dir = tasks_dir.to_path_buf();
128
129        // Create the tasks directory if it doesn't exist
130        std::fs::create_dir_all(&tasks_dir).map_err(|e| {
131            RavenClawsError::CommandExecution(format!(
132                "Failed to create tasks directory '{}': {}",
133                tasks_dir.display(),
134                e
135            ))
136        })?;
137
138        let tasks = Arc::new(RwLock::new(HashMap::new()));
139
140        let mut manager = Self { tasks_dir, tasks };
141
142        // Load existing tasks from disk
143        let count = manager.load_tasks().await?;
144        if count > 0 {
145            info!(count, "Loaded existing background tasks from disk");
146        }
147
148        Ok(manager)
149    }
150
151    /// Create from runtime config — uses `workdir/tasks/` as the tasks directory
152    pub async fn from_config(config: &RuntimeConfig) -> Result<Self> {
153        let tasks_dir = PathBuf::from(&config.workdir).join("tasks");
154        Self::new(&tasks_dir).await
155    }
156
157    /// Load all tasks from disk into memory
158    async fn load_tasks(&mut self) -> Result<usize> {
159        let mut count = 0;
160        let read_dir = match std::fs::read_dir(&self.tasks_dir) {
161            Ok(d) => d,
162            Err(_) => return Ok(0),
163        };
164
165        let mut tasks_to_insert = Vec::new();
166        for entry in read_dir.flatten() {
167            let path = entry.path();
168            if path.extension().is_some_and(|ext| ext == "json") {
169                match std::fs::read_to_string(&path) {
170                    Ok(content) => match serde_json::from_str::<BackgroundTask>(&content) {
171                        Ok(task) => {
172                            tasks_to_insert.push(task);
173                            count += 1;
174                        }
175                        Err(e) => {
176                            warn!(
177                                path = %path.display(),
178                                error = %e,
179                                "Failed to deserialize background task"
180                            );
181                        }
182                    },
183                    Err(e) => {
184                        warn!(
185                            path = %path.display(),
186                            error = %e,
187                            "Failed to read background task file"
188                        );
189                    }
190                }
191            }
192        }
193
194        let mut tasks = self.tasks.write().await;
195        for task in tasks_to_insert {
196            tasks.insert(task.id.clone(), task);
197        }
198
199        Ok(count)
200    }
201
202    /// Persist a single task to disk
203    fn save_task(&self, task: &BackgroundTask) -> Result<()> {
204        let path = self.tasks_dir.join(format!("{}.json", task.id));
205        let content = serde_json::to_string_pretty(task).map_err(|e| {
206            RavenClawsError::CommandExecution(format!("Failed to serialize task: {}", e))
207        })?;
208
209        std::fs::write(&path, content).map_err(|e| {
210            RavenClawsError::CommandExecution(format!(
211                "Failed to write task file '{}': {}",
212                path.display(),
213                e
214            ))
215        })?;
216
217        Ok(())
218    }
219
220    /// Submit a new background task and return its ID.
221    /// The task is persisted immediately and will be executed in the background.
222    pub async fn submit(&self, prompt: String, system_prompt: String) -> Result<String> {
223        let id = uuid::Uuid::new_v4().to_string();
224        let task = BackgroundTask::new(id.clone(), prompt, system_prompt);
225
226        // Persist to disk
227        self.save_task(&task)?;
228
229        // Add to in-memory index
230        let mut tasks = self.tasks.write().await;
231        tasks.insert(id.clone(), task);
232
233        info!(task_id = %id, "Background task submitted");
234        Ok(id)
235    }
236
237    /// Execute a background task with the given LLM client.
238    /// Updates the task status to Running, runs the agent loop, and saves the result.
239    #[instrument(skip(self, llm), fields(task_id = %task_id))]
240    pub async fn execute(&self, task_id: &str, llm: Arc<dyn LLMProviderTrait>) -> Result<String> {
241        // Get the task and mark as running
242        {
243            let mut tasks = self.tasks.write().await;
244            let task = tasks.get_mut(task_id).ok_or_else(|| {
245                RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
246            })?;
247
248            task.status = TaskStatus::Running;
249            task.provider = Some(llm.provider_name().to_string());
250            task.model = Some(llm.model().to_string());
251            task.updated_at = chrono::Utc::now().to_rfc3339();
252            self.save_task(task)?;
253        }
254
255        info!(
256            task_id = %task_id,
257            provider = llm.provider_name(),
258            model = llm.model(),
259            "Executing background task"
260        );
261
262        // Determine checkpoint directory for durable execution
263        let checkpoint_dir = self.tasks_dir.join("checkpoints");
264        let _ = std::fs::create_dir_all(&checkpoint_dir);
265
266        // Run the agent loop with checkpointing enabled
267        let loop_config = crate::agent::AgentLoopConfig {
268            max_iterations: 10,
269            enable_tools: true,
270            require_approval: false,
271            prompt_injection_protection: true,
272            token_lifetime_secs: 0,
273            no_final_required: true,
274            fallback_chain: None,
275            token_budget: None,
276            ravenfabric: None,
277            checkpoint_dir: Some(checkpoint_dir),
278            session_id: Some(task_id.to_string()),
279            metrics_callback: None,
280            load_manager: None,
281        };
282
283        let result = crate::agent::run_agent_loop(
284            llm.clone(),
285            &self.get_prompt(task_id).await?,
286            &self.get_system_prompt(task_id).await?,
287            loop_config,
288        )
289        .await;
290
291        // Update task with result
292        let mut tasks = self.tasks.write().await;
293        let task = tasks.get_mut(task_id).ok_or_else(|| {
294            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
295        })?;
296
297        match result {
298            Ok(response) => {
299                task.status = TaskStatus::Completed;
300                task.result = Some(response.clone());
301                task.updated_at = chrono::Utc::now().to_rfc3339();
302                self.save_task(task)?;
303
304                info!(
305                    task_id = %task_id,
306                    iterations = task.iterations,
307                    "Background task completed"
308                );
309
310                Ok(response)
311            }
312            Err(e) => {
313                task.status = TaskStatus::Failed;
314                task.error = Some(e.to_string());
315                task.updated_at = chrono::Utc::now().to_rfc3339();
316                self.save_task(task)?;
317
318                warn!(
319                    task_id = %task_id,
320                    error = %e,
321                    "Background task failed"
322                );
323
324                Err(e)
325            }
326        }
327    }
328
329    /// Get the current status of a task
330    #[allow(dead_code)]
331    pub async fn status(&self, task_id: &str) -> Result<TaskStatus> {
332        let tasks = self.tasks.read().await;
333        let task = tasks.get(task_id).ok_or_else(|| {
334            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
335        })?;
336        Ok(task.status.clone())
337    }
338
339    /// Get the full task details
340    pub async fn get_task(&self, task_id: &str) -> Result<BackgroundTask> {
341        let tasks = self.tasks.read().await;
342        tasks.get(task_id).cloned().ok_or_else(|| {
343            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
344        })
345    }
346
347    /// List all tasks with their status
348    pub async fn list_tasks(&self) -> Vec<BackgroundTask> {
349        let tasks = self.tasks.read().await;
350        let mut task_list: Vec<BackgroundTask> = tasks.values().cloned().collect();
351        // Sort by creation time (newest first)
352        task_list.sort_by(|a, b| b.created_at.cmp(&a.created_at));
353        task_list
354    }
355
356    /// Cancel a pending or running task
357    pub async fn cancel(&self, task_id: &str) -> Result<()> {
358        let mut tasks = self.tasks.write().await;
359        let task = tasks.get_mut(task_id).ok_or_else(|| {
360            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
361        })?;
362
363        match task.status {
364            TaskStatus::Pending | TaskStatus::Running => {
365                task.status = TaskStatus::Cancelled;
366                task.updated_at = chrono::Utc::now().to_rfc3339();
367                self.save_task(task)?;
368                info!(task_id = %task_id, "Background task cancelled");
369                Ok(())
370            }
371            _ => Err(RavenClawsError::CommandExecution(format!(
372                "Cannot cancel task '{}' in status '{}'",
373                task_id, task.status
374            ))),
375        }
376    }
377
378    /// Resume all incomplete tasks (Pending or Running) from disk.
379    /// Returns the list of task IDs that need execution.
380    pub async fn resume_incomplete(&self) -> Vec<String> {
381        let tasks = self.tasks.read().await;
382        let mut incomplete = Vec::new();
383
384        for task in tasks.values() {
385            if task.status == TaskStatus::Pending || task.status == TaskStatus::Running {
386                incomplete.push(task.id.clone());
387            }
388        }
389
390        if !incomplete.is_empty() {
391            info!(
392                count = incomplete.len(),
393                "Found incomplete background tasks to resume"
394            );
395        }
396
397        incomplete
398    }
399
400    /// Get the prompt for a task (internal helper)
401    async fn get_prompt(&self, task_id: &str) -> Result<String> {
402        let tasks = self.tasks.read().await;
403        let task = tasks.get(task_id).ok_or_else(|| {
404            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
405        })?;
406        Ok(task.prompt.clone())
407    }
408
409    /// Get the system prompt for a task (internal helper)
410    async fn get_system_prompt(&self, task_id: &str) -> Result<String> {
411        let tasks = self.tasks.read().await;
412        let task = tasks.get(task_id).ok_or_else(|| {
413            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
414        })?;
415        Ok(task.system_prompt.clone())
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422    use std::path::PathBuf;
423
424    fn test_dir(name: &str) -> PathBuf {
425        let dir = std::env::temp_dir().join(format!(
426            "ravenclaws-test-bg-{}-{}",
427            name,
428            std::process::id()
429        ));
430        let _ = std::fs::remove_dir_all(&dir);
431        dir
432    }
433
434    #[tokio::test]
435    async fn test_manager_new_creates_directory() {
436        let dir = test_dir("create_dir");
437        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
438        assert!(dir.exists(), "Tasks directory should be created");
439        assert!(manager.tasks.read().await.is_empty());
440        let _ = std::fs::remove_dir_all(&dir);
441    }
442
443    #[tokio::test]
444    async fn test_submit_task() {
445        let dir = test_dir("submit");
446        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
447
448        let task_id = manager
449            .submit("Test prompt".to_string(), "Test system".to_string())
450            .await
451            .unwrap();
452
453        let task = manager.get_task(&task_id).await.unwrap();
454        assert_eq!(task.prompt, "Test prompt");
455        assert_eq!(task.system_prompt, "Test system");
456        assert_eq!(task.status, TaskStatus::Pending);
457        assert!(task.result.is_none());
458
459        // Verify persistence
460        let task_path = dir.join(format!("{}.json", task_id));
461        assert!(task_path.exists(), "Task file should exist on disk");
462
463        let _ = std::fs::remove_dir_all(&dir);
464    }
465
466    #[tokio::test]
467    async fn test_status_transitions() {
468        let dir = test_dir("transitions");
469        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
470
471        let task_id = manager
472            .submit("Test".to_string(), "System".to_string())
473            .await
474            .unwrap();
475
476        assert_eq!(manager.status(&task_id).await.unwrap(), TaskStatus::Pending);
477
478        // Cancel the task
479        manager.cancel(&task_id).await.unwrap();
480        assert_eq!(
481            manager.status(&task_id).await.unwrap(),
482            TaskStatus::Cancelled
483        );
484
485        let _ = std::fs::remove_dir_all(&dir);
486    }
487
488    #[tokio::test]
489    async fn test_list_tasks() {
490        let dir = test_dir("list");
491        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
492
493        manager
494            .submit("Task 1".to_string(), "System".to_string())
495            .await
496            .unwrap();
497        manager
498            .submit("Task 2".to_string(), "System".to_string())
499            .await
500            .unwrap();
501
502        let tasks = manager.list_tasks().await;
503        assert_eq!(tasks.len(), 2);
504
505        let _ = std::fs::remove_dir_all(&dir);
506    }
507
508    #[tokio::test]
509    async fn test_cancel_completed_task_fails() {
510        let dir = test_dir("cancel_fail");
511        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
512
513        let task_id = manager
514            .submit("Test".to_string(), "System".to_string())
515            .await
516            .unwrap();
517
518        // Manually set to completed
519        {
520            let mut tasks = manager.tasks.write().await;
521            let task = tasks.get_mut(&task_id).unwrap();
522            task.status = TaskStatus::Completed;
523        }
524
525        let result = manager.cancel(&task_id).await;
526        assert!(result.is_err(), "Cancelling a completed task should fail");
527
528        let _ = std::fs::remove_dir_all(&dir);
529    }
530
531    #[tokio::test]
532    async fn test_resume_incomplete_tasks() {
533        let dir = test_dir("resume");
534        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
535
536        manager
537            .submit("Task 1".to_string(), "System".to_string())
538            .await
539            .unwrap();
540        manager
541            .submit("Task 2".to_string(), "System".to_string())
542            .await
543            .unwrap();
544
545        // Mark one as completed
546        {
547            let tasks = manager.tasks.read().await;
548            let tasks_vec: Vec<&BackgroundTask> = tasks.values().collect();
549            if let Some(task) = tasks_vec.first() {
550                let id = task.id.clone();
551                drop(tasks);
552                let mut tasks = manager.tasks.write().await;
553                if let Some(t) = tasks.get_mut(&id) {
554                    t.status = TaskStatus::Completed;
555                }
556            }
557        }
558
559        let incomplete = manager.resume_incomplete().await;
560        assert_eq!(incomplete.len(), 1, "One task should be incomplete");
561
562        let _ = std::fs::remove_dir_all(&dir);
563    }
564
565    #[tokio::test]
566    async fn test_task_not_found() {
567        let dir = test_dir("not_found");
568        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
569
570        let result = manager.status("nonexistent").await;
571        assert!(result.is_err());
572
573        let _ = std::fs::remove_dir_all(&dir);
574    }
575
576    #[tokio::test]
577    async fn test_persistence_across_restart() {
578        let dir = test_dir("persist");
579
580        // First session: submit a task
581        {
582            let manager = BackgroundTaskManager::new(&dir).await.unwrap();
583            manager
584                .submit("Persist test".to_string(), "System".to_string())
585                .await
586                .unwrap();
587        } // manager drops
588
589        // Second session: load from disk
590        {
591            let manager = BackgroundTaskManager::new(&dir).await.unwrap();
592            let tasks = manager.list_tasks().await;
593            assert_eq!(tasks.len(), 1, "Task should persist across restarts");
594            assert_eq!(tasks[0].prompt, "Persist test");
595        }
596
597        let _ = std::fs::remove_dir_all(&dir);
598    }
599}