use std::{path::PathBuf, process::ExitCode, sync::Arc, time::Duration};
use clap::{Parser, Subcommand};
use sqry_core::query::executor::QueryExecutor;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use crate::{
DaemonConfig, DaemonError, DaemonResult, IpcServer, RealWorkspaceBuilder, RebuildDispatcher,
WorkspaceManager,
lifecycle::{
log_rotate::install_tracing,
notify::{is_under_systemd, notify_ready},
pidfile::acquire_pidfile_lock,
signals::install_signal_handlers,
units::InstallOptions,
},
};
#[cfg(unix)]
use crate::lifecycle::pidfile::PidfileLock;
const ENV_READY_PIPE_FD: &str = "SQRYD_READY_PIPE_FD";
#[cfg(unix)]
const ENV_LOCK_FD: &str = "SQRYD_LOCK_FD";
#[cfg(unix)]
const ENV_PIDFILE_PATH: &str = "SQRYD_PIDFILE_PATH";
#[cfg(unix)]
const ENV_LOCKFILE_PATH: &str = "SQRYD_LOCKFILE_PATH";
#[derive(Debug, Parser)]
#[command(
name = "sqryd",
about = "sqry daemon — persistent semantic code-search graph service",
version,
author
)]
pub struct SqrydCli {
#[arg(long, value_name = "FILE", env = "SQRY_DAEMON_CONFIG", global = true)]
pub config: Option<PathBuf>,
#[arg(long, value_name = "LEVEL", global = true)]
pub log_level: Option<String>,
#[command(subcommand)]
pub command: Option<Command>,
}
#[derive(Debug, Subcommand)]
pub enum Command {
Start(Start),
Foreground,
Stop {
#[arg(long, default_value_t = 15)]
timeout_secs: u64,
},
Status {
#[arg(long)]
json: bool,
},
#[cfg(target_os = "linux")]
InstallSystemdUser,
#[cfg(target_os = "linux")]
InstallSystemdSystem {
#[arg(long)]
user: Option<String>,
},
#[cfg(target_os = "macos")]
InstallLaunchd,
#[cfg(target_os = "windows")]
InstallWindows,
PrintConfig,
}
#[derive(Debug, clap::Args, Default)]
pub struct Start {
#[arg(long)]
pub detach: bool,
#[arg(long, hide = true)]
pub spawned_by_client: bool,
}
pub fn run() -> DaemonResult<()> {
let cli = SqrydCli::parse();
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.map_err(DaemonError::Io)?;
let log_level_owned = cli.log_level.clone();
let log_level = log_level_owned.as_deref();
let config_path = cli.config.clone();
let command = cli.command.unwrap_or(Command::Start(Start::default()));
match command {
Command::Start(start) => rt.block_on(run_start(start, config_path, log_level)),
Command::Foreground => rt.block_on(run_start(Start::default(), config_path, log_level)),
Command::Stop { timeout_secs } => {
rt.block_on(run_stop(config_path, log_level, timeout_secs))
}
Command::Status { json } => rt.block_on(run_status(config_path, log_level, json)),
#[cfg(target_os = "linux")]
Command::InstallSystemdUser => run_install_systemd_user(config_path, log_level),
#[cfg(target_os = "linux")]
Command::InstallSystemdSystem { user } => {
run_install_systemd_system(config_path, log_level, user)
}
#[cfg(target_os = "macos")]
Command::InstallLaunchd => run_install_launchd(config_path, log_level),
#[cfg(target_os = "windows")]
Command::InstallWindows => run_install_windows(config_path, log_level),
Command::PrintConfig => run_print_config(config_path, log_level),
}
}
async fn run_start(
args: Start,
config_path: Option<PathBuf>,
log_level: Option<&str>,
) -> DaemonResult<()> {
if args.spawned_by_client {
return run_start_spawned_by_client(config_path, log_level).await;
}
if args.detach {
return run_start_detach(config_path, log_level).await;
}
run_start_foreground(config_path, log_level).await
}
async fn run_start_foreground(
config_path: Option<PathBuf>,
log_level: Option<&str>,
) -> DaemonResult<()> {
let cfg = load_config(config_path)?;
let cfg = Arc::new(cfg);
let _tracing_guard = match install_tracing(&cfg, log_level) {
Ok(g) => g,
Err(e) => {
eprintln!("sqryd: warning: tracing setup: {e:#}");
None
}
};
info!(
version = env!("CARGO_PKG_VERSION"),
socket = %cfg.socket_path().display(),
pid_file = %cfg.pid_path().display(),
"sqryd starting"
);
create_runtime_dir(&cfg)?;
let pidfile_lock = acquire_pidfile_lock(&cfg)?;
info!(pid_file = %cfg.pid_path().display(), "pidfile lock acquired");
let (manager, dispatcher, builder, executor) = build_daemon_components(Arc::clone(&cfg));
let shutdown = CancellationToken::new();
let _signal_guard = install_signal_handlers(shutdown.clone())?;
info!("signal handlers installed");
preload_pinned_workspaces(&cfg, &manager, &builder).await;
let server = IpcServer::bind(
Arc::clone(&cfg),
Arc::clone(&manager),
Arc::clone(&dispatcher),
Arc::clone(&builder),
Arc::clone(&executor),
shutdown.clone(),
)
.await?;
info!(socket = %server.socket_path().display(), "IPC server bound");
signal_ready(&cfg, server.socket_path());
server.run().await?;
info!("sqryd shutdown complete");
drop(_signal_guard);
drop(pidfile_lock);
Ok(())
}
async fn run_start_detach(
config_path: Option<PathBuf>,
log_level: Option<&str>,
) -> DaemonResult<()> {
#[cfg(unix)]
{
run_start_detach_unix(config_path, log_level).await
}
#[cfg(not(unix))]
{
let cfg = load_config(config_path.clone())?;
setup_stderr_tracing(log_level, &cfg);
drop(cfg);
warn!(
"--detach is a no-op on Windows; running in the foreground instead. \
Use Task Scheduler or sc.exe to run sqryd as a background service."
);
run_start_foreground(config_path, log_level).await
}
}
#[cfg(unix)]
async fn run_start_detach_unix(
config_path: Option<PathBuf>,
log_level: Option<&str>,
) -> DaemonResult<()> {
let cfg = load_config(config_path.clone())?;
let cfg = Arc::new(cfg);
let _tracing_guard = match install_tracing(&cfg, log_level) {
Ok(g) => g,
Err(e) => {
eprintln!("sqryd: warning: tracing setup (parent): {e:#}");
None
}
};
create_runtime_dir(&cfg)?;
let mut pidfile_lock = acquire_pidfile_lock(&cfg)?;
info!(pid_file = %cfg.pid_path().display(), "parent: pidfile lock acquired (WriteOwner)");
let (read_fd, write_fd) = create_pipe()?;
let lock_fd = pidfile_lock.as_raw_fd();
let pidfile_path = cfg.pid_path();
let lockfile_path = cfg.lock_path();
let exe = std::env::current_exe()
.map_err(|e| DaemonError::Io(std::io::Error::other(format!("current_exe: {e}"))))?;
let mut cmd = std::process::Command::new(&exe);
cmd.args(["start", "--detach", "--spawned-by-client"]);
if let Some(ref cp) = config_path {
cmd.arg("--config").arg(cp);
}
if let Some(ll) = log_level {
cmd.arg("--log-level").arg(ll);
}
cmd.env(ENV_READY_PIPE_FD, write_fd.to_string());
cmd.env(ENV_LOCK_FD, lock_fd.to_string());
cmd.env(ENV_PIDFILE_PATH, pidfile_path.as_os_str());
cmd.env(ENV_LOCKFILE_PATH, lockfile_path.as_os_str());
cmd.stdin(std::process::Stdio::null());
cmd.stdout(std::process::Stdio::null());
cmd.stderr(std::process::Stdio::null());
let write_fd_copy = write_fd;
let lock_fd_copy = lock_fd;
unsafe {
use std::os::unix::process::CommandExt as _;
cmd.pre_exec(move || {
if libc::setsid() < 0 {
return Err(std::io::Error::last_os_error());
}
for fd in [write_fd_copy, lock_fd_copy] {
let flags = libc::fcntl(fd, libc::F_GETFD);
if flags < 0 {
return Err(std::io::Error::last_os_error());
}
let rc = libc::fcntl(fd, libc::F_SETFD, flags & !libc::FD_CLOEXEC);
if rc < 0 {
return Err(std::io::Error::last_os_error());
}
}
Ok(())
});
}
let mut child = cmd.spawn().map_err(|e| {
DaemonError::Io(std::io::Error::other(format!(
"failed to spawn grandchild sqryd process: {e}"
)))
})?;
let grandchild_pid = child.id();
info!(pid = grandchild_pid, "spawned grandchild");
drop_raw_fd(write_fd);
let timeout_secs = cfg.auto_start_ready_timeout_secs;
let deadline = std::time::Instant::now() + Duration::from_secs(timeout_secs);
let result = poll_ready_pipe(read_fd, deadline);
drop_raw_fd(read_fd);
match result {
Ok(()) => {
match child.try_wait() {
Ok(Some(status)) => {
warn!(
pid = grandchild_pid,
?status,
"grandchild exited before signalling ready (pipe EOF was process death)"
);
drop(pidfile_lock);
return Err(DaemonError::AutoStartTimeout {
timeout_secs,
socket: cfg.socket_path(),
});
}
Ok(None) => {
}
Err(e) => {
warn!(
pid = grandchild_pid,
err = %e,
"try_wait after pipe EOF failed -- assuming grandchild is alive"
);
}
}
pidfile_lock.hand_off_to_adopter();
info!(
pid = grandchild_pid,
"grandchild signalled ready -- parent exiting 0 (Handoff)"
);
drop(pidfile_lock);
Ok(())
}
Err(()) => {
warn!(
pid = grandchild_pid,
timeout_secs, "grandchild did not signal ready within timeout -- killing"
);
if let Err(e) = child.kill() {
warn!(pid = grandchild_pid, err = %e, "kill(grandchild) failed");
}
let _ = child.wait();
drop(pidfile_lock);
Err(DaemonError::AutoStartTimeout {
timeout_secs,
socket: cfg.socket_path(),
})
}
}
}
async fn run_start_spawned_by_client(
config_path: Option<PathBuf>,
log_level: Option<&str>,
) -> DaemonResult<()> {
#[cfg(unix)]
{
run_start_spawned_by_client_unix(config_path, log_level).await
}
#[cfg(not(unix))]
{
warn!("--spawned-by-client reached on non-Unix -- running foreground");
run_start_foreground(config_path, log_level).await
}
}
#[cfg(unix)]
async fn run_start_spawned_by_client_unix(
config_path: Option<PathBuf>,
log_level: Option<&str>,
) -> DaemonResult<()> {
use std::os::unix::io::RawFd;
let lock_fd: RawFd = read_env_fd(ENV_LOCK_FD).ok_or_else(|| {
DaemonError::Io(std::io::Error::other(
"grandchild: SQRYD_LOCK_FD not set (only valid via --detach parent spawn)",
))
})?;
let ready_pipe_fd: RawFd = read_env_fd(ENV_READY_PIPE_FD).ok_or_else(|| {
DaemonError::Io(std::io::Error::other(
"grandchild: SQRYD_READY_PIPE_FD not set",
))
})?;
let pidfile_path: PathBuf = std::env::var_os(ENV_PIDFILE_PATH)
.map(PathBuf::from)
.ok_or_else(|| {
DaemonError::Io(std::io::Error::other(
"grandchild: SQRYD_PIDFILE_PATH not set",
))
})?;
let lockfile_path: PathBuf = std::env::var_os(ENV_LOCKFILE_PATH)
.map(PathBuf::from)
.ok_or_else(|| {
DaemonError::Io(std::io::Error::other(
"grandchild: SQRYD_LOCKFILE_PATH not set",
))
})?;
let cfg = load_config(config_path)?;
let cfg = Arc::new(cfg);
write_pid_file_grandchild(&cfg.pid_path())?;
let _pidfile_lock = unsafe { PidfileLock::adopt(lock_fd, pidfile_path, lockfile_path) };
info!(
version = env!("CARGO_PKG_VERSION"),
pid = std::process::id(),
"sqryd grandchild: pidfile lock adopted -- beginning foreground startup"
);
run_start_foreground_inner(cfg, log_level, ready_pipe_fd).await
}
async fn run_start_foreground_inner(
cfg: Arc<DaemonConfig>,
log_level: Option<&str>,
#[cfg(unix)] ready_pipe_write_fd: libc::c_int,
#[cfg(not(unix))] _ready_pipe_write_fd: i32,
) -> DaemonResult<()> {
let _tracing_guard = match install_tracing(&cfg, log_level) {
Ok(g) => g,
Err(e) => {
eprintln!("sqryd: warning: tracing setup: {e:#}");
None
}
};
info!(
version = env!("CARGO_PKG_VERSION"),
socket = %cfg.socket_path().display(),
"sqryd grandchild: tracing active"
);
create_runtime_dir(&cfg)?;
let (manager, dispatcher, builder, executor) = build_daemon_components(Arc::clone(&cfg));
let shutdown = CancellationToken::new();
let _signal_guard = install_signal_handlers(shutdown.clone())?;
preload_pinned_workspaces(&cfg, &manager, &builder).await;
let server = IpcServer::bind(
Arc::clone(&cfg),
Arc::clone(&manager),
Arc::clone(&dispatcher),
Arc::clone(&builder),
Arc::clone(&executor),
shutdown.clone(),
)
.await?;
info!(socket = %server.socket_path().display(), "IPC server bound");
signal_ready(&cfg, server.socket_path());
#[cfg(unix)]
if ready_pipe_write_fd >= 0 {
close_ready_pipe_fd(ready_pipe_write_fd);
}
server.run().await?;
info!("sqryd shutdown complete");
Ok(())
}
async fn run_stop(
config_path: Option<PathBuf>,
log_level: Option<&str>,
timeout_secs: u64,
) -> DaemonResult<()> {
let cfg = load_config(config_path)?;
setup_stderr_tracing(log_level, &cfg);
let socket_path = cfg.socket_path();
info!(socket = %socket_path.display(), "connecting to daemon to send daemon/stop");
let stop_req = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "daemon/stop",
"params": {}
});
send_management_request(&socket_path, &stop_req).await?;
info!(
timeout_secs,
"waiting for daemon socket to become unreachable"
);
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
loop {
if !crate::lifecycle::detach::try_connect_path(&socket_path).await {
info!("daemon socket gone -- stop complete");
return Ok(());
}
if std::time::Instant::now() >= deadline {
return Err(DaemonError::AutoStartTimeout {
timeout_secs,
socket: socket_path,
});
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
async fn run_status(
config_path: Option<PathBuf>,
log_level: Option<&str>,
json_output: bool,
) -> DaemonResult<()> {
let cfg = load_config(config_path)?;
setup_stderr_tracing(log_level, &cfg);
let socket_path = cfg.socket_path();
if !crate::lifecycle::detach::try_connect_path(&socket_path).await {
eprintln!(
"sqryd: daemon is not running (socket not connectable: {})",
socket_path.display()
);
return Err(DaemonError::Io(std::io::Error::other(format!(
"daemon socket not reachable: {}",
socket_path.display()
))));
}
let status_req = serde_json::json!({
"jsonrpc": "2.0",
"id": 1,
"method": "daemon/status",
"params": {}
});
let resp_buf = send_management_request(&socket_path, &status_req).await?;
if json_output {
println!("{}", String::from_utf8_lossy(&resp_buf));
} else {
let v = serde_json::from_slice::<serde_json::Value>(&resp_buf).map_err(|e| {
DaemonError::Io(std::io::Error::other(format!(
"daemon/status response was not valid JSON: {e} (raw: {})",
String::from_utf8_lossy(&resp_buf)
)))
})?;
if let Some(result) = v.get("result") {
render_status_human(result);
} else if let Some(err_val) = v.get("error") {
eprintln!("sqryd status error: {err_val}");
return Err(DaemonError::Io(std::io::Error::other(format!(
"daemon/status error: {err_val}"
))));
} else {
println!("{}", serde_json::to_string_pretty(&v).unwrap_or_default());
}
}
Ok(())
}
async fn send_management_request(
socket_path: &std::path::Path,
req: &serde_json::Value,
) -> DaemonResult<Vec<u8>> {
use crate::{DaemonHello, DaemonHelloResponse};
use sqry_daemon_protocol::framing::{read_frame, write_frame_json};
#[cfg(unix)]
let mut stream = {
tokio::net::UnixStream::connect(socket_path)
.await
.map_err(|e| {
DaemonError::Io(std::io::Error::other(format!(
"connect to daemon socket {}: {e}",
socket_path.display()
)))
})?
};
#[cfg(windows)]
let mut stream = {
use tokio::net::windows::named_pipe::ClientOptions;
let pipe_path = socket_path.to_string_lossy();
ClientOptions::new().open(pipe_path.as_ref()).map_err(|e| {
DaemonError::Io(std::io::Error::other(format!(
"connect to daemon pipe {}: {e}",
pipe_path
)))
})?
};
let hello = DaemonHello {
client_version: env!("CARGO_PKG_VERSION").to_owned(),
protocol_version: 1,
logical_workspace: None,
};
write_frame_json(&mut stream, &hello)
.await
.map_err(|e| DaemonError::Io(std::io::Error::other(format!("send hello: {e}"))))?;
let hello_resp_bytes = read_frame(&mut stream)
.await
.map_err(|e| DaemonError::Io(std::io::Error::other(format!("read hello response: {e}"))))?
.ok_or_else(|| {
DaemonError::Io(std::io::Error::other(
"daemon closed connection before hello response",
))
})?;
let hello_resp: DaemonHelloResponse =
serde_json::from_slice(&hello_resp_bytes).map_err(|e| {
DaemonError::Io(std::io::Error::other(format!("parse hello response: {e}")))
})?;
if !hello_resp.compatible {
return Err(DaemonError::Io(std::io::Error::other(
"daemon is not compatible with this client version",
)));
}
write_frame_json(&mut stream, req)
.await
.map_err(|e| DaemonError::Io(std::io::Error::other(format!("send request: {e}"))))?;
let resp_bytes = read_frame(&mut stream)
.await
.map_err(|e| DaemonError::Io(std::io::Error::other(format!("read response: {e}"))))?
.ok_or_else(|| {
DaemonError::Io(std::io::Error::other(
"daemon closed connection before sending response",
))
})?;
Ok(resp_bytes)
}
fn render_status_human(result: &serde_json::Value) {
let payload = result.get("data").unwrap_or(result);
let version = payload
.get("daemon_version")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let uptime = payload
.get("uptime_seconds")
.and_then(|v| v.as_u64())
.unwrap_or(0);
println!("sqryd version: {version}");
println!(" uptime: {uptime}s");
if let Some(memory) = payload.get("memory") {
let limit = memory
.get("limit_bytes")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let current = memory
.get("current_bytes")
.and_then(|v| v.as_u64())
.unwrap_or(0);
println!(
" memory: {} MiB used / {} MiB limit",
current / (1024 * 1024),
limit / (1024 * 1024)
);
}
if let Some(workspaces) = payload.get("workspaces").and_then(|v| v.as_array()) {
println!(" workspaces: {}", workspaces.len());
for ws in workspaces {
let path = ws.get("index_root").and_then(|v| v.as_str()).unwrap_or("?");
let state = ws
.get("state")
.and_then(|v| v.as_str())
.unwrap_or("Unknown");
println!(" {state:10} {path}");
}
}
}
#[cfg(target_os = "linux")]
fn run_install_systemd_user(
config_path: Option<PathBuf>,
log_level: Option<&str>,
) -> DaemonResult<()> {
let cfg = load_config(config_path)?;
setup_stderr_tracing(log_level, &cfg);
let opts = InstallOptions::default();
let unit = crate::lifecycle::units::systemd::generate_user_unit(&cfg, &opts);
println!("{unit}");
Ok(())
}
#[cfg(target_os = "linux")]
fn run_install_systemd_system(
config_path: Option<PathBuf>,
log_level: Option<&str>,
user: Option<String>,
) -> DaemonResult<()> {
let cfg = load_config(config_path)?;
setup_stderr_tracing(log_level, &cfg);
let opts = InstallOptions {
user: user.clone(),
..Default::default()
};
let resolved_user =
crate::lifecycle::units::systemd::resolve_system_unit_user(&opts).map_err(|e| {
DaemonError::Config {
path: cfg
.pid_path()
.parent()
.unwrap_or_else(|| std::path::Path::new("."))
.to_owned(),
source: anyhow::anyhow!("{e}"),
}
})?;
let opts_with_user = InstallOptions {
user: Some(resolved_user),
..Default::default()
};
let unit = crate::lifecycle::units::systemd::generate_system_unit(&cfg, &opts_with_user);
println!("{unit}");
Ok(())
}
#[cfg(target_os = "macos")]
fn run_install_launchd(config_path: Option<PathBuf>, log_level: Option<&str>) -> DaemonResult<()> {
let cfg = load_config(config_path)?;
setup_stderr_tracing(log_level, &cfg);
let opts = InstallOptions::default();
let plist = crate::lifecycle::units::launchd::generate_plist(&cfg, &opts);
println!("{plist}");
Ok(())
}
#[cfg(target_os = "windows")]
fn run_install_windows(config_path: Option<PathBuf>, log_level: Option<&str>) -> DaemonResult<()> {
let cfg = load_config(config_path)?;
setup_stderr_tracing(log_level, &cfg);
let opts = InstallOptions::default();
let sc = crate::lifecycle::units::windows::generate_sc_create(&cfg, &opts);
let xml = crate::lifecycle::units::windows::generate_task_xml(&cfg, &opts);
println!("-- sc.exe create command --");
println!("{sc}");
println!();
println!("-- Task Scheduler XML --");
println!("{xml}");
Ok(())
}
fn run_print_config(config_path: Option<PathBuf>, log_level: Option<&str>) -> DaemonResult<()> {
let cfg = load_config(config_path)?;
setup_stderr_tracing(log_level, &cfg);
let toml_str = toml::to_string_pretty(&cfg).map_err(|e| DaemonError::Config {
path: PathBuf::from("<serialise>"),
source: anyhow::anyhow!("toml serialisation failed: {e}"),
})?;
println!("{toml_str}");
Ok(())
}
pub fn main_impl() -> ExitCode {
match run() {
Ok(()) => ExitCode::SUCCESS,
Err(err) => {
error!("sqryd: fatal: {err:#}");
eprintln!("sqryd: {err:#}");
ExitCode::from(err.exit_code())
}
}
}
fn load_config(config_path: Option<PathBuf>) -> DaemonResult<DaemonConfig> {
if let Some(ref p) = config_path {
let mut cfg = DaemonConfig::load_from_path(p)?;
cfg.apply_env_overrides()?;
cfg.validate()?;
Ok(cfg)
} else {
DaemonConfig::load()
}
}
fn setup_stderr_tracing(log_level: Option<&str>, cfg: &DaemonConfig) {
let level = log_level
.map(ToOwned::to_owned)
.or_else(|| std::env::var("SQRY_DAEMON_LOG_LEVEL").ok())
.unwrap_or_else(|| cfg.log_level.clone());
let filter = tracing_subscriber::EnvFilter::try_new(&level)
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
let _ = tracing_subscriber::fmt()
.compact()
.with_env_filter(filter)
.try_init();
}
fn create_runtime_dir(cfg: &DaemonConfig) -> DaemonResult<()> {
let dir = cfg.runtime_dir();
std::fs::create_dir_all(&dir).map_err(DaemonError::Io)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt as _;
let perms = std::fs::Permissions::from_mode(0o700);
std::fs::set_permissions(&dir, perms).map_err(DaemonError::Io)?;
}
Ok(())
}
fn build_daemon_components(
cfg: Arc<DaemonConfig>,
) -> (
Arc<WorkspaceManager>,
Arc<RebuildDispatcher>,
Arc<dyn crate::workspace::WorkspaceBuilder>,
Arc<QueryExecutor>,
) {
let plugins = Arc::new(sqry_plugin_registry::create_plugin_manager());
let manager = WorkspaceManager::new(Arc::clone(&cfg));
let dispatcher =
RebuildDispatcher::new(Arc::clone(&manager), Arc::clone(&cfg), Arc::clone(&plugins));
let builder: Arc<dyn crate::workspace::WorkspaceBuilder> =
Arc::new(RealWorkspaceBuilder::new(Arc::clone(&plugins)));
let executor = Arc::new(QueryExecutor::new());
(manager, dispatcher, builder, executor)
}
fn signal_ready(cfg: &DaemonConfig, socket_path: &std::path::Path) {
if is_under_systemd() {
if let Err(e) = notify_ready() {
warn!(err = %e, "sd_notify(READY=1) failed -- systemctl may time out");
} else {
info!("sd_notify: READY=1 sent");
}
}
let ready_path = cfg.runtime_dir().join("sqryd.ready");
if let Err(e) = std::fs::write(&ready_path, b"") {
warn!(
path = %ready_path.display(),
err = %e,
"could not touch sqryd.ready sentinel (non-fatal)"
);
}
info!(
socket = %socket_path.display(),
"sqryd ready -- accepting connections"
);
}
async fn preload_pinned_workspaces(
cfg: &DaemonConfig,
manager: &Arc<WorkspaceManager>,
builder: &Arc<dyn crate::workspace::WorkspaceBuilder>,
) {
use sqry_core::project::ProjectRootMode;
for ws_cfg in &cfg.workspaces {
if ws_cfg.exclude || !ws_cfg.pinned {
continue;
}
let root = ws_cfg.path.clone();
let key =
crate::workspace::WorkspaceKey::new(root.clone(), ProjectRootMode::WorkspaceFolder, 0);
info!(path = %root.display(), "pre-loading pinned workspace");
let estimate =
crate::workspace::working_set_estimate(crate::workspace::WorkingSetInputs::default());
if let Err(e) = manager.get_or_load(&key, builder.as_ref(), estimate) {
warn!(
path = %root.display(),
err = %e,
"pinned workspace pre-load failed (log + continue per §C.3.1 step 13)"
);
}
}
}
#[cfg(all(unix, target_os = "linux"))]
fn create_pipe() -> DaemonResult<(libc::c_int, libc::c_int)> {
let mut fds = [0i32; 2];
let rc = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC) };
if rc < 0 {
return Err(DaemonError::Io(std::io::Error::last_os_error()));
}
Ok((fds[0], fds[1]))
}
#[cfg(all(unix, not(target_os = "linux")))]
fn create_pipe() -> DaemonResult<(libc::c_int, libc::c_int)> {
let mut fds = [0i32; 2];
let rc = unsafe { libc::pipe(fds.as_mut_ptr()) };
if rc < 0 {
return Err(DaemonError::Io(std::io::Error::last_os_error()));
}
if let Err(err) = set_close_on_exec(fds[0]).and_then(|()| set_close_on_exec(fds[1])) {
drop_raw_fd(fds[0]);
drop_raw_fd(fds[1]);
return Err(err);
}
Ok((fds[0], fds[1]))
}
#[cfg(all(unix, not(target_os = "linux")))]
fn set_close_on_exec(fd: libc::c_int) -> DaemonResult<()> {
let flags = unsafe { libc::fcntl(fd, libc::F_GETFD) };
if flags < 0 {
return Err(DaemonError::Io(std::io::Error::last_os_error()));
}
let rc = unsafe { libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC) };
if rc < 0 {
return Err(DaemonError::Io(std::io::Error::last_os_error()));
}
Ok(())
}
#[cfg(unix)]
fn drop_raw_fd(fd: libc::c_int) {
unsafe { libc::close(fd) };
}
#[cfg(unix)]
fn poll_ready_pipe(read_fd: libc::c_int, deadline: std::time::Instant) -> Result<(), ()> {
use std::io::Read as _;
use std::os::unix::io::FromRawFd as _;
let mut file = unsafe { std::fs::File::from_raw_fd(read_fd) };
unsafe {
let flags = libc::fcntl(read_fd, libc::F_GETFL);
if flags >= 0 {
libc::fcntl(read_fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
}
}
loop {
let mut buf = [0u8; 1];
match file.read(&mut buf) {
Ok(0) => {
std::mem::forget(file);
return Ok(());
}
Ok(_) => {
}
Err(e)
if e.kind() == std::io::ErrorKind::WouldBlock
|| e.raw_os_error() == Some(libc::EAGAIN) =>
{
}
Err(_) => {
std::mem::forget(file);
return Err(());
}
}
if std::time::Instant::now() >= deadline {
std::mem::forget(file);
return Err(());
}
std::thread::sleep(Duration::from_millis(50));
}
}
#[cfg(unix)]
fn close_ready_pipe_fd(fd: libc::c_int) {
unsafe { libc::close(fd) };
}
#[cfg(unix)]
fn read_env_fd(var: &str) -> Option<libc::c_int> {
std::env::var(var).ok()?.parse::<libc::c_int>().ok()
}
#[cfg(unix)]
fn write_pid_file_grandchild(pidfile_path: &std::path::Path) -> DaemonResult<()> {
use std::io::Write as _;
use std::os::unix::fs::OpenOptionsExt as _;
let pid = std::process::id();
let pid_str = format!("{pid}\n");
let tmp_path = pidfile_path.with_extension("tmp.gc");
{
let mut f = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.mode(0o644)
.open(&tmp_path)
.map_err(DaemonError::Io)?;
f.write_all(pid_str.as_bytes()).map_err(DaemonError::Io)?;
f.sync_data().map_err(DaemonError::Io)?;
}
std::fs::rename(&tmp_path, pidfile_path).map_err(DaemonError::Io)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn print_config_emits_canonical_toml() {
let cfg = DaemonConfig::default();
let toml_str = toml::to_string_pretty(&cfg)
.expect("DaemonConfig must serialise to TOML without error");
assert!(!toml_str.is_empty(), "serialised config must not be empty");
let reparsed: DaemonConfig =
toml::from_str(&toml_str).expect("serialised TOML must be parseable back");
assert_eq!(reparsed.memory_limit_mb, cfg.memory_limit_mb);
assert_eq!(
reparsed.auto_start_ready_timeout_secs,
cfg.auto_start_ready_timeout_secs
);
assert_eq!(reparsed.log_keep_rotations, cfg.log_keep_rotations);
}
#[test]
fn run_print_config_succeeds_with_defaults() {
unsafe { std::env::remove_var("SQRY_DAEMON_CONFIG") };
let result = run_print_config(None, None);
assert!(
result.is_ok(),
"run_print_config with no config file must succeed: {result:?}"
);
}
#[cfg(target_os = "linux")]
#[test]
fn install_systemd_user_prints_to_stdout() {
use crate::lifecycle::units::systemd::generate_user_unit;
let cfg = DaemonConfig::default();
let opts = InstallOptions::default();
let unit = generate_user_unit(&cfg, &opts);
assert!(!unit.is_empty(), "systemd user unit must be non-empty");
assert!(
unit.contains("Type=notify"),
"systemd user unit must contain 'Type=notify'"
);
assert!(
unit.contains("sqryd"),
"systemd user unit must reference sqryd"
);
}
#[test]
fn default_command_is_start_foreground() {
let cli = SqrydCli::try_parse_from(["sqryd"]).expect("parse must succeed");
match cli.command {
None => {}
Some(Command::Start(Start {
detach: false,
spawned_by_client: false,
})) => {}
other => panic!("unexpected command: {other:?}"),
}
}
#[test]
fn start_without_detach_is_foreground() {
let cli = SqrydCli::try_parse_from(["sqryd", "start"]).expect("parse");
assert!(matches!(
cli.command,
Some(Command::Start(Start {
detach: false,
spawned_by_client: false,
}))
));
}
#[test]
fn start_with_detach_flag_is_parsed() {
let cli = SqrydCli::try_parse_from(["sqryd", "start", "--detach"]).expect("parse");
assert!(matches!(
cli.command,
Some(Command::Start(Start {
detach: true,
spawned_by_client: false,
}))
));
}
#[test]
fn start_spawned_by_client_is_hidden_but_parseable() {
let cli = SqrydCli::try_parse_from(["sqryd", "start", "--detach", "--spawned-by-client"])
.expect("parse");
assert!(matches!(
cli.command,
Some(Command::Start(Start {
detach: true,
spawned_by_client: true,
}))
));
}
#[test]
fn foreground_subcommand_parses() {
let cli = SqrydCli::try_parse_from(["sqryd", "foreground"]).expect("parse");
assert!(matches!(cli.command, Some(Command::Foreground)));
}
#[test]
fn stop_with_timeout_parses() {
let cli =
SqrydCli::try_parse_from(["sqryd", "stop", "--timeout-secs", "30"]).expect("parse");
assert!(matches!(
cli.command,
Some(Command::Stop { timeout_secs: 30 })
));
}
#[test]
fn status_with_json_flag_parses() {
let cli = SqrydCli::try_parse_from(["sqryd", "status", "--json"]).expect("parse");
assert!(matches!(cli.command, Some(Command::Status { json: true })));
}
#[test]
fn print_config_subcommand_parses() {
let cli = SqrydCli::try_parse_from(["sqryd", "print-config"]).expect("parse");
assert!(matches!(cli.command, Some(Command::PrintConfig)));
}
#[test]
fn global_config_flag_is_parsed() {
let cli = SqrydCli::try_parse_from(["sqryd", "--config", "/tmp/test.toml", "print-config"])
.expect("parse");
assert_eq!(
cli.config,
Some(PathBuf::from("/tmp/test.toml")),
"--config flag must be captured"
);
assert!(matches!(cli.command, Some(Command::PrintConfig)));
}
#[test]
fn status_without_json_flag_defaults_to_false() {
let cli = SqrydCli::try_parse_from(["sqryd", "status"]).expect("parse");
assert!(matches!(cli.command, Some(Command::Status { json: false })));
}
#[test]
fn stop_defaults_to_15_second_timeout() {
let cli = SqrydCli::try_parse_from(["sqryd", "stop"]).expect("parse");
assert!(matches!(
cli.command,
Some(Command::Stop { timeout_secs: 15 })
));
}
#[test]
fn render_status_human_handles_minimal_result() {
let result = serde_json::json!({
"daemon_version": "8.0.6",
"uptime_seconds": 42,
});
render_status_human(&result);
}
#[test]
fn load_config_with_explicit_path_does_not_set_env_var() {
use std::io::Write as _;
use tempfile::NamedTempFile;
unsafe { std::env::remove_var("SQRY_DAEMON_CONFIG") };
let mut tmp = NamedTempFile::new().expect("NamedTempFile");
writeln!(tmp, "# minimal sqryd test config").expect("write");
let path = tmp.path().to_path_buf();
let result = load_config(Some(path.clone()));
assert!(
result.is_ok(),
"load_config with valid TOML path must succeed: {result:?}"
);
assert!(
std::env::var_os("SQRY_DAEMON_CONFIG").is_none(),
"load_config must NOT mutate SQRY_DAEMON_CONFIG (M-3 fix)"
);
}
}