Skip to main content

ravenclaws/
background.rs

1//! Background task manager for long-horizon async runs
2//!
3//! Supports assign-and-walk-away execution with persistence across restarts.
4//! Tasks are serialized to JSON in a configurable directory and can be resumed
5//! on process restart.
6//!
7//! # Architecture
8//!
9//! - `BackgroundTaskManager` — owns the task store and manages lifecycle
10//! - `BackgroundTask` — a single task with status, prompt, and result
11//! - `TaskStatus` — pending → running → completed / failed
12//! - Tasks are persisted as individual JSON files in `tasks/` directory
13//!
14//! # Usage
15//!
16//! ```ignore
17//! let mut manager = BackgroundTaskManager::new("/tmp/ravenclaws-tasks")?;
18//! let task_id = manager.submit("Analyze this data", llm_client).await?;
19//! let status = manager.status(&task_id)?;
20//! ```
21
22use crate::config::RuntimeConfig;
23use crate::error::{RavenClawsError, Result};
24use crate::llm::LLMProviderTrait;
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29use tokio::sync::RwLock;
30use tracing::{info, instrument, warn};
31
32/// Status of a background task
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
34pub enum TaskStatus {
35    /// Task has been submitted but not yet started
36    Pending,
37    /// Task is currently running
38    Running,
39    /// Task completed successfully
40    Completed,
41    /// Task failed with an error
42    Failed,
43    /// Task was cancelled by the user
44    Cancelled,
45}
46
47impl std::fmt::Display for TaskStatus {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        match self {
50            TaskStatus::Pending => write!(f, "pending"),
51            TaskStatus::Running => write!(f, "running"),
52            TaskStatus::Completed => write!(f, "completed"),
53            TaskStatus::Failed => write!(f, "failed"),
54            TaskStatus::Cancelled => write!(f, "cancelled"),
55        }
56    }
57}
58
59/// A single background task with full lifecycle tracking
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct BackgroundTask {
62    /// Unique task identifier (UUID v4)
63    pub id: String,
64    /// User-provided prompt for the agent
65    pub prompt: String,
66    /// System prompt used for this task
67    pub system_prompt: String,
68    /// Current status
69    pub status: TaskStatus,
70    /// Final result (set when completed)
71    pub result: Option<String>,
72    /// Error message (set when failed)
73    pub error: Option<String>,
74    /// When the task was created (ISO 8601)
75    pub created_at: String,
76    /// When the task was last updated (ISO 8601)
77    pub updated_at: String,
78    /// Number of agent loop iterations used
79    pub iterations: usize,
80    /// Provider name that executed this task
81    pub provider: Option<String>,
82    /// Model name that executed this task
83    pub model: Option<String>,
84    /// Serialized checkpoint state for durable execution (v0.9.12+)
85    /// When set, the agent loop can resume from this checkpoint instead of
86    /// starting from scratch after a process restart.
87    pub checkpoint: Option<String>,
88}
89
90impl BackgroundTask {
91    /// Create a new pending task
92    pub fn new(id: String, prompt: String, system_prompt: String) -> Self {
93        let now = chrono::Utc::now().to_rfc3339();
94        Self {
95            id,
96            prompt,
97            system_prompt,
98            status: TaskStatus::Pending,
99            result: None,
100            error: None,
101            created_at: now.clone(),
102            updated_at: now,
103            iterations: 0,
104            provider: None,
105            model: None,
106            checkpoint: None,
107        }
108    }
109}
110
111/// Manager for background tasks with disk persistence
112#[derive(Debug, Clone)]
113pub struct BackgroundTaskManager {
114    /// Directory where task files are stored
115    tasks_dir: PathBuf,
116    /// In-memory task index (id → task)
117    tasks: Arc<RwLock<HashMap<String, BackgroundTask>>>,
118}
119
120impl BackgroundTaskManager {
121    /// Create a new background task manager.
122    ///
123    /// The `tasks_dir` is where task JSON files are persisted.
124    /// If the directory doesn't exist, it will be created.
125    /// On creation, it loads any existing tasks from disk.
126    pub async fn new(tasks_dir: &Path) -> Result<Self> {
127        let tasks_dir = tasks_dir.to_path_buf();
128
129        // Create the tasks directory if it doesn't exist
130        std::fs::create_dir_all(&tasks_dir).map_err(|e| {
131            RavenClawsError::CommandExecution(format!(
132                "Failed to create tasks directory '{}': {}",
133                tasks_dir.display(),
134                e
135            ))
136        })?;
137
138        let tasks = Arc::new(RwLock::new(HashMap::new()));
139
140        let mut manager = Self { tasks_dir, tasks };
141
142        // Load existing tasks from disk
143        let count = manager.load_tasks().await?;
144        if count > 0 {
145            info!(count, "Loaded existing background tasks from disk");
146        }
147
148        Ok(manager)
149    }
150
151    /// Create from runtime config — uses `workdir/tasks/` as the tasks directory
152    pub async fn from_config(config: &RuntimeConfig) -> Result<Self> {
153        let tasks_dir = PathBuf::from(&config.workdir).join("tasks");
154        Self::new(&tasks_dir).await
155    }
156
157    /// Load all tasks from disk into memory
158    async fn load_tasks(&mut self) -> Result<usize> {
159        let mut count = 0;
160        let read_dir = match std::fs::read_dir(&self.tasks_dir) {
161            Ok(d) => d,
162            Err(_) => return Ok(0),
163        };
164
165        let mut tasks_to_insert = Vec::new();
166        for entry in read_dir.flatten() {
167            let path = entry.path();
168            if path.extension().is_some_and(|ext| ext == "json") {
169                match std::fs::read_to_string(&path) {
170                    Ok(content) => match serde_json::from_str::<BackgroundTask>(&content) {
171                        Ok(task) => {
172                            tasks_to_insert.push(task);
173                            count += 1;
174                        }
175                        Err(e) => {
176                            warn!(
177                                path = %path.display(),
178                                error = %e,
179                                "Failed to deserialize background task"
180                            );
181                        }
182                    },
183                    Err(e) => {
184                        warn!(
185                            path = %path.display(),
186                            error = %e,
187                            "Failed to read background task file"
188                        );
189                    }
190                }
191            }
192        }
193
194        let mut tasks = self.tasks.write().await;
195        for task in tasks_to_insert {
196            tasks.insert(task.id.clone(), task);
197        }
198
199        Ok(count)
200    }
201
202    /// Persist a single task to disk
203    fn save_task(&self, task: &BackgroundTask) -> Result<()> {
204        let path = self.tasks_dir.join(format!("{}.json", task.id));
205        let content = serde_json::to_string_pretty(task).map_err(|e| {
206            RavenClawsError::CommandExecution(format!("Failed to serialize task: {}", e))
207        })?;
208
209        std::fs::write(&path, content).map_err(|e| {
210            RavenClawsError::CommandExecution(format!(
211                "Failed to write task file '{}': {}",
212                path.display(),
213                e
214            ))
215        })?;
216
217        Ok(())
218    }
219
220    /// Submit a new background task and return its ID.
221    /// The task is persisted immediately and will be executed in the background.
222    pub async fn submit(&self, prompt: String, system_prompt: String) -> Result<String> {
223        let id = uuid::Uuid::new_v4().to_string();
224        let task = BackgroundTask::new(id.clone(), prompt, system_prompt);
225
226        // Persist to disk
227        self.save_task(&task)?;
228
229        // Add to in-memory index
230        let mut tasks = self.tasks.write().await;
231        tasks.insert(id.clone(), task);
232
233        info!(task_id = %id, "Background task submitted");
234        Ok(id)
235    }
236
237    /// Execute a background task with the given LLM client.
238    /// Updates the task status to Running, runs the agent loop, and saves the result.
239    #[instrument(skip(self, llm), fields(task_id = %task_id))]
240    pub async fn execute(&self, task_id: &str, llm: Arc<dyn LLMProviderTrait>) -> Result<String> {
241        // Get the task and mark as running
242        {
243            let mut tasks = self.tasks.write().await;
244            let task = tasks.get_mut(task_id).ok_or_else(|| {
245                RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
246            })?;
247
248            task.status = TaskStatus::Running;
249            task.provider = Some(llm.provider_name().to_string());
250            task.model = Some(llm.model().to_string());
251            task.updated_at = chrono::Utc::now().to_rfc3339();
252            self.save_task(task)?;
253        }
254
255        info!(
256            task_id = %task_id,
257            provider = llm.provider_name(),
258            model = llm.model(),
259            "Executing background task"
260        );
261
262        // Determine checkpoint directory for durable execution
263        let checkpoint_dir = self.tasks_dir.join("checkpoints");
264        let _ = std::fs::create_dir_all(&checkpoint_dir);
265
266        // Run the agent loop with checkpointing enabled
267        let loop_config = crate::agent::AgentLoopConfig {
268            max_iterations: 10,
269            enable_tools: true,
270            require_approval: false,
271            prompt_injection_protection: true,
272            token_lifetime_secs: 0,
273            no_final_required: true,
274            fallback_chain: None,
275            token_budget: None,
276            ravenfabric: None,
277            checkpoint_dir: Some(checkpoint_dir),
278            session_id: Some(task_id.to_string()),
279            metrics_callback: None,
280            load_manager: None,
281            retry_config: None,
282        };
283
284        let result = crate::agent::run_agent_loop(
285            llm.clone(),
286            &self.get_prompt(task_id).await?,
287            &self.get_system_prompt(task_id).await?,
288            loop_config,
289        )
290        .await;
291
292        // Update task with result
293        let mut tasks = self.tasks.write().await;
294        let task = tasks.get_mut(task_id).ok_or_else(|| {
295            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
296        })?;
297
298        match result {
299            Ok(response) => {
300                task.status = TaskStatus::Completed;
301                task.result = Some(response.clone());
302                task.updated_at = chrono::Utc::now().to_rfc3339();
303                self.save_task(task)?;
304
305                info!(
306                    task_id = %task_id,
307                    iterations = task.iterations,
308                    "Background task completed"
309                );
310
311                Ok(response)
312            }
313            Err(e) => {
314                task.status = TaskStatus::Failed;
315                task.error = Some(e.to_string());
316                task.updated_at = chrono::Utc::now().to_rfc3339();
317                self.save_task(task)?;
318
319                warn!(
320                    task_id = %task_id,
321                    error = %e,
322                    "Background task failed"
323                );
324
325                Err(e)
326            }
327        }
328    }
329
330    /// Get the current status of a task
331    #[allow(dead_code)]
332    pub async fn status(&self, task_id: &str) -> Result<TaskStatus> {
333        let tasks = self.tasks.read().await;
334        let task = tasks.get(task_id).ok_or_else(|| {
335            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
336        })?;
337        Ok(task.status.clone())
338    }
339
340    /// Get the full task details
341    pub async fn get_task(&self, task_id: &str) -> Result<BackgroundTask> {
342        let tasks = self.tasks.read().await;
343        tasks.get(task_id).cloned().ok_or_else(|| {
344            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
345        })
346    }
347
348    /// List all tasks with their status
349    pub async fn list_tasks(&self) -> Vec<BackgroundTask> {
350        let tasks = self.tasks.read().await;
351        let mut task_list: Vec<BackgroundTask> = tasks.values().cloned().collect();
352        // Sort by creation time (newest first)
353        task_list.sort_by(|a, b| b.created_at.cmp(&a.created_at));
354        task_list
355    }
356
357    /// Cancel a pending or running task
358    pub async fn cancel(&self, task_id: &str) -> Result<()> {
359        let mut tasks = self.tasks.write().await;
360        let task = tasks.get_mut(task_id).ok_or_else(|| {
361            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
362        })?;
363
364        match task.status {
365            TaskStatus::Pending | TaskStatus::Running => {
366                task.status = TaskStatus::Cancelled;
367                task.updated_at = chrono::Utc::now().to_rfc3339();
368                self.save_task(task)?;
369                info!(task_id = %task_id, "Background task cancelled");
370                Ok(())
371            }
372            _ => Err(RavenClawsError::CommandExecution(format!(
373                "Cannot cancel task '{}' in status '{}'",
374                task_id, task.status
375            ))),
376        }
377    }
378
379    /// Resume all incomplete tasks (Pending or Running) from disk.
380    /// Returns the list of task IDs that need execution.
381    pub async fn resume_incomplete(&self) -> Vec<String> {
382        let tasks = self.tasks.read().await;
383        let mut incomplete = Vec::new();
384
385        for task in tasks.values() {
386            if task.status == TaskStatus::Pending || task.status == TaskStatus::Running {
387                incomplete.push(task.id.clone());
388            }
389        }
390
391        if !incomplete.is_empty() {
392            info!(
393                count = incomplete.len(),
394                "Found incomplete background tasks to resume"
395            );
396        }
397
398        incomplete
399    }
400
401    /// Get the prompt for a task (internal helper)
402    async fn get_prompt(&self, task_id: &str) -> Result<String> {
403        let tasks = self.tasks.read().await;
404        let task = tasks.get(task_id).ok_or_else(|| {
405            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
406        })?;
407        Ok(task.prompt.clone())
408    }
409
410    /// Get the system prompt for a task (internal helper)
411    async fn get_system_prompt(&self, task_id: &str) -> Result<String> {
412        let tasks = self.tasks.read().await;
413        let task = tasks.get(task_id).ok_or_else(|| {
414            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
415        })?;
416        Ok(task.system_prompt.clone())
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use super::*;
423    use std::path::PathBuf;
424
425    fn test_dir(name: &str) -> PathBuf {
426        let dir = std::env::temp_dir().join(format!(
427            "ravenclaws-test-bg-{}-{}",
428            name,
429            std::process::id()
430        ));
431        let _ = std::fs::remove_dir_all(&dir);
432        dir
433    }
434
435    #[tokio::test]
436    async fn test_manager_new_creates_directory() {
437        let dir = test_dir("create_dir");
438        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
439        assert!(dir.exists(), "Tasks directory should be created");
440        assert!(manager.tasks.read().await.is_empty());
441        let _ = std::fs::remove_dir_all(&dir);
442    }
443
444    #[tokio::test]
445    async fn test_submit_task() {
446        let dir = test_dir("submit");
447        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
448
449        let task_id = manager
450            .submit("Test prompt".to_string(), "Test system".to_string())
451            .await
452            .unwrap();
453
454        let task = manager.get_task(&task_id).await.unwrap();
455        assert_eq!(task.prompt, "Test prompt");
456        assert_eq!(task.system_prompt, "Test system");
457        assert_eq!(task.status, TaskStatus::Pending);
458        assert!(task.result.is_none());
459
460        // Verify persistence
461        let task_path = dir.join(format!("{}.json", task_id));
462        assert!(task_path.exists(), "Task file should exist on disk");
463
464        let _ = std::fs::remove_dir_all(&dir);
465    }
466
467    #[tokio::test]
468    async fn test_status_transitions() {
469        let dir = test_dir("transitions");
470        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
471
472        let task_id = manager
473            .submit("Test".to_string(), "System".to_string())
474            .await
475            .unwrap();
476
477        assert_eq!(manager.status(&task_id).await.unwrap(), TaskStatus::Pending);
478
479        // Cancel the task
480        manager.cancel(&task_id).await.unwrap();
481        assert_eq!(
482            manager.status(&task_id).await.unwrap(),
483            TaskStatus::Cancelled
484        );
485
486        let _ = std::fs::remove_dir_all(&dir);
487    }
488
489    #[tokio::test]
490    async fn test_list_tasks() {
491        let dir = test_dir("list");
492        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
493
494        manager
495            .submit("Task 1".to_string(), "System".to_string())
496            .await
497            .unwrap();
498        manager
499            .submit("Task 2".to_string(), "System".to_string())
500            .await
501            .unwrap();
502
503        let tasks = manager.list_tasks().await;
504        assert_eq!(tasks.len(), 2);
505
506        let _ = std::fs::remove_dir_all(&dir);
507    }
508
509    #[tokio::test]
510    async fn test_cancel_completed_task_fails() {
511        let dir = test_dir("cancel_fail");
512        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
513
514        let task_id = manager
515            .submit("Test".to_string(), "System".to_string())
516            .await
517            .unwrap();
518
519        // Manually set to completed
520        {
521            let mut tasks = manager.tasks.write().await;
522            let task = tasks.get_mut(&task_id).unwrap();
523            task.status = TaskStatus::Completed;
524        }
525
526        let result = manager.cancel(&task_id).await;
527        assert!(result.is_err(), "Cancelling a completed task should fail");
528
529        let _ = std::fs::remove_dir_all(&dir);
530    }
531
532    #[tokio::test]
533    async fn test_resume_incomplete_tasks() {
534        let dir = test_dir("resume");
535        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
536
537        manager
538            .submit("Task 1".to_string(), "System".to_string())
539            .await
540            .unwrap();
541        manager
542            .submit("Task 2".to_string(), "System".to_string())
543            .await
544            .unwrap();
545
546        // Mark one as completed
547        {
548            let tasks = manager.tasks.read().await;
549            let tasks_vec: Vec<&BackgroundTask> = tasks.values().collect();
550            if let Some(task) = tasks_vec.first() {
551                let id = task.id.clone();
552                drop(tasks);
553                let mut tasks = manager.tasks.write().await;
554                if let Some(t) = tasks.get_mut(&id) {
555                    t.status = TaskStatus::Completed;
556                }
557            }
558        }
559
560        let incomplete = manager.resume_incomplete().await;
561        assert_eq!(incomplete.len(), 1, "One task should be incomplete");
562
563        let _ = std::fs::remove_dir_all(&dir);
564    }
565
566    #[tokio::test]
567    async fn test_task_not_found() {
568        let dir = test_dir("not_found");
569        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
570
571        let result = manager.status("nonexistent").await;
572        assert!(result.is_err());
573
574        let _ = std::fs::remove_dir_all(&dir);
575    }
576
577    #[tokio::test]
578    async fn test_persistence_across_restart() {
579        let dir = test_dir("persist");
580
581        // First session: submit a task
582        {
583            let manager = BackgroundTaskManager::new(&dir).await.unwrap();
584            manager
585                .submit("Persist test".to_string(), "System".to_string())
586                .await
587                .unwrap();
588        } // manager drops
589
590        // Second session: load from disk
591        {
592            let manager = BackgroundTaskManager::new(&dir).await.unwrap();
593            let tasks = manager.list_tasks().await;
594            assert_eq!(tasks.len(), 1, "Task should persist across restarts");
595            assert_eq!(tasks[0].prompt, "Persist test");
596        }
597
598        let _ = std::fs::remove_dir_all(&dir);
599    }
600}