1use crate::context::AppContext;
2use crate::task::{Task, TaskStatus};
3use crate::task_manager::TaskManager;
4use crate::task_storage::TaskStorage;
5use std::sync::Arc;
6use tokio::sync::Mutex;
7use tokio::time::{Duration, sleep};
8
9pub struct TaskExecutor {
11 context: Arc<AppContext>,
12 storage: Arc<Mutex<Box<dyn TaskStorage>>>,
13 running: Arc<Mutex<bool>>,
14}
15
16impl TaskExecutor {
17 pub fn new(context: Arc<AppContext>, storage: Arc<Mutex<Box<dyn TaskStorage>>>) -> Self {
19 Self {
20 context,
21 storage,
22 running: Arc::new(Mutex::new(false)),
23 }
24 }
25
26 pub async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
28 let mut running = self.running.lock().await;
29 if *running {
30 return Err("Executor is already running".into());
31 }
32 *running = true;
33 drop(running);
34
35 println!("Task executor started");
36
37 self.context
39 .terminal_operations()
40 .set_title("TSK Server Idle");
41
42 loop {
43 if !*self.running.lock().await {
45 println!("Task executor stopping");
46 self.context.terminal_operations().restore_title();
47 break;
48 }
49
50 let storage = self.storage.lock().await;
52 let tasks = storage.list_tasks().await?;
53 drop(storage);
54
55 let queued_task = tasks.into_iter().find(|t| t.status == TaskStatus::Queued);
56
57 match queued_task {
58 Some(task) => {
59 println!("Executing task: {} ({})", task.name, task.id);
60
61 self.context
63 .terminal_operations()
64 .set_title(&format!("TSK: {}", task.name));
65
66 let mut running_task = task.clone();
68 running_task.status = TaskStatus::Running;
69 running_task.started_at = Some(chrono::Utc::now());
70
71 let storage = self.storage.lock().await;
72 storage.update_task(running_task.clone()).await?;
73 drop(storage);
74
75 let execution_result = self.execute_task(&running_task).await;
77
78 match execution_result {
79 Ok(_) => {
80 println!("Task completed successfully: {}", running_task.id);
81
82 let mut completed_task = running_task.clone();
84 completed_task.status = TaskStatus::Complete;
85 completed_task.completed_at = Some(chrono::Utc::now());
86
87 let storage = self.storage.lock().await;
88 storage.update_task(completed_task).await?;
89 }
90 Err(e) => {
91 let error_message = e.to_string();
92 eprintln!("Task failed: {} - {}", running_task.id, error_message);
93
94 let mut failed_task = running_task.clone();
96 failed_task.status = TaskStatus::Failed;
97 failed_task.completed_at = Some(chrono::Utc::now());
98 failed_task.error_message = Some(error_message);
99
100 let storage = self.storage.lock().await;
101 storage.update_task(failed_task).await?;
102 }
103 }
104
105 self.context
107 .terminal_operations()
108 .set_title("TSK Server Idle");
109 }
110 None => {
111 sleep(Duration::from_secs(5)).await;
113 }
114 }
115 }
116
117 Ok(())
118 }
119
120 pub async fn stop(&self) {
122 *self.running.lock().await = false;
123 }
124
125 async fn execute_task(
127 &self,
128 task: &Task,
129 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
130 let task_manager = TaskManager::with_storage(&self.context)?;
132
133 let result = task_manager.execute_queued_task(task).await;
135
136 match result {
137 Ok(_) => Ok(()),
138 Err(e) => Err(e.message.into()),
139 }
140 }
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use crate::context::file_system::tests::MockFileSystem;
147 use crate::storage::XdgDirectories;
148 use crate::task_storage::get_task_storage;
149 use tempfile::TempDir;
150
151 #[tokio::test]
152 async fn test_executor_lifecycle() {
153 let temp_dir = TempDir::new().unwrap();
154 unsafe {
155 std::env::set_var("XDG_DATA_HOME", temp_dir.path().join("data"));
156 }
157 unsafe {
158 std::env::set_var("XDG_RUNTIME_DIR", temp_dir.path().join("runtime"));
159 }
160
161 let xdg = Arc::new(XdgDirectories::new().unwrap());
162 xdg.ensure_directories().unwrap();
163
164 let fs = Arc::new(MockFileSystem::new());
165 let storage = Arc::new(Mutex::new(get_task_storage(xdg.clone(), fs.clone())));
166
167 let app_context = crate::context::AppContext::builder()
168 .with_file_system(fs)
169 .with_xdg_directories(xdg)
170 .build();
171
172 let executor = TaskExecutor::new(Arc::new(app_context), storage);
173
174 assert!(!*executor.running.lock().await);
176
177 let executor_clone = Arc::new(executor);
179 let exec_handle = {
180 let exec = executor_clone.clone();
181 tokio::spawn(async move {
182 let _ = exec.start().await;
183 })
184 };
185
186 tokio::time::sleep(Duration::from_millis(100)).await;
188 assert!(*executor_clone.running.lock().await);
189
190 executor_clone.stop().await;
192 let _ = exec_handle.await;
193
194 assert!(!*executor_clone.running.lock().await);
195 }
196}