tsk/server/
mod.rs

1pub mod executor;
2pub mod lifecycle;
3pub mod protocol;
4
5use crate::context::AppContext;
6use crate::task_storage::{TaskStorage, get_task_storage};
7use executor::TaskExecutor;
8use lifecycle::ServerLifecycle;
9use protocol::{Request, Response};
10use std::path::PathBuf;
11use std::sync::Arc;
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
13use tokio::net::{UnixListener, UnixStream};
14use tokio::sync::Mutex;
15
16/// Main TSK server that handles task management
17pub struct TskServer {
18    app_context: Arc<AppContext>,
19    storage: Arc<Mutex<Box<dyn TaskStorage>>>,
20    socket_path: PathBuf,
21    shutdown_signal: Arc<Mutex<bool>>,
22    executor: Arc<TaskExecutor>,
23    lifecycle: ServerLifecycle,
24}
25
26impl TskServer {
27    /// Create a new TSK server instance
28    pub fn new(app_context: Arc<AppContext>) -> Self {
29        let xdg_directories = app_context.xdg_directories();
30        let socket_path = xdg_directories.socket_path();
31        let storage = get_task_storage(xdg_directories.clone(), app_context.file_system());
32        let storage = Arc::new(Mutex::new(storage));
33
34        let executor = Arc::new(TaskExecutor::new(app_context.clone(), storage.clone()));
35        let lifecycle = ServerLifecycle::new(xdg_directories);
36
37        Self {
38            app_context,
39            storage,
40            socket_path,
41            shutdown_signal: Arc::new(Mutex::new(false)),
42            executor,
43            lifecycle,
44        }
45    }
46
47    /// Start the server and listen for connections
48    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
49        // Check if server is already running
50        if self.lifecycle.is_server_running() {
51            return Err("Server is already running".into());
52        }
53
54        // Write PID file
55        self.lifecycle.write_pid()?;
56
57        // Ensure socket doesn't already exist
58        if self.socket_path.exists() {
59            std::fs::remove_file(&self.socket_path)?;
60        }
61
62        // Create and bind to socket
63        let listener = UnixListener::bind(&self.socket_path)?;
64        println!("TSK Server listening on: {:?}", self.socket_path);
65
66        // Start the task executor in the background
67        let executor = self.executor.clone();
68        let executor_handle = tokio::spawn(async move {
69            if let Err(e) = executor.start().await {
70                eprintln!("Executor error: {e}");
71            }
72        });
73
74        // Accept connections in a loop
75        loop {
76            // Check shutdown signal
77            if *self.shutdown_signal.lock().await {
78                println!("Server shutting down...");
79                break;
80            }
81
82            match listener.accept().await {
83                Ok((stream, _)) => {
84                    let app_context = self.app_context.clone();
85                    let storage = self.storage.clone();
86                    let shutdown_signal = self.shutdown_signal.clone();
87
88                    // Handle each client in a separate task
89                    tokio::spawn(async move {
90                        if let Err(e) =
91                            handle_client(stream, app_context, storage, shutdown_signal).await
92                        {
93                            eprintln!("Error handling client: {e}");
94                        }
95                    });
96                }
97                Err(e) => {
98                    eprintln!("Error accepting connection: {e}");
99                }
100            }
101        }
102
103        // Stop the executor
104        self.executor.stop().await;
105        executor_handle.await?;
106
107        // Clean up server resources
108        self.lifecycle.cleanup()?;
109
110        // Ensure terminal title is restored
111        self.app_context.terminal_operations().restore_title();
112
113        Ok(())
114    }
115
116    /// Signal the server to shut down
117    pub async fn shutdown(&self) {
118        *self.shutdown_signal.lock().await = true;
119    }
120}
121
122/// Handle a single client connection
123async fn handle_client(
124    stream: UnixStream,
125    _app_context: Arc<AppContext>,
126    storage: Arc<Mutex<Box<dyn TaskStorage>>>,
127    shutdown_signal: Arc<Mutex<bool>>,
128) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
129    let (reader, mut writer) = stream.into_split();
130    let mut reader = BufReader::new(reader);
131    let mut line = String::new();
132
133    // Read request
134    let bytes_read = reader.read_line(&mut line).await?;
135
136    // Handle empty connections (e.g., from is_server_available checks)
137    if bytes_read == 0 || line.trim().is_empty() {
138        // Client connected but didn't send data - this is normal for availability checks
139        return Ok(());
140    }
141
142    let request: Request = serde_json::from_str(&line)?;
143
144    // Process request
145    let response = match request {
146        Request::AddTask { repo_path: _, task } => {
147            let storage = storage.lock().await;
148            match storage.add_task(*task).await {
149                Ok(_) => Response::Success {
150                    message: "Task added successfully".to_string(),
151                },
152                Err(e) => Response::Error {
153                    message: e.to_string(),
154                },
155            }
156        }
157        Request::ListTasks => {
158            let storage = storage.lock().await;
159            match storage.list_tasks().await {
160                Ok(tasks) => Response::TaskList { tasks },
161                Err(e) => Response::Error {
162                    message: e.to_string(),
163                },
164            }
165        }
166        Request::GetStatus { task_id } => {
167            let storage = storage.lock().await;
168            match storage.get_task(&task_id).await {
169                Ok(Some(task)) => Response::TaskStatus {
170                    status: task.status,
171                },
172                Ok(None) => Response::Error {
173                    message: "Task not found".to_string(),
174                },
175                Err(e) => Response::Error {
176                    message: e.to_string(),
177                },
178            }
179        }
180        Request::Shutdown => {
181            *shutdown_signal.lock().await = true;
182            Response::Success {
183                message: "Server shutting down".to_string(),
184            }
185        }
186    };
187
188    // Send response
189    let response_json = serde_json::to_string(&response)?;
190    writer.write_all(response_json.as_bytes()).await?;
191    writer.write_all(b"\n").await?;
192    writer.flush().await?;
193
194    Ok(())
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200    use crate::context::AppContext;
201    use crate::test_utils::NoOpDockerClient;
202    use std::sync::Arc;
203    use tempfile::TempDir;
204    use tokio::io::{AsyncReadExt, AsyncWriteExt};
205    use tokio::net::UnixStream;
206
207    /// Helper to create a test AppContext
208    fn create_test_context() -> (Arc<AppContext>, TempDir) {
209        let temp_dir = TempDir::new().unwrap();
210        unsafe {
211            std::env::set_var("XDG_DATA_HOME", temp_dir.path().join("data"));
212        }
213        unsafe {
214            std::env::set_var("XDG_RUNTIME_DIR", temp_dir.path().join("runtime"));
215        }
216
217        let context = AppContext::builder()
218            .with_docker_client(Arc::new(NoOpDockerClient))
219            .build();
220
221        // Ensure directories exist
222        context.xdg_directories().ensure_directories().unwrap();
223
224        (Arc::new(context), temp_dir)
225    }
226
227    #[tokio::test]
228    async fn test_handle_client_with_empty_connection() {
229        // This test verifies that the server gracefully handles connections
230        // that don't send any data (like is_server_available checks)
231        let (app_context, _temp_dir) = create_test_context();
232        let storage = get_task_storage(app_context.xdg_directories(), app_context.file_system());
233        let storage = Arc::new(Mutex::new(storage));
234        let shutdown_signal = Arc::new(Mutex::new(false));
235
236        // Create a pair of connected Unix sockets
237        let (client, server) = UnixStream::pair().unwrap();
238
239        // Close the client side immediately (simulating is_server_available)
240        drop(client);
241
242        // Handle the server side - this should not error with EOF
243        let result = handle_client(server, app_context, storage, shutdown_signal).await;
244
245        // The function should return Ok(()) without panicking or erroring
246        assert!(result.is_ok(), "Expected Ok(()), got {:?}", result);
247    }
248
249    #[tokio::test]
250    async fn test_handle_client_with_valid_request() {
251        // This test verifies that valid requests still work correctly
252        let (app_context, _temp_dir) = create_test_context();
253        let storage = get_task_storage(app_context.xdg_directories(), app_context.file_system());
254        let storage = Arc::new(Mutex::new(storage));
255        let shutdown_signal = Arc::new(Mutex::new(false));
256
257        // Create a pair of connected Unix sockets
258        let (mut client, server) = UnixStream::pair().unwrap();
259
260        // Send a valid request from the client side
261        let request = Request::ListTasks;
262        let request_json = serde_json::to_string(&request).unwrap();
263        client.write_all(request_json.as_bytes()).await.unwrap();
264        client.write_all(b"\n").await.unwrap();
265        client.flush().await.unwrap();
266
267        // Handle the request on the server side
268        let server_handle = tokio::spawn(async move {
269            handle_client(server, app_context, storage, shutdown_signal).await
270        });
271
272        // Read the response
273        let mut buf = vec![0; 1024];
274        let n = client.read(&mut buf).await.unwrap();
275        let response_str = String::from_utf8_lossy(&buf[..n]);
276
277        // Verify we got a valid response
278        assert!(response_str.contains("TaskList") || response_str.contains("Error"));
279
280        // Ensure the server handler completes successfully
281        let result = server_handle.await.unwrap();
282        assert!(result.is_ok());
283    }
284}