1use crate::config::RuntimeConfig;
23use crate::error::{RavenClawsError, Result};
24use crate::llm::LLMProviderTrait;
25use serde::{Deserialize, Serialize};
26use std::collections::HashMap;
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29use tokio::sync::RwLock;
30use tracing::{info, instrument, warn};
31
32#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
34pub enum TaskStatus {
35 Pending,
37 Running,
39 Completed,
41 Failed,
43 Cancelled,
45}
46
47impl std::fmt::Display for TaskStatus {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 match self {
50 TaskStatus::Pending => write!(f, "pending"),
51 TaskStatus::Running => write!(f, "running"),
52 TaskStatus::Completed => write!(f, "completed"),
53 TaskStatus::Failed => write!(f, "failed"),
54 TaskStatus::Cancelled => write!(f, "cancelled"),
55 }
56 }
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct BackgroundTask {
62 pub id: String,
64 pub prompt: String,
66 pub system_prompt: String,
68 pub status: TaskStatus,
70 pub result: Option<String>,
72 pub error: Option<String>,
74 pub created_at: String,
76 pub updated_at: String,
78 pub iterations: usize,
80 pub provider: Option<String>,
82 pub model: Option<String>,
84 pub checkpoint: Option<String>,
88}
89
90impl BackgroundTask {
91 pub fn new(id: String, prompt: String, system_prompt: String) -> Self {
93 let now = chrono::Utc::now().to_rfc3339();
94 Self {
95 id,
96 prompt,
97 system_prompt,
98 status: TaskStatus::Pending,
99 result: None,
100 error: None,
101 created_at: now.clone(),
102 updated_at: now,
103 iterations: 0,
104 provider: None,
105 model: None,
106 checkpoint: None,
107 }
108 }
109}
110
111#[derive(Debug, Clone)]
113pub struct BackgroundTaskManager {
114 tasks_dir: PathBuf,
116 tasks: Arc<RwLock<HashMap<String, BackgroundTask>>>,
118}
119
120impl BackgroundTaskManager {
121 pub async fn new(tasks_dir: &Path) -> Result<Self> {
127 let tasks_dir = tasks_dir.to_path_buf();
128
129 std::fs::create_dir_all(&tasks_dir).map_err(|e| {
131 RavenClawsError::CommandExecution(format!(
132 "Failed to create tasks directory '{}': {}",
133 tasks_dir.display(),
134 e
135 ))
136 })?;
137
138 let tasks = Arc::new(RwLock::new(HashMap::new()));
139
140 let mut manager = Self { tasks_dir, tasks };
141
142 let count = manager.load_tasks().await?;
144 if count > 0 {
145 info!(count, "Loaded existing background tasks from disk");
146 }
147
148 Ok(manager)
149 }
150
151 pub async fn from_config(config: &RuntimeConfig) -> Result<Self> {
153 let tasks_dir = PathBuf::from(&config.workdir).join("tasks");
154 Self::new(&tasks_dir).await
155 }
156
157 async fn load_tasks(&mut self) -> Result<usize> {
159 let mut count = 0;
160 let read_dir = match std::fs::read_dir(&self.tasks_dir) {
161 Ok(d) => d,
162 Err(_) => return Ok(0),
163 };
164
165 let mut tasks_to_insert = Vec::new();
166 for entry in read_dir.flatten() {
167 let path = entry.path();
168 if path.extension().is_some_and(|ext| ext == "json") {
169 match std::fs::read_to_string(&path) {
170 Ok(content) => match serde_json::from_str::<BackgroundTask>(&content) {
171 Ok(task) => {
172 tasks_to_insert.push(task);
173 count += 1;
174 }
175 Err(e) => {
176 warn!(
177 path = %path.display(),
178 error = %e,
179 "Failed to deserialize background task"
180 );
181 }
182 },
183 Err(e) => {
184 warn!(
185 path = %path.display(),
186 error = %e,
187 "Failed to read background task file"
188 );
189 }
190 }
191 }
192 }
193
194 let mut tasks = self.tasks.write().await;
195 for task in tasks_to_insert {
196 tasks.insert(task.id.clone(), task);
197 }
198
199 Ok(count)
200 }
201
202 fn save_task(&self, task: &BackgroundTask) -> Result<()> {
204 let path = self.tasks_dir.join(format!("{}.json", task.id));
205 let content = serde_json::to_string_pretty(task).map_err(|e| {
206 RavenClawsError::CommandExecution(format!("Failed to serialize task: {}", e))
207 })?;
208
209 std::fs::write(&path, content).map_err(|e| {
210 RavenClawsError::CommandExecution(format!(
211 "Failed to write task file '{}': {}",
212 path.display(),
213 e
214 ))
215 })?;
216
217 Ok(())
218 }
219
220 pub async fn submit(&self, prompt: String, system_prompt: String) -> Result<String> {
223 let id = uuid::Uuid::new_v4().to_string();
224 let task = BackgroundTask::new(id.clone(), prompt, system_prompt);
225
226 self.save_task(&task)?;
228
229 let mut tasks = self.tasks.write().await;
231 tasks.insert(id.clone(), task);
232
233 info!(task_id = %id, "Background task submitted");
234 Ok(id)
235 }
236
237 #[instrument(skip(self, llm), fields(task_id = %task_id))]
240 pub async fn execute(&self, task_id: &str, llm: Arc<dyn LLMProviderTrait>) -> Result<String> {
241 {
243 let mut tasks = self.tasks.write().await;
244 let task = tasks.get_mut(task_id).ok_or_else(|| {
245 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
246 })?;
247
248 task.status = TaskStatus::Running;
249 task.provider = Some(llm.provider_name().to_string());
250 task.model = Some(llm.model().to_string());
251 task.updated_at = chrono::Utc::now().to_rfc3339();
252 self.save_task(task)?;
253 }
254
255 info!(
256 task_id = %task_id,
257 provider = llm.provider_name(),
258 model = llm.model(),
259 "Executing background task"
260 );
261
262 let checkpoint_dir = self.tasks_dir.join("checkpoints");
264 let _ = std::fs::create_dir_all(&checkpoint_dir);
265
266 let loop_config = crate::agent::AgentLoopConfig {
268 max_iterations: 10,
269 enable_tools: true,
270 require_approval: false,
271 prompt_injection_protection: true,
272 token_lifetime_secs: 0,
273 no_final_required: true,
274 fallback_chain: None,
275 token_budget: None,
276 ravenfabric: None,
277 checkpoint_dir: Some(checkpoint_dir),
278 session_id: Some(task_id.to_string()),
279 metrics_callback: None,
280 };
281
282 let result = crate::agent::run_agent_loop(
283 llm.clone(),
284 &self.get_prompt(task_id).await?,
285 &self.get_system_prompt(task_id).await?,
286 loop_config,
287 )
288 .await;
289
290 let mut tasks = self.tasks.write().await;
292 let task = tasks.get_mut(task_id).ok_or_else(|| {
293 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
294 })?;
295
296 match result {
297 Ok(response) => {
298 task.status = TaskStatus::Completed;
299 task.result = Some(response.clone());
300 task.updated_at = chrono::Utc::now().to_rfc3339();
301 self.save_task(task)?;
302
303 info!(
304 task_id = %task_id,
305 iterations = task.iterations,
306 "Background task completed"
307 );
308
309 Ok(response)
310 }
311 Err(e) => {
312 task.status = TaskStatus::Failed;
313 task.error = Some(e.to_string());
314 task.updated_at = chrono::Utc::now().to_rfc3339();
315 self.save_task(task)?;
316
317 warn!(
318 task_id = %task_id,
319 error = %e,
320 "Background task failed"
321 );
322
323 Err(e)
324 }
325 }
326 }
327
328 #[allow(dead_code)]
330 pub async fn status(&self, task_id: &str) -> Result<TaskStatus> {
331 let tasks = self.tasks.read().await;
332 let task = tasks.get(task_id).ok_or_else(|| {
333 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
334 })?;
335 Ok(task.status.clone())
336 }
337
338 pub async fn get_task(&self, task_id: &str) -> Result<BackgroundTask> {
340 let tasks = self.tasks.read().await;
341 tasks.get(task_id).cloned().ok_or_else(|| {
342 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
343 })
344 }
345
346 pub async fn list_tasks(&self) -> Vec<BackgroundTask> {
348 let tasks = self.tasks.read().await;
349 let mut task_list: Vec<BackgroundTask> = tasks.values().cloned().collect();
350 task_list.sort_by(|a, b| b.created_at.cmp(&a.created_at));
352 task_list
353 }
354
355 pub async fn cancel(&self, task_id: &str) -> Result<()> {
357 let mut tasks = self.tasks.write().await;
358 let task = tasks.get_mut(task_id).ok_or_else(|| {
359 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
360 })?;
361
362 match task.status {
363 TaskStatus::Pending | TaskStatus::Running => {
364 task.status = TaskStatus::Cancelled;
365 task.updated_at = chrono::Utc::now().to_rfc3339();
366 self.save_task(task)?;
367 info!(task_id = %task_id, "Background task cancelled");
368 Ok(())
369 }
370 _ => Err(RavenClawsError::CommandExecution(format!(
371 "Cannot cancel task '{}' in status '{}'",
372 task_id, task.status
373 ))),
374 }
375 }
376
377 pub async fn resume_incomplete(&self) -> Vec<String> {
380 let tasks = self.tasks.read().await;
381 let mut incomplete = Vec::new();
382
383 for task in tasks.values() {
384 if task.status == TaskStatus::Pending || task.status == TaskStatus::Running {
385 incomplete.push(task.id.clone());
386 }
387 }
388
389 if !incomplete.is_empty() {
390 info!(
391 count = incomplete.len(),
392 "Found incomplete background tasks to resume"
393 );
394 }
395
396 incomplete
397 }
398
399 async fn get_prompt(&self, task_id: &str) -> Result<String> {
401 let tasks = self.tasks.read().await;
402 let task = tasks.get(task_id).ok_or_else(|| {
403 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
404 })?;
405 Ok(task.prompt.clone())
406 }
407
408 async fn get_system_prompt(&self, task_id: &str) -> Result<String> {
410 let tasks = self.tasks.read().await;
411 let task = tasks.get(task_id).ok_or_else(|| {
412 RavenClawsError::CommandExecution(format!("Task '{}' not found", task_id))
413 })?;
414 Ok(task.system_prompt.clone())
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421 use std::path::PathBuf;
422
423 fn test_dir(name: &str) -> PathBuf {
424 let dir = std::env::temp_dir().join(format!(
425 "ravenclaws-test-bg-{}-{}",
426 name,
427 std::process::id()
428 ));
429 let _ = std::fs::remove_dir_all(&dir);
430 dir
431 }
432
433 #[tokio::test]
434 async fn test_manager_new_creates_directory() {
435 let dir = test_dir("create_dir");
436 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
437 assert!(dir.exists(), "Tasks directory should be created");
438 assert!(manager.tasks.read().await.is_empty());
439 let _ = std::fs::remove_dir_all(&dir);
440 }
441
442 #[tokio::test]
443 async fn test_submit_task() {
444 let dir = test_dir("submit");
445 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
446
447 let task_id = manager
448 .submit("Test prompt".to_string(), "Test system".to_string())
449 .await
450 .unwrap();
451
452 let task = manager.get_task(&task_id).await.unwrap();
453 assert_eq!(task.prompt, "Test prompt");
454 assert_eq!(task.system_prompt, "Test system");
455 assert_eq!(task.status, TaskStatus::Pending);
456 assert!(task.result.is_none());
457
458 let task_path = dir.join(format!("{}.json", task_id));
460 assert!(task_path.exists(), "Task file should exist on disk");
461
462 let _ = std::fs::remove_dir_all(&dir);
463 }
464
465 #[tokio::test]
466 async fn test_status_transitions() {
467 let dir = test_dir("transitions");
468 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
469
470 let task_id = manager
471 .submit("Test".to_string(), "System".to_string())
472 .await
473 .unwrap();
474
475 assert_eq!(manager.status(&task_id).await.unwrap(), TaskStatus::Pending);
476
477 manager.cancel(&task_id).await.unwrap();
479 assert_eq!(
480 manager.status(&task_id).await.unwrap(),
481 TaskStatus::Cancelled
482 );
483
484 let _ = std::fs::remove_dir_all(&dir);
485 }
486
487 #[tokio::test]
488 async fn test_list_tasks() {
489 let dir = test_dir("list");
490 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
491
492 manager
493 .submit("Task 1".to_string(), "System".to_string())
494 .await
495 .unwrap();
496 manager
497 .submit("Task 2".to_string(), "System".to_string())
498 .await
499 .unwrap();
500
501 let tasks = manager.list_tasks().await;
502 assert_eq!(tasks.len(), 2);
503
504 let _ = std::fs::remove_dir_all(&dir);
505 }
506
507 #[tokio::test]
508 async fn test_cancel_completed_task_fails() {
509 let dir = test_dir("cancel_fail");
510 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
511
512 let task_id = manager
513 .submit("Test".to_string(), "System".to_string())
514 .await
515 .unwrap();
516
517 {
519 let mut tasks = manager.tasks.write().await;
520 let task = tasks.get_mut(&task_id).unwrap();
521 task.status = TaskStatus::Completed;
522 }
523
524 let result = manager.cancel(&task_id).await;
525 assert!(result.is_err(), "Cancelling a completed task should fail");
526
527 let _ = std::fs::remove_dir_all(&dir);
528 }
529
530 #[tokio::test]
531 async fn test_resume_incomplete_tasks() {
532 let dir = test_dir("resume");
533 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
534
535 manager
536 .submit("Task 1".to_string(), "System".to_string())
537 .await
538 .unwrap();
539 manager
540 .submit("Task 2".to_string(), "System".to_string())
541 .await
542 .unwrap();
543
544 {
546 let tasks = manager.tasks.read().await;
547 let tasks_vec: Vec<&BackgroundTask> = tasks.values().collect();
548 if let Some(task) = tasks_vec.first() {
549 let id = task.id.clone();
550 drop(tasks);
551 let mut tasks = manager.tasks.write().await;
552 if let Some(t) = tasks.get_mut(&id) {
553 t.status = TaskStatus::Completed;
554 }
555 }
556 }
557
558 let incomplete = manager.resume_incomplete().await;
559 assert_eq!(incomplete.len(), 1, "One task should be incomplete");
560
561 let _ = std::fs::remove_dir_all(&dir);
562 }
563
564 #[tokio::test]
565 async fn test_task_not_found() {
566 let dir = test_dir("not_found");
567 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
568
569 let result = manager.status("nonexistent").await;
570 assert!(result.is_err());
571
572 let _ = std::fs::remove_dir_all(&dir);
573 }
574
575 #[tokio::test]
576 async fn test_persistence_across_restart() {
577 let dir = test_dir("persist");
578
579 {
581 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
582 manager
583 .submit("Persist test".to_string(), "System".to_string())
584 .await
585 .unwrap();
586 } {
590 let manager = BackgroundTaskManager::new(&dir).await.unwrap();
591 let tasks = manager.list_tasks().await;
592 assert_eq!(tasks.len(), 1, "Task should persist across restarts");
593 assert_eq!(tasks[0].prompt, "Persist test");
594 }
595
596 let _ = std::fs::remove_dir_all(&dir);
597 }
598}