Skip to main content

sqry_daemon/ipc/
server.rs

1//! IPC accept loop.
2//!
3//! Binds a UDS (Unix) or named pipe (Windows), accepts incoming
4//! connections, and spawns a per-connection handler task. Graceful
5//! shutdown is driven by a [`tokio_util::sync::CancellationToken`];
6//! after cancellation, the loop drains active connections bounded by
7//! [`crate::config::DaemonConfig::ipc_shutdown_drain_secs`].
8//!
9//! The two Unix bind branches (`RuntimeDir` vs `Configured`) implement
10//! the Phase 8a iter-1 B2 fix: runtime-dir paths are auto-managed
11//! (parent created 0700, stale socket removed after a liveness probe).
12//! Configured paths also auto-unlink stale sockets after a liveness
13//! probe confirms no process is listening — this is required for
14//! auto-start to work after a daemon stop. Live sockets are never
15//! touched: the daemon refuses to bind if a live daemon is already
16//! listening. Non-socket files at the configured path are always
17//! rejected.
18
19use std::io;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::time::{Duration, Instant};
24
25#[cfg(unix)]
26use anyhow::anyhow;
27use sqry_core::query::executor::QueryExecutor;
28use tokio_util::sync::CancellationToken;
29
30use crate::config::DaemonConfig;
31#[cfg(unix)]
32use crate::config::ENV_SOCKET_PATH;
33#[cfg(unix)]
34use crate::error::DaemonError;
35use crate::error::DaemonResult;
36use crate::rebuild::RebuildDispatcher;
37use crate::workspace::{WorkspaceBuilder, WorkspaceManager};
38
39use super::methods::HandlerContext;
40use super::router::run_connection;
41use super::shim_registry::ShimRegistry;
42
43/// Top-level IPC server handle. Construct with [`Self::bind`] then
44/// drive with [`Self::run`].
45pub struct IpcServer {
46    listener: Listener,
47    socket_path: PathBuf,
48    manager: Arc<WorkspaceManager>,
49    dispatcher: Arc<RebuildDispatcher>,
50    workspace_builder: Arc<dyn WorkspaceBuilder>,
51    tool_executor: Arc<QueryExecutor>,
52    shim_registry: Arc<ShimRegistry>,
53    shutdown: CancellationToken,
54    active_connections: Arc<AtomicU64>,
55    config: Arc<DaemonConfig>,
56    daemon_version: &'static str,
57}
58
59impl std::fmt::Debug for IpcServer {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct("IpcServer")
62            .field("socket_path", &self.socket_path)
63            .field("daemon_version", &self.daemon_version)
64            .finish_non_exhaustive()
65    }
66}
67
68impl IpcServer {
69    /// Bind the server. Unix: UnixListener with the two-branch policy;
70    /// Windows: NamedPipeServer with explicit options.
71    pub async fn bind(
72        config: Arc<DaemonConfig>,
73        manager: Arc<WorkspaceManager>,
74        dispatcher: Arc<RebuildDispatcher>,
75        workspace_builder: Arc<dyn WorkspaceBuilder>,
76        tool_executor: Arc<QueryExecutor>,
77        shutdown: CancellationToken,
78    ) -> DaemonResult<Self> {
79        let socket_path = config.socket_path();
80        // Cluster-G §5.2 — pre-flight the socket parent directory so a
81        // missing or unwritable parent surfaces as a typed
82        // `DaemonError::SocketSetup` (-32007) with copy-paste recovery
83        // text rather than a generic `EACCES` from the bind syscall.
84        #[cfg(unix)]
85        ensure_socket_parent_writable(&socket_path)?;
86        let listener = Listener::bind(&config, &socket_path).await?;
87        Ok(Self {
88            listener,
89            socket_path,
90            manager,
91            dispatcher,
92            workspace_builder,
93            tool_executor,
94            shim_registry: ShimRegistry::new(),
95            shutdown,
96            active_connections: Arc::new(AtomicU64::new(0)),
97            config,
98            daemon_version: env!("CARGO_PKG_VERSION"),
99        })
100    }
101
102    /// Returns the bound socket path (Unix) or named-pipe name
103    /// (Windows).
104    #[must_use]
105    pub fn socket_path(&self) -> &Path {
106        &self.socket_path
107    }
108
109    /// Return a shared handle to the shim-connection registry.
110    ///
111    /// Task 9's bootstrap path surfaces the count via `daemon/status`,
112    /// and the Phase 8c router / MCP host register shim connections
113    /// through this `Arc`. The registry's internal state is guarded by
114    /// a `parking_lot::Mutex`, so callers must not hold the returned
115    /// `Arc` "actively" (i.e., inside a `.lock()` scope) across
116    /// long-running awaits — see [`ShimRegistry::len`] and
117    /// [`ShimRegistry::is_empty`] for the snapshot-under-lock
118    /// accessors.
119    #[must_use]
120    pub fn shim_registry(&self) -> Arc<ShimRegistry> {
121        Arc::clone(&self.shim_registry)
122    }
123
124    /// Accept loop. Returns when the shutdown token fires.
125    pub async fn run(self) -> DaemonResult<()> {
126        let Self {
127            mut listener,
128            manager,
129            dispatcher,
130            workspace_builder,
131            tool_executor,
132            shim_registry,
133            shutdown,
134            active_connections,
135            config,
136            daemon_version,
137            ..
138        } = self;
139
140        loop {
141            tokio::select! {
142                biased;
143                () = shutdown.cancelled() => {
144                    tracing::info!(
145                        "ipc_server: shutdown requested; draining active connections"
146                    );
147                    break;
148                }
149                res = listener.accept() => match res {
150                    Ok(stream) => {
151                        let ctx = HandlerContext {
152                            manager: Arc::clone(&manager),
153                            dispatcher: Arc::clone(&dispatcher),
154                            workspace_builder: Arc::clone(&workspace_builder),
155                            tool_executor: Arc::clone(&tool_executor),
156                            shim_registry: Arc::clone(&shim_registry),
157                            shutdown: shutdown.clone(),
158                            config: Arc::clone(&config),
159                            daemon_version,
160                        };
161                        active_connections.fetch_add(1, Ordering::AcqRel);
162                        let tracker = Arc::clone(&active_connections);
163                        tokio::spawn(async move {
164                            let conn_result = match stream {
165                                #[cfg(unix)]
166                                AcceptedStream::Unix(s) => run_connection(s, ctx).await,
167                                #[cfg(windows)]
168                                AcceptedStream::Pipe(s) => run_connection(s, ctx).await,
169                            };
170                            if let Err(e) = conn_result {
171                                tracing::debug!(error = %e,
172                                    "ipc_server: connection terminated with error");
173                            }
174                            tracker.fetch_sub(1, Ordering::AcqRel);
175                        });
176                    }
177                    Err(e) => {
178                        tracing::warn!(error = %e,
179                            "ipc_server: accept failed; continuing");
180                        tokio::time::sleep(Duration::from_millis(100)).await;
181                    }
182                }
183            }
184        }
185
186        // Drain phase.
187        let deadline = Instant::now() + Duration::from_secs(config.ipc_shutdown_drain_secs);
188        while Instant::now() < deadline && active_connections.load(Ordering::Acquire) > 0 {
189            tokio::time::sleep(Duration::from_millis(50)).await;
190        }
191        let lingering = active_connections.load(Ordering::Acquire);
192        if lingering > 0 {
193            tracing::warn!(
194                lingering,
195                "ipc_server: {} connections still active at drain deadline",
196                lingering
197            );
198        }
199        Ok(())
200    }
201}
202
203// ---------------------------------------------------------------------------
204// Socket parent directory pre-flight (cluster-G §5.2).
205// ---------------------------------------------------------------------------
206
207/// Ensure the socket path's parent directory exists and is writable
208/// before the daemon attempts to bind. Surfaces a typed
209/// [`DaemonError::SocketSetup`] (`-32007`) with copy-paste recovery
210/// text instead of letting the bind syscall return a generic `EACCES`.
211///
212/// Called from [`IpcServer::bind`] on Unix only. Windows named-pipe
213/// paths (`\\.\pipe\<name>`) have no filesystem parent to validate;
214/// they go through the existing pipe-creation error path.
215#[cfg(unix)]
216fn ensure_socket_parent_writable(socket_path: &Path) -> DaemonResult<()> {
217    let parent = socket_path
218        .parent()
219        .ok_or_else(|| DaemonError::SocketSetup {
220            path: socket_path.to_path_buf(),
221            reason: "socket path has no parent directory".to_string(),
222        })?;
223    if let Err(e) = std::fs::create_dir_all(parent) {
224        return Err(DaemonError::SocketSetup {
225            path: socket_path.to_path_buf(),
226            reason: format!(
227                "cannot create socket parent {}: {e}. \
228                 Hint: set SQRY_DAEMON_SOCKET to a user-writable path \
229                 (e.g. $XDG_RUNTIME_DIR/sqry/sqryd.sock or $TMPDIR/sqryd.sock).",
230                parent.display(),
231            ),
232        });
233    }
234    // Probe writability with a `create_new(true)` open so the call
235    // refuses to follow a pre-existing symlink under the probe path
236    // (cluster-G iter-2 — codex iter-1 review flagged that
237    // `fs::write` follows symlinks and truncates an existing file,
238    // which lets a writable socket parent leak the probe write to an
239    // unrelated location). The probe filename includes pid + nanos
240    // so two daemon-start attempts can't race on the same name.
241    use std::fs::OpenOptions;
242    use std::time::{SystemTime, UNIX_EPOCH};
243    let nanos = SystemTime::now()
244        .duration_since(UNIX_EPOCH)
245        .map(|d| d.subsec_nanos())
246        .unwrap_or(0);
247    let probe = parent.join(format!(".sqryd-probe-{}-{nanos:09}", std::process::id()));
248    let probe_outcome = OpenOptions::new().write(true).create_new(true).open(&probe);
249    match probe_outcome {
250        Ok(_file) => {
251            // RAII: drop closes; we then delete. Best-effort delete —
252            // an EACCES here would be unusual after a successful
253            // create, but if it happens, we leave the empty probe
254            // file rather than abort startup.
255            let _ = std::fs::remove_file(&probe);
256            Ok(())
257        }
258        Err(e) => {
259            // SAFETY: getuid() is always safe to call on Unix.
260            let uid: u32 = unsafe { libc::getuid() };
261            Err(DaemonError::SocketSetup {
262                path: socket_path.to_path_buf(),
263                reason: format!(
264                    "socket parent {} is not writable by uid {}: {e}. \
265                     Either change ownership, or set SQRY_DAEMON_SOCKET \
266                     to a directory you own.",
267                    parent.display(),
268                    uid,
269                ),
270            })
271        }
272    }
273}
274
275// ---------------------------------------------------------------------------
276// Accepted-stream enum + Listener.
277// ---------------------------------------------------------------------------
278
279enum AcceptedStream {
280    #[cfg(unix)]
281    Unix(tokio::net::UnixStream),
282    #[cfg(windows)]
283    Pipe(tokio::net::windows::named_pipe::NamedPipeServer),
284}
285
286#[cfg(unix)]
287enum Listener {
288    Unix(tokio::net::UnixListener),
289}
290
291#[cfg(windows)]
292enum Listener {
293    Pipe(WindowsPipeAcceptor),
294}
295
296impl Listener {
297    async fn bind(cfg: &DaemonConfig, path: &Path) -> DaemonResult<Self> {
298        #[cfg(unix)]
299        {
300            let l = bind_unix(cfg, path).await?;
301            Ok(Listener::Unix(l))
302        }
303        #[cfg(windows)]
304        {
305            let _ = cfg; // consumed here once for the Windows branch
306            let name = path.to_string_lossy().into_owned();
307            let acceptor = WindowsPipeAcceptor::new(name)?;
308            Ok(Listener::Pipe(acceptor))
309        }
310    }
311
312    async fn accept(&mut self) -> io::Result<AcceptedStream> {
313        match self {
314            #[cfg(unix)]
315            Self::Unix(l) => {
316                let (s, _addr) = l.accept().await?;
317                Ok(AcceptedStream::Unix(s))
318            }
319            #[cfg(windows)]
320            Self::Pipe(a) => {
321                let s = a.accept().await?;
322                Ok(AcceptedStream::Pipe(s))
323            }
324        }
325    }
326}
327
328// ---------------------------------------------------------------------------
329// Unix bind (two-branch policy).
330// ---------------------------------------------------------------------------
331
332#[cfg(unix)]
333enum UnixBindMode {
334    RuntimeDir,
335    Configured,
336}
337
338#[cfg(unix)]
339fn classify_bind_mode(cfg: &DaemonConfig) -> UnixBindMode {
340    if cfg.socket.path.is_some() || std::env::var_os(ENV_SOCKET_PATH).is_some() {
341        UnixBindMode::Configured
342    } else {
343        UnixBindMode::RuntimeDir
344    }
345}
346
347#[cfg(unix)]
348async fn bind_unix(cfg: &DaemonConfig, path: &Path) -> DaemonResult<tokio::net::UnixListener> {
349    match classify_bind_mode(cfg) {
350        UnixBindMode::RuntimeDir => bind_unix_runtime(path).await,
351        UnixBindMode::Configured => bind_unix_configured(path).await,
352    }
353}
354
355#[cfg(unix)]
356async fn bind_unix_runtime(path: &Path) -> DaemonResult<tokio::net::UnixListener> {
357    use std::os::unix::fs::PermissionsExt;
358    if let Some(parent) = path.parent() {
359        std::fs::create_dir_all(parent)?;
360        std::fs::set_permissions(parent, std::fs::Permissions::from_mode(0o700))?;
361    }
362    remove_stale_socket_if_dead(path).await?;
363    let listener = tokio::net::UnixListener::bind(path)?;
364    std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
365    Ok(listener)
366}
367
368#[cfg(unix)]
369async fn bind_unix_configured(path: &Path) -> DaemonResult<tokio::net::UnixListener> {
370    use std::os::unix::fs::{FileTypeExt, PermissionsExt};
371    match std::fs::symlink_metadata(path) {
372        Ok(meta) if meta.file_type().is_socket() => {
373            if probe_socket_alive(path).await {
374                return Err(DaemonError::Config {
375                    path: path.to_path_buf(),
376                    source: anyhow!("socket path already in use by a live daemon"),
377                });
378            }
379            // Stale socket: liveness probe confirmed no process is listening.
380            // Safe to unlink and rebind regardless of how the path was
381            // configured — the prior daemon is gone.
382            tracing::warn!(
383                path = %path.display(),
384                "stale socket detected at configured path; unlinking and rebinding"
385            );
386            std::fs::remove_file(path)?;
387        }
388        Ok(_) => {
389            return Err(DaemonError::Config {
390                path: path.to_path_buf(),
391                source: anyhow!("configured socket path exists and is not a socket"),
392            });
393        }
394        Err(e) if e.kind() == io::ErrorKind::NotFound => {}
395        Err(e) => return Err(DaemonError::Io(e)),
396    }
397    let listener = tokio::net::UnixListener::bind(path)?;
398    std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
399    Ok(listener)
400}
401
402#[cfg(unix)]
403async fn remove_stale_socket_if_dead(path: &Path) -> DaemonResult<()> {
404    use std::os::unix::fs::FileTypeExt;
405    match std::fs::symlink_metadata(path) {
406        Ok(meta) if meta.file_type().is_socket() => {
407            if probe_socket_alive(path).await {
408                return Err(DaemonError::Config {
409                    path: path.to_path_buf(),
410                    source: anyhow!("socket path already in use by a live daemon"),
411                });
412            }
413            std::fs::remove_file(path)?;
414        }
415        Ok(_) => {
416            return Err(DaemonError::Config {
417                path: path.to_path_buf(),
418                source: anyhow!("runtime path exists and is not a socket"),
419            });
420        }
421        Err(e) if e.kind() == io::ErrorKind::NotFound => {}
422        Err(e) => return Err(DaemonError::Io(e)),
423    }
424    Ok(())
425}
426
427/// Hard deadline for the async UDS liveness probe. Loopback UDS
428/// handshakes complete in sub-millisecond-to-~1 ms under normal load;
429/// 100 ms is comfortably above that budget while still short enough
430/// that a wedged kernel path (ptrace target, frozen filesystem,
431/// signal-paused peer) does not stall daemon startup. Kernel-level
432/// unresponsiveness classifies the path as "not a live daemon" and
433/// yields to the refuse/unlink fallback.
434#[cfg(unix)]
435const PROBE_TIMEOUT: Duration = Duration::from_millis(100);
436
437/// Async liveness probe for a UDS path.
438///
439/// Returns `true` if a process accepts a UDS connection at `path`
440/// within [`PROBE_TIMEOUT`]; `false` otherwise (stale-socket,
441/// `ENOENT`, or kernel stall past the deadline). Uses tokio's async
442/// UDS connect so the probe never blocks the Tokio reactor — the
443/// future yields to the runtime while the kernel drives the connect
444/// handshake.
445///
446/// On a successful probe the returned `UnixStream` is dropped
447/// immediately: closing the connection is the correct signal to the
448/// peer that this was a liveness ping, not a real client. Remote-peer
449/// RST logs on a healthy daemon are a benign consequence.
450#[cfg(unix)]
451async fn probe_socket_alive(path: &Path) -> bool {
452    match tokio::time::timeout(PROBE_TIMEOUT, tokio::net::UnixStream::connect(path)).await {
453        Ok(Ok(stream)) => {
454            // Explicit drop: the close is the probe's "hang up"
455            // signal to the peer. Keep the drop inline for clarity —
456            // relying on end-of-arm drop works, but an explicit drop
457            // documents the intent.
458            drop(stream);
459            true
460        }
461        Ok(Err(_)) => false,    // ECONNREFUSED / ENOENT / other
462        Err(_elapsed) => false, // kernel stall past deadline
463    }
464}
465
466// ---------------------------------------------------------------------------
467// Windows named-pipe acceptor.
468// ---------------------------------------------------------------------------
469
470#[cfg(windows)]
471struct WindowsPipeAcceptor {
472    name: String,
473    next: Option<tokio::net::windows::named_pipe::NamedPipeServer>,
474}
475
476#[cfg(windows)]
477impl WindowsPipeAcceptor {
478    fn new(name: String) -> io::Result<Self> {
479        let full = pipe_fullname(&name);
480        let next = Some(create_pipe_instance(&full, true)?);
481        Ok(Self { name: full, next })
482    }
483
484    async fn accept(&mut self) -> io::Result<tokio::net::windows::named_pipe::NamedPipeServer> {
485        let server = self.next.take().ok_or_else(|| {
486            io::Error::other("pipe acceptor in invalid state: no pending instance")
487        })?;
488        server.connect().await?;
489        self.next = Some(create_pipe_instance(&self.name, false)?);
490        Ok(server)
491    }
492}
493
494#[cfg(windows)]
495fn pipe_fullname(name: &str) -> String {
496    if name.starts_with(r"\\.\pipe\") {
497        name.to_owned()
498    } else {
499        format!(r"\\.\pipe\{name}")
500    }
501}
502
503#[cfg(windows)]
504fn create_pipe_instance(
505    full_name: &str,
506    first: bool,
507) -> io::Result<tokio::net::windows::named_pipe::NamedPipeServer> {
508    use tokio::net::windows::named_pipe::{PipeMode, ServerOptions};
509    ServerOptions::new()
510        .first_pipe_instance(first)
511        .reject_remote_clients(true)
512        .pipe_mode(PipeMode::Byte)
513        .max_instances(255)
514        .access_inbound(true)
515        .access_outbound(true)
516        .create(full_name)
517}