1use crate::config::RuntimeConfig;
23use crate::error::{RavenClawsError, Result};
24use crate::llm::LLMProviderTrait;
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29use tokio::sync::RwLock;
30use tracing::{info, instrument, warn};
31
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
34pub enum TaskStatus {
35 Pending,
37 Running,
39 Completed,
41 Failed,
43 Cancelled,
45}
46
47impl std::fmt::Display for TaskStatus {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 match self {
50 TaskStatus::Pending => write!(f, "pending"),
51 TaskStatus::Running => write!(f, "running"),
52 TaskStatus::Completed => write!(f, "completed"),
53 TaskStatus::Failed => write!(f, "failed"),
54 TaskStatus::Cancelled => write!(f, "cancelled"),
55 }
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct BackgroundTask {
62 pub id: String,
64 pub prompt: String,
66 pub system_prompt: String,
68 pub status: TaskStatus,
70 pub result: Option<String>,
72 pub error: Option<String>,
74 pub created_at: String,
76 pub updated_at: String,
78 pub iterations: usize,
80 pub provider: Option<String>,
82 pub model: Option<String>,
84 pub checkpoint: Option<String>,
88}
89
90impl BackgroundTask {
91 pub fn new(id: String, prompt: String, system_prompt: String) -> Self {
93 let now = chrono::Utc::now().to_rfc3339();
94 Self {
95 id,
96 prompt,
97 system_prompt,
98 status: TaskStatus::Pending,
99 result: None,
100 error: None,
101 created_at: now.clone(),
102 updated_at: now,
103 iterations: 0,
104 provider: None,
105 model: None,
106 checkpoint: None,
107 }
108 }
109}
110
111#[derive(Debug, Clone)]
113pub struct BackgroundTaskManager {
114 tasks_dir: PathBuf,
116 tasks: Arc<RwLock<HashMap<String, BackgroundTask>>>,
118}
119
120impl BackgroundTaskManager {
121 pub async fn new(tasks_dir: &Path) -> Result<Self> {
127 let tasks_dir = tasks_dir.to_path_buf();
128
129 std::fs::create_dir_all(&tasks_dir).map_err(|e| {
131 RavenClawsError::CommandExecution(format!(
132 "Failed to create tasks directory '{}': {}",
133 tasks_dir.display(),
134 e
135 ))
136 })?;
137
138 let tasks = Arc::new(RwLock::new(HashMap::new()));
139
140 let mut manager = Self { tasks_dir, tasks };
141
142 let count = manager.load_tasks().await?;
144 if count > 0 {
145 info!(count, "Loaded existing background tasks from disk");
146 }
147
148 Ok(manager)
149 }
150
151 pub async fn from_config(config: &RuntimeConfig) -> Result<Self> {
153 let tasks_dir = PathBuf::from(&config.workdir).join("tasks");
154 Self::new(&tasks_dir).await
155 }
156
157 async fn load_tasks(&mut self) -> Result<usize> {
159 let mut count = 0;
160 let read_dir = match std::fs::read_dir(&self.tasks_dir) {
161 Ok(d) => d,
162 Err(_) => return Ok(0),
163 };
164
165 let mut tasks_to_insert = Vec::new();
166 for entry in read_dir.flatten() {
167 let path = entry.path();
168 if path.extension().is_some_and(|ext| ext == "json") {
169 match std::fs::read_to_string(&path) {
170 Ok(content) => match serde_json::from_str::<BackgroundTask>(&content) {
171 Ok(task) => {
172 tasks_to_insert.push(task);
173 count += 1;
174 }
175 Err(e) => {
176 warn!(
177 path = %path.display(),
178 error = %e,
179 "Failed to deserialize background task"
180 );
181 }
182 },
183 Err(e) => {
184 warn!(
185 path = %path.display(),
186 error = %e,
187 "Failed to read background task file"
188 );
189 }
190 }
191 }
192 }
193
194 let mut tasks = self.tasks.write().await;
195 for task in tasks_to_insert {
196 tasks.insert(task.id.clone(), task);
197 }
198
199 Ok(count)
200 }
201
202 fn save_task(&self, task: &BackgroundTask) -> Result<()> {
204 let path = self.tasks_dir.join(format!("{}.json", task.id));
205 let content = serde_json::to_string_pretty(task).map_err(|e| {
206 RavenClawsError::CommandExecution(format!("Failed to serialize task: {}", e))
207 })?;
208
209 std::fs::write(&path, content).map_err(|e| {
210 RavenClawsError::CommandExecution(format!(
211 "Failed to write task file '{}': {}",
212 path.display(),
213 e
214 ))
215 })?;
216
217 Ok(())
218 }
219
220 pub async fn submit(&self, prompt: String, system_prompt: String) -> Result<String> {
223 let id = uuid::Uuid::new_v4().to_string();
224 let task = BackgroundTask::new(id.clone(), prompt, system_prompt);
225
226 self.save_task(&task)?;
228
229 let mut tasks = self.tasks.write().await;
231 tasks.insert(id.clone(), task);
232
233 info!(task_id = %id, "Background task submitted");
234 Ok(id)
235 }
236
237 #[instrument(skip(self, llm), fields(task_id = %task_id))]
240 pub async fn execute(&self, task_id: &str, llm: Arc<dyn LLMProviderTrait>) -> Result<String> {
241 {
243 let mut tasks = self.tasks.write().await;
244 let task = tasks.get_mut(task_id).ok_or_else(|| {
245 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
246 })?;
247
248 task.status = TaskStatus::Running;
249 task.provider = Some(llm.provider_name().to_string());
250 task.model = Some(llm.model().to_string());
251 task.updated_at = chrono::Utc::now().to_rfc3339();
252 self.save_task(task)?;
253 }
254
255 info!(
256 task_id = %task_id,
257 provider = llm.provider_name(),
258 model = llm.model(),
259 "Executing background task"
260 );
261
262 let checkpoint_dir = self.tasks_dir.join("checkpoints");
264 let _ = std::fs::create_dir_all(&checkpoint_dir);
265
266 let loop_config = crate::agent::AgentLoopConfig {
268 max_iterations: 10,
269 enable_tools: true,
270 require_approval: false,
271 prompt_injection_protection: true,
272 token_lifetime_secs: 0,
273 no_final_required: true,
274 fallback_chain: None,
275 token_budget: None,
276 ravenfabric: None,
277 checkpoint_dir: Some(checkpoint_dir),
278 session_id: Some(task_id.to_string()),
279 metrics_callback: None,
280 load_manager: None,
281 retry_config: None,
282 };
283
284 let result = crate::agent::run_agent_loop(
285 llm.clone(),
286 &self.get_prompt(task_id).await?,
287 &self.get_system_prompt(task_id).await?,
288 loop_config,
289 )
290 .await;
291
292 let mut tasks = self.tasks.write().await;
294 let task = tasks.get_mut(task_id).ok_or_else(|| {
295 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
296 })?;
297
298 match result {
299 Ok(response) => {
300 task.status = TaskStatus::Completed;
301 task.result = Some(response.clone());
302 task.updated_at = chrono::Utc::now().to_rfc3339();
303 self.save_task(task)?;
304
305 info!(
306 task_id = %task_id,
307 iterations = task.iterations,
308 "Background task completed"
309 );
310
311 Ok(response)
312 }
313 Err(e) => {
314 task.status = TaskStatus::Failed;
315 task.error = Some(e.to_string());
316 task.updated_at = chrono::Utc::now().to_rfc3339();
317 self.save_task(task)?;
318
319 warn!(
320 task_id = %task_id,
321 error = %e,
322 "Background task failed"
323 );
324
325 Err(e)
326 }
327 }
328 }
329
330 #[allow(dead_code)]
332 pub async fn status(&self, task_id: &str) -> Result<TaskStatus> {
333 let tasks = self.tasks.read().await;
334 let task = tasks.get(task_id).ok_or_else(|| {
335 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
336 })?;
337 Ok(task.status.clone())
338 }
339
340 pub async fn get_task(&self, task_id: &str) -> Result<BackgroundTask> {
342 let tasks = self.tasks.read().await;
343 tasks.get(task_id).cloned().ok_or_else(|| {
344 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
345 })
346 }
347
348 pub async fn list_tasks(&self) -> Vec<BackgroundTask> {
350 let tasks = self.tasks.read().await;
351 let mut task_list: Vec<BackgroundTask> = tasks.values().cloned().collect();
352 task_list.sort_by(|a, b| b.created_at.cmp(&a.created_at));
354 task_list
355 }
356
357 pub async fn cancel(&self, task_id: &str) -> Result<()> {
359 let mut tasks = self.tasks.write().await;
360 let task = tasks.get_mut(task_id).ok_or_else(|| {
361 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
362 })?;
363
364 match task.status {
365 TaskStatus::Pending | TaskStatus::Running => {
366 task.status = TaskStatus::Cancelled;
367 task.updated_at = chrono::Utc::now().to_rfc3339();
368 self.save_task(task)?;
369 info!(task_id = %task_id, "Background task cancelled");
370 Ok(())
371 }
372 _ => Err(RavenClawsError::CommandExecution(format!(
373 "Cannot cancel task '{}' in status '{}'",
374 task_id, task.status
375 ))),
376 }
377 }
378
379 pub async fn resume_incomplete(&self) -> Vec<String> {
382 let tasks = self.tasks.read().await;
383 let mut incomplete = Vec::new();
384
385 for task in tasks.values() {
386 if task.status == TaskStatus::Pending || task.status == TaskStatus::Running {
387 incomplete.push(task.id.clone());
388 }
389 }
390
391 if !incomplete.is_empty() {
392 info!(
393 count = incomplete.len(),
394 "Found incomplete background tasks to resume"
395 );
396 }
397
398 incomplete
399 }
400
401 async fn get_prompt(&self, task_id: &str) -> Result<String> {
403 let tasks = self.tasks.read().await;
404 let task = tasks.get(task_id).ok_or_else(|| {
405 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
406 })?;
407 Ok(task.prompt.clone())
408 }
409
410 async fn get_system_prompt(&self, task_id: &str) -> Result<String> {
412 let tasks = self.tasks.read().await;
413 let task = tasks.get(task_id).ok_or_else(|| {
414 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
415 })?;
416 Ok(task.system_prompt.clone())
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use super::*;
423 use std::path::PathBuf;
424
425 fn test_dir(name: &str) -> PathBuf {
426 let dir = std::env::temp_dir().join(format!(
427 "ravenclaws-test-bg-{}-{}",
428 name,
429 std::process::id()
430 ));
431 let _ = std::fs::remove_dir_all(&dir);
432 dir
433 }
434
435 #[tokio::test]
436 async fn test_manager_new_creates_directory() {
437 let dir = test_dir("create_dir");
438 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
439 assert!(dir.exists(), "Tasks directory should be created");
440 assert!(manager.tasks.read().await.is_empty());
441 let _ = std::fs::remove_dir_all(&dir);
442 }
443
444 #[tokio::test]
445 async fn test_submit_task() {
446 let dir = test_dir("submit");
447 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
448
449 let task_id = manager
450 .submit("Test prompt".to_string(), "Test system".to_string())
451 .await
452 .unwrap();
453
454 let task = manager.get_task(&task_id).await.unwrap();
455 assert_eq!(task.prompt, "Test prompt");
456 assert_eq!(task.system_prompt, "Test system");
457 assert_eq!(task.status, TaskStatus::Pending);
458 assert!(task.result.is_none());
459
460 let task_path = dir.join(format!("{}.json", task_id));
462 assert!(task_path.exists(), "Task file should exist on disk");
463
464 let _ = std::fs::remove_dir_all(&dir);
465 }
466
467 #[tokio::test]
468 async fn test_status_transitions() {
469 let dir = test_dir("transitions");
470 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
471
472 let task_id = manager
473 .submit("Test".to_string(), "System".to_string())
474 .await
475 .unwrap();
476
477 assert_eq!(manager.status(&task_id).await.unwrap(), TaskStatus::Pending);
478
479 manager.cancel(&task_id).await.unwrap();
481 assert_eq!(
482 manager.status(&task_id).await.unwrap(),
483 TaskStatus::Cancelled
484 );
485
486 let _ = std::fs::remove_dir_all(&dir);
487 }
488
489 #[tokio::test]
490 async fn test_list_tasks() {
491 let dir = test_dir("list");
492 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
493
494 manager
495 .submit("Task 1".to_string(), "System".to_string())
496 .await
497 .unwrap();
498 manager
499 .submit("Task 2".to_string(), "System".to_string())
500 .await
501 .unwrap();
502
503 let tasks = manager.list_tasks().await;
504 assert_eq!(tasks.len(), 2);
505
506 let _ = std::fs::remove_dir_all(&dir);
507 }
508
509 #[tokio::test]
510 async fn test_cancel_completed_task_fails() {
511 let dir = test_dir("cancel_fail");
512 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
513
514 let task_id = manager
515 .submit("Test".to_string(), "System".to_string())
516 .await
517 .unwrap();
518
519 {
521 let mut tasks = manager.tasks.write().await;
522 let task = tasks.get_mut(&task_id).unwrap();
523 task.status = TaskStatus::Completed;
524 }
525
526 let result = manager.cancel(&task_id).await;
527 assert!(result.is_err(), "Cancelling a completed task should fail");
528
529 let _ = std::fs::remove_dir_all(&dir);
530 }
531
532 #[tokio::test]
533 async fn test_resume_incomplete_tasks() {
534 let dir = test_dir("resume");
535 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
536
537 manager
538 .submit("Task 1".to_string(), "System".to_string())
539 .await
540 .unwrap();
541 manager
542 .submit("Task 2".to_string(), "System".to_string())
543 .await
544 .unwrap();
545
546 {
548 let tasks = manager.tasks.read().await;
549 let tasks_vec: Vec<&BackgroundTask> = tasks.values().collect();
550 if let Some(task) = tasks_vec.first() {
551 let id = task.id.clone();
552 drop(tasks);
553 let mut tasks = manager.tasks.write().await;
554 if let Some(t) = tasks.get_mut(&id) {
555 t.status = TaskStatus::Completed;
556 }
557 }
558 }
559
560 let incomplete = manager.resume_incomplete().await;
561 assert_eq!(incomplete.len(), 1, "One task should be incomplete");
562
563 let _ = std::fs::remove_dir_all(&dir);
564 }
565
566 #[tokio::test]
567 async fn test_task_not_found() {
568 let dir = test_dir("not_found");
569 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
570
571 let result = manager.status("nonexistent").await;
572 assert!(result.is_err());
573
574 let _ = std::fs::remove_dir_all(&dir);
575 }
576
577 #[tokio::test]
578 async fn test_persistence_across_restart() {
579 let dir = test_dir("persist");
580
581 {
583 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
584 manager
585 .submit("Persist test".to_string(), "System".to_string())
586 .await
587 .unwrap();
588 } {
592 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
593 let tasks = manager.list_tasks().await;
594 assert_eq!(tasks.len(), 1, "Task should persist across restarts");
595 assert_eq!(tasks[0].prompt, "Persist test");
596 }
597
598 let _ = std::fs::remove_dir_all(&dir);
599 }
600}