1use crate::client_connection::handle_client;
2use crate::error::{DaemonError, DaemonResult};
3use crate::pid_lockfile::PidLockfile;
4use crate::workspace_registry::WorkspaceRegistry;
5use std::fs::{create_dir_all, remove_file};
6use std::future::pending;
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::time::{Duration, Instant};
11use tokio::net::UnixListener;
12use tokio::select;
13use tokio::spawn;
14use tokio::sync::{RwLock, oneshot};
15use tokio::time::sleep;
16use uuid::Uuid;
17
18#[doc = include_str!("docs/daemon.md")]
19pub struct LspDaemon {
20 socket_path: PathBuf,
21 idle_timeout: Option<Duration>,
22 workspace_registry: WorkspaceRegistry,
23}
24
25impl LspDaemon {
26 pub fn new(socket_path: PathBuf, idle_timeout: Option<Duration>) -> Self {
28 Self { socket_path, idle_timeout, workspace_registry: WorkspaceRegistry::new() }
29 }
30
31 pub async fn run(self) -> DaemonResult<()> {
33 if let Some(parent) = self.socket_path.parent() {
34 create_dir_all(parent).map_err(DaemonError::Io)?;
35 }
36
37 let _lockfile = PidLockfile::acquire(&self.socket_path.with_extension("lock"))
38 .map_err(|e| DaemonError::LockfileError(e.to_string()))?;
39
40 let _ = remove_file(&self.socket_path);
41
42 let shutdown_rx = spawn_shutdown_signal_handler();
43
44 tracing::info!("Daemon listening on {:?}", self.socket_path);
45 self.run_listener_loop(shutdown_rx).await?;
46
47 tracing::info!("Shutting down LSP servers");
48 self.workspace_registry.shutdown().await;
49
50 let _ = remove_file(&self.socket_path);
51 tracing::info!("Daemon shutdown complete");
52
53 Ok(())
54 }
55
56 async fn run_listener_loop(&self, mut shutdown_rx: oneshot::Receiver<()>) -> DaemonResult<()> {
58 let listener = UnixListener::bind(&self.socket_path).map_err(DaemonError::BindFailed)?;
59 let client_count = Arc::new(AtomicUsize::new(0));
60 let last_activity = Arc::new(RwLock::new(Instant::now()));
61
62 loop {
63 select! {
64 biased;
65
66 _ = &mut shutdown_rx => {
67 tracing::info!("Shutting down");
68 return Ok(());
69 }
70
71 result = listener.accept() => {
72 match result {
73 Ok((stream, _)) => {
74 let client_id = Uuid::new_v4();
75 let registry = self.workspace_registry.clone();
76 let client_count = Arc::clone(&client_count);
77 let last_activity = Arc::clone(&last_activity);
78
79 client_count.fetch_add(1, Ordering::Relaxed);
80 *last_activity.write().await = Instant::now();
81
82 spawn(async move {
83 handle_client(stream, registry, client_id).await;
84 client_count.fetch_sub(1, Ordering::Relaxed);
85 *last_activity.write().await = Instant::now();
86 tracing::debug!("Client {} handler complete", client_id);
87 });
88 }
89 Err(e) => {
90 tracing::warn!("Failed to accept connection: {}", e);
91 }
92 }
93 }
94
95 () = check_idle_timeout(client_count.clone(), last_activity.clone(), self.idle_timeout) => {
96 tracing::info!("Idle timeout reached, shutting down");
97 return Ok(());
98 }
99
100 () = check_workspace_liveness(&self.workspace_registry, Duration::from_secs(10)) => {
101 tracing::info!("All workspace roots deleted, shutting down");
102 return Ok(());
103 }
104 }
105 }
106 }
107}
108
109pub async fn run_daemon(socket_path: PathBuf, idle_timeout: Option<Duration>) -> DaemonResult<()> {
111 LspDaemon::new(socket_path, idle_timeout).run().await
112}
113
114async fn check_idle_timeout(
116 client_count: Arc<AtomicUsize>,
117 last_activity: Arc<RwLock<Instant>>,
118 timeout: Option<Duration>,
119) {
120 check_idle_timeout_with_interval(client_count, last_activity, timeout, Duration::from_secs(10)).await;
121}
122
123async fn check_idle_timeout_with_interval(
125 client_count: Arc<AtomicUsize>,
126 last_activity: Arc<RwLock<Instant>>,
127 timeout: Option<Duration>,
128 poll_interval: Duration,
129) {
130 let Some(timeout) = timeout else {
131 pending::<()>().await;
132 return;
133 };
134
135 loop {
136 sleep(poll_interval).await;
137
138 let count = client_count.load(Ordering::Relaxed);
139 if count > 0 {
140 continue;
141 }
142
143 let last = *last_activity.read().await;
144 if last.elapsed() >= timeout {
145 return;
146 }
147 }
148}
149
150fn all_roots_deleted(roots: &[PathBuf]) -> bool {
152 !roots.is_empty() && roots.iter().all(|root| !root.exists())
153}
154
155async fn check_workspace_liveness(workspace_registry: &WorkspaceRegistry, poll_interval: Duration) {
158 loop {
159 sleep(poll_interval).await;
160 let roots = workspace_registry.workspace_roots().await;
161 if all_roots_deleted(&roots) {
162 return;
163 }
164 }
165}
166
167fn spawn_shutdown_signal_handler() -> oneshot::Receiver<()> {
169 let (tx, rx) = oneshot::channel::<()>();
170
171 #[cfg(unix)]
172 {
173 use tokio::signal::unix::{SignalKind, signal};
174 spawn(async move {
175 let mut sigterm = signal(SignalKind::terminate()).expect("Failed to register SIGTERM handler");
176
177 let mut sigint = signal(SignalKind::interrupt()).expect("Failed to register SIGINT handler");
178
179 select! {
180 _ = sigterm.recv() => {
181 tracing::info!("Received SIGTERM");
182 }
183 _ = sigint.recv() => {
184 tracing::info!("Received SIGINT");
185 }
186 }
187 let _ = tx.send(());
188 });
189 }
190
191 rx
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197 use tokio::time::timeout;
198
199 #[tokio::test]
200 async fn idle_timeout_none_never_completes() {
201 let client_count = Arc::new(AtomicUsize::new(0));
202 let last_activity = Arc::new(RwLock::new(Instant::now()));
203
204 let result = timeout(
205 Duration::from_millis(40),
206 check_idle_timeout_with_interval(client_count, last_activity, None, Duration::from_millis(5)),
207 )
208 .await;
209
210 assert!(result.is_err(), "None timeout should not complete");
211 }
212
213 #[tokio::test]
214 async fn idle_timeout_completes_when_idle_elapsed() {
215 let client_count = Arc::new(AtomicUsize::new(0));
216 let stale_activity = Instant::now()
217 .checked_sub(Duration::from_millis(50))
218 .expect("subtracting from current instant should succeed");
219 let last_activity = Arc::new(RwLock::new(stale_activity));
220
221 let result = timeout(
222 Duration::from_millis(100),
223 check_idle_timeout_with_interval(
224 client_count,
225 last_activity,
226 Some(Duration::from_millis(10)),
227 Duration::from_millis(5),
228 ),
229 )
230 .await;
231
232 assert!(result.is_ok(), "Idle timeout should complete");
233 }
234
235 #[test]
236 fn all_roots_deleted_empty_returns_false() {
237 assert!(!all_roots_deleted(&[]));
238 }
239
240 #[test]
241 fn all_roots_deleted_existing_dir_returns_false() {
242 let dir = tempfile::tempdir().unwrap();
243 assert!(!all_roots_deleted(&[dir.path().to_path_buf()]));
244 }
245
246 #[test]
247 fn all_roots_deleted_nonexistent_returns_true() {
248 let gone = PathBuf::from("/tmp/aether-lspd-test-nonexistent-dir-that-does-not-exist");
249 assert!(all_roots_deleted(&[gone]));
250 }
251
252 #[test]
253 fn all_roots_deleted_mixed_returns_false() {
254 let dir = tempfile::tempdir().unwrap();
255 let gone = PathBuf::from("/tmp/aether-lspd-test-nonexistent-dir-that-does-not-exist");
256 assert!(!all_roots_deleted(&[dir.path().to_path_buf(), gone]));
257 }
258
259 #[test]
260 fn all_roots_deleted_after_tempdir_drop() {
261 let dir = tempfile::tempdir().unwrap();
262 let root = dir.path().to_path_buf();
263 assert!(!all_roots_deleted(std::slice::from_ref(&root)));
264 drop(dir);
265 assert!(all_roots_deleted(&[root]));
266 }
267}