Skip to main content

sqry_daemon/ipc/
server.rs

1//! IPC accept loop.
2//!
3//! Binds a UDS (Unix) or named pipe (Windows), accepts incoming
4//! connections, and spawns a per-connection handler task. Graceful
5//! shutdown is driven by a [`tokio_util::sync::CancellationToken`];
6//! after cancellation, the loop drains active connections bounded by
7//! [`crate::config::DaemonConfig::ipc_shutdown_drain_secs`].
8//!
9//! The two Unix bind branches (`RuntimeDir` vs `Configured`) implement
10//! the Phase 8a iter-1 B2 fix: runtime-dir paths are auto-managed
11//! (parent created 0700, stale socket removed after a liveness probe).
12//! Configured paths also auto-unlink stale sockets after a liveness
13//! probe confirms no process is listening — this is required for
14//! auto-start to work after a daemon stop. Live sockets are never
15//! touched: the daemon refuses to bind if a live daemon is already
16//! listening. Non-socket files at the configured path are always
17//! rejected.
18
19use std::io;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::time::{Duration, Instant};
24
25#[cfg(unix)]
26use anyhow::anyhow;
27use sqry_core::query::executor::QueryExecutor;
28use tokio_util::sync::CancellationToken;
29
30use crate::config::DaemonConfig;
31#[cfg(unix)]
32use crate::config::ENV_SOCKET_PATH;
33#[cfg(unix)]
34use crate::error::DaemonError;
35use crate::error::DaemonResult;
36use crate::rebuild::RebuildDispatcher;
37use crate::workspace::{WorkspaceBuilder, WorkspaceManager};
38
39use super::methods::HandlerContext;
40use super::router::run_connection;
41use super::shim_registry::ShimRegistry;
42
43/// Top-level IPC server handle. Construct with [`Self::bind`] then
44/// drive with [`Self::run`].
45pub struct IpcServer {
46    listener: Listener,
47    socket_path: PathBuf,
48    manager: Arc<WorkspaceManager>,
49    dispatcher: Arc<RebuildDispatcher>,
50    workspace_builder: Arc<dyn WorkspaceBuilder>,
51    tool_executor: Arc<QueryExecutor>,
52    shim_registry: Arc<ShimRegistry>,
53    shutdown: CancellationToken,
54    active_connections: Arc<AtomicU64>,
55    config: Arc<DaemonConfig>,
56    daemon_version: &'static str,
57}
58
59impl std::fmt::Debug for IpcServer {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct("IpcServer")
62            .field("socket_path", &self.socket_path)
63            .field("daemon_version", &self.daemon_version)
64            .finish_non_exhaustive()
65    }
66}
67
68impl IpcServer {
69    /// Bind the server. Unix: UnixListener with the two-branch policy;
70    /// Windows: NamedPipeServer with explicit options.
71    pub async fn bind(
72        config: Arc<DaemonConfig>,
73        manager: Arc<WorkspaceManager>,
74        dispatcher: Arc<RebuildDispatcher>,
75        workspace_builder: Arc<dyn WorkspaceBuilder>,
76        tool_executor: Arc<QueryExecutor>,
77        shutdown: CancellationToken,
78    ) -> DaemonResult<Self> {
79        let socket_path = config.socket_path();
80        let listener = Listener::bind(&config, &socket_path).await?;
81        Ok(Self {
82            listener,
83            socket_path,
84            manager,
85            dispatcher,
86            workspace_builder,
87            tool_executor,
88            shim_registry: ShimRegistry::new(),
89            shutdown,
90            active_connections: Arc::new(AtomicU64::new(0)),
91            config,
92            daemon_version: env!("CARGO_PKG_VERSION"),
93        })
94    }
95
96    /// Returns the bound socket path (Unix) or named-pipe name
97    /// (Windows).
98    #[must_use]
99    pub fn socket_path(&self) -> &Path {
100        &self.socket_path
101    }
102
103    /// Return a shared handle to the shim-connection registry.
104    ///
105    /// Task 9's bootstrap path surfaces the count via `daemon/status`,
106    /// and the Phase 8c router / MCP host register shim connections
107    /// through this `Arc`. The registry's internal state is guarded by
108    /// a `parking_lot::Mutex`, so callers must not hold the returned
109    /// `Arc` "actively" (i.e., inside a `.lock()` scope) across
110    /// long-running awaits — see [`ShimRegistry::len`] and
111    /// [`ShimRegistry::is_empty`] for the snapshot-under-lock
112    /// accessors.
113    #[must_use]
114    pub fn shim_registry(&self) -> Arc<ShimRegistry> {
115        Arc::clone(&self.shim_registry)
116    }
117
118    /// Accept loop. Returns when the shutdown token fires.
119    pub async fn run(self) -> DaemonResult<()> {
120        let Self {
121            mut listener,
122            manager,
123            dispatcher,
124            workspace_builder,
125            tool_executor,
126            shim_registry,
127            shutdown,
128            active_connections,
129            config,
130            daemon_version,
131            ..
132        } = self;
133
134        loop {
135            tokio::select! {
136                biased;
137                () = shutdown.cancelled() => {
138                    tracing::info!(
139                        "ipc_server: shutdown requested; draining active connections"
140                    );
141                    break;
142                }
143                res = listener.accept() => match res {
144                    Ok(stream) => {
145                        let ctx = HandlerContext {
146                            manager: Arc::clone(&manager),
147                            dispatcher: Arc::clone(&dispatcher),
148                            workspace_builder: Arc::clone(&workspace_builder),
149                            tool_executor: Arc::clone(&tool_executor),
150                            shim_registry: Arc::clone(&shim_registry),
151                            shutdown: shutdown.clone(),
152                            config: Arc::clone(&config),
153                            daemon_version,
154                        };
155                        active_connections.fetch_add(1, Ordering::AcqRel);
156                        let tracker = Arc::clone(&active_connections);
157                        tokio::spawn(async move {
158                            let conn_result = match stream {
159                                #[cfg(unix)]
160                                AcceptedStream::Unix(s) => run_connection(s, ctx).await,
161                                #[cfg(windows)]
162                                AcceptedStream::Pipe(s) => run_connection(s, ctx).await,
163                            };
164                            if let Err(e) = conn_result {
165                                tracing::debug!(error = %e,
166                                    "ipc_server: connection terminated with error");
167                            }
168                            tracker.fetch_sub(1, Ordering::AcqRel);
169                        });
170                    }
171                    Err(e) => {
172                        tracing::warn!(error = %e,
173                            "ipc_server: accept failed; continuing");
174                        tokio::time::sleep(Duration::from_millis(100)).await;
175                    }
176                }
177            }
178        }
179
180        // Drain phase.
181        let deadline = Instant::now() + Duration::from_secs(config.ipc_shutdown_drain_secs);
182        while Instant::now() < deadline && active_connections.load(Ordering::Acquire) > 0 {
183            tokio::time::sleep(Duration::from_millis(50)).await;
184        }
185        let lingering = active_connections.load(Ordering::Acquire);
186        if lingering > 0 {
187            tracing::warn!(
188                lingering,
189                "ipc_server: {} connections still active at drain deadline",
190                lingering
191            );
192        }
193        Ok(())
194    }
195}
196
197// ---------------------------------------------------------------------------
198// Accepted-stream enum + Listener.
199// ---------------------------------------------------------------------------
200
201enum AcceptedStream {
202    #[cfg(unix)]
203    Unix(tokio::net::UnixStream),
204    #[cfg(windows)]
205    Pipe(tokio::net::windows::named_pipe::NamedPipeServer),
206}
207
208#[cfg(unix)]
209enum Listener {
210    Unix(tokio::net::UnixListener),
211}
212
213#[cfg(windows)]
214enum Listener {
215    Pipe(WindowsPipeAcceptor),
216}
217
218impl Listener {
219    async fn bind(cfg: &DaemonConfig, path: &Path) -> DaemonResult<Self> {
220        #[cfg(unix)]
221        {
222            let l = bind_unix(cfg, path).await?;
223            Ok(Listener::Unix(l))
224        }
225        #[cfg(windows)]
226        {
227            let _ = cfg; // consumed here once for the Windows branch
228            let name = path.to_string_lossy().into_owned();
229            let acceptor = WindowsPipeAcceptor::new(name)?;
230            Ok(Listener::Pipe(acceptor))
231        }
232    }
233
234    async fn accept(&mut self) -> io::Result<AcceptedStream> {
235        match self {
236            #[cfg(unix)]
237            Self::Unix(l) => {
238                let (s, _addr) = l.accept().await?;
239                Ok(AcceptedStream::Unix(s))
240            }
241            #[cfg(windows)]
242            Self::Pipe(a) => {
243                let s = a.accept().await?;
244                Ok(AcceptedStream::Pipe(s))
245            }
246        }
247    }
248}
249
250// ---------------------------------------------------------------------------
251// Unix bind (two-branch policy).
252// ---------------------------------------------------------------------------
253
254#[cfg(unix)]
255enum UnixBindMode {
256    RuntimeDir,
257    Configured,
258}
259
260#[cfg(unix)]
261fn classify_bind_mode(cfg: &DaemonConfig) -> UnixBindMode {
262    if cfg.socket.path.is_some() || std::env::var_os(ENV_SOCKET_PATH).is_some() {
263        UnixBindMode::Configured
264    } else {
265        UnixBindMode::RuntimeDir
266    }
267}
268
269#[cfg(unix)]
270async fn bind_unix(cfg: &DaemonConfig, path: &Path) -> DaemonResult<tokio::net::UnixListener> {
271    match classify_bind_mode(cfg) {
272        UnixBindMode::RuntimeDir => bind_unix_runtime(path).await,
273        UnixBindMode::Configured => bind_unix_configured(path).await,
274    }
275}
276
277#[cfg(unix)]
278async fn bind_unix_runtime(path: &Path) -> DaemonResult<tokio::net::UnixListener> {
279    use std::os::unix::fs::PermissionsExt;
280    if let Some(parent) = path.parent() {
281        std::fs::create_dir_all(parent)?;
282        std::fs::set_permissions(parent, std::fs::Permissions::from_mode(0o700))?;
283    }
284    remove_stale_socket_if_dead(path).await?;
285    let listener = tokio::net::UnixListener::bind(path)?;
286    std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
287    Ok(listener)
288}
289
290#[cfg(unix)]
291async fn bind_unix_configured(path: &Path) -> DaemonResult<tokio::net::UnixListener> {
292    use std::os::unix::fs::{FileTypeExt, PermissionsExt};
293    match std::fs::symlink_metadata(path) {
294        Ok(meta) if meta.file_type().is_socket() => {
295            if probe_socket_alive(path).await {
296                return Err(DaemonError::Config {
297                    path: path.to_path_buf(),
298                    source: anyhow!("socket path already in use by a live daemon"),
299                });
300            }
301            // Stale socket: liveness probe confirmed no process is listening.
302            // Safe to unlink and rebind regardless of how the path was
303            // configured — the prior daemon is gone.
304            tracing::warn!(
305                path = %path.display(),
306                "stale socket detected at configured path; unlinking and rebinding"
307            );
308            std::fs::remove_file(path)?;
309        }
310        Ok(_) => {
311            return Err(DaemonError::Config {
312                path: path.to_path_buf(),
313                source: anyhow!("configured socket path exists and is not a socket"),
314            });
315        }
316        Err(e) if e.kind() == io::ErrorKind::NotFound => {}
317        Err(e) => return Err(DaemonError::Io(e)),
318    }
319    let listener = tokio::net::UnixListener::bind(path)?;
320    std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
321    Ok(listener)
322}
323
324#[cfg(unix)]
325async fn remove_stale_socket_if_dead(path: &Path) -> DaemonResult<()> {
326    use std::os::unix::fs::FileTypeExt;
327    match std::fs::symlink_metadata(path) {
328        Ok(meta) if meta.file_type().is_socket() => {
329            if probe_socket_alive(path).await {
330                return Err(DaemonError::Config {
331                    path: path.to_path_buf(),
332                    source: anyhow!("socket path already in use by a live daemon"),
333                });
334            }
335            std::fs::remove_file(path)?;
336        }
337        Ok(_) => {
338            return Err(DaemonError::Config {
339                path: path.to_path_buf(),
340                source: anyhow!("runtime path exists and is not a socket"),
341            });
342        }
343        Err(e) if e.kind() == io::ErrorKind::NotFound => {}
344        Err(e) => return Err(DaemonError::Io(e)),
345    }
346    Ok(())
347}
348
349/// Hard deadline for the async UDS liveness probe. Loopback UDS
350/// handshakes complete in sub-millisecond-to-~1 ms under normal load;
351/// 100 ms is comfortably above that budget while still short enough
352/// that a wedged kernel path (ptrace target, frozen filesystem,
353/// signal-paused peer) does not stall daemon startup. Kernel-level
354/// unresponsiveness classifies the path as "not a live daemon" and
355/// yields to the refuse/unlink fallback.
356#[cfg(unix)]
357const PROBE_TIMEOUT: Duration = Duration::from_millis(100);
358
359/// Async liveness probe for a UDS path.
360///
361/// Returns `true` if a process accepts a UDS connection at `path`
362/// within [`PROBE_TIMEOUT`]; `false` otherwise (stale-socket,
363/// `ENOENT`, or kernel stall past the deadline). Uses tokio's async
364/// UDS connect so the probe never blocks the Tokio reactor — the
365/// future yields to the runtime while the kernel drives the connect
366/// handshake.
367///
368/// On a successful probe the returned `UnixStream` is dropped
369/// immediately: closing the connection is the correct signal to the
370/// peer that this was a liveness ping, not a real client. Remote-peer
371/// RST logs on a healthy daemon are a benign consequence.
372#[cfg(unix)]
373async fn probe_socket_alive(path: &Path) -> bool {
374    match tokio::time::timeout(PROBE_TIMEOUT, tokio::net::UnixStream::connect(path)).await {
375        Ok(Ok(stream)) => {
376            // Explicit drop: the close is the probe's "hang up"
377            // signal to the peer. Keep the drop inline for clarity —
378            // relying on end-of-arm drop works, but an explicit drop
379            // documents the intent.
380            drop(stream);
381            true
382        }
383        Ok(Err(_)) => false,    // ECONNREFUSED / ENOENT / other
384        Err(_elapsed) => false, // kernel stall past deadline
385    }
386}
387
388// ---------------------------------------------------------------------------
389// Windows named-pipe acceptor.
390// ---------------------------------------------------------------------------
391
392#[cfg(windows)]
393struct WindowsPipeAcceptor {
394    name: String,
395    next: Option<tokio::net::windows::named_pipe::NamedPipeServer>,
396}
397
398#[cfg(windows)]
399impl WindowsPipeAcceptor {
400    fn new(name: String) -> io::Result<Self> {
401        let full = pipe_fullname(&name);
402        let next = Some(create_pipe_instance(&full, true)?);
403        Ok(Self { name: full, next })
404    }
405
406    async fn accept(&mut self) -> io::Result<tokio::net::windows::named_pipe::NamedPipeServer> {
407        let server = self.next.take().ok_or_else(|| {
408            io::Error::other("pipe acceptor in invalid state: no pending instance")
409        })?;
410        server.connect().await?;
411        self.next = Some(create_pipe_instance(&self.name, false)?);
412        Ok(server)
413    }
414}
415
416#[cfg(windows)]
417fn pipe_fullname(name: &str) -> String {
418    if name.starts_with(r"\\.\pipe\") {
419        name.to_owned()
420    } else {
421        format!(r"\\.\pipe\{name}")
422    }
423}
424
425#[cfg(windows)]
426fn create_pipe_instance(
427    full_name: &str,
428    first: bool,
429) -> io::Result<tokio::net::windows::named_pipe::NamedPipeServer> {
430    use tokio::net::windows::named_pipe::{PipeMode, ServerOptions};
431    ServerOptions::new()
432        .first_pipe_instance(first)
433        .reject_remote_clients(true)
434        .pipe_mode(PipeMode::Byte)
435        .max_instances(255)
436        .access_inbound(true)
437        .access_outbound(true)
438        .create(full_name)
439}