1use crate::config::RuntimeConfig;
23use crate::error::{RavenClawsError, Result};
24use crate::llm::LLMProviderTrait;
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29use tokio::sync::RwLock;
30use tracing::{info, instrument, warn};
31
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
34pub enum TaskStatus {
35 Pending,
37 Running,
39 Completed,
41 Failed,
43 Cancelled,
45}
46
47impl std::fmt::Display for TaskStatus {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 match self {
50 TaskStatus::Pending => write!(f, "pending"),
51 TaskStatus::Running => write!(f, "running"),
52 TaskStatus::Completed => write!(f, "completed"),
53 TaskStatus::Failed => write!(f, "failed"),
54 TaskStatus::Cancelled => write!(f, "cancelled"),
55 }
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct BackgroundTask {
62 pub id: String,
64 pub prompt: String,
66 pub system_prompt: String,
68 pub status: TaskStatus,
70 pub result: Option<String>,
72 pub error: Option<String>,
74 pub created_at: String,
76 pub updated_at: String,
78 pub iterations: usize,
80 pub provider: Option<String>,
82 pub model: Option<String>,
84 pub checkpoint: Option<String>,
88}
89
90impl BackgroundTask {
91 pub fn new(id: String, prompt: String, system_prompt: String) -> Self {
93 let now = chrono::Utc::now().to_rfc3339();
94 Self {
95 id,
96 prompt,
97 system_prompt,
98 status: TaskStatus::Pending,
99 result: None,
100 error: None,
101 created_at: now.clone(),
102 updated_at: now,
103 iterations: 0,
104 provider: None,
105 model: None,
106 checkpoint: None,
107 }
108 }
109}
110
111#[derive(Debug, Clone)]
113pub struct BackgroundTaskManager {
114 tasks_dir: PathBuf,
116 tasks: Arc<RwLock<HashMap<String, BackgroundTask>>>,
118}
119
120impl BackgroundTaskManager {
121 pub async fn new(tasks_dir: &Path) -> Result<Self> {
127 let tasks_dir = tasks_dir.to_path_buf();
128
129 std::fs::create_dir_all(&tasks_dir).map_err(|e| {
131 RavenClawsError::CommandExecution(format!(
132 "Failed to create tasks directory '{}': {}",
133 tasks_dir.display(),
134 e
135 ))
136 })?;
137
138 let tasks = Arc::new(RwLock::new(HashMap::new()));
139
140 let mut manager = Self { tasks_dir, tasks };
141
142 let count = manager.load_tasks().await?;
144 if count > 0 {
145 info!(count, "Loaded existing background tasks from disk");
146 }
147
148 Ok(manager)
149 }
150
151 pub async fn from_config(config: &RuntimeConfig) -> Result<Self> {
153 let tasks_dir = PathBuf::from(&config.workdir).join("tasks");
154 Self::new(&tasks_dir).await
155 }
156
157 async fn load_tasks(&mut self) -> Result<usize> {
159 let mut count = 0;
160 let read_dir = match std::fs::read_dir(&self.tasks_dir) {
161 Ok(d) => d,
162 Err(_) => return Ok(0),
163 };
164
165 let mut tasks_to_insert = Vec::new();
166 for entry in read_dir.flatten() {
167 let path = entry.path();
168 if path.extension().is_some_and(|ext| ext == "json") {
169 match std::fs::read_to_string(&path) {
170 Ok(content) => match serde_json::from_str::<BackgroundTask>(&content) {
171 Ok(task) => {
172 tasks_to_insert.push(task);
173 count += 1;
174 }
175 Err(e) => {
176 warn!(
177 path = %path.display(),
178 error = %e,
179 "Failed to deserialize background task"
180 );
181 }
182 },
183 Err(e) => {
184 warn!(
185 path = %path.display(),
186 error = %e,
187 "Failed to read background task file"
188 );
189 }
190 }
191 }
192 }
193
194 let mut tasks = self.tasks.write().await;
195 for task in tasks_to_insert {
196 tasks.insert(task.id.clone(), task);
197 }
198
199 Ok(count)
200 }
201
202 fn save_task(&self, task: &BackgroundTask) -> Result<()> {
204 let path = self.tasks_dir.join(format!("{}.json", task.id));
205 let content = serde_json::to_string_pretty(task).map_err(|e| {
206 RavenClawsError::CommandExecution(format!("Failed to serialize task: {}", e))
207 })?;
208
209 std::fs::write(&path, content).map_err(|e| {
210 RavenClawsError::CommandExecution(format!(
211 "Failed to write task file '{}': {}",
212 path.display(),
213 e
214 ))
215 })?;
216
217 Ok(())
218 }
219
220 pub async fn submit(&self, prompt: String, system_prompt: String) -> Result<String> {
223 let id = uuid::Uuid::new_v4().to_string();
224 let task = BackgroundTask::new(id.clone(), prompt, system_prompt);
225
226 self.save_task(&task)?;
228
229 let mut tasks = self.tasks.write().await;
231 tasks.insert(id.clone(), task);
232
233 info!(task_id = %id, "Background task submitted");
234 Ok(id)
235 }
236
237 #[instrument(skip(self, llm), fields(task_id = %task_id))]
240 pub async fn execute(&self, task_id: &str, llm: Arc<dyn LLMProviderTrait>) -> Result<String> {
241 {
243 let mut tasks = self.tasks.write().await;
244 let task = tasks.get_mut(task_id).ok_or_else(|| {
245 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
246 })?;
247
248 task.status = TaskStatus::Running;
249 task.provider = Some(llm.provider_name().to_string());
250 task.model = Some(llm.model().to_string());
251 task.updated_at = chrono::Utc::now().to_rfc3339();
252 self.save_task(task)?;
253 }
254
255 info!(
256 task_id = %task_id,
257 provider = llm.provider_name(),
258 model = llm.model(),
259 "Executing background task"
260 );
261
262 let checkpoint_dir = self.tasks_dir.join("checkpoints");
264 let _ = std::fs::create_dir_all(&checkpoint_dir);
265
266 let loop_config = crate::agent::AgentLoopConfig {
268 max_iterations: 10,
269 enable_tools: true,
270 require_approval: false,
271 prompt_injection_protection: true,
272 token_lifetime_secs: 0,
273 no_final_required: true,
274 fallback_chain: None,
275 token_budget: None,
276 ravenfabric: None,
277 checkpoint_dir: Some(checkpoint_dir),
278 session_id: Some(task_id.to_string()),
279 metrics_callback: None,
280 load_manager: None,
281 };
282
283 let result = crate::agent::run_agent_loop(
284 llm.clone(),
285 &self.get_prompt(task_id).await?,
286 &self.get_system_prompt(task_id).await?,
287 loop_config,
288 )
289 .await;
290
291 let mut tasks = self.tasks.write().await;
293 let task = tasks.get_mut(task_id).ok_or_else(|| {
294 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
295 })?;
296
297 match result {
298 Ok(response) => {
299 task.status = TaskStatus::Completed;
300 task.result = Some(response.clone());
301 task.updated_at = chrono::Utc::now().to_rfc3339();
302 self.save_task(task)?;
303
304 info!(
305 task_id = %task_id,
306 iterations = task.iterations,
307 "Background task completed"
308 );
309
310 Ok(response)
311 }
312 Err(e) => {
313 task.status = TaskStatus::Failed;
314 task.error = Some(e.to_string());
315 task.updated_at = chrono::Utc::now().to_rfc3339();
316 self.save_task(task)?;
317
318 warn!(
319 task_id = %task_id,
320 error = %e,
321 "Background task failed"
322 );
323
324 Err(e)
325 }
326 }
327 }
328
329 #[allow(dead_code)]
331 pub async fn status(&self, task_id: &str) -> Result<TaskStatus> {
332 let tasks = self.tasks.read().await;
333 let task = tasks.get(task_id).ok_or_else(|| {
334 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
335 })?;
336 Ok(task.status.clone())
337 }
338
339 pub async fn get_task(&self, task_id: &str) -> Result<BackgroundTask> {
341 let tasks = self.tasks.read().await;
342 tasks.get(task_id).cloned().ok_or_else(|| {
343 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
344 })
345 }
346
347 pub async fn list_tasks(&self) -> Vec<BackgroundTask> {
349 let tasks = self.tasks.read().await;
350 let mut task_list: Vec<BackgroundTask> = tasks.values().cloned().collect();
351 task_list.sort_by(|a, b| b.created_at.cmp(&a.created_at));
353 task_list
354 }
355
356 pub async fn cancel(&self, task_id: &str) -> Result<()> {
358 let mut tasks = self.tasks.write().await;
359 let task = tasks.get_mut(task_id).ok_or_else(|| {
360 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
361 })?;
362
363 match task.status {
364 TaskStatus::Pending | TaskStatus::Running => {
365 task.status = TaskStatus::Cancelled;
366 task.updated_at = chrono::Utc::now().to_rfc3339();
367 self.save_task(task)?;
368 info!(task_id = %task_id, "Background task cancelled");
369 Ok(())
370 }
371 _ => Err(RavenClawsError::CommandExecution(format!(
372 "Cannot cancel task '{}' in status '{}'",
373 task_id, task.status
374 ))),
375 }
376 }
377
378 pub async fn resume_incomplete(&self) -> Vec<String> {
381 let tasks = self.tasks.read().await;
382 let mut incomplete = Vec::new();
383
384 for task in tasks.values() {
385 if task.status == TaskStatus::Pending || task.status == TaskStatus::Running {
386 incomplete.push(task.id.clone());
387 }
388 }
389
390 if !incomplete.is_empty() {
391 info!(
392 count = incomplete.len(),
393 "Found incomplete background tasks to resume"
394 );
395 }
396
397 incomplete
398 }
399
400 async fn get_prompt(&self, task_id: &str) -> Result<String> {
402 let tasks = self.tasks.read().await;
403 let task = tasks.get(task_id).ok_or_else(|| {
404 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
405 })?;
406 Ok(task.prompt.clone())
407 }
408
409 async fn get_system_prompt(&self, task_id: &str) -> Result<String> {
411 let tasks = self.tasks.read().await;
412 let task = tasks.get(task_id).ok_or_else(|| {
413 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
414 })?;
415 Ok(task.system_prompt.clone())
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422 use std::path::PathBuf;
423
424 fn test_dir(name: &str) -> PathBuf {
425 let dir = std::env::temp_dir().join(format!(
426 "ravenclaws-test-bg-{}-{}",
427 name,
428 std::process::id()
429 ));
430 let _ = std::fs::remove_dir_all(&dir);
431 dir
432 }
433
434 #[tokio::test]
435 async fn test_manager_new_creates_directory() {
436 let dir = test_dir("create_dir");
437 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
438 assert!(dir.exists(), "Tasks directory should be created");
439 assert!(manager.tasks.read().await.is_empty());
440 let _ = std::fs::remove_dir_all(&dir);
441 }
442
443 #[tokio::test]
444 async fn test_submit_task() {
445 let dir = test_dir("submit");
446 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
447
448 let task_id = manager
449 .submit("Test prompt".to_string(), "Test system".to_string())
450 .await
451 .unwrap();
452
453 let task = manager.get_task(&task_id).await.unwrap();
454 assert_eq!(task.prompt, "Test prompt");
455 assert_eq!(task.system_prompt, "Test system");
456 assert_eq!(task.status, TaskStatus::Pending);
457 assert!(task.result.is_none());
458
459 let task_path = dir.join(format!("{}.json", task_id));
461 assert!(task_path.exists(), "Task file should exist on disk");
462
463 let _ = std::fs::remove_dir_all(&dir);
464 }
465
466 #[tokio::test]
467 async fn test_status_transitions() {
468 let dir = test_dir("transitions");
469 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
470
471 let task_id = manager
472 .submit("Test".to_string(), "System".to_string())
473 .await
474 .unwrap();
475
476 assert_eq!(manager.status(&task_id).await.unwrap(), TaskStatus::Pending);
477
478 manager.cancel(&task_id).await.unwrap();
480 assert_eq!(
481 manager.status(&task_id).await.unwrap(),
482 TaskStatus::Cancelled
483 );
484
485 let _ = std::fs::remove_dir_all(&dir);
486 }
487
488 #[tokio::test]
489 async fn test_list_tasks() {
490 let dir = test_dir("list");
491 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
492
493 manager
494 .submit("Task 1".to_string(), "System".to_string())
495 .await
496 .unwrap();
497 manager
498 .submit("Task 2".to_string(), "System".to_string())
499 .await
500 .unwrap();
501
502 let tasks = manager.list_tasks().await;
503 assert_eq!(tasks.len(), 2);
504
505 let _ = std::fs::remove_dir_all(&dir);
506 }
507
508 #[tokio::test]
509 async fn test_cancel_completed_task_fails() {
510 let dir = test_dir("cancel_fail");
511 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
512
513 let task_id = manager
514 .submit("Test".to_string(), "System".to_string())
515 .await
516 .unwrap();
517
518 {
520 let mut tasks = manager.tasks.write().await;
521 let task = tasks.get_mut(&task_id).unwrap();
522 task.status = TaskStatus::Completed;
523 }
524
525 let result = manager.cancel(&task_id).await;
526 assert!(result.is_err(), "Cancelling a completed task should fail");
527
528 let _ = std::fs::remove_dir_all(&dir);
529 }
530
531 #[tokio::test]
532 async fn test_resume_incomplete_tasks() {
533 let dir = test_dir("resume");
534 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
535
536 manager
537 .submit("Task 1".to_string(), "System".to_string())
538 .await
539 .unwrap();
540 manager
541 .submit("Task 2".to_string(), "System".to_string())
542 .await
543 .unwrap();
544
545 {
547 let tasks = manager.tasks.read().await;
548 let tasks_vec: Vec<&BackgroundTask> = tasks.values().collect();
549 if let Some(task) = tasks_vec.first() {
550 let id = task.id.clone();
551 drop(tasks);
552 let mut tasks = manager.tasks.write().await;
553 if let Some(t) = tasks.get_mut(&id) {
554 t.status = TaskStatus::Completed;
555 }
556 }
557 }
558
559 let incomplete = manager.resume_incomplete().await;
560 assert_eq!(incomplete.len(), 1, "One task should be incomplete");
561
562 let _ = std::fs::remove_dir_all(&dir);
563 }
564
565 #[tokio::test]
566 async fn test_task_not_found() {
567 let dir = test_dir("not_found");
568 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
569
570 let result = manager.status("nonexistent").await;
571 assert!(result.is_err());
572
573 let _ = std::fs::remove_dir_all(&dir);
574 }
575
576 #[tokio::test]
577 async fn test_persistence_across_restart() {
578 let dir = test_dir("persist");
579
580 {
582 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
583 manager
584 .submit("Persist test".to_string(), "System".to_string())
585 .await
586 .unwrap();
587 } {
591 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
592 let tasks = manager.list_tasks().await;
593 assert_eq!(tasks.len(), 1, "Task should persist across restarts");
594 assert_eq!(tasks[0].prompt, "Persist test");
595 }
596
597 let _ = std::fs::remove_dir_all(&dir);
598 }
599}