1use std::io;
20use std::path::{Path, PathBuf};
21use std::sync::Arc;
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::time::{Duration, Instant};
24
25#[cfg(unix)]
26use anyhow::anyhow;
27use sqry_core::query::executor::QueryExecutor;
28use tokio_util::sync::CancellationToken;
29
30use crate::config::DaemonConfig;
31#[cfg(unix)]
32use crate::config::ENV_SOCKET_PATH;
33#[cfg(unix)]
34use crate::error::DaemonError;
35use crate::error::DaemonResult;
36use crate::rebuild::RebuildDispatcher;
37use crate::workspace::{WorkspaceBuilder, WorkspaceManager};
38
39use super::methods::HandlerContext;
40use super::router::run_connection;
41use super::shim_registry::ShimRegistry;
42
43pub struct IpcServer {
46 listener: Listener,
47 socket_path: PathBuf,
48 manager: Arc<WorkspaceManager>,
49 dispatcher: Arc<RebuildDispatcher>,
50 workspace_builder: Arc<dyn WorkspaceBuilder>,
51 tool_executor: Arc<QueryExecutor>,
52 shim_registry: Arc<ShimRegistry>,
53 shutdown: CancellationToken,
54 active_connections: Arc<AtomicU64>,
55 config: Arc<DaemonConfig>,
56 daemon_version: &'static str,
57}
58
59impl std::fmt::Debug for IpcServer {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct("IpcServer")
62 .field("socket_path", &self.socket_path)
63 .field("daemon_version", &self.daemon_version)
64 .finish_non_exhaustive()
65 }
66}
67
68impl IpcServer {
69 pub async fn bind(
72 config: Arc<DaemonConfig>,
73 manager: Arc<WorkspaceManager>,
74 dispatcher: Arc<RebuildDispatcher>,
75 workspace_builder: Arc<dyn WorkspaceBuilder>,
76 tool_executor: Arc<QueryExecutor>,
77 shutdown: CancellationToken,
78 ) -> DaemonResult<Self> {
79 let socket_path = config.socket_path();
80 let listener = Listener::bind(&config, &socket_path).await?;
81 Ok(Self {
82 listener,
83 socket_path,
84 manager,
85 dispatcher,
86 workspace_builder,
87 tool_executor,
88 shim_registry: ShimRegistry::new(),
89 shutdown,
90 active_connections: Arc::new(AtomicU64::new(0)),
91 config,
92 daemon_version: env!("CARGO_PKG_VERSION"),
93 })
94 }
95
96 #[must_use]
99 pub fn socket_path(&self) -> &Path {
100 &self.socket_path
101 }
102
103 #[must_use]
114 pub fn shim_registry(&self) -> Arc<ShimRegistry> {
115 Arc::clone(&self.shim_registry)
116 }
117
118 pub async fn run(self) -> DaemonResult<()> {
120 let Self {
121 mut listener,
122 manager,
123 dispatcher,
124 workspace_builder,
125 tool_executor,
126 shim_registry,
127 shutdown,
128 active_connections,
129 config,
130 daemon_version,
131 ..
132 } = self;
133
134 loop {
135 tokio::select! {
136 biased;
137 () = shutdown.cancelled() => {
138 tracing::info!(
139 "ipc_server: shutdown requested; draining active connections"
140 );
141 break;
142 }
143 res = listener.accept() => match res {
144 Ok(stream) => {
145 let ctx = HandlerContext {
146 manager: Arc::clone(&manager),
147 dispatcher: Arc::clone(&dispatcher),
148 workspace_builder: Arc::clone(&workspace_builder),
149 tool_executor: Arc::clone(&tool_executor),
150 shim_registry: Arc::clone(&shim_registry),
151 shutdown: shutdown.clone(),
152 config: Arc::clone(&config),
153 daemon_version,
154 };
155 active_connections.fetch_add(1, Ordering::AcqRel);
156 let tracker = Arc::clone(&active_connections);
157 tokio::spawn(async move {
158 let conn_result = match stream {
159 #[cfg(unix)]
160 AcceptedStream::Unix(s) => run_connection(s, ctx).await,
161 #[cfg(windows)]
162 AcceptedStream::Pipe(s) => run_connection(s, ctx).await,
163 };
164 if let Err(e) = conn_result {
165 tracing::debug!(error = %e,
166 "ipc_server: connection terminated with error");
167 }
168 tracker.fetch_sub(1, Ordering::AcqRel);
169 });
170 }
171 Err(e) => {
172 tracing::warn!(error = %e,
173 "ipc_server: accept failed; continuing");
174 tokio::time::sleep(Duration::from_millis(100)).await;
175 }
176 }
177 }
178 }
179
180 let deadline = Instant::now() + Duration::from_secs(config.ipc_shutdown_drain_secs);
182 while Instant::now() < deadline && active_connections.load(Ordering::Acquire) > 0 {
183 tokio::time::sleep(Duration::from_millis(50)).await;
184 }
185 let lingering = active_connections.load(Ordering::Acquire);
186 if lingering > 0 {
187 tracing::warn!(
188 lingering,
189 "ipc_server: {} connections still active at drain deadline",
190 lingering
191 );
192 }
193 Ok(())
194 }
195}
196
197enum AcceptedStream {
202 #[cfg(unix)]
203 Unix(tokio::net::UnixStream),
204 #[cfg(windows)]
205 Pipe(tokio::net::windows::named_pipe::NamedPipeServer),
206}
207
208#[cfg(unix)]
209enum Listener {
210 Unix(tokio::net::UnixListener),
211}
212
213#[cfg(windows)]
214enum Listener {
215 Pipe(WindowsPipeAcceptor),
216}
217
218impl Listener {
219 async fn bind(cfg: &DaemonConfig, path: &Path) -> DaemonResult<Self> {
220 #[cfg(unix)]
221 {
222 let l = bind_unix(cfg, path).await?;
223 Ok(Listener::Unix(l))
224 }
225 #[cfg(windows)]
226 {
227 let _ = cfg; let name = path.to_string_lossy().into_owned();
229 let acceptor = WindowsPipeAcceptor::new(name)?;
230 Ok(Listener::Pipe(acceptor))
231 }
232 }
233
234 async fn accept(&mut self) -> io::Result<AcceptedStream> {
235 match self {
236 #[cfg(unix)]
237 Self::Unix(l) => {
238 let (s, _addr) = l.accept().await?;
239 Ok(AcceptedStream::Unix(s))
240 }
241 #[cfg(windows)]
242 Self::Pipe(a) => {
243 let s = a.accept().await?;
244 Ok(AcceptedStream::Pipe(s))
245 }
246 }
247 }
248}
249
250#[cfg(unix)]
255enum UnixBindMode {
256 RuntimeDir,
257 Configured,
258}
259
260#[cfg(unix)]
261fn classify_bind_mode(cfg: &DaemonConfig) -> UnixBindMode {
262 if cfg.socket.path.is_some() || std::env::var_os(ENV_SOCKET_PATH).is_some() {
263 UnixBindMode::Configured
264 } else {
265 UnixBindMode::RuntimeDir
266 }
267}
268
269#[cfg(unix)]
270async fn bind_unix(cfg: &DaemonConfig, path: &Path) -> DaemonResult<tokio::net::UnixListener> {
271 match classify_bind_mode(cfg) {
272 UnixBindMode::RuntimeDir => bind_unix_runtime(path).await,
273 UnixBindMode::Configured => bind_unix_configured(path).await,
274 }
275}
276
277#[cfg(unix)]
278async fn bind_unix_runtime(path: &Path) -> DaemonResult<tokio::net::UnixListener> {
279 use std::os::unix::fs::PermissionsExt;
280 if let Some(parent) = path.parent() {
281 std::fs::create_dir_all(parent)?;
282 std::fs::set_permissions(parent, std::fs::Permissions::from_mode(0o700))?;
283 }
284 remove_stale_socket_if_dead(path).await?;
285 let listener = tokio::net::UnixListener::bind(path)?;
286 std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
287 Ok(listener)
288}
289
290#[cfg(unix)]
291async fn bind_unix_configured(path: &Path) -> DaemonResult<tokio::net::UnixListener> {
292 use std::os::unix::fs::{FileTypeExt, PermissionsExt};
293 match std::fs::symlink_metadata(path) {
294 Ok(meta) if meta.file_type().is_socket() => {
295 if probe_socket_alive(path).await {
296 return Err(DaemonError::Config {
297 path: path.to_path_buf(),
298 source: anyhow!("socket path already in use by a live daemon"),
299 });
300 }
301 tracing::warn!(
305 path = %path.display(),
306 "stale socket detected at configured path; unlinking and rebinding"
307 );
308 std::fs::remove_file(path)?;
309 }
310 Ok(_) => {
311 return Err(DaemonError::Config {
312 path: path.to_path_buf(),
313 source: anyhow!("configured socket path exists and is not a socket"),
314 });
315 }
316 Err(e) if e.kind() == io::ErrorKind::NotFound => {}
317 Err(e) => return Err(DaemonError::Io(e)),
318 }
319 let listener = tokio::net::UnixListener::bind(path)?;
320 std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
321 Ok(listener)
322}
323
324#[cfg(unix)]
325async fn remove_stale_socket_if_dead(path: &Path) -> DaemonResult<()> {
326 use std::os::unix::fs::FileTypeExt;
327 match std::fs::symlink_metadata(path) {
328 Ok(meta) if meta.file_type().is_socket() => {
329 if probe_socket_alive(path).await {
330 return Err(DaemonError::Config {
331 path: path.to_path_buf(),
332 source: anyhow!("socket path already in use by a live daemon"),
333 });
334 }
335 std::fs::remove_file(path)?;
336 }
337 Ok(_) => {
338 return Err(DaemonError::Config {
339 path: path.to_path_buf(),
340 source: anyhow!("runtime path exists and is not a socket"),
341 });
342 }
343 Err(e) if e.kind() == io::ErrorKind::NotFound => {}
344 Err(e) => return Err(DaemonError::Io(e)),
345 }
346 Ok(())
347}
348
349#[cfg(unix)]
357const PROBE_TIMEOUT: Duration = Duration::from_millis(100);
358
359#[cfg(unix)]
373async fn probe_socket_alive(path: &Path) -> bool {
374 match tokio::time::timeout(PROBE_TIMEOUT, tokio::net::UnixStream::connect(path)).await {
375 Ok(Ok(stream)) => {
376 drop(stream);
381 true
382 }
383 Ok(Err(_)) => false, Err(_elapsed) => false, }
386}
387
388#[cfg(windows)]
393struct WindowsPipeAcceptor {
394 name: String,
395 next: Option<tokio::net::windows::named_pipe::NamedPipeServer>,
396}
397
398#[cfg(windows)]
399impl WindowsPipeAcceptor {
400 fn new(name: String) -> io::Result<Self> {
401 let full = pipe_fullname(&name);
402 let next = Some(create_pipe_instance(&full, true)?);
403 Ok(Self { name: full, next })
404 }
405
406 async fn accept(&mut self) -> io::Result<tokio::net::windows::named_pipe::NamedPipeServer> {
407 let server = self.next.take().ok_or_else(|| {
408 io::Error::other("pipe acceptor in invalid state: no pending instance")
409 })?;
410 server.connect().await?;
411 self.next = Some(create_pipe_instance(&self.name, false)?);
412 Ok(server)
413 }
414}
415
416#[cfg(windows)]
417fn pipe_fullname(name: &str) -> String {
418 if name.starts_with(r"\\.\pipe\") {
419 name.to_owned()
420 } else {
421 format!(r"\\.\pipe\{name}")
422 }
423}
424
425#[cfg(windows)]
426fn create_pipe_instance(
427 full_name: &str,
428 first: bool,
429) -> io::Result<tokio::net::windows::named_pipe::NamedPipeServer> {
430 use tokio::net::windows::named_pipe::{PipeMode, ServerOptions};
431 ServerOptions::new()
432 .first_pipe_instance(first)
433 .reject_remote_clients(true)
434 .pipe_mode(PipeMode::Byte)
435 .max_instances(255)
436 .access_inbound(true)
437 .access_outbound(true)
438 .create(full_name)
439}