Skip to main content

ravenclaws/
background.rs

1//! Background task manager for long-horizon async runs
2//!
3//! Supports assign-and-walk-away execution with persistence across restarts.
4//! Tasks are serialized to JSON in a configurable directory and can be resumed
5//! on process restart.
6//!
7//! # Architecture
8//!
9//! - `BackgroundTaskManager` — owns the task store and manages lifecycle
10//! - `BackgroundTask` — a single task with status, prompt, and result
11//! - `TaskStatus` — pending → running → completed / failed
12//! - Tasks are persisted as individual JSON files in `tasks/` directory
13//!
14//! # Usage
15//!
16//! ```ignore
17//! let mut manager = BackgroundTaskManager::new("/tmp/ravenclaws-tasks")?;
18//! let task_id = manager.submit("Analyze this data", llm_client).await?;
19//! let status = manager.status(&task_id)?;
20//! ```
21
22use crate::config::RuntimeConfig;
23use crate::error::{RavenClawsError, Result};
24use crate::llm::LLMProviderTrait;
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29use tokio::sync::RwLock;
30use tracing::{info, instrument, warn};
31
32/// Status of a background task
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
34pub enum TaskStatus {
35    /// Task has been submitted but not yet started
36    Pending,
37    /// Task is currently running
38    Running,
39    /// Task completed successfully
40    Completed,
41    /// Task failed with an error
42    Failed,
43    /// Task was cancelled by the user
44    Cancelled,
45}
46
47impl std::fmt::Display for TaskStatus {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        match self {
50            TaskStatus::Pending => write!(f, "pending"),
51            TaskStatus::Running => write!(f, "running"),
52            TaskStatus::Completed => write!(f, "completed"),
53            TaskStatus::Failed => write!(f, "failed"),
54            TaskStatus::Cancelled => write!(f, "cancelled"),
55        }
56    }
57}
58
59/// A single background task with full lifecycle tracking
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct BackgroundTask {
62    /// Unique task identifier (UUID v4)
63    pub id: String,
64    /// User-provided prompt for the agent
65    pub prompt: String,
66    /// System prompt used for this task
67    pub system_prompt: String,
68    /// Current status
69    pub status: TaskStatus,
70    /// Final result (set when completed)
71    pub result: Option<String>,
72    /// Error message (set when failed)
73    pub error: Option<String>,
74    /// When the task was created (ISO 8601)
75    pub created_at: String,
76    /// When the task was last updated (ISO 8601)
77    pub updated_at: String,
78    /// Number of agent loop iterations used
79    pub iterations: usize,
80    /// Provider name that executed this task
81    pub provider: Option<String>,
82    /// Model name that executed this task
83    pub model: Option<String>,
84}
85
86impl BackgroundTask {
87    /// Create a new pending task
88    pub fn new(id: String, prompt: String, system_prompt: String) -> Self {
89        let now = chrono::Utc::now().to_rfc3339();
90        Self {
91            id,
92            prompt,
93            system_prompt,
94            status: TaskStatus::Pending,
95            result: None,
96            error: None,
97            created_at: now.clone(),
98            updated_at: now,
99            iterations: 0,
100            provider: None,
101            model: None,
102        }
103    }
104}
105
106/// Manager for background tasks with disk persistence
107#[derive(Debug, Clone)]
108pub struct BackgroundTaskManager {
109    /// Directory where task files are stored
110    tasks_dir: PathBuf,
111    /// In-memory task index (id → task)
112    tasks: Arc<RwLock<HashMap<String, BackgroundTask>>>,
113}
114
115impl BackgroundTaskManager {
116    /// Create a new background task manager.
117    ///
118    /// The `tasks_dir` is where task JSON files are persisted.
119    /// If the directory doesn't exist, it will be created.
120    /// On creation, it loads any existing tasks from disk.
121    pub async fn new(tasks_dir: &Path) -> Result<Self> {
122        let tasks_dir = tasks_dir.to_path_buf();
123
124        // Create the tasks directory if it doesn't exist
125        std::fs::create_dir_all(&tasks_dir).map_err(|e| {
126            RavenClawsError::CommandExecution(format!(
127                "Failed to create tasks directory '{}': {}",
128                tasks_dir.display(),
129                e
130            ))
131        })?;
132
133        let tasks = Arc::new(RwLock::new(HashMap::new()));
134
135        let mut manager = Self { tasks_dir, tasks };
136
137        // Load existing tasks from disk
138        let count = manager.load_tasks().await?;
139        if count > 0 {
140            info!(count, "Loaded existing background tasks from disk");
141        }
142
143        Ok(manager)
144    }
145
146    /// Create from runtime config — uses `workdir/tasks/` as the tasks directory
147    pub async fn from_config(config: &RuntimeConfig) -> Result<Self> {
148        let tasks_dir = PathBuf::from(&config.workdir).join("tasks");
149        Self::new(&tasks_dir).await
150    }
151
152    /// Load all tasks from disk into memory
153    async fn load_tasks(&mut self) -> Result<usize> {
154        let mut count = 0;
155        let read_dir = match std::fs::read_dir(&self.tasks_dir) {
156            Ok(d) => d,
157            Err(_) => return Ok(0),
158        };
159
160        let mut tasks_to_insert = Vec::new();
161        for entry in read_dir.flatten() {
162            let path = entry.path();
163            if path.extension().is_some_and(|ext| ext == "json") {
164                match std::fs::read_to_string(&path) {
165                    Ok(content) => match serde_json::from_str::<BackgroundTask>(&content) {
166                        Ok(task) => {
167                            tasks_to_insert.push(task);
168                            count += 1;
169                        }
170                        Err(e) => {
171                            warn!(
172                                path = %path.display(),
173                                error = %e,
174                                "Failed to deserialize background task"
175                            );
176                        }
177                    },
178                    Err(e) => {
179                        warn!(
180                            path = %path.display(),
181                            error = %e,
182                            "Failed to read background task file"
183                        );
184                    }
185                }
186            }
187        }
188
189        let mut tasks = self.tasks.write().await;
190        for task in tasks_to_insert {
191            tasks.insert(task.id.clone(), task);
192        }
193
194        Ok(count)
195    }
196
197    /// Persist a single task to disk
198    fn save_task(&self, task: &BackgroundTask) -> Result<()> {
199        let path = self.tasks_dir.join(format!("{}.json", task.id));
200        let content = serde_json::to_string_pretty(task).map_err(|e| {
201            RavenClawsError::CommandExecution(format!("Failed to serialize task: {}", e))
202        })?;
203
204        std::fs::write(&path, content).map_err(|e| {
205            RavenClawsError::CommandExecution(format!(
206                "Failed to write task file '{}': {}",
207                path.display(),
208                e
209            ))
210        })?;
211
212        Ok(())
213    }
214
215    /// Submit a new background task and return its ID.
216    /// The task is persisted immediately and will be executed in the background.
217    pub async fn submit(&self, prompt: String, system_prompt: String) -> Result<String> {
218        let id = uuid::Uuid::new_v4().to_string();
219        let task = BackgroundTask::new(id.clone(), prompt, system_prompt);
220
221        // Persist to disk
222        self.save_task(&task)?;
223
224        // Add to in-memory index
225        let mut tasks = self.tasks.write().await;
226        tasks.insert(id.clone(), task);
227
228        info!(task_id = %id, "Background task submitted");
229        Ok(id)
230    }
231
232    /// Execute a background task with the given LLM client.
233    /// Updates the task status to Running, runs the agent loop, and saves the result.
234    #[instrument(skip(self, llm), fields(task_id = %task_id))]
235    pub async fn execute(&self, task_id: &str, llm: Arc<dyn LLMProviderTrait>) -> Result<String> {
236        // Get the task and mark as running
237        {
238            let mut tasks = self.tasks.write().await;
239            let task = tasks.get_mut(task_id).ok_or_else(|| {
240                RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
241            })?;
242
243            task.status = TaskStatus::Running;
244            task.provider = Some(llm.provider_name().to_string());
245            task.model = Some(llm.model().to_string());
246            task.updated_at = chrono::Utc::now().to_rfc3339();
247            self.save_task(task)?;
248        }
249
250        info!(
251            task_id = %task_id,
252            provider = llm.provider_name(),
253            model = llm.model(),
254            "Executing background task"
255        );
256
257        // Run the agent loop
258        let loop_config = crate::agent::AgentLoopConfig {
259            max_iterations: 10,
260            enable_tools: true,
261            require_approval: false,
262            prompt_injection_protection: true,
263            token_lifetime_secs: 0,
264            no_final_required: false,
265            fallback_chain: None,
266            token_budget: None,
267            ravenfabric: None,
268        };
269
270        let result = crate::agent::run_agent_loop(
271            llm.clone(),
272            &self.get_prompt(task_id).await?,
273            &self.get_system_prompt(task_id).await?,
274            loop_config,
275        )
276        .await;
277
278        // Update task with result
279        let mut tasks = self.tasks.write().await;
280        let task = tasks.get_mut(task_id).ok_or_else(|| {
281            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
282        })?;
283
284        match result {
285            Ok(response) => {
286                task.status = TaskStatus::Completed;
287                task.result = Some(response.clone());
288                task.updated_at = chrono::Utc::now().to_rfc3339();
289                self.save_task(task)?;
290
291                info!(
292                    task_id = %task_id,
293                    iterations = task.iterations,
294                    "Background task completed"
295                );
296
297                Ok(response)
298            }
299            Err(e) => {
300                task.status = TaskStatus::Failed;
301                task.error = Some(e.to_string());
302                task.updated_at = chrono::Utc::now().to_rfc3339();
303                self.save_task(task)?;
304
305                warn!(
306                    task_id = %task_id,
307                    error = %e,
308                    "Background task failed"
309                );
310
311                Err(e)
312            }
313        }
314    }
315
316    /// Get the current status of a task
317    #[allow(dead_code)]
318    pub async fn status(&self, task_id: &str) -> Result<TaskStatus> {
319        let tasks = self.tasks.read().await;
320        let task = tasks.get(task_id).ok_or_else(|| {
321            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
322        })?;
323        Ok(task.status.clone())
324    }
325
326    /// Get the full task details
327    pub async fn get_task(&self, task_id: &str) -> Result<BackgroundTask> {
328        let tasks = self.tasks.read().await;
329        tasks.get(task_id).cloned().ok_or_else(|| {
330            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
331        })
332    }
333
334    /// List all tasks with their status
335    pub async fn list_tasks(&self) -> Vec<BackgroundTask> {
336        let tasks = self.tasks.read().await;
337        let mut task_list: Vec<BackgroundTask> = tasks.values().cloned().collect();
338        // Sort by creation time (newest first)
339        task_list.sort_by(|a, b| b.created_at.cmp(&a.created_at));
340        task_list
341    }
342
343    /// Cancel a pending or running task
344    pub async fn cancel(&self, task_id: &str) -> Result<()> {
345        let mut tasks = self.tasks.write().await;
346        let task = tasks.get_mut(task_id).ok_or_else(|| {
347            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
348        })?;
349
350        match task.status {
351            TaskStatus::Pending | TaskStatus::Running => {
352                task.status = TaskStatus::Cancelled;
353                task.updated_at = chrono::Utc::now().to_rfc3339();
354                self.save_task(task)?;
355                info!(task_id = %task_id, "Background task cancelled");
356                Ok(())
357            }
358            _ => Err(RavenClawsError::CommandExecution(format!(
359                "Cannot cancel task '{}' in status '{}'",
360                task_id, task.status
361            ))),
362        }
363    }
364
365    /// Resume all incomplete tasks (Pending or Running) from disk.
366    /// Returns the list of task IDs that need execution.
367    pub async fn resume_incomplete(&self) -> Vec<String> {
368        let tasks = self.tasks.read().await;
369        let mut incomplete = Vec::new();
370
371        for task in tasks.values() {
372            if task.status == TaskStatus::Pending || task.status == TaskStatus::Running {
373                incomplete.push(task.id.clone());
374            }
375        }
376
377        if !incomplete.is_empty() {
378            info!(
379                count = incomplete.len(),
380                "Found incomplete background tasks to resume"
381            );
382        }
383
384        incomplete
385    }
386
387    /// Get the prompt for a task (internal helper)
388    async fn get_prompt(&self, task_id: &str) -> Result<String> {
389        let tasks = self.tasks.read().await;
390        let task = tasks.get(task_id).ok_or_else(|| {
391            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
392        })?;
393        Ok(task.prompt.clone())
394    }
395
396    /// Get the system prompt for a task (internal helper)
397    async fn get_system_prompt(&self, task_id: &str) -> Result<String> {
398        let tasks = self.tasks.read().await;
399        let task = tasks.get(task_id).ok_or_else(|| {
400            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
401        })?;
402        Ok(task.system_prompt.clone())
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409    use std::path::PathBuf;
410
411    fn test_dir(name: &str) -> PathBuf {
412        let dir = std::env::temp_dir().join(format!(
413            "ravenclaws-test-bg-{}-{}",
414            name,
415            std::process::id()
416        ));
417        let _ = std::fs::remove_dir_all(&dir);
418        dir
419    }
420
421    #[tokio::test]
422    async fn test_manager_new_creates_directory() {
423        let dir = test_dir("create_dir");
424        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
425        assert!(dir.exists(), "Tasks directory should be created");
426        assert!(manager.tasks.read().await.is_empty());
427        let _ = std::fs::remove_dir_all(&dir);
428    }
429
430    #[tokio::test]
431    async fn test_submit_task() {
432        let dir = test_dir("submit");
433        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
434
435        let task_id = manager
436            .submit("Test prompt".to_string(), "Test system".to_string())
437            .await
438            .unwrap();
439
440        let task = manager.get_task(&task_id).await.unwrap();
441        assert_eq!(task.prompt, "Test prompt");
442        assert_eq!(task.system_prompt, "Test system");
443        assert_eq!(task.status, TaskStatus::Pending);
444        assert!(task.result.is_none());
445
446        // Verify persistence
447        let task_path = dir.join(format!("{}.json", task_id));
448        assert!(task_path.exists(), "Task file should exist on disk");
449
450        let _ = std::fs::remove_dir_all(&dir);
451    }
452
453    #[tokio::test]
454    async fn test_status_transitions() {
455        let dir = test_dir("transitions");
456        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
457
458        let task_id = manager
459            .submit("Test".to_string(), "System".to_string())
460            .await
461            .unwrap();
462
463        assert_eq!(manager.status(&task_id).await.unwrap(), TaskStatus::Pending);
464
465        // Cancel the task
466        manager.cancel(&task_id).await.unwrap();
467        assert_eq!(
468            manager.status(&task_id).await.unwrap(),
469            TaskStatus::Cancelled
470        );
471
472        let _ = std::fs::remove_dir_all(&dir);
473    }
474
475    #[tokio::test]
476    async fn test_list_tasks() {
477        let dir = test_dir("list");
478        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
479
480        manager
481            .submit("Task 1".to_string(), "System".to_string())
482            .await
483            .unwrap();
484        manager
485            .submit("Task 2".to_string(), "System".to_string())
486            .await
487            .unwrap();
488
489        let tasks = manager.list_tasks().await;
490        assert_eq!(tasks.len(), 2);
491
492        let _ = std::fs::remove_dir_all(&dir);
493    }
494
495    #[tokio::test]
496    async fn test_cancel_completed_task_fails() {
497        let dir = test_dir("cancel_fail");
498        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
499
500        let task_id = manager
501            .submit("Test".to_string(), "System".to_string())
502            .await
503            .unwrap();
504
505        // Manually set to completed
506        {
507            let mut tasks = manager.tasks.write().await;
508            let task = tasks.get_mut(&task_id).unwrap();
509            task.status = TaskStatus::Completed;
510        }
511
512        let result = manager.cancel(&task_id).await;
513        assert!(result.is_err(), "Cancelling a completed task should fail");
514
515        let _ = std::fs::remove_dir_all(&dir);
516    }
517
518    #[tokio::test]
519    async fn test_resume_incomplete_tasks() {
520        let dir = test_dir("resume");
521        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
522
523        manager
524            .submit("Task 1".to_string(), "System".to_string())
525            .await
526            .unwrap();
527        manager
528            .submit("Task 2".to_string(), "System".to_string())
529            .await
530            .unwrap();
531
532        // Mark one as completed
533        {
534            let tasks = manager.tasks.read().await;
535            let tasks_vec: Vec<&BackgroundTask> = tasks.values().collect();
536            if let Some(task) = tasks_vec.first() {
537                let id = task.id.clone();
538                drop(tasks);
539                let mut tasks = manager.tasks.write().await;
540                if let Some(t) = tasks.get_mut(&id) {
541                    t.status = TaskStatus::Completed;
542                }
543            }
544        }
545
546        let incomplete = manager.resume_incomplete().await;
547        assert_eq!(incomplete.len(), 1, "One task should be incomplete");
548
549        let _ = std::fs::remove_dir_all(&dir);
550    }
551
552    #[tokio::test]
553    async fn test_task_not_found() {
554        let dir = test_dir("not_found");
555        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
556
557        let result = manager.status("nonexistent").await;
558        assert!(result.is_err());
559
560        let _ = std::fs::remove_dir_all(&dir);
561    }
562
563    #[tokio::test]
564    async fn test_persistence_across_restart() {
565        let dir = test_dir("persist");
566
567        // First session: submit a task
568        {
569            let manager = BackgroundTaskManager::new(&dir).await.unwrap();
570            manager
571                .submit("Persist test".to_string(), "System".to_string())
572                .await
573                .unwrap();
574        } // manager drops
575
576        // Second session: load from disk
577        {
578            let manager = BackgroundTaskManager::new(&dir).await.unwrap();
579            let tasks = manager.list_tasks().await;
580            assert_eq!(tasks.len(), 1, "Task should persist across restarts");
581            assert_eq!(tasks[0].prompt, "Persist test");
582        }
583
584        let _ = std::fs::remove_dir_all(&dir);
585    }
586}