1use std::io;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::time::{Duration, Instant};
24
25#[cfg(unix)]
26use anyhow::anyhow;
27use sqry_core::query::executor::QueryExecutor;
28use tokio_util::sync::CancellationToken;
29
30use crate::config::DaemonConfig;
31#[cfg(unix)]
32use crate::config::ENV_SOCKET_PATH;
33#[cfg(unix)]
34use crate::error::DaemonError;
35use crate::error::DaemonResult;
36use crate::rebuild::RebuildDispatcher;
37use crate::workspace::{WorkspaceBuilder, WorkspaceManager};
38
39use super::methods::HandlerContext;
40use super::router::run_connection;
41use super::shim_registry::ShimRegistry;
42
43pub struct IpcServer {
46 listener: Listener,
47 socket_path: PathBuf,
48 manager: Arc<WorkspaceManager>,
49 dispatcher: Arc<RebuildDispatcher>,
50 workspace_builder: Arc<dyn WorkspaceBuilder>,
51 tool_executor: Arc<QueryExecutor>,
52 shim_registry: Arc<ShimRegistry>,
53 shutdown: CancellationToken,
54 active_connections: Arc<AtomicU64>,
55 config: Arc<DaemonConfig>,
56 daemon_version: &'static str,
57}
58
59impl std::fmt::Debug for IpcServer {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct("IpcServer")
62 .field("socket_path", &self.socket_path)
63 .field("daemon_version", &self.daemon_version)
64 .finish_non_exhaustive()
65 }
66}
67
68impl IpcServer {
69 pub async fn bind(
72 config: Arc<DaemonConfig>,
73 manager: Arc<WorkspaceManager>,
74 dispatcher: Arc<RebuildDispatcher>,
75 workspace_builder: Arc<dyn WorkspaceBuilder>,
76 tool_executor: Arc<QueryExecutor>,
77 shutdown: CancellationToken,
78 ) -> DaemonResult<Self> {
79 let socket_path = config.socket_path();
80 #[cfg(unix)]
85 ensure_socket_parent_writable(&socket_path)?;
86 let listener = Listener::bind(&config, &socket_path).await?;
87 Ok(Self {
88 listener,
89 socket_path,
90 manager,
91 dispatcher,
92 workspace_builder,
93 tool_executor,
94 shim_registry: ShimRegistry::new(),
95 shutdown,
96 active_connections: Arc::new(AtomicU64::new(0)),
97 config,
98 daemon_version: env!("CARGO_PKG_VERSION"),
99 })
100 }
101
102 #[must_use]
105 pub fn socket_path(&self) -> &Path {
106 &self.socket_path
107 }
108
109 #[must_use]
120 pub fn shim_registry(&self) -> Arc<ShimRegistry> {
121 Arc::clone(&self.shim_registry)
122 }
123
124 pub async fn run(self) -> DaemonResult<()> {
126 let Self {
127 mut listener,
128 manager,
129 dispatcher,
130 workspace_builder,
131 tool_executor,
132 shim_registry,
133 shutdown,
134 active_connections,
135 config,
136 daemon_version,
137 ..
138 } = self;
139
140 loop {
141 tokio::select! {
142 biased;
143 () = shutdown.cancelled() => {
144 tracing::info!(
145 "ipc_server: shutdown requested; draining active connections"
146 );
147 break;
148 }
149 res = listener.accept() => match res {
150 Ok(stream) => {
151 let ctx = HandlerContext {
152 manager: Arc::clone(&manager),
153 dispatcher: Arc::clone(&dispatcher),
154 workspace_builder: Arc::clone(&workspace_builder),
155 tool_executor: Arc::clone(&tool_executor),
156 shim_registry: Arc::clone(&shim_registry),
157 shutdown: shutdown.clone(),
158 config: Arc::clone(&config),
159 daemon_version,
160 };
161 active_connections.fetch_add(1, Ordering::AcqRel);
162 let tracker = Arc::clone(&active_connections);
163 tokio::spawn(async move {
164 let conn_result = match stream {
165 #[cfg(unix)]
166 AcceptedStream::Unix(s) => run_connection(s, ctx).await,
167 #[cfg(windows)]
168 AcceptedStream::Pipe(s) => run_connection(s, ctx).await,
169 };
170 if let Err(e) = conn_result {
171 tracing::debug!(error = %e,
172 "ipc_server: connection terminated with error");
173 }
174 tracker.fetch_sub(1, Ordering::AcqRel);
175 });
176 }
177 Err(e) => {
178 tracing::warn!(error = %e,
179 "ipc_server: accept failed; continuing");
180 tokio::time::sleep(Duration::from_millis(100)).await;
181 }
182 }
183 }
184 }
185
186 let deadline = Instant::now() + Duration::from_secs(config.ipc_shutdown_drain_secs);
188 while Instant::now() < deadline && active_connections.load(Ordering::Acquire) > 0 {
189 tokio::time::sleep(Duration::from_millis(50)).await;
190 }
191 let lingering = active_connections.load(Ordering::Acquire);
192 if lingering > 0 {
193 tracing::warn!(
194 lingering,
195 "ipc_server: {} connections still active at drain deadline",
196 lingering
197 );
198 }
199 Ok(())
200 }
201}
202
203#[cfg(unix)]
216fn ensure_socket_parent_writable(socket_path: &Path) -> DaemonResult<()> {
217 let parent = socket_path
218 .parent()
219 .ok_or_else(|| DaemonError::SocketSetup {
220 path: socket_path.to_path_buf(),
221 reason: "socket path has no parent directory".to_string(),
222 })?;
223 if let Err(e) = std::fs::create_dir_all(parent) {
224 return Err(DaemonError::SocketSetup {
225 path: socket_path.to_path_buf(),
226 reason: format!(
227 "cannot create socket parent {}: {e}. \
228 Hint: set SQRY_DAEMON_SOCKET to a user-writable path \
229 (e.g. $XDG_RUNTIME_DIR/sqry/sqryd.sock or $TMPDIR/sqryd.sock).",
230 parent.display(),
231 ),
232 });
233 }
234 use std::fs::OpenOptions;
242 use std::time::{SystemTime, UNIX_EPOCH};
243 let nanos = SystemTime::now()
244 .duration_since(UNIX_EPOCH)
245 .map(|d| d.subsec_nanos())
246 .unwrap_or(0);
247 let probe = parent.join(format!(".sqryd-probe-{}-{nanos:09}", std::process::id()));
248 let probe_outcome = OpenOptions::new().write(true).create_new(true).open(&probe);
249 match probe_outcome {
250 Ok(_file) => {
251 let _ = std::fs::remove_file(&probe);
256 Ok(())
257 }
258 Err(e) => {
259 let uid: u32 = unsafe { libc::getuid() };
261 Err(DaemonError::SocketSetup {
262 path: socket_path.to_path_buf(),
263 reason: format!(
264 "socket parent {} is not writable by uid {}: {e}. \
265 Either change ownership, or set SQRY_DAEMON_SOCKET \
266 to a directory you own.",
267 parent.display(),
268 uid,
269 ),
270 })
271 }
272 }
273}
274
275enum AcceptedStream {
280 #[cfg(unix)]
281 Unix(tokio::net::UnixStream),
282 #[cfg(windows)]
283 Pipe(tokio::net::windows::named_pipe::NamedPipeServer),
284}
285
286#[cfg(unix)]
287enum Listener {
288 Unix(tokio::net::UnixListener),
289}
290
291#[cfg(windows)]
292enum Listener {
293 Pipe(WindowsPipeAcceptor),
294}
295
296impl Listener {
297 async fn bind(cfg: &DaemonConfig, path: &Path) -> DaemonResult<Self> {
298 #[cfg(unix)]
299 {
300 let l = bind_unix(cfg, path).await?;
301 Ok(Listener::Unix(l))
302 }
303 #[cfg(windows)]
304 {
305 let _ = cfg; let name = path.to_string_lossy().into_owned();
307 let acceptor = WindowsPipeAcceptor::new(name)?;
308 Ok(Listener::Pipe(acceptor))
309 }
310 }
311
312 async fn accept(&mut self) -> io::Result<AcceptedStream> {
313 match self {
314 #[cfg(unix)]
315 Self::Unix(l) => {
316 let (s, _addr) = l.accept().await?;
317 Ok(AcceptedStream::Unix(s))
318 }
319 #[cfg(windows)]
320 Self::Pipe(a) => {
321 let s = a.accept().await?;
322 Ok(AcceptedStream::Pipe(s))
323 }
324 }
325 }
326}
327
328#[cfg(unix)]
333enum UnixBindMode {
334 RuntimeDir,
335 Configured,
336}
337
338#[cfg(unix)]
339fn classify_bind_mode(cfg: &DaemonConfig) -> UnixBindMode {
340 if cfg.socket.path.is_some() || std::env::var_os(ENV_SOCKET_PATH).is_some() {
341 UnixBindMode::Configured
342 } else {
343 UnixBindMode::RuntimeDir
344 }
345}
346
347#[cfg(unix)]
348async fn bind_unix(cfg: &DaemonConfig, path: &Path) -> DaemonResult<tokio::net::UnixListener> {
349 match classify_bind_mode(cfg) {
350 UnixBindMode::RuntimeDir => bind_unix_runtime(path).await,
351 UnixBindMode::Configured => bind_unix_configured(path).await,
352 }
353}
354
355#[cfg(unix)]
356async fn bind_unix_runtime(path: &Path) -> DaemonResult<tokio::net::UnixListener> {
357 use std::os::unix::fs::PermissionsExt;
358 if let Some(parent) = path.parent() {
359 std::fs::create_dir_all(parent)?;
360 std::fs::set_permissions(parent, std::fs::Permissions::from_mode(0o700))?;
361 }
362 remove_stale_socket_if_dead(path).await?;
363 let listener = tokio::net::UnixListener::bind(path)?;
364 std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
365 Ok(listener)
366}
367
368#[cfg(unix)]
369async fn bind_unix_configured(path: &Path) -> DaemonResult<tokio::net::UnixListener> {
370 use std::os::unix::fs::{FileTypeExt, PermissionsExt};
371 match std::fs::symlink_metadata(path) {
372 Ok(meta) if meta.file_type().is_socket() => {
373 if probe_socket_alive(path).await {
374 return Err(DaemonError::Config {
375 path: path.to_path_buf(),
376 source: anyhow!("socket path already in use by a live daemon"),
377 });
378 }
379 tracing::warn!(
383 path = %path.display(),
384 "stale socket detected at configured path; unlinking and rebinding"
385 );
386 std::fs::remove_file(path)?;
387 }
388 Ok(_) => {
389 return Err(DaemonError::Config {
390 path: path.to_path_buf(),
391 source: anyhow!("configured socket path exists and is not a socket"),
392 });
393 }
394 Err(e) if e.kind() == io::ErrorKind::NotFound => {}
395 Err(e) => return Err(DaemonError::Io(e)),
396 }
397 let listener = tokio::net::UnixListener::bind(path)?;
398 std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
399 Ok(listener)
400}
401
402#[cfg(unix)]
403async fn remove_stale_socket_if_dead(path: &Path) -> DaemonResult<()> {
404 use std::os::unix::fs::FileTypeExt;
405 match std::fs::symlink_metadata(path) {
406 Ok(meta) if meta.file_type().is_socket() => {
407 if probe_socket_alive(path).await {
408 return Err(DaemonError::Config {
409 path: path.to_path_buf(),
410 source: anyhow!("socket path already in use by a live daemon"),
411 });
412 }
413 std::fs::remove_file(path)?;
414 }
415 Ok(_) => {
416 return Err(DaemonError::Config {
417 path: path.to_path_buf(),
418 source: anyhow!("runtime path exists and is not a socket"),
419 });
420 }
421 Err(e) if e.kind() == io::ErrorKind::NotFound => {}
422 Err(e) => return Err(DaemonError::Io(e)),
423 }
424 Ok(())
425}
426
427#[cfg(unix)]
435const PROBE_TIMEOUT: Duration = Duration::from_millis(100);
436
437#[cfg(unix)]
451async fn probe_socket_alive(path: &Path) -> bool {
452 match tokio::time::timeout(PROBE_TIMEOUT, tokio::net::UnixStream::connect(path)).await {
453 Ok(Ok(stream)) => {
454 drop(stream);
459 true
460 }
461 Ok(Err(_)) => false, Err(_elapsed) => false, }
464}
465
466#[cfg(windows)]
471struct WindowsPipeAcceptor {
472 name: String,
473 next: Option<tokio::net::windows::named_pipe::NamedPipeServer>,
474}
475
476#[cfg(windows)]
477impl WindowsPipeAcceptor {
478 fn new(name: String) -> io::Result<Self> {
479 let full = pipe_fullname(&name);
480 let next = Some(create_pipe_instance(&full, true)?);
481 Ok(Self { name: full, next })
482 }
483
484 async fn accept(&mut self) -> io::Result<tokio::net::windows::named_pipe::NamedPipeServer> {
485 let server = self.next.take().ok_or_else(|| {
486 io::Error::other("pipe acceptor in invalid state: no pending instance")
487 })?;
488 server.connect().await?;
489 self.next = Some(create_pipe_instance(&self.name, false)?);
490 Ok(server)
491 }
492}
493
494#[cfg(windows)]
495fn pipe_fullname(name: &str) -> String {
496 if name.starts_with(r"\\.\pipe\") {
497 name.to_owned()
498 } else {
499 format!(r"\\.\pipe\{name}")
500 }
501}
502
503#[cfg(windows)]
504fn create_pipe_instance(
505 full_name: &str,
506 first: bool,
507) -> io::Result<tokio::net::windows::named_pipe::NamedPipeServer> {
508 use tokio::net::windows::named_pipe::{PipeMode, ServerOptions};
509 ServerOptions::new()
510 .first_pipe_instance(first)
511 .reject_remote_clients(true)
512 .pipe_mode(PipeMode::Byte)
513 .max_instances(255)
514 .access_inbound(true)
515 .access_outbound(true)
516 .create(full_name)
517}