Skip to main content

sqry_daemon/ipc/
server.rs

1//! IPC accept loop.
2//!
3//! Binds a UDS (Unix) or named pipe (Windows), accepts incoming
4//! connections, and spawns a per-connection handler task. Graceful
5//! shutdown is driven by a [`tokio_util::sync::CancellationToken`];
6//! after cancellation, the loop drains active connections bounded by
7//! [`crate::config::DaemonConfig::ipc_shutdown_drain_secs`].
8//!
9//! The two Unix bind branches (`RuntimeDir` vs `Configured`) implement
10//! the Phase 8a iter-1 B2 fix: runtime-dir paths are auto-managed
11//! (parent created 0700, stale socket removed after a liveness probe).
12//! Configured paths also auto-unlink stale sockets after a liveness
13//! probe confirms no process is listening — this is required for
14//! auto-start to work after a daemon stop. Live sockets are never
15//! touched: the daemon refuses to bind if a live daemon is already
16//! listening. Non-socket files at the configured path are always
17//! rejected.
18
19use std::io;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::time::{Duration, Instant};
24
25use anyhow::anyhow;
26use sqry_core::query::executor::QueryExecutor;
27use tokio_util::sync::CancellationToken;
28
29use crate::config::{DaemonConfig, ENV_SOCKET_PATH};
30use crate::error::{DaemonError, DaemonResult};
31use crate::rebuild::RebuildDispatcher;
32use crate::workspace::{WorkspaceBuilder, WorkspaceManager};
33
34use super::methods::HandlerContext;
35use super::router::run_connection;
36use super::shim_registry::ShimRegistry;
37
38/// Top-level IPC server handle. Construct with [`Self::bind`] then
39/// drive with [`Self::run`].
40pub struct IpcServer {
41    listener: Listener,
42    socket_path: PathBuf,
43    manager: Arc<WorkspaceManager>,
44    dispatcher: Arc<RebuildDispatcher>,
45    workspace_builder: Arc<dyn WorkspaceBuilder>,
46    tool_executor: Arc<QueryExecutor>,
47    shim_registry: Arc<ShimRegistry>,
48    shutdown: CancellationToken,
49    active_connections: Arc<AtomicU64>,
50    config: Arc<DaemonConfig>,
51    daemon_version: &'static str,
52}
53
54impl std::fmt::Debug for IpcServer {
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("IpcServer")
57            .field("socket_path", &self.socket_path)
58            .field("daemon_version", &self.daemon_version)
59            .finish_non_exhaustive()
60    }
61}
62
63impl IpcServer {
64    /// Bind the server. Unix: UnixListener with the two-branch policy;
65    /// Windows: NamedPipeServer with explicit options.
66    pub async fn bind(
67        config: Arc<DaemonConfig>,
68        manager: Arc<WorkspaceManager>,
69        dispatcher: Arc<RebuildDispatcher>,
70        workspace_builder: Arc<dyn WorkspaceBuilder>,
71        tool_executor: Arc<QueryExecutor>,
72        shutdown: CancellationToken,
73    ) -> DaemonResult<Self> {
74        let socket_path = config.socket_path();
75        let listener = Listener::bind(&config, &socket_path).await?;
76        Ok(Self {
77            listener,
78            socket_path,
79            manager,
80            dispatcher,
81            workspace_builder,
82            tool_executor,
83            shim_registry: ShimRegistry::new(),
84            shutdown,
85            active_connections: Arc::new(AtomicU64::new(0)),
86            config,
87            daemon_version: env!("CARGO_PKG_VERSION"),
88        })
89    }
90
91    /// Returns the bound socket path (Unix) or named-pipe name
92    /// (Windows).
93    #[must_use]
94    pub fn socket_path(&self) -> &Path {
95        &self.socket_path
96    }
97
98    /// Return a shared handle to the shim-connection registry.
99    ///
100    /// Task 9's bootstrap path surfaces the count via `daemon/status`,
101    /// and the Phase 8c router / MCP host register shim connections
102    /// through this `Arc`. The registry's internal state is guarded by
103    /// a `parking_lot::Mutex`, so callers must not hold the returned
104    /// `Arc` "actively" (i.e., inside a `.lock()` scope) across
105    /// long-running awaits — see [`ShimRegistry::len`] and
106    /// [`ShimRegistry::is_empty`] for the snapshot-under-lock
107    /// accessors.
108    #[must_use]
109    pub fn shim_registry(&self) -> Arc<ShimRegistry> {
110        Arc::clone(&self.shim_registry)
111    }
112
113    /// Accept loop. Returns when the shutdown token fires.
114    pub async fn run(self) -> DaemonResult<()> {
115        let Self {
116            mut listener,
117            manager,
118            dispatcher,
119            workspace_builder,
120            tool_executor,
121            shim_registry,
122            shutdown,
123            active_connections,
124            config,
125            daemon_version,
126            ..
127        } = self;
128
129        loop {
130            tokio::select! {
131                biased;
132                () = shutdown.cancelled() => {
133                    tracing::info!(
134                        "ipc_server: shutdown requested; draining active connections"
135                    );
136                    break;
137                }
138                res = listener.accept() => match res {
139                    Ok(stream) => {
140                        let ctx = HandlerContext {
141                            manager: Arc::clone(&manager),
142                            dispatcher: Arc::clone(&dispatcher),
143                            workspace_builder: Arc::clone(&workspace_builder),
144                            tool_executor: Arc::clone(&tool_executor),
145                            shim_registry: Arc::clone(&shim_registry),
146                            shutdown: shutdown.clone(),
147                            config: Arc::clone(&config),
148                            daemon_version,
149                        };
150                        active_connections.fetch_add(1, Ordering::AcqRel);
151                        let tracker = Arc::clone(&active_connections);
152                        tokio::spawn(async move {
153                            let conn_result = match stream {
154                                #[cfg(unix)]
155                                AcceptedStream::Unix(s) => run_connection(s, ctx).await,
156                                #[cfg(windows)]
157                                AcceptedStream::Pipe(s) => run_connection(s, ctx).await,
158                            };
159                            if let Err(e) = conn_result {
160                                tracing::debug!(error = %e,
161                                    "ipc_server: connection terminated with error");
162                            }
163                            tracker.fetch_sub(1, Ordering::AcqRel);
164                        });
165                    }
166                    Err(e) => {
167                        tracing::warn!(error = %e,
168                            "ipc_server: accept failed; continuing");
169                        tokio::time::sleep(Duration::from_millis(100)).await;
170                    }
171                }
172            }
173        }
174
175        // Drain phase.
176        let deadline = Instant::now() + Duration::from_secs(config.ipc_shutdown_drain_secs);
177        while Instant::now() < deadline && active_connections.load(Ordering::Acquire) > 0 {
178            tokio::time::sleep(Duration::from_millis(50)).await;
179        }
180        let lingering = active_connections.load(Ordering::Acquire);
181        if lingering > 0 {
182            tracing::warn!(
183                lingering,
184                "ipc_server: {} connections still active at drain deadline",
185                lingering
186            );
187        }
188        Ok(())
189    }
190}
191
192// ---------------------------------------------------------------------------
193// Accepted-stream enum + Listener.
194// ---------------------------------------------------------------------------
195
196enum AcceptedStream {
197    #[cfg(unix)]
198    Unix(tokio::net::UnixStream),
199    #[cfg(windows)]
200    Pipe(tokio::net::windows::named_pipe::NamedPipeServer),
201}
202
203#[cfg(unix)]
204enum Listener {
205    Unix(tokio::net::UnixListener),
206}
207
208#[cfg(windows)]
209enum Listener {
210    Pipe(WindowsPipeAcceptor),
211}
212
213impl Listener {
214    async fn bind(cfg: &DaemonConfig, path: &Path) -> DaemonResult<Self> {
215        #[cfg(unix)]
216        {
217            let l = bind_unix(cfg, path).await?;
218            Ok(Listener::Unix(l))
219        }
220        #[cfg(windows)]
221        {
222            let _ = cfg; // consumed here once for the Windows branch
223            let name = path.to_string_lossy().into_owned();
224            let acceptor = WindowsPipeAcceptor::new(name)?;
225            Ok(Listener::Pipe(acceptor))
226        }
227    }
228
229    async fn accept(&mut self) -> io::Result<AcceptedStream> {
230        match self {
231            #[cfg(unix)]
232            Self::Unix(l) => {
233                let (s, _addr) = l.accept().await?;
234                Ok(AcceptedStream::Unix(s))
235            }
236            #[cfg(windows)]
237            Self::Pipe(a) => {
238                let s = a.accept().await?;
239                Ok(AcceptedStream::Pipe(s))
240            }
241        }
242    }
243}
244
245// ---------------------------------------------------------------------------
246// Unix bind (two-branch policy).
247// ---------------------------------------------------------------------------
248
249#[cfg(unix)]
250enum UnixBindMode {
251    RuntimeDir,
252    Configured,
253}
254
255#[cfg(unix)]
256fn classify_bind_mode(cfg: &DaemonConfig) -> UnixBindMode {
257    if cfg.socket.path.is_some() || std::env::var_os(ENV_SOCKET_PATH).is_some() {
258        UnixBindMode::Configured
259    } else {
260        UnixBindMode::RuntimeDir
261    }
262}
263
264#[cfg(unix)]
265async fn bind_unix(cfg: &DaemonConfig, path: &Path) -> DaemonResult<tokio::net::UnixListener> {
266    match classify_bind_mode(cfg) {
267        UnixBindMode::RuntimeDir => bind_unix_runtime(path).await,
268        UnixBindMode::Configured => bind_unix_configured(path).await,
269    }
270}
271
272#[cfg(unix)]
273async fn bind_unix_runtime(path: &Path) -> DaemonResult<tokio::net::UnixListener> {
274    use std::os::unix::fs::PermissionsExt;
275    if let Some(parent) = path.parent() {
276        std::fs::create_dir_all(parent)?;
277        std::fs::set_permissions(parent, std::fs::Permissions::from_mode(0o700))?;
278    }
279    remove_stale_socket_if_dead(path).await?;
280    let listener = tokio::net::UnixListener::bind(path)?;
281    std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
282    Ok(listener)
283}
284
285#[cfg(unix)]
286async fn bind_unix_configured(path: &Path) -> DaemonResult<tokio::net::UnixListener> {
287    use std::os::unix::fs::{FileTypeExt, PermissionsExt};
288    match std::fs::symlink_metadata(path) {
289        Ok(meta) if meta.file_type().is_socket() => {
290            if probe_socket_alive(path).await {
291                return Err(DaemonError::Config {
292                    path: path.to_path_buf(),
293                    source: anyhow!("socket path already in use by a live daemon"),
294                });
295            }
296            // Stale socket: liveness probe confirmed no process is listening.
297            // Safe to unlink and rebind regardless of how the path was
298            // configured — the prior daemon is gone.
299            tracing::warn!(
300                path = %path.display(),
301                "stale socket detected at configured path; unlinking and rebinding"
302            );
303            std::fs::remove_file(path)?;
304        }
305        Ok(_) => {
306            return Err(DaemonError::Config {
307                path: path.to_path_buf(),
308                source: anyhow!("configured socket path exists and is not a socket"),
309            });
310        }
311        Err(e) if e.kind() == io::ErrorKind::NotFound => {}
312        Err(e) => return Err(DaemonError::Io(e)),
313    }
314    let listener = tokio::net::UnixListener::bind(path)?;
315    std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
316    Ok(listener)
317}
318
319#[cfg(unix)]
320async fn remove_stale_socket_if_dead(path: &Path) -> DaemonResult<()> {
321    use std::os::unix::fs::FileTypeExt;
322    match std::fs::symlink_metadata(path) {
323        Ok(meta) if meta.file_type().is_socket() => {
324            if probe_socket_alive(path).await {
325                return Err(DaemonError::Config {
326                    path: path.to_path_buf(),
327                    source: anyhow!("socket path already in use by a live daemon"),
328                });
329            }
330            std::fs::remove_file(path)?;
331        }
332        Ok(_) => {
333            return Err(DaemonError::Config {
334                path: path.to_path_buf(),
335                source: anyhow!("runtime path exists and is not a socket"),
336            });
337        }
338        Err(e) if e.kind() == io::ErrorKind::NotFound => {}
339        Err(e) => return Err(DaemonError::Io(e)),
340    }
341    Ok(())
342}
343
344/// Hard deadline for the async UDS liveness probe. Loopback UDS
345/// handshakes complete in sub-millisecond-to-~1 ms under normal load;
346/// 100 ms is comfortably above that budget while still short enough
347/// that a wedged kernel path (ptrace target, frozen filesystem,
348/// signal-paused peer) does not stall daemon startup. Kernel-level
349/// unresponsiveness classifies the path as "not a live daemon" and
350/// yields to the refuse/unlink fallback.
351#[cfg(unix)]
352const PROBE_TIMEOUT: Duration = Duration::from_millis(100);
353
354/// Async liveness probe for a UDS path.
355///
356/// Returns `true` if a process accepts a UDS connection at `path`
357/// within [`PROBE_TIMEOUT`]; `false` otherwise (stale-socket,
358/// `ENOENT`, or kernel stall past the deadline). Uses tokio's async
359/// UDS connect so the probe never blocks the Tokio reactor — the
360/// future yields to the runtime while the kernel drives the connect
361/// handshake.
362///
363/// On a successful probe the returned `UnixStream` is dropped
364/// immediately: closing the connection is the correct signal to the
365/// peer that this was a liveness ping, not a real client. Remote-peer
366/// RST logs on a healthy daemon are a benign consequence.
367#[cfg(unix)]
368async fn probe_socket_alive(path: &Path) -> bool {
369    match tokio::time::timeout(PROBE_TIMEOUT, tokio::net::UnixStream::connect(path)).await {
370        Ok(Ok(stream)) => {
371            // Explicit drop: the close is the probe's "hang up"
372            // signal to the peer. Keep the drop inline for clarity —
373            // relying on end-of-arm drop works, but an explicit drop
374            // documents the intent.
375            drop(stream);
376            true
377        }
378        Ok(Err(_)) => false,    // ECONNREFUSED / ENOENT / other
379        Err(_elapsed) => false, // kernel stall past deadline
380    }
381}
382
383// ---------------------------------------------------------------------------
384// Windows named-pipe acceptor.
385// ---------------------------------------------------------------------------
386
387#[cfg(windows)]
388struct WindowsPipeAcceptor {
389    name: String,
390    next: Option<tokio::net::windows::named_pipe::NamedPipeServer>,
391}
392
393#[cfg(windows)]
394impl WindowsPipeAcceptor {
395    fn new(name: String) -> io::Result<Self> {
396        let full = pipe_fullname(&name);
397        let next = Some(create_pipe_instance(&full, true)?);
398        Ok(Self { name: full, next })
399    }
400
401    async fn accept(&mut self) -> io::Result<tokio::net::windows::named_pipe::NamedPipeServer> {
402        let server = self.next.take().ok_or_else(|| {
403            io::Error::other("pipe acceptor in invalid state: no pending instance")
404        })?;
405        server.connect().await?;
406        self.next = Some(create_pipe_instance(&self.name, false)?);
407        Ok(server)
408    }
409}
410
411#[cfg(windows)]
412fn pipe_fullname(name: &str) -> String {
413    if name.starts_with(r"\\.\pipe\") {
414        name.to_owned()
415    } else {
416        format!(r"\\.\pipe\{name}")
417    }
418}
419
420#[cfg(windows)]
421fn create_pipe_instance(
422    full_name: &str,
423    first: bool,
424) -> io::Result<tokio::net::windows::named_pipe::NamedPipeServer> {
425    use tokio::net::windows::named_pipe::{PipeMode, ServerOptions};
426    ServerOptions::new()
427        .first_pipe_instance(first)
428        .reject_remote_clients(true)
429        .pipe_mode(PipeMode::Byte)
430        .max_instances(255)
431        .access_inbound(true)
432        .access_outbound(true)
433        .create(full_name)
434}