1use std::io::{BufRead, Read, Seek, SeekFrom, Write};
25use std::path::{Path, PathBuf};
26use std::time::{Duration, Instant};
27
28use anyhow::{Context, Result};
29
30use crate::args::{Cli, DaemonAction};
31
32#[allow(dead_code)]
41const ENV_DAEMON_AUTO_START: &str = "SQRY_DAEMON_AUTO_START";
42
43const STOP_POLL_INTERVAL_MS: u64 = 100;
45
46const FOLLOW_EVENT_TIMEOUT_MS: u64 = 250;
49
50pub fn run(_cli: &Cli, action: &DaemonAction) -> Result<()> {
60 match action {
61 DaemonAction::Start {
62 sqryd_path,
63 timeout,
64 } => run_daemon_start(sqryd_path.as_deref(), *timeout),
65 DaemonAction::Stop { timeout } => run_daemon_stop(*timeout),
66 DaemonAction::Status { json } => run_daemon_status(*json),
67 DaemonAction::Logs { lines, follow } => run_daemon_logs(*lines, *follow),
68 DaemonAction::Load { path } => run_daemon_load(path),
69 DaemonAction::Rebuild {
70 path,
71 force,
72 timeout,
73 json,
74 } => run_daemon_rebuild(path, *force, *timeout, *json),
75 }
76}
77
78fn run_daemon_rebuild(path: &Path, force: bool, timeout: u64, json: bool) -> Result<()> {
83 let config = load_daemon_config()?;
84 let socket_path = config.socket_path();
85
86 let canonical_path = std::fs::canonicalize(path)
87 .with_context(|| format!("failed to canonicalize path {}", path.display()))?;
88
89 if !try_connect_sync(&socket_path)? {
90 anyhow::bail!(
91 "daemon is not running (socket {}). Start it with `sqry daemon start`.",
92 socket_path.display()
93 );
94 }
95
96 let rt = tokio::runtime::Builder::new_current_thread()
97 .enable_all()
98 .build()
99 .context("failed to build tokio runtime for daemon rebuild")?;
100
101 rt.block_on(async {
102 let mut client = sqry_daemon_client::DaemonClient::connect(&socket_path)
103 .await
104 .with_context(|| {
105 format!("failed to connect to daemon at {}", socket_path.display())
106 })?;
107
108 let started = Instant::now();
109 let deadline = started + Duration::from_secs(timeout);
110
111 let poll_socket = socket_path.clone();
113 let poll_path = canonical_path.clone();
114 let poll_done = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
115 let poll_flag = std::sync::Arc::clone(&poll_done);
116 let poll_handle = tokio::spawn(async move {
117 loop {
118 tokio::time::sleep(Duration::from_secs(5)).await;
119 if poll_flag.load(std::sync::atomic::Ordering::Relaxed) {
120 break;
121 }
122 let Ok(mut poll_client) =
124 sqry_daemon_client::DaemonClient::connect(&poll_socket).await
125 else {
126 continue;
127 };
128 if let Ok(status) = poll_client.status().await {
129 let elapsed = started.elapsed().as_secs();
130 if let Some(ws_state) = extract_workspace_state(&status, &poll_path) {
131 eprint!("\rsqry: {ws_state} ({elapsed}s elapsed)");
132 let _ = std::io::stderr().flush();
133 }
134 }
135 }
136 });
137
138 let result = tokio::select! {
140 res = client.rebuild(&canonical_path, force) => res,
141 () = tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)) => {
142 poll_done.store(true, std::sync::atomic::Ordering::Relaxed);
143 let _ = poll_handle.await;
144 let elapsed_ms = started.elapsed().as_millis() as u64;
145 if json {
146 let out = serde_json::json!({
147 "status": "timeout",
148 "elapsed_ms": elapsed_ms,
149 "message": "rebuild still in progress on daemon"
150 });
151 println!("{}", serde_json::to_string_pretty(&out)?);
152 } else {
153 eprintln!("\nsqry: rebuild timed out after {timeout}s (daemon continues in background)");
154 }
155 std::process::exit(2);
156 }
157 };
158
159 poll_done.store(true, std::sync::atomic::Ordering::Relaxed);
160 let _ = poll_handle.await;
161 eprint!("\r\x1b[K");
163
164 match result {
165 Ok(value) => {
166 if json {
167 let mut out = serde_json::Map::new();
168 out.insert(
169 "status".to_owned(),
170 serde_json::Value::String("completed".to_owned()),
171 );
172 if let Some(r) = value.get("result") {
174 if let Some(d) = r.get("duration_ms") {
175 out.insert("duration_ms".to_owned(), d.clone());
176 }
177 if let Some(n) = r.get("nodes") {
178 out.insert("nodes".to_owned(), n.clone());
179 }
180 if let Some(e) = r.get("edges") {
181 out.insert("edges".to_owned(), e.clone());
182 }
183 if let Some(f) = r.get("files_indexed") {
184 out.insert("files_indexed".to_owned(), f.clone());
185 }
186 }
187 println!(
188 "{}",
189 serde_json::to_string_pretty(&serde_json::Value::Object(out))?
190 );
191 } else {
192 render_rebuild_human(&value, &canonical_path);
193 }
194 }
195 Err(sqry_daemon_client::ClientError::RpcError {
196 code: -32004,
197 message,
198 ..
199 }) => {
200 anyhow::bail!(
201 "workspace {} is not loaded on the daemon. \
202 Load it first with `sqry daemon load {}`.\n (daemon said: {message})",
203 canonical_path.display(),
204 canonical_path.display()
205 );
206 }
207 Err(e) => {
208 return Err(anyhow::anyhow!("daemon/rebuild failed: {e}"));
209 }
210 }
211 anyhow::Ok(())
212 })?;
213
214 Ok(())
215}
216
217fn extract_workspace_state(status: &serde_json::Value, path: &Path) -> Option<String> {
218 let workspaces = status.get("result")?.get("workspaces")?.as_array()?;
219 let path_str = path.to_string_lossy();
220 for ws in workspaces {
221 if let Some(root) = ws.get("index_root").and_then(|r| r.as_str())
222 && root == path_str.as_ref()
223 {
224 return ws
225 .get("state")
226 .and_then(|s| s.as_str())
227 .map(|s| s.to_owned());
228 }
229 }
230 None
231}
232
233fn render_rebuild_human(value: &serde_json::Value, path: &Path) {
234 if let Some(r) = value.get("result") {
235 let duration = r.get("duration_ms").and_then(|d| d.as_u64()).unwrap_or(0);
236 let nodes = r.get("nodes").and_then(|n| n.as_u64()).unwrap_or(0);
237 let edges = r.get("edges").and_then(|e| e.as_u64()).unwrap_or(0);
238 let files = r.get("files_indexed").and_then(|f| f.as_u64()).unwrap_or(0);
239 let was_full = r.get("was_full").and_then(|w| w.as_bool()).unwrap_or(false);
240 let mode = if was_full { "full" } else { "incremental" };
241 eprintln!(
242 "sqry: {mode} rebuild of {} completed in {:.1}s ({nodes} nodes, {edges} edges, {files} files)",
243 path.display(),
244 duration as f64 / 1000.0
245 );
246 } else {
247 eprintln!("sqry: rebuild completed for {}", path.display());
248 }
249}
250
251fn run_daemon_start(sqryd_path: Option<&Path>, timeout: u64) -> Result<()> {
262 let binary = resolve_sqryd_binary(sqryd_path)?;
263
264 let socket_path = load_config_socket_path();
266
267 if socket_path
269 .as_ref()
270 .is_some_and(|sp| try_connect_sync(sp).unwrap_or(false))
271 {
272 let sp = socket_path.as_ref().unwrap();
273 eprintln!("sqry: daemon is already running (socket {})", sp.display());
274 return Ok(());
275 }
276
277 let status = std::process::Command::new(&binary)
278 .args(["start", "--detach"])
279 .stdin(std::process::Stdio::null())
280 .stdout(std::process::Stdio::inherit())
281 .stderr(std::process::Stdio::inherit())
282 .status()
283 .with_context(|| format!("failed to exec sqryd at {}", binary.display()))?;
284
285 if !status.success() {
286 let code = status.code().unwrap_or(1);
287 if code == 75 {
289 eprintln!("sqry: daemon is already running");
290 return Ok(());
291 }
292 anyhow::bail!("sqryd start --detach exited with code {code}");
293 }
294
295 if let Some(ref sp) = socket_path {
297 poll_until_reachable(sp, timeout)?;
298 eprintln!("sqry: daemon started (socket {})", sp.display());
299 } else {
300 eprintln!("sqry: daemon started");
301 }
302 Ok(())
303}
304
305fn run_daemon_stop(timeout: u64) -> Result<()> {
312 let config = load_daemon_config()?;
313 let socket_path = config.socket_path();
314
315 if !try_connect_sync(&socket_path)? {
316 eprintln!("sqry: daemon is not running");
317 return Ok(());
318 }
319
320 let rt = tokio::runtime::Builder::new_current_thread()
321 .enable_all()
322 .build()
323 .context("failed to build tokio runtime for daemon stop")?;
324
325 rt.block_on(async {
326 let mut client = sqry_daemon_client::DaemonClient::connect(&socket_path)
327 .await
328 .with_context(|| format!("failed to connect to daemon at {}", socket_path.display()))?;
329
330 let _ = client.stop().await;
333
334 let deadline = Instant::now() + Duration::from_secs(timeout);
335 loop {
336 tokio::time::sleep(Duration::from_millis(STOP_POLL_INTERVAL_MS)).await;
338
339 if !try_connect_async(&socket_path).await {
340 break;
341 }
342 if Instant::now() >= deadline {
343 anyhow::bail!(
344 "daemon did not exit within {timeout} seconds; \
345 check the daemon log for errors"
346 );
347 }
348 }
349 anyhow::Ok(())
350 })?;
351
352 eprintln!("sqry: daemon stopped");
353 Ok(())
354}
355
356fn run_daemon_status(json: bool) -> Result<()> {
362 let config = load_daemon_config()?;
363 let socket_path = config.socket_path();
364
365 if !try_connect_sync(&socket_path)? {
367 if json {
368 println!("{{}}");
369 } else {
370 eprintln!(
371 "sqry: daemon is not running (socket {})",
372 socket_path.display()
373 );
374 }
375 std::process::exit(1);
377 }
378
379 let rt = tokio::runtime::Builder::new_current_thread()
380 .enable_all()
381 .build()
382 .context("failed to build tokio runtime for daemon status")?;
383
384 rt.block_on(async {
385 let mut client = sqry_daemon_client::DaemonClient::connect(&socket_path)
386 .await
387 .with_context(|| format!("failed to connect to daemon at {}", socket_path.display()))?;
388
389 let result = client
390 .status()
391 .await
392 .context("daemon/status request failed")?;
393
394 if json {
395 let out = serde_json::to_string_pretty(&result)
396 .context("failed to serialize daemon status as JSON")?;
397 println!("{out}");
398 } else {
399 render_status_human(&result);
400 }
401 anyhow::Ok(())
402 })?;
403
404 Ok(())
405}
406
407fn run_daemon_logs(lines: usize, follow: bool) -> Result<()> {
418 let config = load_daemon_config()?;
419 let log_path = resolve_log_path(&config)?;
420
421 if !log_path.exists() {
422 anyhow::bail!(
423 "daemon log file not found at {}. \
424 Is the daemon running? Has it written any log output?",
425 log_path.display()
426 );
427 }
428
429 if follow {
430 tail_follow(&log_path, lines)?;
431 } else {
432 tail_last_n(&log_path, lines)?;
433 }
434 Ok(())
435}
436
437fn run_daemon_load(path: &Path) -> Result<()> {
447 let config = load_daemon_config()?;
448 let socket_path = config.socket_path();
449
450 let canonical_path = std::fs::canonicalize(path)
454 .with_context(|| format!("failed to canonicalize path {}", path.display()))?;
455
456 if !try_connect_sync(&socket_path)? {
457 anyhow::bail!(
458 "daemon is not running (socket {}). Start it with `sqry daemon start`.",
459 socket_path.display()
460 );
461 }
462
463 let rt = tokio::runtime::Builder::new_current_thread()
464 .enable_all()
465 .build()
466 .context("failed to build tokio runtime for daemon load")?;
467
468 rt.block_on(async {
469 let mut client = sqry_daemon_client::DaemonClient::connect(&socket_path)
470 .await
471 .with_context(|| format!("failed to connect to daemon at {}", socket_path.display()))?;
472
473 let envelope = client.load(&canonical_path).await.with_context(|| {
479 format!(
480 "daemon/load request failed for {}",
481 canonical_path.display()
482 )
483 })?;
484
485 let load_result = envelope.result;
486 eprintln!(
487 "sqry: workspace loaded at {} ({:?}, {})",
488 canonical_path.display(),
489 load_result.state,
490 human_bytes(load_result.current_bytes)
491 );
492 anyhow::Ok(())
493 })?;
494
495 Ok(())
496}
497
498fn resolve_sqryd_binary(explicit: Option<&Path>) -> Result<PathBuf> {
514 if let Some(path) = explicit {
516 if path.exists() {
517 return Ok(path.to_path_buf());
518 }
519 anyhow::bail!("explicit --sqryd-path {} does not exist", path.display());
520 }
521
522 if let Ok(exe) = std::env::current_exe() {
524 let canonical = std::fs::canonicalize(&exe).unwrap_or(exe);
526 if let Some(dir) = canonical.parent() {
527 let sibling = dir.join("sqryd");
528 if sibling.exists() {
529 return Ok(sibling);
530 }
531 let sibling_exe = dir.join("sqryd.exe");
533 if sibling_exe.exists() {
534 return Ok(sibling_exe);
535 }
536 }
537 }
538
539 if let Some(val) = std::env::var_os("SQRYD_PATH") {
541 let path = PathBuf::from(val);
542 if path.exists() {
543 return Ok(path);
544 }
545 anyhow::bail!("SQRYD_PATH={} does not exist", path.display());
546 }
547
548 which::which("sqryd").with_context(|| {
550 "sqryd binary not found. \
551 Install sqryd alongside sqry, set SQRYD_PATH, or use --sqryd-path."
552 .to_owned()
553 })
554}
555
556#[allow(dead_code)]
578pub fn try_auto_start_daemon() -> Result<bool> {
579 if std::env::var_os(ENV_DAEMON_AUTO_START).as_deref() != Some(std::ffi::OsStr::new("1")) {
580 return Ok(false);
581 }
582
583 let binary = resolve_sqryd_binary(None)?;
585
586 let socket_path = load_config_socket_path();
588 if socket_path
589 .as_ref()
590 .is_some_and(|sp| try_connect_sync(sp).unwrap_or(false))
591 {
592 return Ok(true);
593 }
594
595 let status = std::process::Command::new(&binary)
597 .args(["start", "--detach"])
598 .stdin(std::process::Stdio::null())
599 .stdout(std::process::Stdio::null())
600 .stderr(std::process::Stdio::inherit())
601 .status()
602 .with_context(|| format!("auto-start: failed to exec sqryd at {}", binary.display()))?;
603
604 if !status.success() {
605 let code = status.code().unwrap_or(1);
606 if code != 75 {
607 eprintln!(
609 "sqry: Warning: daemon auto-start failed (sqryd exited {code}); \
610 falling back to local mode"
611 );
612 return Ok(false);
613 }
614 }
615
616 Ok(true)
617}
618
619#[must_use]
627pub fn human_bytes(bytes: u64) -> String {
628 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
629 const DIVISOR: u64 = 1024;
630
631 if bytes < DIVISOR {
632 return format!("{bytes} B");
633 }
634
635 let mut value = bytes as f64;
636 let mut unit_index = 0usize;
637 while value >= DIVISOR as f64 && unit_index + 1 < UNITS.len() {
638 value /= DIVISOR as f64;
639 unit_index += 1;
640 }
641 if (value - value.floor()).abs() < 0.05 {
643 format!("{:.0} {}", value, UNITS[unit_index])
644 } else {
645 format!("{:.1} {}", value, UNITS[unit_index])
646 }
647}
648
649#[must_use]
660pub fn format_uptime(seconds: u64) -> String {
661 let days = seconds / 86_400;
662 let hours = (seconds % 86_400) / 3_600;
663 let mins = (seconds % 3_600) / 60;
664
665 match (days, hours, mins) {
666 (0, 0, _) => format!("{mins}m"),
667 (0, h, 0) => format!("{h}h"),
668 (0, h, m) => format!("{h}h {m}m"),
669 (d, 0, 0) => format!("{d}d"),
670 (d, h, 0) => format!("{d}d {h}h"),
671 (d, h, m) => format!("{d}d {h}h {m}m"),
672 }
673}
674
675fn render_status_human(envelope: &serde_json::Value) {
685 let stdout = std::io::stdout();
686 let mut handle = stdout.lock();
687 let _ = render_status_human_into(envelope, &mut handle);
689}
690
691fn render_status_human_into(
740 envelope: &serde_json::Value,
741 out: &mut dyn Write,
742) -> std::io::Result<()> {
743 let inner = envelope.get("result").unwrap_or(envelope);
747
748 let version = inner
752 .get("daemon_version")
753 .or_else(|| envelope.get("meta").and_then(|m| m.get("daemon_version")))
754 .and_then(serde_json::Value::as_str)
755 .unwrap_or("unknown");
756
757 let uptime_str = inner
759 .get("uptime_seconds")
760 .and_then(serde_json::Value::as_u64)
761 .map(format_uptime);
762
763 match uptime_str {
764 Some(uptime) => writeln!(out, "sqryd v{version} -- uptime {uptime}")?,
765 None => writeln!(out, "sqryd v{version}")?,
766 }
767
768 let memory = inner.get("memory");
771 let mem_current = memory
772 .and_then(|m| m.get("current_bytes"))
773 .and_then(serde_json::Value::as_u64);
774 let mem_limit = memory
775 .and_then(|m| m.get("limit_bytes"))
776 .and_then(serde_json::Value::as_u64);
777 let mem_peak = memory
778 .and_then(|m| m.get("high_water_bytes"))
779 .and_then(serde_json::Value::as_u64);
780
781 if mem_current.is_some() || mem_limit.is_some() {
782 writeln!(out)?;
783 let used_str = mem_current.map_or_else(|| "?".to_owned(), human_bytes);
784 let limit_str = mem_limit.map_or_else(|| "?".to_owned(), human_bytes);
785 match mem_peak {
786 Some(peak) => writeln!(
787 out,
788 "Memory: {used_str} / {limit_str} (peak: {})",
789 human_bytes(peak)
790 )?,
791 None => writeln!(out, "Memory: {used_str} / {limit_str}")?,
792 }
793 }
794
795 let workspaces = inner
797 .get("workspaces")
798 .and_then(serde_json::Value::as_array);
799
800 if let Some(wss) = workspaces {
801 writeln!(out)?;
802 writeln!(out, "Workspaces ({} loaded):", wss.len())?;
803 for ws in wss {
804 render_workspace_line_into(ws, out)?;
805 }
806 }
807
808 Ok(())
809}
810
811#[allow(dead_code)]
820fn render_workspace_line(ws: &serde_json::Value) {
821 let stdout = std::io::stdout();
822 let mut handle = stdout.lock();
823 let _ = render_workspace_line_into(ws, &mut handle);
824}
825
826fn render_workspace_line_into(ws: &serde_json::Value, out: &mut dyn Write) -> std::io::Result<()> {
836 let path = ws
839 .get("index_root")
840 .or_else(|| ws.get("path"))
841 .and_then(serde_json::Value::as_str)
842 .unwrap_or("<unknown>");
843
844 let display_path = tilde_shorten(path);
846
847 let ws_mem = ws
849 .get("current_bytes")
850 .and_then(serde_json::Value::as_u64)
851 .map(human_bytes);
852 let ws_peak = ws
853 .get("high_water_bytes")
854 .and_then(serde_json::Value::as_u64)
855 .map(human_bytes);
856
857 let mut tags: Vec<&str> = Vec::new();
859 if ws
860 .get("pinned")
861 .and_then(serde_json::Value::as_bool)
862 .unwrap_or(false)
863 {
864 tags.push("pinned");
865 }
866 let state = ws
867 .get("state")
868 .and_then(serde_json::Value::as_str)
869 .unwrap_or("Loaded");
870 tags.push(state);
871
872 if let Some(err_msg) = ws.get("last_error").and_then(serde_json::Value::as_str) {
874 let tag = format!("error: {err_msg}");
876 match (ws_mem, ws_peak) {
877 (Some(mem), Some(peak)) => {
878 writeln!(
879 out,
880 " {display_path:<30} {mem:<8} (peak: {peak:<8}) [{tags}, {tag}]",
881 tags = tags.join(", ")
882 )?;
883 }
884 (Some(mem), None) => {
885 writeln!(
886 out,
887 " {display_path:<30} {mem:<8} [{tags}, {tag}]",
888 tags = tags.join(", ")
889 )?;
890 }
891 _ => {
892 writeln!(
893 out,
894 " {display_path} [{tags}, {tag}]",
895 tags = tags.join(", ")
896 )?;
897 }
898 }
899 return Ok(());
900 }
901
902 let tag_str = format!("[{}]", tags.join(", "));
903 match (ws_mem, ws_peak) {
904 (Some(mem), Some(peak)) => {
905 writeln!(
906 out,
907 " {display_path:<30} {mem:<8} (peak: {peak:<8}) {tag_str}"
908 )?;
909 }
910 (Some(mem), None) => {
911 writeln!(out, " {display_path:<30} {mem:<8} {tag_str}")?;
912 }
913 _ => {
914 writeln!(out, " {display_path} {tag_str}")?;
915 }
916 }
917 Ok(())
918}
919
920fn tilde_shorten(path: &str) -> String {
922 if let Some(home) = dirs::home_dir() {
923 let home_str = home.to_string_lossy();
924 if let Some(stripped) = path.strip_prefix(home_str.as_ref()) {
925 return format!("~{stripped}");
926 }
927 }
928 path.to_owned()
929}
930
931pub fn tail_last_n(path: &Path, n: usize) -> Result<()> {
946 let file = std::fs::File::open(path)
947 .with_context(|| format!("failed to open log file {}", path.display()))?;
948 let buf_reader = std::io::BufReader::new(file);
949
950 let mut lines: Vec<String> = Vec::new();
952 for line in buf_reader.lines() {
953 let l = line.with_context(|| format!("error reading log file {}", path.display()))?;
954 lines.push(l);
955 }
956
957 let start = lines.len().saturating_sub(n);
959 let stdout = std::io::stdout();
960 let mut out = stdout.lock();
961 for line in &lines[start..] {
962 writeln!(out, "{line}").context("failed to write to stdout")?;
963 }
964 Ok(())
965}
966
967pub fn tail_follow(path: &Path, initial_lines: usize) -> Result<()> {
980 use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
981 use std::sync::mpsc;
982
983 tail_last_n(path, initial_lines)?;
985
986 let mut file = std::fs::File::open(path)
988 .with_context(|| format!("failed to open log file for follow: {}", path.display()))?;
989 let mut pos = file
990 .seek(SeekFrom::End(0))
991 .context("failed to seek to end of log file")?;
992
993 let (tx, rx) = mpsc::channel::<notify::Result<notify::Event>>();
995 let mut watcher = RecommendedWatcher::new(tx, notify::Config::default())
996 .context("failed to create file watcher for log follow")?;
997
998 let parent = path.parent().unwrap_or(Path::new("."));
1000 watcher
1001 .watch(parent, RecursiveMode::NonRecursive)
1002 .with_context(|| format!("failed to watch log directory {}", parent.display()))?;
1003
1004 let stdout = std::io::stdout();
1005 let mut out = stdout.lock();
1006
1007 loop {
1009 match rx.recv_timeout(Duration::from_millis(FOLLOW_EVENT_TIMEOUT_MS)) {
1010 Ok(Ok(event)) => {
1011 let is_rotate = matches!(event.kind, EventKind::Remove(_) | EventKind::Create(_))
1012 && event.paths.iter().any(|p| p == path);
1013
1014 if is_rotate {
1015 if path.exists() {
1017 match std::fs::File::open(path) {
1018 Ok(f) => {
1019 file = f;
1020 pos = 0;
1021 }
1022 Err(e) => {
1023 eprintln!("sqry: log rotation detected but reopen failed: {e}");
1024 }
1025 }
1026 }
1027 }
1028
1029 pos = drain_new_bytes(&mut file, pos, path, &mut out)?;
1031 }
1032 Ok(Err(e)) => {
1033 eprintln!("sqry: file watcher error: {e}");
1034 }
1035 Err(mpsc::RecvTimeoutError::Timeout) => {
1036 pos = drain_new_bytes(&mut file, pos, path, &mut out)?;
1038 }
1039 Err(mpsc::RecvTimeoutError::Disconnected) => {
1040 break;
1041 }
1042 }
1043 }
1044
1045 Ok(())
1046}
1047
1048fn drain_new_bytes(
1051 file: &mut std::fs::File,
1052 current_pos: u64,
1053 path: &Path,
1054 out: &mut impl Write,
1055) -> Result<u64> {
1056 file.seek(SeekFrom::Start(current_pos))
1057 .with_context(|| format!("seek error in log file {}", path.display()))?;
1058
1059 let mut buf = Vec::new();
1060 file.read_to_end(&mut buf)
1061 .with_context(|| format!("read error in log file {}", path.display()))?;
1062
1063 if !buf.is_empty() {
1064 out.write_all(&buf)
1065 .context("failed to write log output to stdout")?;
1066 out.flush().context("failed to flush stdout")?;
1067 }
1068
1069 let new_pos = current_pos + buf.len() as u64;
1070 Ok(new_pos)
1071}
1072
1073fn load_daemon_config() -> Result<sqry_daemon::config::DaemonConfig> {
1079 sqry_daemon::config::DaemonConfig::load().context(
1080 "failed to load daemon config; ensure daemon.toml is well-formed or \
1081 remove it to use defaults",
1082 )
1083}
1084
1085fn load_config_socket_path() -> Option<PathBuf> {
1089 sqry_daemon::config::DaemonConfig::load()
1090 .ok()
1091 .map(|c| c.socket_path())
1092}
1093
1094fn resolve_log_path(config: &sqry_daemon::config::DaemonConfig) -> Result<PathBuf> {
1103 match config.log_file.as_ref() {
1104 Some(p) => Ok(p.clone()),
1105 None => {
1106 anyhow::bail!(
1107 "no log file is configured for the daemon.\n\
1108 The daemon writes to stderr by default (captured by systemd or your terminal).\n\
1109 To enable file logging, set `log_file = \"/path/to/sqryd.log\"` in daemon.toml."
1110 );
1111 }
1112 }
1113}
1114
1115fn poll_until_reachable(socket_path: &Path, timeout: u64) -> Result<()> {
1124 let deadline = Instant::now() + Duration::from_secs(timeout);
1125 loop {
1126 if try_connect_sync(socket_path).unwrap_or(false) {
1127 return Ok(());
1128 }
1129 if Instant::now() >= deadline {
1130 anyhow::bail!(
1131 "daemon process started but did not become reachable within {timeout} \
1132 seconds (socket {}). Check the daemon log for startup errors.",
1133 socket_path.display()
1134 );
1135 }
1136 std::thread::sleep(Duration::from_millis(STOP_POLL_INTERVAL_MS));
1137 }
1138}
1139
1140pub fn try_connect_sync(socket_path: &Path) -> Result<bool> {
1157 #[cfg(unix)]
1158 {
1159 use std::os::unix::net::UnixStream;
1160 match UnixStream::connect(socket_path) {
1161 Ok(_) => Ok(true),
1162 Err(e) => match e.kind() {
1163 std::io::ErrorKind::ConnectionRefused | std::io::ErrorKind::NotFound => Ok(false),
1164 _ => Err(anyhow::Error::from(e).context(format!(
1165 "unexpected error probing socket {}",
1166 socket_path.display()
1167 ))),
1168 },
1169 }
1170 }
1171 #[cfg(windows)]
1172 {
1173 Ok(socket_path.exists())
1175 }
1176 #[cfg(not(any(unix, windows)))]
1177 {
1178 let _ = socket_path;
1179 Ok(false)
1180 }
1181}
1182
1183async fn try_connect_async(socket_path: &Path) -> bool {
1187 #[cfg(unix)]
1188 {
1189 tokio::net::UnixStream::connect(socket_path).await.is_ok()
1190 }
1191 #[cfg(windows)]
1192 {
1193 socket_path.exists()
1194 }
1195 #[cfg(not(any(unix, windows)))]
1196 {
1197 let _ = socket_path;
1198 false
1199 }
1200}
1201
1202#[cfg(test)]
1207mod tests {
1208 use super::*;
1209
1210 #[test]
1217 #[serial_test::serial]
1218 fn resolve_sqryd_binary_finds_sibling() {
1219 let dir = tempfile::tempdir().expect("tempdir");
1220 let sqryd_path = dir.path().join("sqryd");
1221 std::fs::write(&sqryd_path, b"#!/bin/sh\n").expect("write fake sqryd");
1223 #[cfg(unix)]
1224 {
1225 use std::os::unix::fs::PermissionsExt;
1226 std::fs::set_permissions(&sqryd_path, std::fs::Permissions::from_mode(0o755))
1227 .expect("chmod");
1228 }
1229
1230 unsafe {
1233 std::env::set_var("SQRYD_PATH", &sqryd_path);
1234 }
1235 let result = resolve_sqryd_binary(None);
1236 unsafe {
1237 std::env::remove_var("SQRYD_PATH");
1238 }
1239
1240 assert!(result.is_ok(), "expected Ok, got {:?}", result);
1241 assert_eq!(result.unwrap(), sqryd_path);
1242 }
1243
1244 #[test]
1248 #[serial_test::serial]
1249 fn resolve_sqryd_binary_falls_back_to_path() {
1250 unsafe {
1252 std::env::remove_var("SQRYD_PATH");
1253 }
1254
1255 let result = resolve_sqryd_binary(None);
1256 let _ = result;
1259 }
1260
1261 #[test]
1263 #[serial_test::serial]
1264 fn resolve_sqryd_binary_respects_env_var() {
1265 let dir = tempfile::tempdir().expect("tempdir");
1266 let sqryd_path = dir.path().join("sqryd");
1267 std::fs::write(&sqryd_path, b"#!/bin/sh\n").expect("write fake sqryd");
1268
1269 unsafe {
1270 std::env::set_var("SQRYD_PATH", &sqryd_path);
1271 }
1272 let result = resolve_sqryd_binary(None);
1273 unsafe {
1274 std::env::remove_var("SQRYD_PATH");
1275 }
1276
1277 assert!(result.is_ok(), "expected Ok, got {:?}", result);
1278 assert_eq!(result.unwrap(), sqryd_path);
1279 }
1280
1281 #[test]
1286 fn human_bytes_formats_correctly() {
1287 assert_eq!(human_bytes(0), "0 B");
1288 assert_eq!(human_bytes(512), "512 B");
1289 assert_eq!(human_bytes(1023), "1023 B");
1290 assert_eq!(human_bytes(1024), "1 KB");
1291 assert_eq!(human_bytes(1536), "1.5 KB");
1292 assert_eq!(human_bytes(1_048_576), "1 MB");
1293 assert_eq!(human_bytes(1_073_741_824), "1 GB");
1294 assert_eq!(human_bytes(1_099_511_627_776), "1 TB");
1295 assert_eq!(human_bytes(1_572_864), "1.5 MB");
1297 }
1298
1299 #[test]
1304 fn format_uptime_renders_hours_minutes() {
1305 assert_eq!(format_uptime(0), "0m");
1306 assert_eq!(format_uptime(59), "0m");
1307 assert_eq!(format_uptime(60), "1m");
1308 assert_eq!(format_uptime(3600), "1h");
1309 assert_eq!(format_uptime(3660), "1h 1m");
1310 assert_eq!(format_uptime(7380), "2h 3m");
1311 assert_eq!(format_uptime(86400), "1d");
1312 assert_eq!(format_uptime(90061), "1d 1h 1m");
1313 assert_eq!(format_uptime(172800), "2d");
1314 }
1315
1316 #[test]
1327 fn daemon_status_human_renders_minimal_response() {
1328 let envelope = serde_json::json!({
1331 "result": {},
1332 "meta": { "stale": false, "daemon_version": "8.0.6" }
1333 });
1334 let mut buf: Vec<u8> = Vec::new();
1335 render_status_human_into(&envelope, &mut buf)
1336 .expect("render_status_human_into must not fail");
1337 let output = String::from_utf8(buf).expect("rendered output must be valid UTF-8");
1338 assert!(
1340 output.contains("v8.0.6"),
1341 "graceful degradation must fall back to meta.daemon_version '8.0.6'; got:\n{output}"
1342 );
1343 }
1344
1345 #[test]
1349 fn daemon_status_human_renders_full_response() {
1350 let envelope = serde_json::json!({
1353 "result": {
1354 "daemon_version": "8.0.6",
1355 "uptime_seconds": 8040_u64,
1356 "memory": {
1357 "limit_bytes": 2_147_483_648_u64,
1358 "current_bytes": 471_859_200_u64,
1359 "reserved_bytes": 0_u64,
1360 "high_water_bytes": 1_288_490_188_u64
1361 },
1362 "workspaces": [
1363 {
1364 "index_root": "/home/user/repos/main-project",
1365 "state": "Loaded",
1366 "pinned": true,
1367 "current_bytes": 335_544_320_u64,
1368 "high_water_bytes": 933_232_896_u64,
1369 "last_good_at": null,
1370 "last_error": null,
1371 "retry_count": 0
1372 },
1373 {
1374 "index_root": "/home/user/repos/auth-service",
1375 "state": "Loaded",
1376 "pinned": false,
1377 "current_bytes": 83_886_080_u64,
1378 "high_water_bytes": 324_534_016_u64,
1379 "last_good_at": null,
1380 "last_error": null,
1381 "retry_count": 0
1382 }
1383 ]
1384 },
1385 "meta": {
1386 "stale": false,
1387 "daemon_version": "8.0.6"
1388 }
1389 });
1390 let mut buf: Vec<u8> = Vec::new();
1391 render_status_human_into(&envelope, &mut buf)
1392 .expect("render_status_human_into must not fail");
1393 let output = String::from_utf8(buf).expect("rendered output must be valid UTF-8");
1394 assert!(
1396 output.contains("v8.0.6"),
1397 "must contain version; got:\n{output}"
1398 );
1399 assert!(
1400 output.contains("2h"),
1401 "must contain uptime hours; got:\n{output}"
1402 );
1403 assert!(
1405 output.contains("Memory:"),
1406 "must contain memory section; got:\n{output}"
1407 );
1408 assert!(
1409 output.contains("2 GB"),
1410 "must contain limit '2 GB'; got:\n{output}"
1411 );
1412 assert!(
1414 output.contains("Workspaces"),
1415 "must contain workspaces section; got:\n{output}"
1416 );
1417 assert!(
1418 output.contains("main-project"),
1419 "must contain workspace path; got:\n{output}"
1420 );
1421 assert!(
1422 output.contains("auth-service"),
1423 "must contain workspace path; got:\n{output}"
1424 );
1425 }
1426
1427 #[test]
1441 fn daemon_status_human_extracts_version_and_uptime() {
1442 let envelope = serde_json::json!({
1443 "result": {
1444 "daemon_version": "8.1.2",
1445 "uptime_seconds": 3661_u64,
1446 "memory": {
1447 "limit_bytes": 1_073_741_824_u64,
1448 "current_bytes": 104_857_600_u64,
1449 "reserved_bytes": 0_u64,
1450 "high_water_bytes": 209_715_200_u64
1451 },
1452 "workspaces": []
1453 },
1454 "meta": { "stale": false, "daemon_version": "7.9.0" }
1457 });
1458
1459 let mut buf: Vec<u8> = Vec::new();
1460 render_status_human_into(&envelope, &mut buf)
1461 .expect("render_status_human_into must not fail writing to Vec");
1462 let output = String::from_utf8(buf).expect("rendered output must be valid UTF-8");
1463
1464 assert!(
1467 output.contains("v8.1.2"),
1468 "rendered output must contain result.daemon_version '8.1.2'; got:\n{output}"
1469 );
1470 assert!(
1471 !output.contains("v7.9.0"),
1472 "rendered output must NOT contain meta.daemon_version '7.9.0'; got:\n{output}"
1473 );
1474 assert!(
1476 output.contains("1h 1m"),
1477 "rendered output must contain uptime '1h 1m'; got:\n{output}"
1478 );
1479 assert!(
1481 output.contains("100 MB"),
1482 "rendered output must contain memory current '100 MB'; got:\n{output}"
1483 );
1484 assert!(
1485 output.contains("1 GB"),
1486 "rendered output must contain memory limit '1 GB'; got:\n{output}"
1487 );
1488 assert!(
1490 output.contains("200 MB"),
1491 "rendered output must contain memory peak '200 MB'; got:\n{output}"
1492 );
1493 }
1494
1495 #[test]
1503 fn daemon_status_human_renders_workspace_canonical_fields() {
1504 let ws = serde_json::json!({
1505 "index_root": "/home/user/repos/sqry",
1506 "state": "Loaded",
1507 "pinned": true,
1508 "current_bytes": 335_544_320_u64,
1509 "high_water_bytes": 671_088_640_u64,
1510 "last_good_at": null,
1511 "last_error": null,
1512 "retry_count": 0
1513 });
1514
1515 let mut buf: Vec<u8> = Vec::new();
1516 render_workspace_line_into(&ws, &mut buf)
1517 .expect("render_workspace_line_into must not fail writing to Vec");
1518 let output = String::from_utf8(buf).expect("rendered output must be valid UTF-8");
1519
1520 assert!(
1523 output.contains("sqry"),
1524 "rendered output must contain the workspace path component 'sqry'; got:\n{output}"
1525 );
1526 assert!(
1528 output.contains("320 MB"),
1529 "rendered output must contain workspace size '320 MB' from current_bytes; got:\n{output}"
1530 );
1531 assert!(
1533 output.contains("640 MB"),
1534 "rendered output must contain workspace peak '640 MB' from high_water_bytes; got:\n{output}"
1535 );
1536 assert!(
1538 output.contains("pinned"),
1539 "rendered output must contain 'pinned' tag; got:\n{output}"
1540 );
1541 }
1542
1543 #[test]
1554 fn resolve_log_path_errors_when_log_file_not_configured() {
1555 let config = sqry_daemon::config::DaemonConfig::default();
1557 assert!(
1558 config.log_file.is_none(),
1559 "DaemonConfig::default() must have log_file = None"
1560 );
1561
1562 let result = resolve_log_path(&config);
1563 assert!(
1564 result.is_err(),
1565 "resolve_log_path must return Err when log_file is None"
1566 );
1567
1568 let err_msg = format!("{}", result.unwrap_err());
1569 assert!(
1570 err_msg.contains("log_file"),
1571 "error must mention 'log_file' to guide the user; got:\n{err_msg}"
1572 );
1573 assert!(
1574 err_msg.contains("daemon.toml"),
1575 "error must mention 'daemon.toml' to guide the user; got:\n{err_msg}"
1576 );
1577 assert!(
1578 err_msg.contains("stderr"),
1579 "error must mention 'stderr' to explain the default behaviour; got:\n{err_msg}"
1580 );
1581 }
1582
1583 #[test]
1595 fn poll_until_reachable_times_out_for_unreachable_socket() {
1596 let dir = tempfile::tempdir().expect("tempdir");
1597 let socket_path = dir.path().join("nonexistent.sock");
1598
1599 let result = poll_until_reachable(&socket_path, 0);
1601 assert!(
1602 result.is_err(),
1603 "poll_until_reachable must return Err when socket is unreachable and timeout = 0"
1604 );
1605
1606 let err_msg = format!("{}", result.unwrap_err());
1607 assert!(
1608 err_msg.contains("did not become reachable"),
1609 "error must contain 'did not become reachable'; got:\n{err_msg}"
1610 );
1611 assert!(
1613 err_msg.contains("nonexistent.sock"),
1614 "error must contain the socket path; got:\n{err_msg}"
1615 );
1616 }
1617
1618 #[test]
1625 #[serial_test::serial]
1626 fn try_auto_start_daemon_returns_false_when_disabled() {
1627 unsafe {
1628 std::env::remove_var(ENV_DAEMON_AUTO_START);
1629 }
1630 let result = try_auto_start_daemon().expect("try_auto_start_daemon must not error");
1631 assert!(
1632 !result,
1633 "expected false when SQRY_DAEMON_AUTO_START is unset"
1634 );
1635
1636 unsafe {
1638 std::env::set_var(ENV_DAEMON_AUTO_START, "0");
1639 }
1640 let result = try_auto_start_daemon().expect("try_auto_start_daemon must not error");
1641 unsafe {
1642 std::env::remove_var(ENV_DAEMON_AUTO_START);
1643 }
1644 assert!(!result, "expected false when SQRY_DAEMON_AUTO_START=0");
1645 }
1646}