1pub mod executor;
2pub mod lifecycle;
3pub mod protocol;
4
5use crate::context::AppContext;
6use crate::task_storage::{TaskStorage, get_task_storage};
7use executor::TaskExecutor;
8use lifecycle::ServerLifecycle;
9use protocol::{Request, Response};
10use std::path::PathBuf;
11use std::sync::Arc;
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
13use tokio::net::{UnixListener, UnixStream};
14use tokio::sync::Mutex;
15
16pub struct TskServer {
18 app_context: Arc<AppContext>,
19 storage: Arc<Mutex<Box<dyn TaskStorage>>>,
20 socket_path: PathBuf,
21 shutdown_signal: Arc<Mutex<bool>>,
22 executor: Arc<TaskExecutor>,
23 lifecycle: ServerLifecycle,
24}
25
26impl TskServer {
27 pub fn new(app_context: Arc<AppContext>) -> Self {
29 let xdg_directories = app_context.xdg_directories();
30 let socket_path = xdg_directories.socket_path();
31 let storage = get_task_storage(xdg_directories.clone(), app_context.file_system());
32 let storage = Arc::new(Mutex::new(storage));
33
34 let executor = Arc::new(TaskExecutor::new(app_context.clone(), storage.clone()));
35 let lifecycle = ServerLifecycle::new(xdg_directories);
36
37 Self {
38 app_context,
39 storage,
40 socket_path,
41 shutdown_signal: Arc::new(Mutex::new(false)),
42 executor,
43 lifecycle,
44 }
45 }
46
47 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
49 if self.lifecycle.is_server_running() {
51 return Err("Server is already running".into());
52 }
53
54 self.lifecycle.write_pid()?;
56
57 if self.socket_path.exists() {
59 std::fs::remove_file(&self.socket_path)?;
60 }
61
62 let listener = UnixListener::bind(&self.socket_path)?;
64 println!("TSK Server listening on: {:?}", self.socket_path);
65
66 let executor = self.executor.clone();
68 let executor_handle = tokio::spawn(async move {
69 if let Err(e) = executor.start().await {
70 eprintln!("Executor error: {e}");
71 }
72 });
73
74 loop {
76 if *self.shutdown_signal.lock().await {
78 println!("Server shutting down...");
79 break;
80 }
81
82 match listener.accept().await {
83 Ok((stream, _)) => {
84 let app_context = self.app_context.clone();
85 let storage = self.storage.clone();
86 let shutdown_signal = self.shutdown_signal.clone();
87
88 tokio::spawn(async move {
90 if let Err(e) =
91 handle_client(stream, app_context, storage, shutdown_signal).await
92 {
93 eprintln!("Error handling client: {e}");
94 }
95 });
96 }
97 Err(e) => {
98 eprintln!("Error accepting connection: {e}");
99 }
100 }
101 }
102
103 self.executor.stop().await;
105 executor_handle.await?;
106
107 self.lifecycle.cleanup()?;
109
110 self.app_context.terminal_operations().restore_title();
112
113 Ok(())
114 }
115
116 pub async fn shutdown(&self) {
118 *self.shutdown_signal.lock().await = true;
119 }
120}
121
122async fn handle_client(
124 stream: UnixStream,
125 _app_context: Arc<AppContext>,
126 storage: Arc<Mutex<Box<dyn TaskStorage>>>,
127 shutdown_signal: Arc<Mutex<bool>>,
128) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
129 let (reader, mut writer) = stream.into_split();
130 let mut reader = BufReader::new(reader);
131 let mut line = String::new();
132
133 let bytes_read = reader.read_line(&mut line).await?;
135
136 if bytes_read == 0 || line.trim().is_empty() {
138 return Ok(());
140 }
141
142 let request: Request = serde_json::from_str(&line)?;
143
144 let response = match request {
146 Request::AddTask { repo_path: _, task } => {
147 let storage = storage.lock().await;
148 match storage.add_task(*task).await {
149 Ok(_) => Response::Success {
150 message: "Task added successfully".to_string(),
151 },
152 Err(e) => Response::Error {
153 message: e.to_string(),
154 },
155 }
156 }
157 Request::ListTasks => {
158 let storage = storage.lock().await;
159 match storage.list_tasks().await {
160 Ok(tasks) => Response::TaskList { tasks },
161 Err(e) => Response::Error {
162 message: e.to_string(),
163 },
164 }
165 }
166 Request::GetStatus { task_id } => {
167 let storage = storage.lock().await;
168 match storage.get_task(&task_id).await {
169 Ok(Some(task)) => Response::TaskStatus {
170 status: task.status,
171 },
172 Ok(None) => Response::Error {
173 message: "Task not found".to_string(),
174 },
175 Err(e) => Response::Error {
176 message: e.to_string(),
177 },
178 }
179 }
180 Request::Shutdown => {
181 *shutdown_signal.lock().await = true;
182 Response::Success {
183 message: "Server shutting down".to_string(),
184 }
185 }
186 };
187
188 let response_json = serde_json::to_string(&response)?;
190 writer.write_all(response_json.as_bytes()).await?;
191 writer.write_all(b"\n").await?;
192 writer.flush().await?;
193
194 Ok(())
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200 use crate::context::AppContext;
201 use crate::test_utils::NoOpDockerClient;
202 use std::sync::Arc;
203 use tempfile::TempDir;
204 use tokio::io::{AsyncReadExt, AsyncWriteExt};
205 use tokio::net::UnixStream;
206
207 fn create_test_context() -> (Arc<AppContext>, TempDir) {
209 let temp_dir = TempDir::new().unwrap();
210 unsafe {
211 std::env::set_var("XDG_DATA_HOME", temp_dir.path().join("data"));
212 }
213 unsafe {
214 std::env::set_var("XDG_RUNTIME_DIR", temp_dir.path().join("runtime"));
215 }
216
217 let context = AppContext::builder()
218 .with_docker_client(Arc::new(NoOpDockerClient))
219 .build();
220
221 context.xdg_directories().ensure_directories().unwrap();
223
224 (Arc::new(context), temp_dir)
225 }
226
227 #[tokio::test]
228 async fn test_handle_client_with_empty_connection() {
229 let (app_context, _temp_dir) = create_test_context();
232 let storage = get_task_storage(app_context.xdg_directories(), app_context.file_system());
233 let storage = Arc::new(Mutex::new(storage));
234 let shutdown_signal = Arc::new(Mutex::new(false));
235
236 let (client, server) = UnixStream::pair().unwrap();
238
239 drop(client);
241
242 let result = handle_client(server, app_context, storage, shutdown_signal).await;
244
245 assert!(result.is_ok(), "Expected Ok(()), got {:?}", result);
247 }
248
249 #[tokio::test]
250 async fn test_handle_client_with_valid_request() {
251 let (app_context, _temp_dir) = create_test_context();
253 let storage = get_task_storage(app_context.xdg_directories(), app_context.file_system());
254 let storage = Arc::new(Mutex::new(storage));
255 let shutdown_signal = Arc::new(Mutex::new(false));
256
257 let (mut client, server) = UnixStream::pair().unwrap();
259
260 let request = Request::ListTasks;
262 let request_json = serde_json::to_string(&request).unwrap();
263 client.write_all(request_json.as_bytes()).await.unwrap();
264 client.write_all(b"\n").await.unwrap();
265 client.flush().await.unwrap();
266
267 let server_handle = tokio::spawn(async move {
269 handle_client(server, app_context, storage, shutdown_signal).await
270 });
271
272 let mut buf = vec![0; 1024];
274 let n = client.read(&mut buf).await.unwrap();
275 let response_str = String::from_utf8_lossy(&buf[..n]);
276
277 assert!(response_str.contains("TaskList") || response_str.contains("Error"));
279
280 let result = server_handle.await.unwrap();
282 assert!(result.is_ok());
283 }
284}