1use crate::config::RuntimeConfig;
23use crate::error::{RavenClawsError, Result};
24use crate::llm::LLMProviderTrait;
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29use tokio::sync::RwLock;
30use tracing::{info, instrument, warn};
31
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
34pub enum TaskStatus {
35 Pending,
37 Running,
39 Completed,
41 Failed,
43 Cancelled,
45}
46
47impl std::fmt::Display for TaskStatus {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 match self {
50 TaskStatus::Pending => write!(f, "pending"),
51 TaskStatus::Running => write!(f, "running"),
52 TaskStatus::Completed => write!(f, "completed"),
53 TaskStatus::Failed => write!(f, "failed"),
54 TaskStatus::Cancelled => write!(f, "cancelled"),
55 }
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct BackgroundTask {
62 pub id: String,
64 pub prompt: String,
66 pub system_prompt: String,
68 pub status: TaskStatus,
70 pub result: Option<String>,
72 pub error: Option<String>,
74 pub created_at: String,
76 pub updated_at: String,
78 pub iterations: usize,
80 pub provider: Option<String>,
82 pub model: Option<String>,
84}
85
86impl BackgroundTask {
87 pub fn new(id: String, prompt: String, system_prompt: String) -> Self {
89 let now = chrono::Utc::now().to_rfc3339();
90 Self {
91 id,
92 prompt,
93 system_prompt,
94 status: TaskStatus::Pending,
95 result: None,
96 error: None,
97 created_at: now.clone(),
98 updated_at: now,
99 iterations: 0,
100 provider: None,
101 model: None,
102 }
103 }
104}
105
106#[derive(Debug, Clone)]
108pub struct BackgroundTaskManager {
109 tasks_dir: PathBuf,
111 tasks: Arc<RwLock<HashMap<String, BackgroundTask>>>,
113}
114
115impl BackgroundTaskManager {
116 pub async fn new(tasks_dir: &Path) -> Result<Self> {
122 let tasks_dir = tasks_dir.to_path_buf();
123
124 std::fs::create_dir_all(&tasks_dir).map_err(|e| {
126 RavenClawsError::CommandExecution(format!(
127 "Failed to create tasks directory '{}': {}",
128 tasks_dir.display(),
129 e
130 ))
131 })?;
132
133 let tasks = Arc::new(RwLock::new(HashMap::new()));
134
135 let mut manager = Self { tasks_dir, tasks };
136
137 let count = manager.load_tasks().await?;
139 if count > 0 {
140 info!(count, "Loaded existing background tasks from disk");
141 }
142
143 Ok(manager)
144 }
145
146 pub async fn from_config(config: &RuntimeConfig) -> Result<Self> {
148 let tasks_dir = PathBuf::from(&config.workdir).join("tasks");
149 Self::new(&tasks_dir).await
150 }
151
152 async fn load_tasks(&mut self) -> Result<usize> {
154 let mut count = 0;
155 let read_dir = match std::fs::read_dir(&self.tasks_dir) {
156 Ok(d) => d,
157 Err(_) => return Ok(0),
158 };
159
160 let mut tasks_to_insert = Vec::new();
161 for entry in read_dir.flatten() {
162 let path = entry.path();
163 if path.extension().is_some_and(|ext| ext == "json") {
164 match std::fs::read_to_string(&path) {
165 Ok(content) => match serde_json::from_str::<BackgroundTask>(&content) {
166 Ok(task) => {
167 tasks_to_insert.push(task);
168 count += 1;
169 }
170 Err(e) => {
171 warn!(
172 path = %path.display(),
173 error = %e,
174 "Failed to deserialize background task"
175 );
176 }
177 },
178 Err(e) => {
179 warn!(
180 path = %path.display(),
181 error = %e,
182 "Failed to read background task file"
183 );
184 }
185 }
186 }
187 }
188
189 let mut tasks = self.tasks.write().await;
190 for task in tasks_to_insert {
191 tasks.insert(task.id.clone(), task);
192 }
193
194 Ok(count)
195 }
196
197 fn save_task(&self, task: &BackgroundTask) -> Result<()> {
199 let path = self.tasks_dir.join(format!("{}.json", task.id));
200 let content = serde_json::to_string_pretty(task).map_err(|e| {
201 RavenClawsError::CommandExecution(format!("Failed to serialize task: {}", e))
202 })?;
203
204 std::fs::write(&path, content).map_err(|e| {
205 RavenClawsError::CommandExecution(format!(
206 "Failed to write task file '{}': {}",
207 path.display(),
208 e
209 ))
210 })?;
211
212 Ok(())
213 }
214
215 pub async fn submit(&self, prompt: String, system_prompt: String) -> Result<String> {
218 let id = uuid::Uuid::new_v4().to_string();
219 let task = BackgroundTask::new(id.clone(), prompt, system_prompt);
220
221 self.save_task(&task)?;
223
224 let mut tasks = self.tasks.write().await;
226 tasks.insert(id.clone(), task);
227
228 info!(task_id = %id, "Background task submitted");
229 Ok(id)
230 }
231
232 #[instrument(skip(self, llm), fields(task_id = %task_id))]
235 pub async fn execute(&self, task_id: &str, llm: Arc<dyn LLMProviderTrait>) -> Result<String> {
236 {
238 let mut tasks = self.tasks.write().await;
239 let task = tasks.get_mut(task_id).ok_or_else(|| {
240 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
241 })?;
242
243 task.status = TaskStatus::Running;
244 task.provider = Some(llm.provider_name().to_string());
245 task.model = Some(llm.model().to_string());
246 task.updated_at = chrono::Utc::now().to_rfc3339();
247 self.save_task(task)?;
248 }
249
250 info!(
251 task_id = %task_id,
252 provider = llm.provider_name(),
253 model = llm.model(),
254 "Executing background task"
255 );
256
257 let loop_config = crate::agent::AgentLoopConfig {
259 max_iterations: 10,
260 enable_tools: true,
261 require_approval: false,
262 prompt_injection_protection: true,
263 token_lifetime_secs: 0,
264 no_final_required: false,
265 fallback_chain: None,
266 token_budget: None,
267 ravenfabric: None,
268 };
269
270 let result = crate::agent::run_agent_loop(
271 llm.clone(),
272 &self.get_prompt(task_id).await?,
273 &self.get_system_prompt(task_id).await?,
274 loop_config,
275 )
276 .await;
277
278 let mut tasks = self.tasks.write().await;
280 let task = tasks.get_mut(task_id).ok_or_else(|| {
281 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
282 })?;
283
284 match result {
285 Ok(response) => {
286 task.status = TaskStatus::Completed;
287 task.result = Some(response.clone());
288 task.updated_at = chrono::Utc::now().to_rfc3339();
289 self.save_task(task)?;
290
291 info!(
292 task_id = %task_id,
293 iterations = task.iterations,
294 "Background task completed"
295 );
296
297 Ok(response)
298 }
299 Err(e) => {
300 task.status = TaskStatus::Failed;
301 task.error = Some(e.to_string());
302 task.updated_at = chrono::Utc::now().to_rfc3339();
303 self.save_task(task)?;
304
305 warn!(
306 task_id = %task_id,
307 error = %e,
308 "Background task failed"
309 );
310
311 Err(e)
312 }
313 }
314 }
315
316 #[allow(dead_code)]
318 pub async fn status(&self, task_id: &str) -> Result<TaskStatus> {
319 let tasks = self.tasks.read().await;
320 let task = tasks.get(task_id).ok_or_else(|| {
321 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
322 })?;
323 Ok(task.status.clone())
324 }
325
326 pub async fn get_task(&self, task_id: &str) -> Result<BackgroundTask> {
328 let tasks = self.tasks.read().await;
329 tasks.get(task_id).cloned().ok_or_else(|| {
330 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
331 })
332 }
333
334 pub async fn list_tasks(&self) -> Vec<BackgroundTask> {
336 let tasks = self.tasks.read().await;
337 let mut task_list: Vec<BackgroundTask> = tasks.values().cloned().collect();
338 task_list.sort_by(|a, b| b.created_at.cmp(&a.created_at));
340 task_list
341 }
342
343 pub async fn cancel(&self, task_id: &str) -> Result<()> {
345 let mut tasks = self.tasks.write().await;
346 let task = tasks.get_mut(task_id).ok_or_else(|| {
347 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
348 })?;
349
350 match task.status {
351 TaskStatus::Pending | TaskStatus::Running => {
352 task.status = TaskStatus::Cancelled;
353 task.updated_at = chrono::Utc::now().to_rfc3339();
354 self.save_task(task)?;
355 info!(task_id = %task_id, "Background task cancelled");
356 Ok(())
357 }
358 _ => Err(RavenClawsError::CommandExecution(format!(
359 "Cannot cancel task '{}' in status '{}'",
360 task_id, task.status
361 ))),
362 }
363 }
364
365 pub async fn resume_incomplete(&self) -> Vec<String> {
368 let tasks = self.tasks.read().await;
369 let mut incomplete = Vec::new();
370
371 for task in tasks.values() {
372 if task.status == TaskStatus::Pending || task.status == TaskStatus::Running {
373 incomplete.push(task.id.clone());
374 }
375 }
376
377 if !incomplete.is_empty() {
378 info!(
379 count = incomplete.len(),
380 "Found incomplete background tasks to resume"
381 );
382 }
383
384 incomplete
385 }
386
387 async fn get_prompt(&self, task_id: &str) -> Result<String> {
389 let tasks = self.tasks.read().await;
390 let task = tasks.get(task_id).ok_or_else(|| {
391 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
392 })?;
393 Ok(task.prompt.clone())
394 }
395
396 async fn get_system_prompt(&self, task_id: &str) -> Result<String> {
398 let tasks = self.tasks.read().await;
399 let task = tasks.get(task_id).ok_or_else(|| {
400 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
401 })?;
402 Ok(task.system_prompt.clone())
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use super::*;
409 use std::path::PathBuf;
410
411 fn test_dir(name: &str) -> PathBuf {
412 let dir = std::env::temp_dir().join(format!(
413 "ravenclaws-test-bg-{}-{}",
414 name,
415 std::process::id()
416 ));
417 let _ = std::fs::remove_dir_all(&dir);
418 dir
419 }
420
421 #[tokio::test]
422 async fn test_manager_new_creates_directory() {
423 let dir = test_dir("create_dir");
424 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
425 assert!(dir.exists(), "Tasks directory should be created");
426 assert!(manager.tasks.read().await.is_empty());
427 let _ = std::fs::remove_dir_all(&dir);
428 }
429
430 #[tokio::test]
431 async fn test_submit_task() {
432 let dir = test_dir("submit");
433 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
434
435 let task_id = manager
436 .submit("Test prompt".to_string(), "Test system".to_string())
437 .await
438 .unwrap();
439
440 let task = manager.get_task(&task_id).await.unwrap();
441 assert_eq!(task.prompt, "Test prompt");
442 assert_eq!(task.system_prompt, "Test system");
443 assert_eq!(task.status, TaskStatus::Pending);
444 assert!(task.result.is_none());
445
446 let task_path = dir.join(format!("{}.json", task_id));
448 assert!(task_path.exists(), "Task file should exist on disk");
449
450 let _ = std::fs::remove_dir_all(&dir);
451 }
452
453 #[tokio::test]
454 async fn test_status_transitions() {
455 let dir = test_dir("transitions");
456 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
457
458 let task_id = manager
459 .submit("Test".to_string(), "System".to_string())
460 .await
461 .unwrap();
462
463 assert_eq!(manager.status(&task_id).await.unwrap(), TaskStatus::Pending);
464
465 manager.cancel(&task_id).await.unwrap();
467 assert_eq!(
468 manager.status(&task_id).await.unwrap(),
469 TaskStatus::Cancelled
470 );
471
472 let _ = std::fs::remove_dir_all(&dir);
473 }
474
475 #[tokio::test]
476 async fn test_list_tasks() {
477 let dir = test_dir("list");
478 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
479
480 manager
481 .submit("Task 1".to_string(), "System".to_string())
482 .await
483 .unwrap();
484 manager
485 .submit("Task 2".to_string(), "System".to_string())
486 .await
487 .unwrap();
488
489 let tasks = manager.list_tasks().await;
490 assert_eq!(tasks.len(), 2);
491
492 let _ = std::fs::remove_dir_all(&dir);
493 }
494
495 #[tokio::test]
496 async fn test_cancel_completed_task_fails() {
497 let dir = test_dir("cancel_fail");
498 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
499
500 let task_id = manager
501 .submit("Test".to_string(), "System".to_string())
502 .await
503 .unwrap();
504
505 {
507 let mut tasks = manager.tasks.write().await;
508 let task = tasks.get_mut(&task_id).unwrap();
509 task.status = TaskStatus::Completed;
510 }
511
512 let result = manager.cancel(&task_id).await;
513 assert!(result.is_err(), "Cancelling a completed task should fail");
514
515 let _ = std::fs::remove_dir_all(&dir);
516 }
517
518 #[tokio::test]
519 async fn test_resume_incomplete_tasks() {
520 let dir = test_dir("resume");
521 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
522
523 manager
524 .submit("Task 1".to_string(), "System".to_string())
525 .await
526 .unwrap();
527 manager
528 .submit("Task 2".to_string(), "System".to_string())
529 .await
530 .unwrap();
531
532 {
534 let tasks = manager.tasks.read().await;
535 let tasks_vec: Vec<&BackgroundTask> = tasks.values().collect();
536 if let Some(task) = tasks_vec.first() {
537 let id = task.id.clone();
538 drop(tasks);
539 let mut tasks = manager.tasks.write().await;
540 if let Some(t) = tasks.get_mut(&id) {
541 t.status = TaskStatus::Completed;
542 }
543 }
544 }
545
546 let incomplete = manager.resume_incomplete().await;
547 assert_eq!(incomplete.len(), 1, "One task should be incomplete");
548
549 let _ = std::fs::remove_dir_all(&dir);
550 }
551
552 #[tokio::test]
553 async fn test_task_not_found() {
554 let dir = test_dir("not_found");
555 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
556
557 let result = manager.status("nonexistent").await;
558 assert!(result.is_err());
559
560 let _ = std::fs::remove_dir_all(&dir);
561 }
562
563 #[tokio::test]
564 async fn test_persistence_across_restart() {
565 let dir = test_dir("persist");
566
567 {
569 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
570 manager
571 .submit("Persist test".to_string(), "System".to_string())
572 .await
573 .unwrap();
574 } {
578 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
579 let tasks = manager.list_tasks().await;
580 assert_eq!(tasks.len(), 1, "Task should persist across restarts");
581 assert_eq!(tasks[0].prompt, "Persist test");
582 }
583
584 let _ = std::fs::remove_dir_all(&dir);
585 }
586}