Skip to main content

aether_lspd/
daemon.rs

1use crate::client_connection::handle_client;
2use crate::error::{DaemonError, DaemonResult};
3use crate::pid_lockfile::PidLockfile;
4use crate::workspace_registry::WorkspaceRegistry;
5use std::fs::{create_dir_all, remove_file};
6use std::future::pending;
7use std::path::PathBuf;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::time::{Duration, Instant};
11use tokio::net::UnixListener;
12use tokio::select;
13use tokio::spawn;
14use tokio::sync::{RwLock, oneshot};
15use tokio::time::sleep;
16use uuid::Uuid;
17
18#[doc = include_str!("docs/daemon.md")]
19pub struct LspDaemon {
20    socket_path: PathBuf,
21    idle_timeout: Option<Duration>,
22    workspace_registry: WorkspaceRegistry,
23}
24
25impl LspDaemon {
26    /// Create a daemon with socket and idle-timeout settings.
27    pub fn new(socket_path: PathBuf, idle_timeout: Option<Duration>) -> Self {
28        Self { socket_path, idle_timeout, workspace_registry: WorkspaceRegistry::new() }
29    }
30
31    /// Run the daemon until shutdown.
32    pub async fn run(self) -> DaemonResult<()> {
33        if let Some(parent) = self.socket_path.parent() {
34            create_dir_all(parent).map_err(DaemonError::Io)?;
35        }
36
37        let _lockfile = PidLockfile::acquire(&self.socket_path.with_extension("lock"))
38            .map_err(|e| DaemonError::LockfileError(e.to_string()))?;
39
40        let _ = remove_file(&self.socket_path);
41
42        let shutdown_rx = spawn_shutdown_signal_handler();
43
44        tracing::info!("Daemon listening on {:?}", self.socket_path);
45        self.run_listener_loop(shutdown_rx).await?;
46
47        tracing::info!("Shutting down LSP servers");
48        self.workspace_registry.shutdown().await;
49
50        let _ = remove_file(&self.socket_path);
51        tracing::info!("Daemon shutdown complete");
52
53        Ok(())
54    }
55
56    /// Main listener loop that handles connections and shutdown signals.
57    async fn run_listener_loop(&self, mut shutdown_rx: oneshot::Receiver<()>) -> DaemonResult<()> {
58        let listener = UnixListener::bind(&self.socket_path).map_err(DaemonError::BindFailed)?;
59        let client_count = Arc::new(AtomicUsize::new(0));
60        let last_activity = Arc::new(RwLock::new(Instant::now()));
61
62        loop {
63            select! {
64                biased;
65
66                _ = &mut shutdown_rx => {
67                    tracing::info!("Shutting down");
68                    return Ok(());
69                }
70
71                result = listener.accept() => {
72                    match result {
73                        Ok((stream, _)) => {
74                            let client_id = Uuid::new_v4();
75                            let registry = self.workspace_registry.clone();
76                            let client_count = Arc::clone(&client_count);
77                            let last_activity = Arc::clone(&last_activity);
78
79                            client_count.fetch_add(1, Ordering::Relaxed);
80                            *last_activity.write().await = Instant::now();
81
82                            spawn(async move {
83                                handle_client(stream, registry, client_id).await;
84                                client_count.fetch_sub(1, Ordering::Relaxed);
85                                *last_activity.write().await = Instant::now();
86                                tracing::debug!("Client {} handler complete", client_id);
87                            });
88                        }
89                        Err(e) => {
90                            tracing::warn!("Failed to accept connection: {}", e);
91                        }
92                    }
93                }
94
95                () = check_idle_timeout(client_count.clone(), last_activity.clone(), self.idle_timeout) => {
96                    tracing::info!("Idle timeout reached, shutting down");
97                    return Ok(());
98                }
99
100                () = check_workspace_liveness(&self.workspace_registry, Duration::from_secs(10)) => {
101                    tracing::info!("All workspace roots deleted, shutting down");
102                    return Ok(());
103                }
104            }
105        }
106    }
107}
108
109/// Compatibility wrapper for the previous daemon entrypoint.
110pub async fn run_daemon(socket_path: PathBuf, idle_timeout: Option<Duration>) -> DaemonResult<()> {
111    LspDaemon::new(socket_path, idle_timeout).run().await
112}
113
114/// Wait until idle timeout is reached
115async fn check_idle_timeout(
116    client_count: Arc<AtomicUsize>,
117    last_activity: Arc<RwLock<Instant>>,
118    timeout: Option<Duration>,
119) {
120    check_idle_timeout_with_interval(client_count, last_activity, timeout, Duration::from_secs(10)).await;
121}
122
123/// Wait until idle timeout is reached, polling at a configurable interval.
124async fn check_idle_timeout_with_interval(
125    client_count: Arc<AtomicUsize>,
126    last_activity: Arc<RwLock<Instant>>,
127    timeout: Option<Duration>,
128    poll_interval: Duration,
129) {
130    let Some(timeout) = timeout else {
131        pending::<()>().await;
132        return;
133    };
134
135    loop {
136        sleep(poll_interval).await;
137
138        let count = client_count.load(Ordering::Relaxed);
139        if count > 0 {
140            continue;
141        }
142
143        let last = *last_activity.read().await;
144        if last.elapsed() >= timeout {
145            return;
146        }
147    }
148}
149
150/// Returns `true` when all roots are non-existent and the list is non-empty.
151fn all_roots_deleted(roots: &[PathBuf]) -> bool {
152    !roots.is_empty() && roots.iter().all(|root| !root.exists())
153}
154
155/// Resolves when every workspace root managed by `lsp_manager` has been deleted
156/// from disk. Polls at `poll_interval`.
157async fn check_workspace_liveness(workspace_registry: &WorkspaceRegistry, poll_interval: Duration) {
158    loop {
159        sleep(poll_interval).await;
160        let roots = workspace_registry.workspace_roots().await;
161        if all_roots_deleted(&roots) {
162            return;
163        }
164    }
165}
166
167/// Spawn a task to handle shutdown signals (SIGTERM, SIGINT)
168fn spawn_shutdown_signal_handler() -> oneshot::Receiver<()> {
169    let (tx, rx) = oneshot::channel::<()>();
170
171    #[cfg(unix)]
172    {
173        use tokio::signal::unix::{SignalKind, signal};
174        spawn(async move {
175            let mut sigterm = signal(SignalKind::terminate()).expect("Failed to register SIGTERM handler");
176
177            let mut sigint = signal(SignalKind::interrupt()).expect("Failed to register SIGINT handler");
178
179            select! {
180                _ = sigterm.recv() => {
181                    tracing::info!("Received SIGTERM");
182                }
183                _ = sigint.recv() => {
184                    tracing::info!("Received SIGINT");
185                }
186            }
187            let _ = tx.send(());
188        });
189    }
190
191    rx
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197    use tokio::time::timeout;
198
199    #[tokio::test]
200    async fn idle_timeout_none_never_completes() {
201        let client_count = Arc::new(AtomicUsize::new(0));
202        let last_activity = Arc::new(RwLock::new(Instant::now()));
203
204        let result = timeout(
205            Duration::from_millis(40),
206            check_idle_timeout_with_interval(client_count, last_activity, None, Duration::from_millis(5)),
207        )
208        .await;
209
210        assert!(result.is_err(), "None timeout should not complete");
211    }
212
213    #[tokio::test]
214    async fn idle_timeout_completes_when_idle_elapsed() {
215        let client_count = Arc::new(AtomicUsize::new(0));
216        let stale_activity = Instant::now()
217            .checked_sub(Duration::from_millis(50))
218            .expect("subtracting from current instant should succeed");
219        let last_activity = Arc::new(RwLock::new(stale_activity));
220
221        let result = timeout(
222            Duration::from_millis(100),
223            check_idle_timeout_with_interval(
224                client_count,
225                last_activity,
226                Some(Duration::from_millis(10)),
227                Duration::from_millis(5),
228            ),
229        )
230        .await;
231
232        assert!(result.is_ok(), "Idle timeout should complete");
233    }
234
235    #[test]
236    fn all_roots_deleted_empty_returns_false() {
237        assert!(!all_roots_deleted(&[]));
238    }
239
240    #[test]
241    fn all_roots_deleted_existing_dir_returns_false() {
242        let dir = tempfile::tempdir().unwrap();
243        assert!(!all_roots_deleted(&[dir.path().to_path_buf()]));
244    }
245
246    #[test]
247    fn all_roots_deleted_nonexistent_returns_true() {
248        let gone = PathBuf::from("/tmp/aether-lspd-test-nonexistent-dir-that-does-not-exist");
249        assert!(all_roots_deleted(&[gone]));
250    }
251
252    #[test]
253    fn all_roots_deleted_mixed_returns_false() {
254        let dir = tempfile::tempdir().unwrap();
255        let gone = PathBuf::from("/tmp/aether-lspd-test-nonexistent-dir-that-does-not-exist");
256        assert!(!all_roots_deleted(&[dir.path().to_path_buf(), gone]));
257    }
258
259    #[test]
260    fn all_roots_deleted_after_tempdir_drop() {
261        let dir = tempfile::tempdir().unwrap();
262        let root = dir.path().to_path_buf();
263        assert!(!all_roots_deleted(std::slice::from_ref(&root)));
264        drop(dir);
265        assert!(all_roots_deleted(&[root]));
266    }
267}