use std::io;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use anyhow::anyhow;
use sqry_core::query::executor::QueryExecutor;
use tokio_util::sync::CancellationToken;
use crate::config::{DaemonConfig, ENV_SOCKET_PATH};
use crate::error::{DaemonError, DaemonResult};
use crate::rebuild::RebuildDispatcher;
use crate::workspace::{WorkspaceBuilder, WorkspaceManager};
use super::methods::HandlerContext;
use super::router::run_connection;
use super::shim_registry::ShimRegistry;
pub struct IpcServer {
listener: Listener,
socket_path: PathBuf,
manager: Arc<WorkspaceManager>,
dispatcher: Arc<RebuildDispatcher>,
workspace_builder: Arc<dyn WorkspaceBuilder>,
tool_executor: Arc<QueryExecutor>,
shim_registry: Arc<ShimRegistry>,
shutdown: CancellationToken,
active_connections: Arc<AtomicU64>,
config: Arc<DaemonConfig>,
daemon_version: &'static str,
}
impl std::fmt::Debug for IpcServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IpcServer")
.field("socket_path", &self.socket_path)
.field("daemon_version", &self.daemon_version)
.finish_non_exhaustive()
}
}
impl IpcServer {
pub async fn bind(
config: Arc<DaemonConfig>,
manager: Arc<WorkspaceManager>,
dispatcher: Arc<RebuildDispatcher>,
workspace_builder: Arc<dyn WorkspaceBuilder>,
tool_executor: Arc<QueryExecutor>,
shutdown: CancellationToken,
) -> DaemonResult<Self> {
let socket_path = config.socket_path();
let listener = Listener::bind(&config, &socket_path).await?;
Ok(Self {
listener,
socket_path,
manager,
dispatcher,
workspace_builder,
tool_executor,
shim_registry: ShimRegistry::new(),
shutdown,
active_connections: Arc::new(AtomicU64::new(0)),
config,
daemon_version: env!("CARGO_PKG_VERSION"),
})
}
#[must_use]
pub fn socket_path(&self) -> &Path {
&self.socket_path
}
#[must_use]
pub fn shim_registry(&self) -> Arc<ShimRegistry> {
Arc::clone(&self.shim_registry)
}
pub async fn run(self) -> DaemonResult<()> {
let Self {
mut listener,
manager,
dispatcher,
workspace_builder,
tool_executor,
shim_registry,
shutdown,
active_connections,
config,
daemon_version,
..
} = self;
loop {
tokio::select! {
biased;
() = shutdown.cancelled() => {
tracing::info!(
"ipc_server: shutdown requested; draining active connections"
);
break;
}
res = listener.accept() => match res {
Ok(stream) => {
let ctx = HandlerContext {
manager: Arc::clone(&manager),
dispatcher: Arc::clone(&dispatcher),
workspace_builder: Arc::clone(&workspace_builder),
tool_executor: Arc::clone(&tool_executor),
shim_registry: Arc::clone(&shim_registry),
shutdown: shutdown.clone(),
config: Arc::clone(&config),
daemon_version,
};
active_connections.fetch_add(1, Ordering::AcqRel);
let tracker = Arc::clone(&active_connections);
tokio::spawn(async move {
let conn_result = match stream {
#[cfg(unix)]
AcceptedStream::Unix(s) => run_connection(s, ctx).await,
#[cfg(windows)]
AcceptedStream::Pipe(s) => run_connection(s, ctx).await,
};
if let Err(e) = conn_result {
tracing::debug!(error = %e,
"ipc_server: connection terminated with error");
}
tracker.fetch_sub(1, Ordering::AcqRel);
});
}
Err(e) => {
tracing::warn!(error = %e,
"ipc_server: accept failed; continuing");
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}
let deadline = Instant::now() + Duration::from_secs(config.ipc_shutdown_drain_secs);
while Instant::now() < deadline && active_connections.load(Ordering::Acquire) > 0 {
tokio::time::sleep(Duration::from_millis(50)).await;
}
let lingering = active_connections.load(Ordering::Acquire);
if lingering > 0 {
tracing::warn!(
lingering,
"ipc_server: {} connections still active at drain deadline",
lingering
);
}
Ok(())
}
}
enum AcceptedStream {
#[cfg(unix)]
Unix(tokio::net::UnixStream),
#[cfg(windows)]
Pipe(tokio::net::windows::named_pipe::NamedPipeServer),
}
#[cfg(unix)]
enum Listener {
Unix(tokio::net::UnixListener),
}
#[cfg(windows)]
enum Listener {
Pipe(WindowsPipeAcceptor),
}
impl Listener {
async fn bind(cfg: &DaemonConfig, path: &Path) -> DaemonResult<Self> {
#[cfg(unix)]
{
let l = bind_unix(cfg, path).await?;
Ok(Listener::Unix(l))
}
#[cfg(windows)]
{
let _ = cfg; let name = path.to_string_lossy().into_owned();
let acceptor = WindowsPipeAcceptor::new(name)?;
Ok(Listener::Pipe(acceptor))
}
}
async fn accept(&mut self) -> io::Result<AcceptedStream> {
match self {
#[cfg(unix)]
Self::Unix(l) => {
let (s, _addr) = l.accept().await?;
Ok(AcceptedStream::Unix(s))
}
#[cfg(windows)]
Self::Pipe(a) => {
let s = a.accept().await?;
Ok(AcceptedStream::Pipe(s))
}
}
}
}
#[cfg(unix)]
enum UnixBindMode {
RuntimeDir,
Configured,
}
#[cfg(unix)]
fn classify_bind_mode(cfg: &DaemonConfig) -> UnixBindMode {
if cfg.socket.path.is_some() || std::env::var_os(ENV_SOCKET_PATH).is_some() {
UnixBindMode::Configured
} else {
UnixBindMode::RuntimeDir
}
}
#[cfg(unix)]
async fn bind_unix(cfg: &DaemonConfig, path: &Path) -> DaemonResult<tokio::net::UnixListener> {
match classify_bind_mode(cfg) {
UnixBindMode::RuntimeDir => bind_unix_runtime(path).await,
UnixBindMode::Configured => bind_unix_configured(path).await,
}
}
#[cfg(unix)]
async fn bind_unix_runtime(path: &Path) -> DaemonResult<tokio::net::UnixListener> {
use std::os::unix::fs::PermissionsExt;
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
std::fs::set_permissions(parent, std::fs::Permissions::from_mode(0o700))?;
}
remove_stale_socket_if_dead(path).await?;
let listener = tokio::net::UnixListener::bind(path)?;
std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
Ok(listener)
}
#[cfg(unix)]
async fn bind_unix_configured(path: &Path) -> DaemonResult<tokio::net::UnixListener> {
use std::os::unix::fs::{FileTypeExt, PermissionsExt};
match std::fs::symlink_metadata(path) {
Ok(meta) if meta.file_type().is_socket() => {
if probe_socket_alive(path).await {
return Err(DaemonError::Config {
path: path.to_path_buf(),
source: anyhow!("socket path already in use by a live daemon"),
});
}
tracing::warn!(
path = %path.display(),
"stale socket detected at configured path; unlinking and rebinding"
);
std::fs::remove_file(path)?;
}
Ok(_) => {
return Err(DaemonError::Config {
path: path.to_path_buf(),
source: anyhow!("configured socket path exists and is not a socket"),
});
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {}
Err(e) => return Err(DaemonError::Io(e)),
}
let listener = tokio::net::UnixListener::bind(path)?;
std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
Ok(listener)
}
#[cfg(unix)]
async fn remove_stale_socket_if_dead(path: &Path) -> DaemonResult<()> {
use std::os::unix::fs::FileTypeExt;
match std::fs::symlink_metadata(path) {
Ok(meta) if meta.file_type().is_socket() => {
if probe_socket_alive(path).await {
return Err(DaemonError::Config {
path: path.to_path_buf(),
source: anyhow!("socket path already in use by a live daemon"),
});
}
std::fs::remove_file(path)?;
}
Ok(_) => {
return Err(DaemonError::Config {
path: path.to_path_buf(),
source: anyhow!("runtime path exists and is not a socket"),
});
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {}
Err(e) => return Err(DaemonError::Io(e)),
}
Ok(())
}
#[cfg(unix)]
const PROBE_TIMEOUT: Duration = Duration::from_millis(100);
#[cfg(unix)]
async fn probe_socket_alive(path: &Path) -> bool {
match tokio::time::timeout(PROBE_TIMEOUT, tokio::net::UnixStream::connect(path)).await {
Ok(Ok(stream)) => {
drop(stream);
true
}
Ok(Err(_)) => false, Err(_elapsed) => false, }
}
#[cfg(windows)]
struct WindowsPipeAcceptor {
name: String,
next: Option<tokio::net::windows::named_pipe::NamedPipeServer>,
}
#[cfg(windows)]
impl WindowsPipeAcceptor {
fn new(name: String) -> io::Result<Self> {
let full = pipe_fullname(&name);
let next = Some(create_pipe_instance(&full, true)?);
Ok(Self { name: full, next })
}
async fn accept(&mut self) -> io::Result<tokio::net::windows::named_pipe::NamedPipeServer> {
let server = self.next.take().ok_or_else(|| {
io::Error::other("pipe acceptor in invalid state: no pending instance")
})?;
server.connect().await?;
self.next = Some(create_pipe_instance(&self.name, false)?);
Ok(server)
}
}
#[cfg(windows)]
fn pipe_fullname(name: &str) -> String {
if name.starts_with(r"\\.\pipe\") {
name.to_owned()
} else {
format!(r"\\.\pipe\{name}")
}
}
#[cfg(windows)]
fn create_pipe_instance(
full_name: &str,
first: bool,
) -> io::Result<tokio::net::windows::named_pipe::NamedPipeServer> {
use tokio::net::windows::named_pipe::{PipeMode, ServerOptions};
ServerOptions::new()
.first_pipe_instance(first)
.reject_remote_clients(true)
.pipe_mode(PipeMode::Byte)
.max_instances(255)
.access_inbound(true)
.access_outbound(true)
.create(full_name)
}