tsk/server/
executor.rs

1use crate::context::AppContext;
2use crate::task::{Task, TaskStatus};
3use crate::task_manager::TaskManager;
4use crate::task_storage::TaskStorage;
5use std::sync::Arc;
6use tokio::sync::Mutex;
7use tokio::time::{Duration, sleep};
8
9/// Task executor that runs tasks sequentially
10pub struct TaskExecutor {
11    context: Arc<AppContext>,
12    storage: Arc<Mutex<Box<dyn TaskStorage>>>,
13    running: Arc<Mutex<bool>>,
14}
15
16impl TaskExecutor {
17    /// Create a new task executor
18    pub fn new(context: Arc<AppContext>, storage: Arc<Mutex<Box<dyn TaskStorage>>>) -> Self {
19        Self {
20            context,
21            storage,
22            running: Arc::new(Mutex::new(false)),
23        }
24    }
25
26    /// Start executing tasks from the queue
27    pub async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
28        let mut running = self.running.lock().await;
29        if *running {
30            return Err("Executor is already running".into());
31        }
32        *running = true;
33        drop(running);
34
35        println!("Task executor started");
36
37        // Set initial idle title
38        self.context
39            .terminal_operations()
40            .set_title("TSK Server Idle");
41
42        loop {
43            // Check if we should continue running
44            if !*self.running.lock().await {
45                println!("Task executor stopping");
46                self.context.terminal_operations().restore_title();
47                break;
48            }
49
50            // Get the next queued task
51            let storage = self.storage.lock().await;
52            let tasks = storage.list_tasks().await?;
53            drop(storage);
54
55            let queued_task = tasks.into_iter().find(|t| t.status == TaskStatus::Queued);
56
57            match queued_task {
58                Some(task) => {
59                    println!("Executing task: {} ({})", task.name, task.id);
60
61                    // Update terminal title to show current task
62                    self.context
63                        .terminal_operations()
64                        .set_title(&format!("TSK: {}", task.name));
65
66                    // Update task status to running
67                    let mut running_task = task.clone();
68                    running_task.status = TaskStatus::Running;
69                    running_task.started_at = Some(chrono::Utc::now());
70
71                    let storage = self.storage.lock().await;
72                    storage.update_task(running_task.clone()).await?;
73                    drop(storage);
74
75                    // Execute the task
76                    let execution_result = self.execute_task(&running_task).await;
77
78                    match execution_result {
79                        Ok(_) => {
80                            println!("Task completed successfully: {}", running_task.id);
81
82                            // Update task status to complete
83                            let mut completed_task = running_task.clone();
84                            completed_task.status = TaskStatus::Complete;
85                            completed_task.completed_at = Some(chrono::Utc::now());
86
87                            let storage = self.storage.lock().await;
88                            storage.update_task(completed_task).await?;
89                        }
90                        Err(e) => {
91                            let error_message = e.to_string();
92                            eprintln!("Task failed: {} - {}", running_task.id, error_message);
93
94                            // Update task status to failed
95                            let mut failed_task = running_task.clone();
96                            failed_task.status = TaskStatus::Failed;
97                            failed_task.completed_at = Some(chrono::Utc::now());
98                            failed_task.error_message = Some(error_message);
99
100                            let storage = self.storage.lock().await;
101                            storage.update_task(failed_task).await?;
102                        }
103                    }
104
105                    // Restore idle title after task completion
106                    self.context
107                        .terminal_operations()
108                        .set_title("TSK Server Idle");
109                }
110                None => {
111                    // No tasks to execute, wait a bit before checking again
112                    sleep(Duration::from_secs(5)).await;
113                }
114            }
115        }
116
117        Ok(())
118    }
119
120    /// Stop the executor
121    pub async fn stop(&self) {
122        *self.running.lock().await = false;
123    }
124
125    /// Execute a single task
126    async fn execute_task(
127        &self,
128        task: &Task,
129    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
130        // Create a task manager with the current context
131        let task_manager = TaskManager::with_storage(&self.context)?;
132
133        // Execute the task
134        let result = task_manager.execute_queued_task(task).await;
135
136        match result {
137            Ok(_) => Ok(()),
138            Err(e) => Err(e.message.into()),
139        }
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use crate::context::file_system::tests::MockFileSystem;
147    use crate::storage::XdgDirectories;
148    use crate::task_storage::get_task_storage;
149    use tempfile::TempDir;
150
151    #[tokio::test]
152    async fn test_executor_lifecycle() {
153        let temp_dir = TempDir::new().unwrap();
154        unsafe {
155            std::env::set_var("XDG_DATA_HOME", temp_dir.path().join("data"));
156        }
157        unsafe {
158            std::env::set_var("XDG_RUNTIME_DIR", temp_dir.path().join("runtime"));
159        }
160
161        let xdg = Arc::new(XdgDirectories::new().unwrap());
162        xdg.ensure_directories().unwrap();
163
164        let fs = Arc::new(MockFileSystem::new());
165        let storage = Arc::new(Mutex::new(get_task_storage(xdg.clone(), fs.clone())));
166
167        let app_context = crate::context::AppContext::builder()
168            .with_file_system(fs)
169            .with_xdg_directories(xdg)
170            .build();
171
172        let executor = TaskExecutor::new(Arc::new(app_context), storage);
173
174        // Test that executor can be started and stopped
175        assert!(!*executor.running.lock().await);
176
177        // Start executor in background
178        let executor_clone = Arc::new(executor);
179        let exec_handle = {
180            let exec = executor_clone.clone();
181            tokio::spawn(async move {
182                let _ = exec.start().await;
183            })
184        };
185
186        // Give it time to start
187        tokio::time::sleep(Duration::from_millis(100)).await;
188        assert!(*executor_clone.running.lock().await);
189
190        // Stop executor
191        executor_clone.stop().await;
192        let _ = exec_handle.await;
193
194        assert!(!*executor_clone.running.lock().await);
195    }
196}