Skip to main content

ravenclaws/
background.rs

1//! Background task manager for long-horizon async runs
2//!
3//! Supports assign-and-walk-away execution with persistence across restarts.
4//! Tasks are serialized to JSON in a configurable directory and can be resumed
5//! on process restart.
6//!
7//! # Architecture
8//!
9//! - `BackgroundTaskManager` — owns the task store and manages lifecycle
10//! - `BackgroundTask` — a single task with status, prompt, and result
11//! - `TaskStatus` — pending → running → completed / failed
12//! - Tasks are persisted as individual JSON files in `tasks/` directory
13//!
14//! # Usage
15//!
16//! ```ignore
17//! let mut manager = BackgroundTaskManager::new("/tmp/ravenclaws-tasks")?;
18//! let task_id = manager.submit("Analyze this data", llm_client).await?;
19//! let status = manager.status(&task_id)?;
20//! ```
21
22use crate::config::RuntimeConfig;
23use crate::error::{RavenClawsError, Result};
24use crate::llm::LLMProviderTrait;
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29use tokio::sync::RwLock;
30use tracing::{info, instrument, warn};
31
32/// Status of a background task
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
34pub enum TaskStatus {
35    /// Task has been submitted but not yet started
36    Pending,
37    /// Task is currently running
38    Running,
39    /// Task completed successfully
40    Completed,
41    /// Task failed with an error
42    Failed,
43    /// Task was cancelled by the user
44    Cancelled,
45}
46
47impl std::fmt::Display for TaskStatus {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        match self {
50            TaskStatus::Pending => write!(f, "pending"),
51            TaskStatus::Running => write!(f, "running"),
52            TaskStatus::Completed => write!(f, "completed"),
53            TaskStatus::Failed => write!(f, "failed"),
54            TaskStatus::Cancelled => write!(f, "cancelled"),
55        }
56    }
57}
58
59/// A single background task with full lifecycle tracking
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct BackgroundTask {
62    /// Unique task identifier (UUID v4)
63    pub id: String,
64    /// User-provided prompt for the agent
65    pub prompt: String,
66    /// System prompt used for this task
67    pub system_prompt: String,
68    /// Current status
69    pub status: TaskStatus,
70    /// Final result (set when completed)
71    pub result: Option<String>,
72    /// Error message (set when failed)
73    pub error: Option<String>,
74    /// When the task was created (ISO 8601)
75    pub created_at: String,
76    /// When the task was last updated (ISO 8601)
77    pub updated_at: String,
78    /// Number of agent loop iterations used
79    pub iterations: usize,
80    /// Provider name that executed this task
81    pub provider: Option<String>,
82    /// Model name that executed this task
83    pub model: Option<String>,
84    /// Serialized checkpoint state for durable execution (v0.9.12+)
85    /// When set, the agent loop can resume from this checkpoint instead of
86    /// starting from scratch after a process restart.
87    pub checkpoint: Option<String>,
88}
89
90impl BackgroundTask {
91    /// Create a new pending task
92    pub fn new(id: String, prompt: String, system_prompt: String) -> Self {
93        let now = chrono::Utc::now().to_rfc3339();
94        Self {
95            id,
96            prompt,
97            system_prompt,
98            status: TaskStatus::Pending,
99            result: None,
100            error: None,
101            created_at: now.clone(),
102            updated_at: now,
103            iterations: 0,
104            provider: None,
105            model: None,
106            checkpoint: None,
107        }
108    }
109}
110
111/// Manager for background tasks with disk persistence
112#[derive(Debug, Clone)]
113pub struct BackgroundTaskManager {
114    /// Directory where task files are stored
115    tasks_dir: PathBuf,
116    /// In-memory task index (id → task)
117    tasks: Arc<RwLock<HashMap<String, BackgroundTask>>>,
118}
119
120impl BackgroundTaskManager {
121    /// Create a new background task manager.
122    ///
123    /// The `tasks_dir` is where task JSON files are persisted.
124    /// If the directory doesn't exist, it will be created.
125    /// On creation, it loads any existing tasks from disk.
126    pub async fn new(tasks_dir: &Path) -> Result<Self> {
127        let tasks_dir = tasks_dir.to_path_buf();
128
129        // Create the tasks directory if it doesn't exist
130        std::fs::create_dir_all(&tasks_dir).map_err(|e| {
131            RavenClawsError::CommandExecution(format!(
132                "Failed to create tasks directory '{}': {}",
133                tasks_dir.display(),
134                e
135            ))
136        })?;
137
138        let tasks = Arc::new(RwLock::new(HashMap::new()));
139
140        let mut manager = Self { tasks_dir, tasks };
141
142        // Load existing tasks from disk
143        let count = manager.load_tasks().await?;
144        if count > 0 {
145            info!(count, "Loaded existing background tasks from disk");
146        }
147
148        Ok(manager)
149    }
150
151    /// Create from runtime config — uses `workdir/tasks/` as the tasks directory
152    pub async fn from_config(config: &RuntimeConfig) -> Result<Self> {
153        let tasks_dir = PathBuf::from(&config.workdir).join("tasks");
154        Self::new(&tasks_dir).await
155    }
156
157    /// Load all tasks from disk into memory
158    async fn load_tasks(&mut self) -> Result<usize> {
159        let mut count = 0;
160        let read_dir = match std::fs::read_dir(&self.tasks_dir) {
161            Ok(d) => d,
162            Err(_) => return Ok(0),
163        };
164
165        let mut tasks_to_insert = Vec::new();
166        for entry in read_dir.flatten() {
167            let path = entry.path();
168            if path.extension().is_some_and(|ext| ext == "json") {
169                match std::fs::read_to_string(&path) {
170                    Ok(content) => match serde_json::from_str::<BackgroundTask>(&content) {
171                        Ok(task) => {
172                            tasks_to_insert.push(task);
173                            count += 1;
174                        }
175                        Err(e) => {
176                            warn!(
177                                path = %path.display(),
178                                error = %e,
179                                "Failed to deserialize background task"
180                            );
181                        }
182                    },
183                    Err(e) => {
184                        warn!(
185                            path = %path.display(),
186                            error = %e,
187                            "Failed to read background task file"
188                        );
189                    }
190                }
191            }
192        }
193
194        let mut tasks = self.tasks.write().await;
195        for task in tasks_to_insert {
196            tasks.insert(task.id.clone(), task);
197        }
198
199        Ok(count)
200    }
201
202    /// Persist a single task to disk
203    fn save_task(&self, task: &BackgroundTask) -> Result<()> {
204        let path = self.tasks_dir.join(format!("{}.json", task.id));
205        let content = serde_json::to_string_pretty(task).map_err(|e| {
206            RavenClawsError::CommandExecution(format!("Failed to serialize task: {}", e))
207        })?;
208
209        std::fs::write(&path, content).map_err(|e| {
210            RavenClawsError::CommandExecution(format!(
211                "Failed to write task file '{}': {}",
212                path.display(),
213                e
214            ))
215        })?;
216
217        Ok(())
218    }
219
220    /// Submit a new background task and return its ID.
221    /// The task is persisted immediately and will be executed in the background.
222    pub async fn submit(&self, prompt: String, system_prompt: String) -> Result<String> {
223        let id = uuid::Uuid::new_v4().to_string();
224        let task = BackgroundTask::new(id.clone(), prompt, system_prompt);
225
226        // Persist to disk
227        self.save_task(&task)?;
228
229        // Add to in-memory index
230        let mut tasks = self.tasks.write().await;
231        tasks.insert(id.clone(), task);
232
233        info!(task_id = %id, "Background task submitted");
234        Ok(id)
235    }
236
237    /// Execute a background task with the given LLM client.
238    /// Updates the task status to Running, runs the agent loop, and saves the result.
239    #[instrument(skip(self, llm), fields(task_id = %task_id))]
240    pub async fn execute(&self, task_id: &str, llm: Arc<dyn LLMProviderTrait>) -> Result<String> {
241        // Get the task and mark as running
242        {
243            let mut tasks = self.tasks.write().await;
244            let task = tasks.get_mut(task_id).ok_or_else(|| {
245                RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
246            })?;
247
248            task.status = TaskStatus::Running;
249            task.provider = Some(llm.provider_name().to_string());
250            task.model = Some(llm.model().to_string());
251            task.updated_at = chrono::Utc::now().to_rfc3339();
252            self.save_task(task)?;
253        }
254
255        info!(
256            task_id = %task_id,
257            provider = llm.provider_name(),
258            model = llm.model(),
259            "Executing background task"
260        );
261
262        // Determine checkpoint directory for durable execution
263        let checkpoint_dir = self.tasks_dir.join("checkpoints");
264        let _ = std::fs::create_dir_all(&checkpoint_dir);
265
266        // Run the agent loop with checkpointing enabled
267        let loop_config = crate::agent::AgentLoopConfig {
268            max_iterations: 10,
269            enable_tools: true,
270            require_approval: false,
271            prompt_injection_protection: true,
272            token_lifetime_secs: 0,
273            no_final_required: false,
274            fallback_chain: None,
275            token_budget: None,
276            ravenfabric: None,
277            checkpoint_dir: Some(checkpoint_dir),
278            session_id: Some(task_id.to_string()),
279            metrics_callback: None,
280        };
281
282        let result = crate::agent::run_agent_loop(
283            llm.clone(),
284            &self.get_prompt(task_id).await?,
285            &self.get_system_prompt(task_id).await?,
286            loop_config,
287        )
288        .await;
289
290        // Update task with result
291        let mut tasks = self.tasks.write().await;
292        let task = tasks.get_mut(task_id).ok_or_else(|| {
293            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
294        })?;
295
296        match result {
297            Ok(response) => {
298                task.status = TaskStatus::Completed;
299                task.result = Some(response.clone());
300                task.updated_at = chrono::Utc::now().to_rfc3339();
301                self.save_task(task)?;
302
303                info!(
304                    task_id = %task_id,
305                    iterations = task.iterations,
306                    "Background task completed"
307                );
308
309                Ok(response)
310            }
311            Err(e) => {
312                task.status = TaskStatus::Failed;
313                task.error = Some(e.to_string());
314                task.updated_at = chrono::Utc::now().to_rfc3339();
315                self.save_task(task)?;
316
317                warn!(
318                    task_id = %task_id,
319                    error = %e,
320                    "Background task failed"
321                );
322
323                Err(e)
324            }
325        }
326    }
327
328    /// Get the current status of a task
329    #[allow(dead_code)]
330    pub async fn status(&self, task_id: &str) -> Result<TaskStatus> {
331        let tasks = self.tasks.read().await;
332        let task = tasks.get(task_id).ok_or_else(|| {
333            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
334        })?;
335        Ok(task.status.clone())
336    }
337
338    /// Get the full task details
339    pub async fn get_task(&self, task_id: &str) -> Result<BackgroundTask> {
340        let tasks = self.tasks.read().await;
341        tasks.get(task_id).cloned().ok_or_else(|| {
342            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
343        })
344    }
345
346    /// List all tasks with their status
347    pub async fn list_tasks(&self) -> Vec<BackgroundTask> {
348        let tasks = self.tasks.read().await;
349        let mut task_list: Vec<BackgroundTask> = tasks.values().cloned().collect();
350        // Sort by creation time (newest first)
351        task_list.sort_by(|a, b| b.created_at.cmp(&a.created_at));
352        task_list
353    }
354
355    /// Cancel a pending or running task
356    pub async fn cancel(&self, task_id: &str) -> Result<()> {
357        let mut tasks = self.tasks.write().await;
358        let task = tasks.get_mut(task_id).ok_or_else(|| {
359            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
360        })?;
361
362        match task.status {
363            TaskStatus::Pending | TaskStatus::Running => {
364                task.status = TaskStatus::Cancelled;
365                task.updated_at = chrono::Utc::now().to_rfc3339();
366                self.save_task(task)?;
367                info!(task_id = %task_id, "Background task cancelled");
368                Ok(())
369            }
370            _ => Err(RavenClawsError::CommandExecution(format!(
371                "Cannot cancel task '{}' in status '{}'",
372                task_id, task.status
373            ))),
374        }
375    }
376
377    /// Resume all incomplete tasks (Pending or Running) from disk.
378    /// Returns the list of task IDs that need execution.
379    pub async fn resume_incomplete(&self) -> Vec<String> {
380        let tasks = self.tasks.read().await;
381        let mut incomplete = Vec::new();
382
383        for task in tasks.values() {
384            if task.status == TaskStatus::Pending || task.status == TaskStatus::Running {
385                incomplete.push(task.id.clone());
386            }
387        }
388
389        if !incomplete.is_empty() {
390            info!(
391                count = incomplete.len(),
392                "Found incomplete background tasks to resume"
393            );
394        }
395
396        incomplete
397    }
398
399    /// Get the prompt for a task (internal helper)
400    async fn get_prompt(&self, task_id: &str) -> Result<String> {
401        let tasks = self.tasks.read().await;
402        let task = tasks.get(task_id).ok_or_else(|| {
403            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
404        })?;
405        Ok(task.prompt.clone())
406    }
407
408    /// Get the system prompt for a task (internal helper)
409    async fn get_system_prompt(&self, task_id: &str) -> Result<String> {
410        let tasks = self.tasks.read().await;
411        let task = tasks.get(task_id).ok_or_else(|| {
412            RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
413        })?;
414        Ok(task.system_prompt.clone())
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421    use std::path::PathBuf;
422
423    fn test_dir(name: &str) -> PathBuf {
424        let dir = std::env::temp_dir().join(format!(
425            "ravenclaws-test-bg-{}-{}",
426            name,
427            std::process::id()
428        ));
429        let _ = std::fs::remove_dir_all(&dir);
430        dir
431    }
432
433    #[tokio::test]
434    async fn test_manager_new_creates_directory() {
435        let dir = test_dir("create_dir");
436        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
437        assert!(dir.exists(), "Tasks directory should be created");
438        assert!(manager.tasks.read().await.is_empty());
439        let _ = std::fs::remove_dir_all(&dir);
440    }
441
442    #[tokio::test]
443    async fn test_submit_task() {
444        let dir = test_dir("submit");
445        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
446
447        let task_id = manager
448            .submit("Test prompt".to_string(), "Test system".to_string())
449            .await
450            .unwrap();
451
452        let task = manager.get_task(&task_id).await.unwrap();
453        assert_eq!(task.prompt, "Test prompt");
454        assert_eq!(task.system_prompt, "Test system");
455        assert_eq!(task.status, TaskStatus::Pending);
456        assert!(task.result.is_none());
457
458        // Verify persistence
459        let task_path = dir.join(format!("{}.json", task_id));
460        assert!(task_path.exists(), "Task file should exist on disk");
461
462        let _ = std::fs::remove_dir_all(&dir);
463    }
464
465    #[tokio::test]
466    async fn test_status_transitions() {
467        let dir = test_dir("transitions");
468        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
469
470        let task_id = manager
471            .submit("Test".to_string(), "System".to_string())
472            .await
473            .unwrap();
474
475        assert_eq!(manager.status(&task_id).await.unwrap(), TaskStatus::Pending);
476
477        // Cancel the task
478        manager.cancel(&task_id).await.unwrap();
479        assert_eq!(
480            manager.status(&task_id).await.unwrap(),
481            TaskStatus::Cancelled
482        );
483
484        let _ = std::fs::remove_dir_all(&dir);
485    }
486
487    #[tokio::test]
488    async fn test_list_tasks() {
489        let dir = test_dir("list");
490        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
491
492        manager
493            .submit("Task 1".to_string(), "System".to_string())
494            .await
495            .unwrap();
496        manager
497            .submit("Task 2".to_string(), "System".to_string())
498            .await
499            .unwrap();
500
501        let tasks = manager.list_tasks().await;
502        assert_eq!(tasks.len(), 2);
503
504        let _ = std::fs::remove_dir_all(&dir);
505    }
506
507    #[tokio::test]
508    async fn test_cancel_completed_task_fails() {
509        let dir = test_dir("cancel_fail");
510        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
511
512        let task_id = manager
513            .submit("Test".to_string(), "System".to_string())
514            .await
515            .unwrap();
516
517        // Manually set to completed
518        {
519            let mut tasks = manager.tasks.write().await;
520            let task = tasks.get_mut(&task_id).unwrap();
521            task.status = TaskStatus::Completed;
522        }
523
524        let result = manager.cancel(&task_id).await;
525        assert!(result.is_err(), "Cancelling a completed task should fail");
526
527        let _ = std::fs::remove_dir_all(&dir);
528    }
529
530    #[tokio::test]
531    async fn test_resume_incomplete_tasks() {
532        let dir = test_dir("resume");
533        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
534
535        manager
536            .submit("Task 1".to_string(), "System".to_string())
537            .await
538            .unwrap();
539        manager
540            .submit("Task 2".to_string(), "System".to_string())
541            .await
542            .unwrap();
543
544        // Mark one as completed
545        {
546            let tasks = manager.tasks.read().await;
547            let tasks_vec: Vec<&BackgroundTask> = tasks.values().collect();
548            if let Some(task) = tasks_vec.first() {
549                let id = task.id.clone();
550                drop(tasks);
551                let mut tasks = manager.tasks.write().await;
552                if let Some(t) = tasks.get_mut(&id) {
553                    t.status = TaskStatus::Completed;
554                }
555            }
556        }
557
558        let incomplete = manager.resume_incomplete().await;
559        assert_eq!(incomplete.len(), 1, "One task should be incomplete");
560
561        let _ = std::fs::remove_dir_all(&dir);
562    }
563
564    #[tokio::test]
565    async fn test_task_not_found() {
566        let dir = test_dir("not_found");
567        let manager = BackgroundTaskManager::new(&dir).await.unwrap();
568
569        let result = manager.status("nonexistent").await;
570        assert!(result.is_err());
571
572        let _ = std::fs::remove_dir_all(&dir);
573    }
574
575    #[tokio::test]
576    async fn test_persistence_across_restart() {
577        let dir = test_dir("persist");
578
579        // First session: submit a task
580        {
581            let manager = BackgroundTaskManager::new(&dir).await.unwrap();
582            manager
583                .submit("Persist test".to_string(), "System".to_string())
584                .await
585                .unwrap();
586        } // manager drops
587
588        // Second session: load from disk
589        {
590            let manager = BackgroundTaskManager::new(&dir).await.unwrap();
591            let tasks = manager.list_tasks().await;
592            assert_eq!(tasks.len(), 1, "Task should persist across restarts");
593            assert_eq!(tasks[0].prompt, "Persist test");
594        }
595
596        let _ = std::fs::remove_dir_all(&dir);
597    }
598}