1use std::io;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::time::{Duration, Instant};
24
25use anyhow::anyhow;
26use sqry_core::query::executor::QueryExecutor;
27use tokio_util::sync::CancellationToken;
28
29use crate::config::{DaemonConfig, ENV_SOCKET_PATH};
30use crate::error::{DaemonError, DaemonResult};
31use crate::rebuild::RebuildDispatcher;
32use crate::workspace::{WorkspaceBuilder, WorkspaceManager};
33
34use super::methods::HandlerContext;
35use super::router::run_connection;
36use super::shim_registry::ShimRegistry;
37
38pub struct IpcServer {
41 listener: Listener,
42 socket_path: PathBuf,
43 manager: Arc<WorkspaceManager>,
44 dispatcher: Arc<RebuildDispatcher>,
45 workspace_builder: Arc<dyn WorkspaceBuilder>,
46 tool_executor: Arc<QueryExecutor>,
47 shim_registry: Arc<ShimRegistry>,
48 shutdown: CancellationToken,
49 active_connections: Arc<AtomicU64>,
50 config: Arc<DaemonConfig>,
51 daemon_version: &'static str,
52}
53
54impl std::fmt::Debug for IpcServer {
55 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56 f.debug_struct("IpcServer")
57 .field("socket_path", &self.socket_path)
58 .field("daemon_version", &self.daemon_version)
59 .finish_non_exhaustive()
60 }
61}
62
63impl IpcServer {
64 pub async fn bind(
67 config: Arc<DaemonConfig>,
68 manager: Arc<WorkspaceManager>,
69 dispatcher: Arc<RebuildDispatcher>,
70 workspace_builder: Arc<dyn WorkspaceBuilder>,
71 tool_executor: Arc<QueryExecutor>,
72 shutdown: CancellationToken,
73 ) -> DaemonResult<Self> {
74 let socket_path = config.socket_path();
75 let listener = Listener::bind(&config, &socket_path).await?;
76 Ok(Self {
77 listener,
78 socket_path,
79 manager,
80 dispatcher,
81 workspace_builder,
82 tool_executor,
83 shim_registry: ShimRegistry::new(),
84 shutdown,
85 active_connections: Arc::new(AtomicU64::new(0)),
86 config,
87 daemon_version: env!("CARGO_PKG_VERSION"),
88 })
89 }
90
91 #[must_use]
94 pub fn socket_path(&self) -> &Path {
95 &self.socket_path
96 }
97
98 #[must_use]
109 pub fn shim_registry(&self) -> Arc<ShimRegistry> {
110 Arc::clone(&self.shim_registry)
111 }
112
113 pub async fn run(self) -> DaemonResult<()> {
115 let Self {
116 mut listener,
117 manager,
118 dispatcher,
119 workspace_builder,
120 tool_executor,
121 shim_registry,
122 shutdown,
123 active_connections,
124 config,
125 daemon_version,
126 ..
127 } = self;
128
129 loop {
130 tokio::select! {
131 biased;
132 () = shutdown.cancelled() => {
133 tracing::info!(
134 "ipc_server: shutdown requested; draining active connections"
135 );
136 break;
137 }
138 res = listener.accept() => match res {
139 Ok(stream) => {
140 let ctx = HandlerContext {
141 manager: Arc::clone(&manager),
142 dispatcher: Arc::clone(&dispatcher),
143 workspace_builder: Arc::clone(&workspace_builder),
144 tool_executor: Arc::clone(&tool_executor),
145 shim_registry: Arc::clone(&shim_registry),
146 shutdown: shutdown.clone(),
147 config: Arc::clone(&config),
148 daemon_version,
149 };
150 active_connections.fetch_add(1, Ordering::AcqRel);
151 let tracker = Arc::clone(&active_connections);
152 tokio::spawn(async move {
153 let conn_result = match stream {
154 #[cfg(unix)]
155 AcceptedStream::Unix(s) => run_connection(s, ctx).await,
156 #[cfg(windows)]
157 AcceptedStream::Pipe(s) => run_connection(s, ctx).await,
158 };
159 if let Err(e) = conn_result {
160 tracing::debug!(error = %e,
161 "ipc_server: connection terminated with error");
162 }
163 tracker.fetch_sub(1, Ordering::AcqRel);
164 });
165 }
166 Err(e) => {
167 tracing::warn!(error = %e,
168 "ipc_server: accept failed; continuing");
169 tokio::time::sleep(Duration::from_millis(100)).await;
170 }
171 }
172 }
173 }
174
175 let deadline = Instant::now() + Duration::from_secs(config.ipc_shutdown_drain_secs);
177 while Instant::now() < deadline && active_connections.load(Ordering::Acquire) > 0 {
178 tokio::time::sleep(Duration::from_millis(50)).await;
179 }
180 let lingering = active_connections.load(Ordering::Acquire);
181 if lingering > 0 {
182 tracing::warn!(
183 lingering,
184 "ipc_server: {} connections still active at drain deadline",
185 lingering
186 );
187 }
188 Ok(())
189 }
190}
191
192enum AcceptedStream {
197 #[cfg(unix)]
198 Unix(tokio::net::UnixStream),
199 #[cfg(windows)]
200 Pipe(tokio::net::windows::named_pipe::NamedPipeServer),
201}
202
203#[cfg(unix)]
204enum Listener {
205 Unix(tokio::net::UnixListener),
206}
207
208#[cfg(windows)]
209enum Listener {
210 Pipe(WindowsPipeAcceptor),
211}
212
213impl Listener {
214 async fn bind(cfg: &DaemonConfig, path: &Path) -> DaemonResult<Self> {
215 #[cfg(unix)]
216 {
217 let l = bind_unix(cfg, path).await?;
218 Ok(Listener::Unix(l))
219 }
220 #[cfg(windows)]
221 {
222 let _ = cfg; let name = path.to_string_lossy().into_owned();
224 let acceptor = WindowsPipeAcceptor::new(name)?;
225 Ok(Listener::Pipe(acceptor))
226 }
227 }
228
229 async fn accept(&mut self) -> io::Result<AcceptedStream> {
230 match self {
231 #[cfg(unix)]
232 Self::Unix(l) => {
233 let (s, _addr) = l.accept().await?;
234 Ok(AcceptedStream::Unix(s))
235 }
236 #[cfg(windows)]
237 Self::Pipe(a) => {
238 let s = a.accept().await?;
239 Ok(AcceptedStream::Pipe(s))
240 }
241 }
242 }
243}
244
245#[cfg(unix)]
250enum UnixBindMode {
251 RuntimeDir,
252 Configured,
253}
254
255#[cfg(unix)]
256fn classify_bind_mode(cfg: &DaemonConfig) -> UnixBindMode {
257 if cfg.socket.path.is_some() || std::env::var_os(ENV_SOCKET_PATH).is_some() {
258 UnixBindMode::Configured
259 } else {
260 UnixBindMode::RuntimeDir
261 }
262}
263
264#[cfg(unix)]
265async fn bind_unix(cfg: &DaemonConfig, path: &Path) -> DaemonResult<tokio::net::UnixListener> {
266 match classify_bind_mode(cfg) {
267 UnixBindMode::RuntimeDir => bind_unix_runtime(path).await,
268 UnixBindMode::Configured => bind_unix_configured(path).await,
269 }
270}
271
272#[cfg(unix)]
273async fn bind_unix_runtime(path: &Path) -> DaemonResult<tokio::net::UnixListener> {
274 use std::os::unix::fs::PermissionsExt;
275 if let Some(parent) = path.parent() {
276 std::fs::create_dir_all(parent)?;
277 std::fs::set_permissions(parent, std::fs::Permissions::from_mode(0o700))?;
278 }
279 remove_stale_socket_if_dead(path).await?;
280 let listener = tokio::net::UnixListener::bind(path)?;
281 std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
282 Ok(listener)
283}
284
285#[cfg(unix)]
286async fn bind_unix_configured(path: &Path) -> DaemonResult<tokio::net::UnixListener> {
287 use std::os::unix::fs::{FileTypeExt, PermissionsExt};
288 match std::fs::symlink_metadata(path) {
289 Ok(meta) if meta.file_type().is_socket() => {
290 if probe_socket_alive(path).await {
291 return Err(DaemonError::Config {
292 path: path.to_path_buf(),
293 source: anyhow!("socket path already in use by a live daemon"),
294 });
295 }
296 tracing::warn!(
300 path = %path.display(),
301 "stale socket detected at configured path; unlinking and rebinding"
302 );
303 std::fs::remove_file(path)?;
304 }
305 Ok(_) => {
306 return Err(DaemonError::Config {
307 path: path.to_path_buf(),
308 source: anyhow!("configured socket path exists and is not a socket"),
309 });
310 }
311 Err(e) if e.kind() == io::ErrorKind::NotFound => {}
312 Err(e) => return Err(DaemonError::Io(e)),
313 }
314 let listener = tokio::net::UnixListener::bind(path)?;
315 std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
316 Ok(listener)
317}
318
319#[cfg(unix)]
320async fn remove_stale_socket_if_dead(path: &Path) -> DaemonResult<()> {
321 use std::os::unix::fs::FileTypeExt;
322 match std::fs::symlink_metadata(path) {
323 Ok(meta) if meta.file_type().is_socket() => {
324 if probe_socket_alive(path).await {
325 return Err(DaemonError::Config {
326 path: path.to_path_buf(),
327 source: anyhow!("socket path already in use by a live daemon"),
328 });
329 }
330 std::fs::remove_file(path)?;
331 }
332 Ok(_) => {
333 return Err(DaemonError::Config {
334 path: path.to_path_buf(),
335 source: anyhow!("runtime path exists and is not a socket"),
336 });
337 }
338 Err(e) if e.kind() == io::ErrorKind::NotFound => {}
339 Err(e) => return Err(DaemonError::Io(e)),
340 }
341 Ok(())
342}
343
344#[cfg(unix)]
352const PROBE_TIMEOUT: Duration = Duration::from_millis(100);
353
354#[cfg(unix)]
368async fn probe_socket_alive(path: &Path) -> bool {
369 match tokio::time::timeout(PROBE_TIMEOUT, tokio::net::UnixStream::connect(path)).await {
370 Ok(Ok(stream)) => {
371 drop(stream);
376 true
377 }
378 Ok(Err(_)) => false, Err(_elapsed) => false, }
381}
382
383#[cfg(windows)]
388struct WindowsPipeAcceptor {
389 name: String,
390 next: Option<tokio::net::windows::named_pipe::NamedPipeServer>,
391}
392
393#[cfg(windows)]
394impl WindowsPipeAcceptor {
395 fn new(name: String) -> io::Result<Self> {
396 let full = pipe_fullname(&name);
397 let next = Some(create_pipe_instance(&full, true)?);
398 Ok(Self { name: full, next })
399 }
400
401 async fn accept(&mut self) -> io::Result<tokio::net::windows::named_pipe::NamedPipeServer> {
402 let server = self.next.take().ok_or_else(|| {
403 io::Error::other("pipe acceptor in invalid state: no pending instance")
404 })?;
405 server.connect().await?;
406 self.next = Some(create_pipe_instance(&self.name, false)?);
407 Ok(server)
408 }
409}
410
411#[cfg(windows)]
412fn pipe_fullname(name: &str) -> String {
413 if name.starts_with(r"\\.\pipe\") {
414 name.to_owned()
415 } else {
416 format!(r"\\.\pipe\{name}")
417 }
418}
419
420#[cfg(windows)]
421fn create_pipe_instance(
422 full_name: &str,
423 first: bool,
424) -> io::Result<tokio::net::windows::named_pipe::NamedPipeServer> {
425 use tokio::net::windows::named_pipe::{PipeMode, ServerOptions};
426 ServerOptions::new()
427 .first_pipe_instance(first)
428 .reject_remote_clients(true)
429 .pipe_mode(PipeMode::Byte)
430 .max_instances(255)
431 .access_inbound(true)
432 .access_outbound(true)
433 .create(full_name)
434}