1use std::{path::PathBuf, process::ExitCode, sync::Arc, time::Duration};
87
88use clap::{Parser, Subcommand};
89use sqry_core::query::executor::QueryExecutor;
90use tokio_util::sync::CancellationToken;
91use tracing::{error, info, warn};
92
93use crate::{
94 DaemonConfig, DaemonError, DaemonResult, IpcServer, RealWorkspaceBuilder, RebuildDispatcher,
95 WorkspaceManager,
96 lifecycle::{
97 log_rotate::install_tracing,
98 notify::{is_under_systemd, notify_ready},
99 pidfile::acquire_pidfile_lock,
100 signals::install_signal_handlers,
101 units::InstallOptions,
102 },
103};
104
105#[cfg(unix)]
106use crate::lifecycle::pidfile::PidfileLock;
107
108const ENV_READY_PIPE_FD: &str = "SQRYD_READY_PIPE_FD";
116
117#[cfg(unix)]
122const ENV_LOCK_FD: &str = "SQRYD_LOCK_FD";
123
124#[cfg(unix)]
127const ENV_PIDFILE_PATH: &str = "SQRYD_PIDFILE_PATH";
128
129#[cfg(unix)]
133const ENV_LOCKFILE_PATH: &str = "SQRYD_LOCKFILE_PATH";
134
135#[derive(Debug, Parser)]
143#[command(
144 name = "sqryd",
145 about = "sqry daemon — persistent semantic code-search graph service",
146 version,
147 author
148)]
149pub struct SqrydCli {
150 #[arg(long, value_name = "FILE", env = "SQRY_DAEMON_CONFIG", global = true)]
156 pub config: Option<PathBuf>,
157
158 #[arg(long, value_name = "LEVEL", global = true)]
164 pub log_level: Option<String>,
165
166 #[command(subcommand)]
168 pub command: Option<Command>,
169}
170
171#[derive(Debug, Subcommand)]
173pub enum Command {
174 Start(Start),
185
186 Foreground,
188
189 Stop {
192 #[arg(long, default_value_t = 15)]
194 timeout_secs: u64,
195 },
196
197 Status {
199 #[arg(long)]
202 json: bool,
203 },
204
205 #[cfg(target_os = "linux")]
211 InstallSystemdUser,
212
213 #[cfg(target_os = "linux")]
222 InstallSystemdSystem {
223 #[arg(long)]
225 user: Option<String>,
226 },
227
228 #[cfg(target_os = "macos")]
236 InstallLaunchd,
237
238 #[cfg(target_os = "windows")]
240 InstallWindows,
241
242 PrintConfig,
247}
248
249#[derive(Debug, clap::Args, Default)]
251pub struct Start {
252 #[arg(long)]
257 pub detach: bool,
258
259 #[arg(long, hide = true)]
263 pub spawned_by_client: bool,
264}
265
266pub fn run() -> DaemonResult<()> {
276 let cli = SqrydCli::parse();
277 let rt = tokio::runtime::Builder::new_multi_thread()
291 .enable_all()
292 .max_blocking_threads(64)
293 .build()
294 .map_err(DaemonError::Io)?;
295
296 let log_level_owned = cli.log_level.clone();
297 let log_level = log_level_owned.as_deref();
298 let config_path = cli.config.clone();
299
300 let command = cli.command.unwrap_or(Command::Start(Start::default()));
301
302 match command {
303 Command::Start(start) => rt.block_on(run_start(start, config_path, log_level)),
304 Command::Foreground => rt.block_on(run_start(Start::default(), config_path, log_level)),
305 Command::Stop { timeout_secs } => {
306 rt.block_on(run_stop(config_path, log_level, timeout_secs))
307 }
308 Command::Status { json } => rt.block_on(run_status(config_path, log_level, json)),
309 #[cfg(target_os = "linux")]
310 Command::InstallSystemdUser => run_install_systemd_user(config_path, log_level),
311 #[cfg(target_os = "linux")]
312 Command::InstallSystemdSystem { user } => {
313 run_install_systemd_system(config_path, log_level, user)
314 }
315 #[cfg(target_os = "macos")]
316 Command::InstallLaunchd => run_install_launchd(config_path, log_level),
317 #[cfg(target_os = "windows")]
318 Command::InstallWindows => run_install_windows(config_path, log_level),
319 Command::PrintConfig => run_print_config(config_path, log_level),
320 }
321}
322
323async fn run_start(
335 args: Start,
336 config_path: Option<PathBuf>,
337 log_level: Option<&str>,
338) -> DaemonResult<()> {
339 if args.spawned_by_client {
340 return run_start_spawned_by_client(config_path, log_level).await;
341 }
342 if args.detach {
343 return run_start_detach(config_path, log_level).await;
344 }
345 run_start_foreground(config_path, log_level).await
346}
347
348async fn run_start_foreground(
356 config_path: Option<PathBuf>,
357 log_level: Option<&str>,
358) -> DaemonResult<()> {
359 let cfg = load_config(config_path)?;
361 let cfg = Arc::new(cfg);
362
363 let _tracing_guard = match install_tracing(&cfg, log_level) {
365 Ok(g) => g,
366 Err(e) => {
367 eprintln!("sqryd: warning: tracing setup: {e:#}");
368 None
369 }
370 };
371
372 info!(
373 version = env!("CARGO_PKG_VERSION"),
374 socket = %cfg.socket_path().display(),
375 pid_file = %cfg.pid_path().display(),
376 "sqryd starting"
377 );
378
379 create_runtime_dir(&cfg)?;
381
382 let pidfile_lock = acquire_pidfile_lock(&cfg)?;
384 info!(pid_file = %cfg.pid_path().display(), "pidfile lock acquired");
385
386 let (manager, dispatcher, builder, executor) = build_daemon_components(Arc::clone(&cfg));
388
389 let shutdown = CancellationToken::new();
391
392 let _signal_guard = install_signal_handlers(shutdown.clone())?;
394 info!("signal handlers installed");
395
396 preload_pinned_workspaces(&cfg, &manager, &builder).await;
398
399 let server = IpcServer::bind(
401 Arc::clone(&cfg),
402 Arc::clone(&manager),
403 Arc::clone(&dispatcher),
404 Arc::clone(&builder),
405 Arc::clone(&executor),
406 shutdown.clone(),
407 )
408 .await?;
409 info!(socket = %server.socket_path().display(), "IPC server bound");
410
411 signal_ready(&cfg, server.socket_path());
413
414 server.run().await?;
416
417 info!("sqryd shutdown complete");
419 drop(_signal_guard);
420 drop(pidfile_lock);
421
422 Ok(())
423}
424
425async fn run_start_detach(
437 config_path: Option<PathBuf>,
438 log_level: Option<&str>,
439) -> DaemonResult<()> {
440 #[cfg(unix)]
441 {
442 run_start_detach_unix(config_path, log_level).await
443 }
444 #[cfg(not(unix))]
445 {
446 let cfg = load_config(config_path.clone())?;
447 setup_stderr_tracing(log_level, &cfg);
448 drop(cfg);
449 warn!(
450 "--detach is a no-op on Windows; running in the foreground instead. \
451 Use Task Scheduler or sc.exe to run sqryd as a background service."
452 );
453 run_start_foreground(config_path, log_level).await
454 }
455}
456
457#[cfg(unix)]
458async fn run_start_detach_unix(
459 config_path: Option<PathBuf>,
460 log_level: Option<&str>,
461) -> DaemonResult<()> {
462 let cfg = load_config(config_path.clone())?;
464 let cfg = Arc::new(cfg);
465
466 let _tracing_guard = match install_tracing(&cfg, log_level) {
467 Ok(g) => g,
468 Err(e) => {
469 eprintln!("sqryd: warning: tracing setup (parent): {e:#}");
470 None
471 }
472 };
473
474 create_runtime_dir(&cfg)?;
475
476 let mut pidfile_lock = acquire_pidfile_lock(&cfg)?;
478 info!(pid_file = %cfg.pid_path().display(), "parent: pidfile lock acquired (WriteOwner)");
479
480 let (read_fd, write_fd) = create_pipe()?;
482
483 let lock_fd = pidfile_lock.as_raw_fd();
485 let pidfile_path = cfg.pid_path();
486 let lockfile_path = cfg.lock_path();
487
488 let exe = std::env::current_exe()
490 .map_err(|e| DaemonError::Io(std::io::Error::other(format!("current_exe: {e}"))))?;
491
492 let mut cmd = std::process::Command::new(&exe);
493 cmd.args(["start", "--detach", "--spawned-by-client"]);
494
495 if let Some(ref cp) = config_path {
496 cmd.arg("--config").arg(cp);
497 }
498 if let Some(ll) = log_level {
499 cmd.arg("--log-level").arg(ll);
500 }
501
502 cmd.env(ENV_READY_PIPE_FD, write_fd.to_string());
503 cmd.env(ENV_LOCK_FD, lock_fd.to_string());
504 cmd.env(ENV_PIDFILE_PATH, pidfile_path.as_os_str());
505 cmd.env(ENV_LOCKFILE_PATH, lockfile_path.as_os_str());
506
507 cmd.stdin(std::process::Stdio::null());
509 cmd.stdout(std::process::Stdio::null());
510 cmd.stderr(std::process::Stdio::null());
511
512 let write_fd_copy = write_fd;
516 let lock_fd_copy = lock_fd;
517 unsafe {
518 use std::os::unix::process::CommandExt as _;
519 cmd.pre_exec(move || {
520 if libc::setsid() < 0 {
522 return Err(std::io::Error::last_os_error());
523 }
524 for fd in [write_fd_copy, lock_fd_copy] {
526 let flags = libc::fcntl(fd, libc::F_GETFD);
527 if flags < 0 {
528 return Err(std::io::Error::last_os_error());
529 }
530 let rc = libc::fcntl(fd, libc::F_SETFD, flags & !libc::FD_CLOEXEC);
531 if rc < 0 {
532 return Err(std::io::Error::last_os_error());
533 }
534 }
535 Ok(())
536 });
537 }
538
539 let mut child = cmd.spawn().map_err(|e| {
540 DaemonError::Io(std::io::Error::other(format!(
541 "failed to spawn grandchild sqryd process: {e}"
542 )))
543 })?;
544
545 let grandchild_pid = child.id();
546 info!(pid = grandchild_pid, "spawned grandchild");
547
548 drop_raw_fd(write_fd);
550
551 let timeout_secs = cfg.auto_start_ready_timeout_secs;
553 let deadline = std::time::Instant::now() + Duration::from_secs(timeout_secs);
554
555 let result = poll_ready_pipe(read_fd, deadline);
556
557 drop_raw_fd(read_fd);
559
560 match result {
561 Ok(()) => {
562 match child.try_wait() {
568 Ok(Some(status)) => {
569 warn!(
571 pid = grandchild_pid,
572 ?status,
573 "grandchild exited before signalling ready (pipe EOF was process death)"
574 );
575 drop(pidfile_lock);
577 return Err(DaemonError::AutoStartTimeout {
578 timeout_secs,
579 socket: cfg.socket_path(),
580 });
581 }
582 Ok(None) => {
583 }
585 Err(e) => {
586 warn!(
588 pid = grandchild_pid,
589 err = %e,
590 "try_wait after pipe EOF failed -- assuming grandchild is alive"
591 );
592 }
593 }
594
595 pidfile_lock.hand_off_to_adopter();
598 info!(
599 pid = grandchild_pid,
600 "grandchild signalled ready -- parent exiting 0 (Handoff)"
601 );
602 drop(pidfile_lock);
604 Ok(())
605 }
606 Err(()) => {
607 warn!(
608 pid = grandchild_pid,
609 timeout_secs, "grandchild did not signal ready within timeout -- killing"
610 );
611 if let Err(e) = child.kill() {
615 warn!(pid = grandchild_pid, err = %e, "kill(grandchild) failed");
616 }
617 let _ = child.wait();
618 drop(pidfile_lock);
620 Err(DaemonError::AutoStartTimeout {
621 timeout_secs,
622 socket: cfg.socket_path(),
623 })
624 }
625 }
626}
627
628async fn run_start_spawned_by_client(
639 config_path: Option<PathBuf>,
640 log_level: Option<&str>,
641) -> DaemonResult<()> {
642 #[cfg(unix)]
643 {
644 run_start_spawned_by_client_unix(config_path, log_level).await
645 }
646 #[cfg(not(unix))]
647 {
648 warn!("--spawned-by-client reached on non-Unix -- running foreground");
651 run_start_foreground(config_path, log_level).await
652 }
653}
654
655#[cfg(unix)]
656async fn run_start_spawned_by_client_unix(
657 config_path: Option<PathBuf>,
658 log_level: Option<&str>,
659) -> DaemonResult<()> {
660 use std::os::unix::io::RawFd;
661
662 let lock_fd: RawFd = read_env_fd(ENV_LOCK_FD).ok_or_else(|| {
664 DaemonError::Io(std::io::Error::other(
665 "grandchild: SQRYD_LOCK_FD not set (only valid via --detach parent spawn)",
666 ))
667 })?;
668 let ready_pipe_fd: RawFd = read_env_fd(ENV_READY_PIPE_FD).ok_or_else(|| {
669 DaemonError::Io(std::io::Error::other(
670 "grandchild: SQRYD_READY_PIPE_FD not set",
671 ))
672 })?;
673 let pidfile_path: PathBuf = std::env::var_os(ENV_PIDFILE_PATH)
674 .map(PathBuf::from)
675 .ok_or_else(|| {
676 DaemonError::Io(std::io::Error::other(
677 "grandchild: SQRYD_PIDFILE_PATH not set",
678 ))
679 })?;
680 let lockfile_path: PathBuf = std::env::var_os(ENV_LOCKFILE_PATH)
681 .map(PathBuf::from)
682 .ok_or_else(|| {
683 DaemonError::Io(std::io::Error::other(
684 "grandchild: SQRYD_LOCKFILE_PATH not set",
685 ))
686 })?;
687
688 let cfg = load_config(config_path)?;
690 let cfg = Arc::new(cfg);
691
692 write_pid_file_grandchild(&cfg.pid_path())?;
695
696 let _pidfile_lock = unsafe { PidfileLock::adopt(lock_fd, pidfile_path, lockfile_path) };
702
703 info!(
704 version = env!("CARGO_PKG_VERSION"),
705 pid = std::process::id(),
706 "sqryd grandchild: pidfile lock adopted -- beginning foreground startup"
707 );
708
709 run_start_foreground_inner(cfg, log_level, ready_pipe_fd).await
711}
712
713async fn run_start_foreground_inner(
722 cfg: Arc<DaemonConfig>,
723 log_level: Option<&str>,
724 #[cfg(unix)] ready_pipe_write_fd: libc::c_int,
725 #[cfg(not(unix))] _ready_pipe_write_fd: i32,
726) -> DaemonResult<()> {
727 let _tracing_guard = match install_tracing(&cfg, log_level) {
729 Ok(g) => g,
730 Err(e) => {
731 eprintln!("sqryd: warning: tracing setup: {e:#}");
732 None
733 }
734 };
735 info!(
736 version = env!("CARGO_PKG_VERSION"),
737 socket = %cfg.socket_path().display(),
738 "sqryd grandchild: tracing active"
739 );
740
741 create_runtime_dir(&cfg)?;
743
744 let (manager, dispatcher, builder, executor) = build_daemon_components(Arc::clone(&cfg));
746
747 let shutdown = CancellationToken::new();
749
750 let _signal_guard = install_signal_handlers(shutdown.clone())?;
752
753 preload_pinned_workspaces(&cfg, &manager, &builder).await;
755
756 let server = IpcServer::bind(
758 Arc::clone(&cfg),
759 Arc::clone(&manager),
760 Arc::clone(&dispatcher),
761 Arc::clone(&builder),
762 Arc::clone(&executor),
763 shutdown.clone(),
764 )
765 .await?;
766 info!(socket = %server.socket_path().display(), "IPC server bound");
767
768 signal_ready(&cfg, server.socket_path());
770
771 #[cfg(unix)]
773 if ready_pipe_write_fd >= 0 {
774 close_ready_pipe_fd(ready_pipe_write_fd);
775 }
776
777 server.run().await?;
779
780 info!("sqryd shutdown complete");
781 Ok(())
782}
783
784async fn run_stop(
791 config_path: Option<PathBuf>,
792 log_level: Option<&str>,
793 timeout_secs: u64,
794) -> DaemonResult<()> {
795 let cfg = load_config(config_path)?;
796 setup_stderr_tracing(log_level, &cfg);
797 let socket_path = cfg.socket_path();
798
799 info!(socket = %socket_path.display(), "connecting to daemon to send daemon/stop");
800
801 let stop_req = serde_json::json!({
803 "jsonrpc": "2.0",
804 "id": 1,
805 "method": "daemon/stop",
806 "params": {}
807 });
808 send_management_request(&socket_path, &stop_req).await?;
809
810 info!(
811 timeout_secs,
812 "waiting for daemon socket to become unreachable"
813 );
814
815 let deadline = std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs);
816 loop {
817 if !crate::lifecycle::detach::try_connect_path(&socket_path).await {
818 info!("daemon socket gone -- stop complete");
819 return Ok(());
820 }
821 if std::time::Instant::now() >= deadline {
822 return Err(DaemonError::AutoStartTimeout {
823 timeout_secs,
824 socket: socket_path,
825 });
826 }
827 tokio::time::sleep(Duration::from_millis(100)).await;
828 }
829}
830
831async fn run_status(
837 config_path: Option<PathBuf>,
838 log_level: Option<&str>,
839 json_output: bool,
840) -> DaemonResult<()> {
841 let cfg = load_config(config_path)?;
842 setup_stderr_tracing(log_level, &cfg);
843 let socket_path = cfg.socket_path();
844
845 if !crate::lifecycle::detach::try_connect_path(&socket_path).await {
847 eprintln!(
848 "sqryd: daemon is not running (socket not connectable: {})",
849 socket_path.display()
850 );
851 return Err(DaemonError::Io(std::io::Error::other(format!(
852 "daemon socket not reachable: {}",
853 socket_path.display()
854 ))));
855 }
856
857 let status_req = serde_json::json!({
858 "jsonrpc": "2.0",
859 "id": 1,
860 "method": "daemon/status",
861 "params": {}
862 });
863
864 let resp_buf = send_management_request(&socket_path, &status_req).await?;
865
866 if json_output {
867 println!("{}", String::from_utf8_lossy(&resp_buf));
868 } else {
869 let v = serde_json::from_slice::<serde_json::Value>(&resp_buf).map_err(|e| {
872 DaemonError::Io(std::io::Error::other(format!(
873 "daemon/status response was not valid JSON: {e} (raw: {})",
874 String::from_utf8_lossy(&resp_buf)
875 )))
876 })?;
877 if let Some(result) = v.get("result") {
878 render_status_human(result);
879 } else if let Some(err_val) = v.get("error") {
880 eprintln!("sqryd status error: {err_val}");
881 return Err(DaemonError::Io(std::io::Error::other(format!(
882 "daemon/status error: {err_val}"
883 ))));
884 } else {
885 println!("{}", serde_json::to_string_pretty(&v).unwrap_or_default());
886 }
887 }
888
889 Ok(())
890}
891
892async fn send_management_request(
906 socket_path: &std::path::Path,
907 req: &serde_json::Value,
908) -> DaemonResult<Vec<u8>> {
909 use crate::{DaemonHello, DaemonHelloResponse};
910 use sqry_daemon_protocol::framing::{read_frame, write_frame_json};
911
912 #[cfg(unix)]
913 let mut stream = {
914 tokio::net::UnixStream::connect(socket_path)
915 .await
916 .map_err(|e| {
917 DaemonError::Io(std::io::Error::other(format!(
918 "connect to daemon socket {}: {e}",
919 socket_path.display()
920 )))
921 })?
922 };
923
924 #[cfg(windows)]
925 let mut stream = {
926 use tokio::net::windows::named_pipe::ClientOptions;
927 let pipe_path = socket_path.to_string_lossy();
928 ClientOptions::new().open(pipe_path.as_ref()).map_err(|e| {
929 DaemonError::Io(std::io::Error::other(format!(
930 "connect to daemon pipe {}: {e}",
931 pipe_path
932 )))
933 })?
934 };
935
936 let hello = DaemonHello {
938 client_version: env!("CARGO_PKG_VERSION").to_owned(),
939 protocol_version: 1,
940 logical_workspace: None,
945 };
946 write_frame_json(&mut stream, &hello)
947 .await
948 .map_err(|e| DaemonError::Io(std::io::Error::other(format!("send hello: {e}"))))?;
949
950 let hello_resp_bytes = read_frame(&mut stream)
952 .await
953 .map_err(|e| DaemonError::Io(std::io::Error::other(format!("read hello response: {e}"))))?
954 .ok_or_else(|| {
955 DaemonError::Io(std::io::Error::other(
956 "daemon closed connection before hello response",
957 ))
958 })?;
959 let hello_resp: DaemonHelloResponse =
960 serde_json::from_slice(&hello_resp_bytes).map_err(|e| {
961 DaemonError::Io(std::io::Error::other(format!("parse hello response: {e}")))
962 })?;
963 if !hello_resp.compatible {
964 return Err(DaemonError::Io(std::io::Error::other(
965 "daemon is not compatible with this client version",
966 )));
967 }
968
969 write_frame_json(&mut stream, req)
971 .await
972 .map_err(|e| DaemonError::Io(std::io::Error::other(format!("send request: {e}"))))?;
973
974 let resp_bytes = read_frame(&mut stream)
976 .await
977 .map_err(|e| DaemonError::Io(std::io::Error::other(format!("read response: {e}"))))?
978 .ok_or_else(|| {
979 DaemonError::Io(std::io::Error::other(
980 "daemon closed connection before sending response",
981 ))
982 })?;
983
984 Ok(resp_bytes)
985}
986
987fn render_status_human(result: &serde_json::Value) {
990 let payload = result.get("data").unwrap_or(result);
991
992 let version = payload
993 .get("daemon_version")
994 .and_then(|v| v.as_str())
995 .unwrap_or("unknown");
996 let uptime = payload
997 .get("uptime_seconds")
998 .and_then(|v| v.as_u64())
999 .unwrap_or(0);
1000
1001 println!("sqryd version: {version}");
1002 println!(" uptime: {uptime}s");
1003
1004 if let Some(memory) = payload.get("memory") {
1005 let limit = memory
1006 .get("limit_bytes")
1007 .and_then(|v| v.as_u64())
1008 .unwrap_or(0);
1009 let current = memory
1010 .get("current_bytes")
1011 .and_then(|v| v.as_u64())
1012 .unwrap_or(0);
1013 println!(
1014 " memory: {} MiB used / {} MiB limit",
1015 current / (1024 * 1024),
1016 limit / (1024 * 1024)
1017 );
1018 }
1019
1020 if let Some(workspaces) = payload.get("workspaces").and_then(|v| v.as_array()) {
1021 println!(" workspaces: {}", workspaces.len());
1022 for ws in workspaces {
1023 let path = ws.get("index_root").and_then(|v| v.as_str()).unwrap_or("?");
1024 let state = ws
1025 .get("state")
1026 .and_then(|v| v.as_str())
1027 .unwrap_or("Unknown");
1028 println!(" {state:10} {path}");
1029 }
1030 }
1031}
1032
1033#[cfg(target_os = "linux")]
1038fn run_install_systemd_user(
1039 config_path: Option<PathBuf>,
1040 log_level: Option<&str>,
1041) -> DaemonResult<()> {
1042 let cfg = load_config(config_path)?;
1043 setup_stderr_tracing(log_level, &cfg);
1044 let opts = InstallOptions::default();
1045 let unit = crate::lifecycle::units::systemd::generate_user_unit(&cfg, &opts);
1046 println!("{unit}");
1047 Ok(())
1048}
1049
1050#[cfg(target_os = "linux")]
1051fn run_install_systemd_system(
1052 config_path: Option<PathBuf>,
1053 log_level: Option<&str>,
1054 user: Option<String>,
1055) -> DaemonResult<()> {
1056 let cfg = load_config(config_path)?;
1057 setup_stderr_tracing(log_level, &cfg);
1058 let opts = InstallOptions {
1059 user: user.clone(),
1060 ..Default::default()
1061 };
1062 let resolved_user =
1064 crate::lifecycle::units::systemd::resolve_system_unit_user(&opts).map_err(|e| {
1065 DaemonError::Config {
1066 path: cfg
1067 .pid_path()
1068 .parent()
1069 .unwrap_or_else(|| std::path::Path::new("."))
1070 .to_owned(),
1071 source: anyhow::anyhow!("{e}"),
1072 }
1073 })?;
1074 let opts_with_user = InstallOptions {
1075 user: Some(resolved_user),
1076 ..Default::default()
1077 };
1078 let unit = crate::lifecycle::units::systemd::generate_system_unit(&cfg, &opts_with_user);
1079 println!("{unit}");
1080 Ok(())
1081}
1082
1083#[cfg(target_os = "macos")]
1084fn run_install_launchd(config_path: Option<PathBuf>, log_level: Option<&str>) -> DaemonResult<()> {
1085 let cfg = load_config(config_path)?;
1086 setup_stderr_tracing(log_level, &cfg);
1087 let opts = InstallOptions::default();
1088 let plist = crate::lifecycle::units::launchd::generate_plist(&cfg, &opts);
1089 println!("{plist}");
1090 Ok(())
1091}
1092
1093#[cfg(target_os = "windows")]
1094fn run_install_windows(config_path: Option<PathBuf>, log_level: Option<&str>) -> DaemonResult<()> {
1095 let cfg = load_config(config_path)?;
1096 setup_stderr_tracing(log_level, &cfg);
1097 let opts = InstallOptions::default();
1098 let sc = crate::lifecycle::units::windows::generate_sc_create(&cfg, &opts);
1099 let xml = crate::lifecycle::units::windows::generate_task_xml(&cfg, &opts);
1100 println!("-- sc.exe create command --");
1101 println!("{sc}");
1102 println!();
1103 println!("-- Task Scheduler XML --");
1104 println!("{xml}");
1105 Ok(())
1106}
1107
1108fn run_print_config(config_path: Option<PathBuf>, log_level: Option<&str>) -> DaemonResult<()> {
1109 let cfg = load_config(config_path)?;
1110 setup_stderr_tracing(log_level, &cfg);
1111 let toml_str = toml::to_string_pretty(&cfg).map_err(|e| DaemonError::Config {
1112 path: PathBuf::from("<serialise>"),
1113 source: anyhow::anyhow!("toml serialisation failed: {e}"),
1114 })?;
1115 println!("{toml_str}");
1116 Ok(())
1117}
1118
1119pub fn main_impl() -> ExitCode {
1126 match run() {
1127 Ok(()) => ExitCode::SUCCESS,
1128 Err(err) => {
1129 error!("sqryd: fatal: {err:#}");
1130 eprintln!("sqryd: {err:#}");
1131 ExitCode::from(err.exit_code())
1132 }
1133 }
1134}
1135
1136fn load_config(config_path: Option<PathBuf>) -> DaemonResult<DaemonConfig> {
1151 if let Some(ref p) = config_path {
1152 let mut cfg = DaemonConfig::load_from_path(p)?;
1153 cfg.apply_env_overrides()?;
1154 cfg.validate()?;
1155 Ok(cfg)
1156 } else {
1157 DaemonConfig::load()
1158 }
1159}
1160
1161fn setup_stderr_tracing(log_level: Option<&str>, cfg: &DaemonConfig) {
1166 let level = log_level
1167 .map(ToOwned::to_owned)
1168 .or_else(|| std::env::var("SQRY_DAEMON_LOG_LEVEL").ok())
1169 .unwrap_or_else(|| cfg.log_level.clone());
1170 let filter = tracing_subscriber::EnvFilter::try_new(&level)
1171 .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
1172 let _ = tracing_subscriber::fmt()
1173 .compact()
1174 .with_env_filter(filter)
1175 .try_init();
1176}
1177
1178fn create_runtime_dir(cfg: &DaemonConfig) -> DaemonResult<()> {
1180 let dir = cfg.runtime_dir();
1181 std::fs::create_dir_all(&dir).map_err(DaemonError::Io)?;
1182
1183 #[cfg(unix)]
1184 {
1185 use std::os::unix::fs::PermissionsExt as _;
1186 let perms = std::fs::Permissions::from_mode(0o700);
1187 std::fs::set_permissions(&dir, perms).map_err(DaemonError::Io)?;
1188 }
1189
1190 Ok(())
1191}
1192
1193fn build_daemon_components(
1210 cfg: Arc<DaemonConfig>,
1211) -> (
1212 Arc<WorkspaceManager>,
1213 Arc<RebuildDispatcher>,
1214 Arc<dyn crate::workspace::WorkspaceBuilder>,
1215 Arc<QueryExecutor>,
1216) {
1217 let plugins = Arc::new(sqry_plugin_registry::create_plugin_manager());
1218 let manager = WorkspaceManager::new(Arc::clone(&cfg));
1219
1220 let query_db_hook =
1224 crate::workspace::QueryDbHook::new(Duration::from_millis(cfg.rebuild_drain_timeout_ms));
1225 manager.set_hook(query_db_hook as crate::workspace::SharedHook);
1226 info!(
1227 timeout_ms = cfg.rebuild_drain_timeout_ms,
1228 "PF03B: production QueryDbHook installed (post-publish derived-cache writer)"
1229 );
1230
1231 let dispatcher =
1232 RebuildDispatcher::new(Arc::clone(&manager), Arc::clone(&cfg), Arc::clone(&plugins));
1233 let builder: Arc<dyn crate::workspace::WorkspaceBuilder> =
1234 Arc::new(RealWorkspaceBuilder::new(Arc::clone(&plugins)));
1235 let executor = Arc::new(QueryExecutor::new());
1236 (manager, dispatcher, builder, executor)
1237}
1238
1239fn signal_ready(cfg: &DaemonConfig, socket_path: &std::path::Path) {
1247 if is_under_systemd() {
1248 if let Err(e) = notify_ready() {
1249 warn!(err = %e, "sd_notify(READY=1) failed -- systemctl may time out");
1250 } else {
1251 info!("sd_notify: READY=1 sent");
1252 }
1253 }
1254
1255 let ready_path = cfg.runtime_dir().join("sqryd.ready");
1256 if let Err(e) = std::fs::write(&ready_path, b"") {
1257 warn!(
1258 path = %ready_path.display(),
1259 err = %e,
1260 "could not touch sqryd.ready sentinel (non-fatal)"
1261 );
1262 }
1263
1264 info!(
1265 socket = %socket_path.display(),
1266 "sqryd ready -- accepting connections"
1267 );
1268}
1269
1270async fn preload_pinned_workspaces(
1274 cfg: &DaemonConfig,
1275 manager: &Arc<WorkspaceManager>,
1276 builder: &Arc<dyn crate::workspace::WorkspaceBuilder>,
1277) {
1278 use sqry_core::project::ProjectRootMode;
1279
1280 for ws_cfg in &cfg.workspaces {
1281 if ws_cfg.exclude || !ws_cfg.pinned {
1282 continue;
1283 }
1284
1285 let root = ws_cfg.path.clone();
1286 let key =
1287 crate::workspace::WorkspaceKey::new(root.clone(), ProjectRootMode::WorkspaceFolder, 0);
1288
1289 info!(path = %root.display(), "pre-loading pinned workspace");
1290 let estimate =
1291 crate::workspace::working_set_estimate(crate::workspace::WorkingSetInputs::default());
1292
1293 if let Err(e) = manager.get_or_load(&key, builder.as_ref(), estimate) {
1294 warn!(
1295 path = %root.display(),
1296 err = %e,
1297 "pinned workspace pre-load failed (log + continue per §C.3.1 step 13)"
1298 );
1299 }
1300 }
1301}
1302
1303#[cfg(all(unix, target_os = "linux"))]
1309fn create_pipe() -> DaemonResult<(libc::c_int, libc::c_int)> {
1310 let mut fds = [0i32; 2];
1311 let rc = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC) };
1313 if rc < 0 {
1314 return Err(DaemonError::Io(std::io::Error::last_os_error()));
1315 }
1316 Ok((fds[0], fds[1]))
1317}
1318
1319#[cfg(all(unix, not(target_os = "linux")))]
1321fn create_pipe() -> DaemonResult<(libc::c_int, libc::c_int)> {
1322 let mut fds = [0i32; 2];
1323 let rc = unsafe { libc::pipe(fds.as_mut_ptr()) };
1325 if rc < 0 {
1326 return Err(DaemonError::Io(std::io::Error::last_os_error()));
1327 }
1328
1329 if let Err(err) = set_close_on_exec(fds[0]).and_then(|()| set_close_on_exec(fds[1])) {
1330 drop_raw_fd(fds[0]);
1331 drop_raw_fd(fds[1]);
1332 return Err(err);
1333 }
1334
1335 Ok((fds[0], fds[1]))
1336}
1337
1338#[cfg(all(unix, not(target_os = "linux")))]
1339fn set_close_on_exec(fd: libc::c_int) -> DaemonResult<()> {
1340 let flags = unsafe { libc::fcntl(fd, libc::F_GETFD) };
1342 if flags < 0 {
1343 return Err(DaemonError::Io(std::io::Error::last_os_error()));
1344 }
1345
1346 let rc = unsafe { libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC) };
1348 if rc < 0 {
1349 return Err(DaemonError::Io(std::io::Error::last_os_error()));
1350 }
1351
1352 Ok(())
1353}
1354
1355#[cfg(unix)]
1357fn drop_raw_fd(fd: libc::c_int) {
1358 unsafe { libc::close(fd) };
1360}
1361
1362#[cfg(unix)]
1367fn poll_ready_pipe(read_fd: libc::c_int, deadline: std::time::Instant) -> Result<(), ()> {
1368 use std::io::Read as _;
1369 use std::os::unix::io::FromRawFd as _;
1370
1371 let mut file = unsafe { std::fs::File::from_raw_fd(read_fd) };
1374
1375 unsafe {
1378 let flags = libc::fcntl(read_fd, libc::F_GETFL);
1379 if flags >= 0 {
1380 libc::fcntl(read_fd, libc::F_SETFL, flags | libc::O_NONBLOCK);
1381 }
1382 }
1383
1384 loop {
1385 let mut buf = [0u8; 1];
1386 match file.read(&mut buf) {
1387 Ok(0) => {
1388 std::mem::forget(file);
1390 return Ok(());
1391 }
1392 Ok(_) => {
1393 }
1395 Err(e)
1396 if e.kind() == std::io::ErrorKind::WouldBlock
1397 || e.raw_os_error() == Some(libc::EAGAIN) =>
1398 {
1399 }
1401 Err(_) => {
1402 std::mem::forget(file);
1403 return Err(());
1404 }
1405 }
1406
1407 if std::time::Instant::now() >= deadline {
1408 std::mem::forget(file);
1409 return Err(());
1410 }
1411
1412 std::thread::sleep(Duration::from_millis(50));
1413 }
1414}
1415
1416#[cfg(unix)]
1418fn close_ready_pipe_fd(fd: libc::c_int) {
1419 unsafe { libc::close(fd) };
1421}
1422
1423#[cfg(unix)]
1425fn read_env_fd(var: &str) -> Option<libc::c_int> {
1426 std::env::var(var).ok()?.parse::<libc::c_int>().ok()
1427}
1428
1429#[cfg(unix)]
1434fn write_pid_file_grandchild(pidfile_path: &std::path::Path) -> DaemonResult<()> {
1435 use std::io::Write as _;
1436 use std::os::unix::fs::OpenOptionsExt as _;
1437
1438 let pid = std::process::id();
1439 let pid_str = format!("{pid}\n");
1440
1441 let tmp_path = pidfile_path.with_extension("tmp.gc");
1442 {
1443 let mut f = std::fs::OpenOptions::new()
1444 .write(true)
1445 .create(true)
1446 .truncate(true)
1447 .mode(0o644)
1448 .open(&tmp_path)
1449 .map_err(DaemonError::Io)?;
1450 f.write_all(pid_str.as_bytes()).map_err(DaemonError::Io)?;
1451 f.sync_data().map_err(DaemonError::Io)?;
1452 }
1453 std::fs::rename(&tmp_path, pidfile_path).map_err(DaemonError::Io)?;
1454 Ok(())
1455}
1456
1457#[cfg(test)]
1462mod tests {
1463 use super::*;
1464
1465 #[test]
1471 fn print_config_emits_canonical_toml() {
1472 let cfg = DaemonConfig::default();
1473 let toml_str = toml::to_string_pretty(&cfg)
1474 .expect("DaemonConfig must serialise to TOML without error");
1475
1476 assert!(!toml_str.is_empty(), "serialised config must not be empty");
1477
1478 let reparsed: DaemonConfig =
1480 toml::from_str(&toml_str).expect("serialised TOML must be parseable back");
1481
1482 assert_eq!(reparsed.memory_limit_mb, cfg.memory_limit_mb);
1483 assert_eq!(
1484 reparsed.auto_start_ready_timeout_secs,
1485 cfg.auto_start_ready_timeout_secs
1486 );
1487 assert_eq!(reparsed.log_keep_rotations, cfg.log_keep_rotations);
1488 }
1489
1490 #[test]
1492 fn run_print_config_succeeds_with_defaults() {
1493 unsafe { std::env::remove_var("SQRY_DAEMON_CONFIG") };
1495
1496 let result = run_print_config(None, None);
1497 assert!(
1498 result.is_ok(),
1499 "run_print_config with no config file must succeed: {result:?}"
1500 );
1501 }
1502
1503 #[cfg(target_os = "linux")]
1508 #[test]
1509 fn install_systemd_user_prints_to_stdout() {
1510 use crate::lifecycle::units::systemd::generate_user_unit;
1511 let cfg = DaemonConfig::default();
1512 let opts = InstallOptions::default();
1513 let unit = generate_user_unit(&cfg, &opts);
1514 assert!(!unit.is_empty(), "systemd user unit must be non-empty");
1515 assert!(
1516 unit.contains("Type=notify"),
1517 "systemd user unit must contain 'Type=notify'"
1518 );
1519 assert!(
1520 unit.contains("sqryd"),
1521 "systemd user unit must reference sqryd"
1522 );
1523 }
1524
1525 #[test]
1529 fn default_command_is_start_foreground() {
1530 let cli = SqrydCli::try_parse_from(["sqryd"]).expect("parse must succeed");
1531 match cli.command {
1532 None => {}
1533 Some(Command::Start(Start {
1534 detach: false,
1535 spawned_by_client: false,
1536 })) => {}
1537 other => panic!("unexpected command: {other:?}"),
1538 }
1539 }
1540
1541 #[test]
1543 fn start_without_detach_is_foreground() {
1544 let cli = SqrydCli::try_parse_from(["sqryd", "start"]).expect("parse");
1545 assert!(matches!(
1546 cli.command,
1547 Some(Command::Start(Start {
1548 detach: false,
1549 spawned_by_client: false,
1550 }))
1551 ));
1552 }
1553
1554 #[test]
1556 fn start_with_detach_flag_is_parsed() {
1557 let cli = SqrydCli::try_parse_from(["sqryd", "start", "--detach"]).expect("parse");
1558 assert!(matches!(
1559 cli.command,
1560 Some(Command::Start(Start {
1561 detach: true,
1562 spawned_by_client: false,
1563 }))
1564 ));
1565 }
1566
1567 #[test]
1569 fn start_spawned_by_client_is_hidden_but_parseable() {
1570 let cli = SqrydCli::try_parse_from(["sqryd", "start", "--detach", "--spawned-by-client"])
1571 .expect("parse");
1572 assert!(matches!(
1573 cli.command,
1574 Some(Command::Start(Start {
1575 detach: true,
1576 spawned_by_client: true,
1577 }))
1578 ));
1579 }
1580
1581 #[test]
1583 fn foreground_subcommand_parses() {
1584 let cli = SqrydCli::try_parse_from(["sqryd", "foreground"]).expect("parse");
1585 assert!(matches!(cli.command, Some(Command::Foreground)));
1586 }
1587
1588 #[test]
1590 fn stop_with_timeout_parses() {
1591 let cli =
1592 SqrydCli::try_parse_from(["sqryd", "stop", "--timeout-secs", "30"]).expect("parse");
1593 assert!(matches!(
1594 cli.command,
1595 Some(Command::Stop { timeout_secs: 30 })
1596 ));
1597 }
1598
1599 #[test]
1601 fn status_with_json_flag_parses() {
1602 let cli = SqrydCli::try_parse_from(["sqryd", "status", "--json"]).expect("parse");
1603 assert!(matches!(cli.command, Some(Command::Status { json: true })));
1604 }
1605
1606 #[test]
1608 fn print_config_subcommand_parses() {
1609 let cli = SqrydCli::try_parse_from(["sqryd", "print-config"]).expect("parse");
1610 assert!(matches!(cli.command, Some(Command::PrintConfig)));
1611 }
1612
1613 #[test]
1615 fn global_config_flag_is_parsed() {
1616 let cli = SqrydCli::try_parse_from(["sqryd", "--config", "/tmp/test.toml", "print-config"])
1617 .expect("parse");
1618 assert_eq!(
1619 cli.config,
1620 Some(PathBuf::from("/tmp/test.toml")),
1621 "--config flag must be captured"
1622 );
1623 assert!(matches!(cli.command, Some(Command::PrintConfig)));
1624 }
1625
1626 #[test]
1628 fn status_without_json_flag_defaults_to_false() {
1629 let cli = SqrydCli::try_parse_from(["sqryd", "status"]).expect("parse");
1630 assert!(matches!(cli.command, Some(Command::Status { json: false })));
1631 }
1632
1633 #[test]
1635 fn stop_defaults_to_15_second_timeout() {
1636 let cli = SqrydCli::try_parse_from(["sqryd", "stop"]).expect("parse");
1637 assert!(matches!(
1638 cli.command,
1639 Some(Command::Stop { timeout_secs: 15 })
1640 ));
1641 }
1642
1643 #[test]
1648 fn render_status_human_handles_minimal_result() {
1649 let result = serde_json::json!({
1650 "daemon_version": "8.0.6",
1651 "uptime_seconds": 42,
1652 });
1653 render_status_human(&result);
1655 }
1656
1657 #[test]
1664 fn load_config_with_explicit_path_does_not_set_env_var() {
1665 use std::io::Write as _;
1666 use tempfile::NamedTempFile;
1667
1668 unsafe { std::env::remove_var("SQRY_DAEMON_CONFIG") };
1670
1671 let mut tmp = NamedTempFile::new().expect("NamedTempFile");
1673 writeln!(tmp, "# minimal sqryd test config").expect("write");
1674 let path = tmp.path().to_path_buf();
1675
1676 let result = load_config(Some(path.clone()));
1677
1678 assert!(
1679 result.is_ok(),
1680 "load_config with valid TOML path must succeed: {result:?}"
1681 );
1682 assert!(
1683 std::env::var_os("SQRY_DAEMON_CONFIG").is_none(),
1684 "load_config must NOT mutate SQRY_DAEMON_CONFIG (M-3 fix)"
1685 );
1686 }
1687}