1use std::io::{BufRead, Read, Seek, SeekFrom, Write};
25use std::path::{Path, PathBuf};
26use std::time::{Duration, Instant};
27
28use anyhow::{Context, Result};
29
30use crate::args::{Cli, DaemonAction};
31
32#[allow(dead_code)]
41const ENV_DAEMON_AUTO_START: &str = "SQRY_DAEMON_AUTO_START";
42
43const STOP_POLL_INTERVAL_MS: u64 = 100;
45
46const FOLLOW_EVENT_TIMEOUT_MS: u64 = 250;
49
50pub fn run(_cli: &Cli, action: &DaemonAction) -> Result<()> {
60 match action {
61 DaemonAction::Start {
62 sqryd_path,
63 timeout,
64 } => run_daemon_start(sqryd_path.as_deref(), *timeout),
65 DaemonAction::Stop { timeout } => run_daemon_stop(*timeout),
66 DaemonAction::Status { json } => run_daemon_status(*json),
67 DaemonAction::Logs { lines, follow } => run_daemon_logs(*lines, *follow),
68 DaemonAction::Load { path } => run_daemon_load(path),
69 DaemonAction::Rebuild {
70 path,
71 force,
72 timeout,
73 json,
74 } => run_daemon_rebuild(path, *force, *timeout, *json),
75 DaemonAction::Reset { path, force } => run_daemon_reset(path, *force),
76 }
77}
78
79fn run_daemon_reset(path: &Path, force: bool) -> Result<()> {
89 let config = load_daemon_config()?;
90 let socket_path = config.socket_path();
91
92 let canonical = std::fs::canonicalize(path)
93 .with_context(|| format!("failed to canonicalize {}", path.display()))?;
94
95 if !try_connect_sync(&socket_path)? {
96 anyhow::bail!(
97 "daemon is not running (socket {}). Start it with `sqry daemon start`.",
98 socket_path.display()
99 );
100 }
101
102 let rt = tokio::runtime::Builder::new_current_thread()
103 .enable_all()
104 .build()
105 .context("failed to build tokio runtime for daemon reset")?;
106
107 rt.block_on(async {
108 let mut client = sqry_daemon_client::DaemonClient::connect(&socket_path)
109 .await
110 .with_context(|| format!("failed to connect to daemon at {}", socket_path.display()))?;
111 let result = client
112 .reset(&canonical, force)
113 .await
114 .with_context(|| format!("daemon/reset for {}", canonical.display()))?;
115 let was_reset = result
116 .get("result")
117 .and_then(|r| r.get("reset"))
118 .and_then(serde_json::Value::as_bool)
119 .or_else(|| result.get("reset").and_then(serde_json::Value::as_bool))
120 .unwrap_or(false);
121 if was_reset {
122 eprintln!("sqry: workspace {} reset", canonical.display());
123 } else {
124 eprintln!(
125 "sqry: workspace {} was not loaded; nothing to reset",
126 canonical.display()
127 );
128 }
129 Ok::<(), anyhow::Error>(())
130 })?;
131
132 Ok(())
133}
134
135fn run_daemon_rebuild(path: &Path, force: bool, timeout: u64, json: bool) -> Result<()> {
140 let config = load_daemon_config()?;
141 let socket_path = config.socket_path();
142
143 let canonical_path = std::fs::canonicalize(path)
144 .with_context(|| format!("failed to canonicalize path {}", path.display()))?;
145
146 if !try_connect_sync(&socket_path)? {
147 anyhow::bail!(
148 "daemon is not running (socket {}). Start it with `sqry daemon start`.",
149 socket_path.display()
150 );
151 }
152
153 let rt = tokio::runtime::Builder::new_current_thread()
154 .enable_all()
155 .build()
156 .context("failed to build tokio runtime for daemon rebuild")?;
157
158 rt.block_on(async {
159 let mut client = sqry_daemon_client::DaemonClient::connect(&socket_path)
160 .await
161 .with_context(|| {
162 format!("failed to connect to daemon at {}", socket_path.display())
163 })?;
164
165 let started = Instant::now();
166 let deadline = started + Duration::from_secs(timeout);
167
168 let poll_socket = socket_path.clone();
170 let poll_path = canonical_path.clone();
171 let poll_done = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
172 let poll_flag = std::sync::Arc::clone(&poll_done);
173 let poll_handle = tokio::spawn(async move {
174 loop {
175 tokio::time::sleep(Duration::from_secs(5)).await;
176 if poll_flag.load(std::sync::atomic::Ordering::Relaxed) {
177 break;
178 }
179 let Ok(mut poll_client) =
181 sqry_daemon_client::DaemonClient::connect(&poll_socket).await
182 else {
183 continue;
184 };
185 if let Ok(status) = poll_client.status().await {
186 let elapsed = started.elapsed().as_secs();
187 if let Some(ws_state) = extract_workspace_state(&status, &poll_path) {
188 eprint!("\rsqry: {ws_state} ({elapsed}s elapsed)");
189 let _ = std::io::stderr().flush();
190 }
191 }
192 }
193 });
194
195 let result = tokio::select! {
197 res = client.rebuild(&canonical_path, force) => res,
198 () = tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)) => {
199 poll_done.store(true, std::sync::atomic::Ordering::Relaxed);
200 let _ = poll_handle.await;
201 let elapsed_ms = started.elapsed().as_millis() as u64;
202 if json {
203 let out = serde_json::json!({
204 "status": "timeout",
205 "elapsed_ms": elapsed_ms,
206 "message": "rebuild still in progress on daemon"
207 });
208 println!("{}", serde_json::to_string_pretty(&out)?);
209 } else {
210 eprintln!("\nsqry: rebuild timed out after {timeout}s (daemon continues in background)");
211 }
212 std::process::exit(2);
213 }
214 };
215
216 poll_done.store(true, std::sync::atomic::Ordering::Relaxed);
217 let _ = poll_handle.await;
218 eprint!("\r\x1b[K");
220
221 match result {
222 Ok(value) => {
223 if json {
224 let mut out = serde_json::Map::new();
225 out.insert(
226 "status".to_owned(),
227 serde_json::Value::String("completed".to_owned()),
228 );
229 if let Some(r) = value.get("result") {
231 if let Some(d) = r.get("duration_ms") {
232 out.insert("duration_ms".to_owned(), d.clone());
233 }
234 if let Some(n) = r.get("nodes") {
235 out.insert("nodes".to_owned(), n.clone());
236 }
237 if let Some(e) = r.get("edges") {
238 out.insert("edges".to_owned(), e.clone());
239 }
240 if let Some(f) = r.get("files_indexed") {
241 out.insert("files_indexed".to_owned(), f.clone());
242 }
243 }
244 println!(
245 "{}",
246 serde_json::to_string_pretty(&serde_json::Value::Object(out))?
247 );
248 } else {
249 render_rebuild_human(&value, &canonical_path);
250 }
251 }
252 Err(sqry_daemon_client::ClientError::RpcError {
253 code: -32004,
254 message,
255 ..
256 }) => {
257 anyhow::bail!(
258 "workspace {} is not loaded on the daemon. \
259 Load it first with `sqry daemon load {}`.\n (daemon said: {message})",
260 canonical_path.display(),
261 canonical_path.display()
262 );
263 }
264 Err(e) => {
265 return Err(anyhow::anyhow!("daemon/rebuild failed: {e}"));
266 }
267 }
268 anyhow::Ok(())
269 })?;
270
271 Ok(())
272}
273
274fn extract_workspace_state(status: &serde_json::Value, path: &Path) -> Option<String> {
275 let workspaces = status.get("result")?.get("workspaces")?.as_array()?;
276 let path_str = path.to_string_lossy();
277 for ws in workspaces {
278 if let Some(root) = ws.get("index_root").and_then(|r| r.as_str())
279 && root == path_str.as_ref()
280 {
281 return ws
282 .get("state")
283 .and_then(|s| s.as_str())
284 .map(|s| s.to_owned());
285 }
286 }
287 None
288}
289
290fn render_rebuild_human(value: &serde_json::Value, path: &Path) {
291 if let Some(r) = value.get("result") {
292 let duration = r.get("duration_ms").and_then(|d| d.as_u64()).unwrap_or(0);
293 let nodes = r.get("nodes").and_then(|n| n.as_u64()).unwrap_or(0);
294 let edges = r.get("edges").and_then(|e| e.as_u64()).unwrap_or(0);
295 let files = r.get("files_indexed").and_then(|f| f.as_u64()).unwrap_or(0);
296 let was_full = r.get("was_full").and_then(|w| w.as_bool()).unwrap_or(false);
297 let mode = if was_full { "full" } else { "incremental" };
298 eprintln!(
299 "sqry: {mode} rebuild of {} completed in {:.1}s ({nodes} nodes, {edges} edges, {files} files)",
300 path.display(),
301 duration as f64 / 1000.0
302 );
303 } else {
304 eprintln!("sqry: rebuild completed for {}", path.display());
305 }
306}
307
308fn run_daemon_start(sqryd_path: Option<&Path>, timeout: u64) -> Result<()> {
319 let binary = resolve_sqryd_binary(sqryd_path)?;
320
321 let socket_path = load_config_socket_path();
323
324 if socket_path
326 .as_ref()
327 .is_some_and(|sp| try_connect_sync(sp).unwrap_or(false))
328 {
329 let sp = socket_path.as_ref().unwrap();
330 eprintln!("sqry: daemon is already running (socket {})", sp.display());
331 return Ok(());
332 }
333
334 let status = std::process::Command::new(&binary)
335 .args(["start", "--detach"])
336 .stdin(std::process::Stdio::null())
337 .stdout(std::process::Stdio::inherit())
338 .stderr(std::process::Stdio::inherit())
339 .status()
340 .with_context(|| format!("failed to exec sqryd at {}", binary.display()))?;
341
342 if !status.success() {
343 let code = status.code().unwrap_or(1);
344 if code == 75 {
346 eprintln!("sqry: daemon is already running");
347 return Ok(());
348 }
349 anyhow::bail!("sqryd start --detach exited with code {code}");
350 }
351
352 if let Some(ref sp) = socket_path {
354 poll_until_reachable(sp, timeout)?;
355 eprintln!("sqry: daemon started (socket {})", sp.display());
356 } else {
357 eprintln!("sqry: daemon started");
358 }
359 Ok(())
360}
361
362fn run_daemon_stop(timeout: u64) -> Result<()> {
369 let config = load_daemon_config()?;
370 let socket_path = config.socket_path();
371
372 if !try_connect_sync(&socket_path)? {
373 eprintln!("sqry: daemon is not running");
374 return Ok(());
375 }
376
377 let rt = tokio::runtime::Builder::new_current_thread()
378 .enable_all()
379 .build()
380 .context("failed to build tokio runtime for daemon stop")?;
381
382 rt.block_on(async {
383 let mut client = sqry_daemon_client::DaemonClient::connect(&socket_path)
384 .await
385 .with_context(|| format!("failed to connect to daemon at {}", socket_path.display()))?;
386
387 let _ = client.stop().await;
390
391 let deadline = Instant::now() + Duration::from_secs(timeout);
392 loop {
393 tokio::time::sleep(Duration::from_millis(STOP_POLL_INTERVAL_MS)).await;
395
396 if !try_connect_async(&socket_path).await {
397 break;
398 }
399 if Instant::now() >= deadline {
400 anyhow::bail!(
401 "daemon did not exit within {timeout} seconds; \
402 check the daemon log for errors"
403 );
404 }
405 }
406 anyhow::Ok(())
407 })?;
408
409 eprintln!("sqry: daemon stopped");
410 Ok(())
411}
412
413fn run_daemon_status(json: bool) -> Result<()> {
419 let config = load_daemon_config()?;
420 let socket_path = config.socket_path();
421
422 if !try_connect_sync(&socket_path)? {
424 if json {
425 println!(
429 "{}",
430 serde_json::json!({
431 "error": "daemon_unreachable",
432 "socket": socket_path.display().to_string(),
433 })
434 );
435 } else {
436 eprintln!(
437 "sqry: daemon is not running (socket {})",
438 socket_path.display()
439 );
440 }
441 std::process::exit(1);
443 }
444
445 let rt = tokio::runtime::Builder::new_current_thread()
446 .enable_all()
447 .build()
448 .context("failed to build tokio runtime for daemon status")?;
449
450 rt.block_on(async {
451 let mut client = sqry_daemon_client::DaemonClient::connect(&socket_path)
452 .await
453 .with_context(|| format!("failed to connect to daemon at {}", socket_path.display()))?;
454
455 let result = client
456 .status()
457 .await
458 .context("daemon/status request failed")?;
459
460 if json {
461 let out = serde_json::to_string_pretty(&result)
462 .context("failed to serialize daemon status as JSON")?;
463 println!("{out}");
464 } else {
465 render_status_human(&result);
466 }
467 anyhow::Ok(())
468 })?;
469
470 Ok(())
471}
472
473fn run_daemon_logs(lines: usize, follow: bool) -> Result<()> {
486 let config = load_daemon_config()?;
487 let log_path = match resolve_log_path(&config) {
488 Ok(p) if p.exists() => p,
489 Ok(p) => {
490 eprintln!(
493 "sqry: configured log file {} does not exist yet. \
494 The daemon may have just started, or is logging to stderr.",
495 p.display()
496 );
497 return print_log_fallback_hint(&config);
498 }
499 Err(_) => return print_log_fallback_hint(&config),
500 };
501
502 if follow {
503 tail_follow(&log_path, lines)?;
504 } else {
505 tail_last_n(&log_path, lines)?;
506 }
507 Ok(())
508}
509
510fn print_log_fallback_hint(config: &sqry_daemon::config::DaemonConfig) -> Result<()> {
518 eprintln!();
519 eprintln!(
520 "Default log location: {}",
521 config.runtime_dir().join("sqryd.log").display()
522 );
523 eprintln!("To configure a custom path, set in $XDG_CONFIG_HOME/sqry/daemon.toml:");
524 eprintln!(
525 " log_file = \"{}\"",
526 config.runtime_dir().join("sqryd.log").display()
527 );
528 eprintln!("Or via env: SQRY_DAEMON_LOG_FILE=<path>");
529 eprintln!();
530 if cfg!(target_os = "linux") && std::env::var_os("XDG_RUNTIME_DIR").is_some() {
531 eprintln!("If running under systemd --user:");
532 eprintln!(" journalctl --user -u sqryd.service -f");
533 } else if cfg!(target_os = "linux") {
534 eprintln!("If running under systemd:");
535 eprintln!(" journalctl -u sqryd.service -f");
536 } else if cfg!(target_os = "macos") {
537 eprintln!("If running under launchd:");
538 eprintln!(" log stream --predicate 'process == \"sqryd\"'");
539 } else if cfg!(target_os = "windows") {
540 eprintln!("On Windows, use Event Viewer or `Get-WinEvent`.");
541 }
542 Ok(())
543}
544
545fn run_daemon_load(path: &Path) -> Result<()> {
555 let config = load_daemon_config()?;
556 let socket_path = config.socket_path();
557
558 let canonical_path = std::fs::canonicalize(path)
562 .with_context(|| format!("failed to canonicalize path {}", path.display()))?;
563
564 if !try_connect_sync(&socket_path)? {
565 anyhow::bail!(
566 "daemon is not running (socket {}). Start it with `sqry daemon start`.",
567 socket_path.display()
568 );
569 }
570
571 let rt = tokio::runtime::Builder::new_current_thread()
572 .enable_all()
573 .build()
574 .context("failed to build tokio runtime for daemon load")?;
575
576 rt.block_on(async {
577 let mut client = sqry_daemon_client::DaemonClient::connect(&socket_path)
578 .await
579 .with_context(|| format!("failed to connect to daemon at {}", socket_path.display()))?;
580
581 let envelope = client.load(&canonical_path).await.with_context(|| {
587 format!(
588 "daemon/load request failed for {}",
589 canonical_path.display()
590 )
591 })?;
592
593 let load_result = envelope.result;
594 eprintln!(
595 "sqry: workspace loaded at {} ({:?}, {})",
596 canonical_path.display(),
597 load_result.state,
598 human_bytes(load_result.current_bytes)
599 );
600 anyhow::Ok(())
601 })?;
602
603 Ok(())
604}
605
606fn resolve_sqryd_binary(explicit: Option<&Path>) -> Result<PathBuf> {
622 if let Some(path) = explicit {
624 if path.exists() {
625 return Ok(path.to_path_buf());
626 }
627 anyhow::bail!("explicit --sqryd-path {} does not exist", path.display());
628 }
629
630 if let Ok(exe) = std::env::current_exe() {
632 let canonical = std::fs::canonicalize(&exe).unwrap_or(exe);
634 if let Some(dir) = canonical.parent() {
635 let sibling = dir.join("sqryd");
636 if sibling.exists() {
637 return Ok(sibling);
638 }
639 let sibling_exe = dir.join("sqryd.exe");
641 if sibling_exe.exists() {
642 return Ok(sibling_exe);
643 }
644 }
645 }
646
647 if let Some(val) = std::env::var_os("SQRYD_PATH") {
649 let path = PathBuf::from(val);
650 if path.exists() {
651 return Ok(path);
652 }
653 anyhow::bail!("SQRYD_PATH={} does not exist", path.display());
654 }
655
656 which::which("sqryd").with_context(|| {
658 "sqryd binary not found. \
659 Install sqryd alongside sqry, set SQRYD_PATH, or use --sqryd-path."
660 .to_owned()
661 })
662}
663
664#[allow(dead_code)]
686pub fn try_auto_start_daemon() -> Result<bool> {
687 if std::env::var_os(ENV_DAEMON_AUTO_START).as_deref() != Some(std::ffi::OsStr::new("1")) {
688 return Ok(false);
689 }
690
691 let binary = resolve_sqryd_binary(None)?;
693
694 let socket_path = load_config_socket_path();
696 if socket_path
697 .as_ref()
698 .is_some_and(|sp| try_connect_sync(sp).unwrap_or(false))
699 {
700 return Ok(true);
701 }
702
703 let status = std::process::Command::new(&binary)
705 .args(["start", "--detach"])
706 .stdin(std::process::Stdio::null())
707 .stdout(std::process::Stdio::null())
708 .stderr(std::process::Stdio::inherit())
709 .status()
710 .with_context(|| format!("auto-start: failed to exec sqryd at {}", binary.display()))?;
711
712 if !status.success() {
713 let code = status.code().unwrap_or(1);
714 if code != 75 {
715 eprintln!(
717 "sqry: Warning: daemon auto-start failed (sqryd exited {code}); \
718 falling back to local mode"
719 );
720 return Ok(false);
721 }
722 }
723
724 Ok(true)
725}
726
727#[must_use]
735pub fn human_bytes(bytes: u64) -> String {
736 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
737 const DIVISOR: u64 = 1024;
738
739 if bytes < DIVISOR {
740 return format!("{bytes} B");
741 }
742
743 let mut value = bytes as f64;
744 let mut unit_index = 0usize;
745 while value >= DIVISOR as f64 && unit_index + 1 < UNITS.len() {
746 value /= DIVISOR as f64;
747 unit_index += 1;
748 }
749 if (value - value.floor()).abs() < 0.05 {
751 format!("{:.0} {}", value, UNITS[unit_index])
752 } else {
753 format!("{:.1} {}", value, UNITS[unit_index])
754 }
755}
756
757#[must_use]
768pub fn format_uptime(seconds: u64) -> String {
769 let days = seconds / 86_400;
770 let hours = (seconds % 86_400) / 3_600;
771 let mins = (seconds % 3_600) / 60;
772
773 match (days, hours, mins) {
774 (0, 0, _) => format!("{mins}m"),
775 (0, h, 0) => format!("{h}h"),
776 (0, h, m) => format!("{h}h {m}m"),
777 (d, 0, 0) => format!("{d}d"),
778 (d, h, 0) => format!("{d}d {h}h"),
779 (d, h, m) => format!("{d}d {h}h {m}m"),
780 }
781}
782
783fn render_status_human(envelope: &serde_json::Value) {
793 let stdout = std::io::stdout();
794 let mut handle = stdout.lock();
795 let _ = render_status_human_into(envelope, &mut handle);
797}
798
799fn render_status_human_into(
848 envelope: &serde_json::Value,
849 out: &mut dyn Write,
850) -> std::io::Result<()> {
851 let inner = envelope.get("result").unwrap_or(envelope);
855
856 let version = inner
860 .get("daemon_version")
861 .or_else(|| envelope.get("meta").and_then(|m| m.get("daemon_version")))
862 .and_then(serde_json::Value::as_str)
863 .unwrap_or("unknown");
864
865 let uptime_str = inner
867 .get("uptime_seconds")
868 .and_then(serde_json::Value::as_u64)
869 .map(format_uptime);
870
871 match uptime_str {
872 Some(uptime) => writeln!(out, "sqryd v{version} -- uptime {uptime}")?,
873 None => writeln!(out, "sqryd v{version}")?,
874 }
875
876 let memory = inner.get("memory");
879 let mem_current = memory
880 .and_then(|m| m.get("current_bytes"))
881 .and_then(serde_json::Value::as_u64);
882 let mem_limit = memory
883 .and_then(|m| m.get("limit_bytes"))
884 .and_then(serde_json::Value::as_u64);
885 let mem_peak = memory
886 .and_then(|m| m.get("high_water_bytes"))
887 .and_then(serde_json::Value::as_u64);
888
889 if mem_current.is_some() || mem_limit.is_some() {
890 writeln!(out)?;
891 let used_str = mem_current.map_or_else(|| "?".to_owned(), human_bytes);
892 let limit_str = mem_limit.map_or_else(|| "?".to_owned(), human_bytes);
893 match mem_peak {
894 Some(peak) => writeln!(
895 out,
896 "Memory: {used_str} / {limit_str} (peak: {})",
897 human_bytes(peak)
898 )?,
899 None => writeln!(out, "Memory: {used_str} / {limit_str}")?,
900 }
901 }
902
903 let workspaces = inner
905 .get("workspaces")
906 .and_then(serde_json::Value::as_array);
907
908 if let Some(wss) = workspaces {
909 writeln!(out)?;
910 writeln!(out, "Workspaces ({} loaded):", wss.len())?;
911 for ws in wss {
912 render_workspace_line_into(ws, out)?;
913 }
914 }
915
916 Ok(())
917}
918
919#[allow(dead_code)]
928fn render_workspace_line(ws: &serde_json::Value) {
929 let stdout = std::io::stdout();
930 let mut handle = stdout.lock();
931 let _ = render_workspace_line_into(ws, &mut handle);
932}
933
934fn render_workspace_line_into(ws: &serde_json::Value, out: &mut dyn Write) -> std::io::Result<()> {
944 let path = ws
947 .get("index_root")
948 .or_else(|| ws.get("path"))
949 .and_then(serde_json::Value::as_str)
950 .unwrap_or("<unknown>");
951
952 let display_path = tilde_shorten(path);
954
955 let ws_mem = ws
957 .get("current_bytes")
958 .and_then(serde_json::Value::as_u64)
959 .map(human_bytes);
960 let ws_peak = ws
961 .get("high_water_bytes")
962 .and_then(serde_json::Value::as_u64)
963 .map(human_bytes);
964
965 let mut tags: Vec<&str> = Vec::new();
967 if ws
968 .get("pinned")
969 .and_then(serde_json::Value::as_bool)
970 .unwrap_or(false)
971 {
972 tags.push("pinned");
973 }
974 let state = ws
975 .get("state")
976 .and_then(serde_json::Value::as_str)
977 .unwrap_or("Loaded");
978 tags.push(state);
979
980 if let Some(err_msg) = ws.get("last_error").and_then(serde_json::Value::as_str) {
982 let tag = format!("error: {err_msg}");
984 match (ws_mem, ws_peak) {
985 (Some(mem), Some(peak)) => {
986 writeln!(
987 out,
988 " {display_path:<30} {mem:<8} (peak: {peak:<8}) [{tags}, {tag}]",
989 tags = tags.join(", ")
990 )?;
991 }
992 (Some(mem), None) => {
993 writeln!(
994 out,
995 " {display_path:<30} {mem:<8} [{tags}, {tag}]",
996 tags = tags.join(", ")
997 )?;
998 }
999 _ => {
1000 writeln!(
1001 out,
1002 " {display_path} [{tags}, {tag}]",
1003 tags = tags.join(", ")
1004 )?;
1005 }
1006 }
1007 return Ok(());
1008 }
1009
1010 let tag_str = format!("[{}]", tags.join(", "));
1011 match (ws_mem, ws_peak) {
1012 (Some(mem), Some(peak)) => {
1013 writeln!(
1014 out,
1015 " {display_path:<30} {mem:<8} (peak: {peak:<8}) {tag_str}"
1016 )?;
1017 }
1018 (Some(mem), None) => {
1019 writeln!(out, " {display_path:<30} {mem:<8} {tag_str}")?;
1020 }
1021 _ => {
1022 writeln!(out, " {display_path} {tag_str}")?;
1023 }
1024 }
1025 Ok(())
1026}
1027
1028fn tilde_shorten(path: &str) -> String {
1030 if let Some(home) = dirs::home_dir() {
1031 let home_str = home.to_string_lossy();
1032 if let Some(stripped) = path.strip_prefix(home_str.as_ref()) {
1033 return format!("~{stripped}");
1034 }
1035 }
1036 path.to_owned()
1037}
1038
1039pub fn tail_last_n(path: &Path, n: usize) -> Result<()> {
1054 let file = std::fs::File::open(path)
1055 .with_context(|| format!("failed to open log file {}", path.display()))?;
1056 let buf_reader = std::io::BufReader::new(file);
1057
1058 let mut lines: Vec<String> = Vec::new();
1060 for line in buf_reader.lines() {
1061 let l = line.with_context(|| format!("error reading log file {}", path.display()))?;
1062 lines.push(l);
1063 }
1064
1065 let start = lines.len().saturating_sub(n);
1067 let stdout = std::io::stdout();
1068 let mut out = stdout.lock();
1069 for line in &lines[start..] {
1070 writeln!(out, "{line}").context("failed to write to stdout")?;
1071 }
1072 Ok(())
1073}
1074
1075pub fn tail_follow(path: &Path, initial_lines: usize) -> Result<()> {
1088 use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
1089 use std::sync::mpsc;
1090
1091 tail_last_n(path, initial_lines)?;
1093
1094 let mut file = std::fs::File::open(path)
1096 .with_context(|| format!("failed to open log file for follow: {}", path.display()))?;
1097 let mut pos = file
1098 .seek(SeekFrom::End(0))
1099 .context("failed to seek to end of log file")?;
1100
1101 let (tx, rx) = mpsc::channel::<notify::Result<notify::Event>>();
1103 let mut watcher = RecommendedWatcher::new(tx, notify::Config::default())
1104 .context("failed to create file watcher for log follow")?;
1105
1106 let parent = path.parent().unwrap_or(Path::new("."));
1108 watcher
1109 .watch(parent, RecursiveMode::NonRecursive)
1110 .with_context(|| format!("failed to watch log directory {}", parent.display()))?;
1111
1112 let stdout = std::io::stdout();
1113 let mut out = stdout.lock();
1114
1115 loop {
1117 match rx.recv_timeout(Duration::from_millis(FOLLOW_EVENT_TIMEOUT_MS)) {
1118 Ok(Ok(event)) => {
1119 let is_rotate = matches!(event.kind, EventKind::Remove(_) | EventKind::Create(_))
1120 && event.paths.iter().any(|p| p == path);
1121
1122 if is_rotate {
1123 if path.exists() {
1125 match std::fs::File::open(path) {
1126 Ok(f) => {
1127 file = f;
1128 pos = 0;
1129 }
1130 Err(e) => {
1131 eprintln!("sqry: log rotation detected but reopen failed: {e}");
1132 }
1133 }
1134 }
1135 }
1136
1137 pos = drain_new_bytes(&mut file, pos, path, &mut out)?;
1139 }
1140 Ok(Err(e)) => {
1141 eprintln!("sqry: file watcher error: {e}");
1142 }
1143 Err(mpsc::RecvTimeoutError::Timeout) => {
1144 pos = drain_new_bytes(&mut file, pos, path, &mut out)?;
1146 }
1147 Err(mpsc::RecvTimeoutError::Disconnected) => {
1148 break;
1149 }
1150 }
1151 }
1152
1153 Ok(())
1154}
1155
1156fn drain_new_bytes(
1159 file: &mut std::fs::File,
1160 current_pos: u64,
1161 path: &Path,
1162 out: &mut impl Write,
1163) -> Result<u64> {
1164 file.seek(SeekFrom::Start(current_pos))
1165 .with_context(|| format!("seek error in log file {}", path.display()))?;
1166
1167 let mut buf = Vec::new();
1168 file.read_to_end(&mut buf)
1169 .with_context(|| format!("read error in log file {}", path.display()))?;
1170
1171 if !buf.is_empty() {
1172 out.write_all(&buf)
1173 .context("failed to write log output to stdout")?;
1174 out.flush().context("failed to flush stdout")?;
1175 }
1176
1177 let new_pos = current_pos + buf.len() as u64;
1178 Ok(new_pos)
1179}
1180
1181fn load_daemon_config() -> Result<sqry_daemon::config::DaemonConfig> {
1187 sqry_daemon::config::DaemonConfig::load().context(
1188 "failed to load daemon config; ensure daemon.toml is well-formed or \
1189 remove it to use defaults",
1190 )
1191}
1192
1193fn load_config_socket_path() -> Option<PathBuf> {
1197 sqry_daemon::config::DaemonConfig::load()
1198 .ok()
1199 .map(|c| c.socket_path())
1200}
1201
1202fn resolve_log_path(config: &sqry_daemon::config::DaemonConfig) -> Result<PathBuf> {
1213 match config.log_file.resolve() {
1214 Some(p) => Ok(p),
1215 None => {
1216 anyhow::bail!(
1217 "log_file = \"stderr\" / \"-\" — the daemon writes only to stderr.\n\
1218 Tail systemd / journald instead (see `sqry daemon logs --help`),\n\
1219 or remove the opt-out so the daemon logs to <runtime_dir>/sqryd.log."
1220 );
1221 }
1222 }
1223}
1224
1225fn poll_until_reachable(socket_path: &Path, timeout: u64) -> Result<()> {
1234 let deadline = Instant::now() + Duration::from_secs(timeout);
1235 loop {
1236 if try_connect_sync(socket_path).unwrap_or(false) {
1237 return Ok(());
1238 }
1239 if Instant::now() >= deadline {
1240 anyhow::bail!(
1241 "daemon process started but did not become reachable within {timeout} \
1242 seconds (socket {}). Check the daemon log for startup errors.",
1243 socket_path.display()
1244 );
1245 }
1246 std::thread::sleep(Duration::from_millis(STOP_POLL_INTERVAL_MS));
1247 }
1248}
1249
1250pub fn try_connect_sync(socket_path: &Path) -> Result<bool> {
1267 #[cfg(unix)]
1268 {
1269 use std::os::unix::net::UnixStream;
1270 match UnixStream::connect(socket_path) {
1271 Ok(_) => Ok(true),
1272 Err(e) => match e.kind() {
1273 std::io::ErrorKind::ConnectionRefused | std::io::ErrorKind::NotFound => Ok(false),
1274 _ => Err(anyhow::Error::from(e).context(format!(
1275 "unexpected error probing socket {}",
1276 socket_path.display()
1277 ))),
1278 },
1279 }
1280 }
1281 #[cfg(windows)]
1282 {
1283 Ok(socket_path.exists())
1285 }
1286 #[cfg(not(any(unix, windows)))]
1287 {
1288 let _ = socket_path;
1289 Ok(false)
1290 }
1291}
1292
1293async fn try_connect_async(socket_path: &Path) -> bool {
1297 #[cfg(unix)]
1298 {
1299 tokio::net::UnixStream::connect(socket_path).await.is_ok()
1300 }
1301 #[cfg(windows)]
1302 {
1303 socket_path.exists()
1304 }
1305 #[cfg(not(any(unix, windows)))]
1306 {
1307 let _ = socket_path;
1308 false
1309 }
1310}
1311
1312#[cfg(test)]
1317mod tests {
1318 use super::*;
1319
1320 #[test]
1327 #[serial_test::serial]
1328 fn resolve_sqryd_binary_finds_sibling() {
1329 let dir = tempfile::tempdir().expect("tempdir");
1330 let sqryd_path = dir.path().join("sqryd");
1331 std::fs::write(&sqryd_path, b"#!/bin/sh\n").expect("write fake sqryd");
1333 #[cfg(unix)]
1334 {
1335 use std::os::unix::fs::PermissionsExt;
1336 std::fs::set_permissions(&sqryd_path, std::fs::Permissions::from_mode(0o755))
1337 .expect("chmod");
1338 }
1339
1340 unsafe {
1343 std::env::set_var("SQRYD_PATH", &sqryd_path);
1344 }
1345 let result = resolve_sqryd_binary(None);
1346 unsafe {
1347 std::env::remove_var("SQRYD_PATH");
1348 }
1349
1350 assert!(result.is_ok(), "expected Ok, got {:?}", result);
1351 assert_eq!(result.unwrap(), sqryd_path);
1352 }
1353
1354 #[test]
1358 #[serial_test::serial]
1359 fn resolve_sqryd_binary_falls_back_to_path() {
1360 unsafe {
1362 std::env::remove_var("SQRYD_PATH");
1363 }
1364
1365 let result = resolve_sqryd_binary(None);
1366 let _ = result;
1369 }
1370
1371 #[test]
1373 #[serial_test::serial]
1374 fn resolve_sqryd_binary_respects_env_var() {
1375 let dir = tempfile::tempdir().expect("tempdir");
1376 let sqryd_path = dir.path().join("sqryd");
1377 std::fs::write(&sqryd_path, b"#!/bin/sh\n").expect("write fake sqryd");
1378
1379 unsafe {
1380 std::env::set_var("SQRYD_PATH", &sqryd_path);
1381 }
1382 let result = resolve_sqryd_binary(None);
1383 unsafe {
1384 std::env::remove_var("SQRYD_PATH");
1385 }
1386
1387 assert!(result.is_ok(), "expected Ok, got {:?}", result);
1388 assert_eq!(result.unwrap(), sqryd_path);
1389 }
1390
1391 #[test]
1396 fn human_bytes_formats_correctly() {
1397 assert_eq!(human_bytes(0), "0 B");
1398 assert_eq!(human_bytes(512), "512 B");
1399 assert_eq!(human_bytes(1023), "1023 B");
1400 assert_eq!(human_bytes(1024), "1 KB");
1401 assert_eq!(human_bytes(1536), "1.5 KB");
1402 assert_eq!(human_bytes(1_048_576), "1 MB");
1403 assert_eq!(human_bytes(1_073_741_824), "1 GB");
1404 assert_eq!(human_bytes(1_099_511_627_776), "1 TB");
1405 assert_eq!(human_bytes(1_572_864), "1.5 MB");
1407 }
1408
1409 #[test]
1414 fn format_uptime_renders_hours_minutes() {
1415 assert_eq!(format_uptime(0), "0m");
1416 assert_eq!(format_uptime(59), "0m");
1417 assert_eq!(format_uptime(60), "1m");
1418 assert_eq!(format_uptime(3600), "1h");
1419 assert_eq!(format_uptime(3660), "1h 1m");
1420 assert_eq!(format_uptime(7380), "2h 3m");
1421 assert_eq!(format_uptime(86400), "1d");
1422 assert_eq!(format_uptime(90061), "1d 1h 1m");
1423 assert_eq!(format_uptime(172800), "2d");
1424 }
1425
1426 #[test]
1437 fn daemon_status_human_renders_minimal_response() {
1438 let envelope = serde_json::json!({
1441 "result": {},
1442 "meta": { "stale": false, "daemon_version": "8.0.6" }
1443 });
1444 let mut buf: Vec<u8> = Vec::new();
1445 render_status_human_into(&envelope, &mut buf)
1446 .expect("render_status_human_into must not fail");
1447 let output = String::from_utf8(buf).expect("rendered output must be valid UTF-8");
1448 assert!(
1450 output.contains("v8.0.6"),
1451 "graceful degradation must fall back to meta.daemon_version '8.0.6'; got:\n{output}"
1452 );
1453 }
1454
1455 #[test]
1459 fn daemon_status_human_renders_full_response() {
1460 let envelope = serde_json::json!({
1463 "result": {
1464 "daemon_version": "8.0.6",
1465 "uptime_seconds": 8040_u64,
1466 "memory": {
1467 "limit_bytes": 2_147_483_648_u64,
1468 "current_bytes": 471_859_200_u64,
1469 "reserved_bytes": 0_u64,
1470 "high_water_bytes": 1_288_490_188_u64
1471 },
1472 "workspaces": [
1473 {
1474 "index_root": "/home/user/repos/main-project",
1475 "state": "Loaded",
1476 "pinned": true,
1477 "current_bytes": 335_544_320_u64,
1478 "high_water_bytes": 933_232_896_u64,
1479 "last_good_at": null,
1480 "last_error": null,
1481 "retry_count": 0
1482 },
1483 {
1484 "index_root": "/home/user/repos/auth-service",
1485 "state": "Loaded",
1486 "pinned": false,
1487 "current_bytes": 83_886_080_u64,
1488 "high_water_bytes": 324_534_016_u64,
1489 "last_good_at": null,
1490 "last_error": null,
1491 "retry_count": 0
1492 }
1493 ]
1494 },
1495 "meta": {
1496 "stale": false,
1497 "daemon_version": "8.0.6"
1498 }
1499 });
1500 let mut buf: Vec<u8> = Vec::new();
1501 render_status_human_into(&envelope, &mut buf)
1502 .expect("render_status_human_into must not fail");
1503 let output = String::from_utf8(buf).expect("rendered output must be valid UTF-8");
1504 assert!(
1506 output.contains("v8.0.6"),
1507 "must contain version; got:\n{output}"
1508 );
1509 assert!(
1510 output.contains("2h"),
1511 "must contain uptime hours; got:\n{output}"
1512 );
1513 assert!(
1515 output.contains("Memory:"),
1516 "must contain memory section; got:\n{output}"
1517 );
1518 assert!(
1519 output.contains("2 GB"),
1520 "must contain limit '2 GB'; got:\n{output}"
1521 );
1522 assert!(
1524 output.contains("Workspaces"),
1525 "must contain workspaces section; got:\n{output}"
1526 );
1527 assert!(
1528 output.contains("main-project"),
1529 "must contain workspace path; got:\n{output}"
1530 );
1531 assert!(
1532 output.contains("auth-service"),
1533 "must contain workspace path; got:\n{output}"
1534 );
1535 }
1536
1537 #[test]
1551 fn daemon_status_human_extracts_version_and_uptime() {
1552 let envelope = serde_json::json!({
1553 "result": {
1554 "daemon_version": "8.1.2",
1555 "uptime_seconds": 3661_u64,
1556 "memory": {
1557 "limit_bytes": 1_073_741_824_u64,
1558 "current_bytes": 104_857_600_u64,
1559 "reserved_bytes": 0_u64,
1560 "high_water_bytes": 209_715_200_u64
1561 },
1562 "workspaces": []
1563 },
1564 "meta": { "stale": false, "daemon_version": "7.9.0" }
1567 });
1568
1569 let mut buf: Vec<u8> = Vec::new();
1570 render_status_human_into(&envelope, &mut buf)
1571 .expect("render_status_human_into must not fail writing to Vec");
1572 let output = String::from_utf8(buf).expect("rendered output must be valid UTF-8");
1573
1574 assert!(
1577 output.contains("v8.1.2"),
1578 "rendered output must contain result.daemon_version '8.1.2'; got:\n{output}"
1579 );
1580 assert!(
1581 !output.contains("v7.9.0"),
1582 "rendered output must NOT contain meta.daemon_version '7.9.0'; got:\n{output}"
1583 );
1584 assert!(
1586 output.contains("1h 1m"),
1587 "rendered output must contain uptime '1h 1m'; got:\n{output}"
1588 );
1589 assert!(
1591 output.contains("100 MB"),
1592 "rendered output must contain memory current '100 MB'; got:\n{output}"
1593 );
1594 assert!(
1595 output.contains("1 GB"),
1596 "rendered output must contain memory limit '1 GB'; got:\n{output}"
1597 );
1598 assert!(
1600 output.contains("200 MB"),
1601 "rendered output must contain memory peak '200 MB'; got:\n{output}"
1602 );
1603 }
1604
1605 #[test]
1613 fn daemon_status_human_renders_workspace_canonical_fields() {
1614 let ws = serde_json::json!({
1615 "index_root": "/home/user/repos/sqry",
1616 "state": "Loaded",
1617 "pinned": true,
1618 "current_bytes": 335_544_320_u64,
1619 "high_water_bytes": 671_088_640_u64,
1620 "last_good_at": null,
1621 "last_error": null,
1622 "retry_count": 0
1623 });
1624
1625 let mut buf: Vec<u8> = Vec::new();
1626 render_workspace_line_into(&ws, &mut buf)
1627 .expect("render_workspace_line_into must not fail writing to Vec");
1628 let output = String::from_utf8(buf).expect("rendered output must be valid UTF-8");
1629
1630 assert!(
1633 output.contains("sqry"),
1634 "rendered output must contain the workspace path component 'sqry'; got:\n{output}"
1635 );
1636 assert!(
1638 output.contains("320 MB"),
1639 "rendered output must contain workspace size '320 MB' from current_bytes; got:\n{output}"
1640 );
1641 assert!(
1643 output.contains("640 MB"),
1644 "rendered output must contain workspace peak '640 MB' from high_water_bytes; got:\n{output}"
1645 );
1646 assert!(
1648 output.contains("pinned"),
1649 "rendered output must contain 'pinned' tag; got:\n{output}"
1650 );
1651 }
1652
1653 #[test]
1664 fn resolve_log_path_returns_runtime_dir_default_when_unconfigured() {
1665 let config = sqry_daemon::config::DaemonConfig::default();
1666 let result = resolve_log_path(&config).expect("default config must resolve to a path");
1667 assert!(
1668 result.ends_with("sqryd.log"),
1669 "default log path should end with sqryd.log, got: {}",
1670 result.display()
1671 );
1672 }
1673
1674 #[test]
1679 fn resolve_log_path_errors_when_log_file_opted_out() {
1680 let mut config = sqry_daemon::config::DaemonConfig::default();
1681 config.log_file = sqry_daemon::config::LogFileSetting::Special("stderr".to_string());
1682
1683 let result = resolve_log_path(&config);
1684 assert!(
1685 result.is_err(),
1686 "resolve_log_path must return Err when log_file is Special"
1687 );
1688
1689 let err_msg = format!("{}", result.unwrap_err());
1690 assert!(
1691 err_msg.contains("stderr"),
1692 "error must mention 'stderr' to explain the opt-out; got:\n{err_msg}"
1693 );
1694 }
1695
1696 #[test]
1708 fn poll_until_reachable_times_out_for_unreachable_socket() {
1709 let dir = tempfile::tempdir().expect("tempdir");
1710 let socket_path = dir.path().join("nonexistent.sock");
1711
1712 let result = poll_until_reachable(&socket_path, 0);
1714 assert!(
1715 result.is_err(),
1716 "poll_until_reachable must return Err when socket is unreachable and timeout = 0"
1717 );
1718
1719 let err_msg = format!("{}", result.unwrap_err());
1720 assert!(
1721 err_msg.contains("did not become reachable"),
1722 "error must contain 'did not become reachable'; got:\n{err_msg}"
1723 );
1724 assert!(
1726 err_msg.contains("nonexistent.sock"),
1727 "error must contain the socket path; got:\n{err_msg}"
1728 );
1729 }
1730
1731 #[test]
1738 #[serial_test::serial]
1739 fn try_auto_start_daemon_returns_false_when_disabled() {
1740 unsafe {
1741 std::env::remove_var(ENV_DAEMON_AUTO_START);
1742 }
1743 let result = try_auto_start_daemon().expect("try_auto_start_daemon must not error");
1744 assert!(
1745 !result,
1746 "expected false when SQRY_DAEMON_AUTO_START is unset"
1747 );
1748
1749 unsafe {
1751 std::env::set_var(ENV_DAEMON_AUTO_START, "0");
1752 }
1753 let result = try_auto_start_daemon().expect("try_auto_start_daemon must not error");
1754 unsafe {
1755 std::env::remove_var(ENV_DAEMON_AUTO_START);
1756 }
1757 assert!(!result, "expected false when SQRY_DAEMON_AUTO_START=0");
1758 }
1759}