1use std::io::{BufRead, Read, Seek, SeekFrom, Write};
25use std::path::{Path, PathBuf};
26use std::time::{Duration, Instant};
27
28use anyhow::{Context, Result};
29
30use crate::args::{Cli, DaemonAction};
31
32#[allow(dead_code)]
41const ENV_DAEMON_AUTO_START: &str = "SQRY_DAEMON_AUTO_START";
42
43const STOP_POLL_INTERVAL_MS: u64 = 100;
45
46const FOLLOW_EVENT_TIMEOUT_MS: u64 = 250;
49
50pub fn run(_cli: &Cli, action: &DaemonAction) -> Result<()> {
60 match action {
61 DaemonAction::Start {
62 sqryd_path,
63 timeout,
64 } => run_daemon_start(sqryd_path.as_deref(), *timeout),
65 DaemonAction::Stop { timeout } => run_daemon_stop(*timeout),
66 DaemonAction::Status { json } => run_daemon_status(*json),
67 DaemonAction::Logs { lines, follow } => run_daemon_logs(*lines, *follow),
68 DaemonAction::Load { path } => run_daemon_load(path),
69 DaemonAction::Rebuild {
70 path,
71 force,
72 timeout,
73 json,
74 } => run_daemon_rebuild(path, *force, *timeout, *json),
75 }
76}
77
78fn run_daemon_rebuild(path: &Path, force: bool, timeout: u64, json: bool) -> Result<()> {
83 let config = load_daemon_config()?;
84 let socket_path = config.socket_path();
85
86 let canonical_path = std::fs::canonicalize(path)
87 .with_context(|| format!("failed to canonicalize path {}", path.display()))?;
88
89 if !try_connect_sync(&socket_path)? {
90 anyhow::bail!(
91 "daemon is not running (socket {}). Start it with `sqry daemon start`.",
92 socket_path.display()
93 );
94 }
95
96 let rt = tokio::runtime::Builder::new_current_thread()
97 .enable_all()
98 .build()
99 .context("failed to build tokio runtime for daemon rebuild")?;
100
101 rt.block_on(async {
102 let mut client = sqry_daemon_client::DaemonClient::connect(&socket_path)
103 .await
104 .with_context(|| {
105 format!("failed to connect to daemon at {}", socket_path.display())
106 })?;
107
108 let started = Instant::now();
109 let deadline = started + Duration::from_secs(timeout);
110
111 let poll_socket = socket_path.clone();
113 let poll_path = canonical_path.clone();
114 let poll_done = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
115 let poll_flag = std::sync::Arc::clone(&poll_done);
116 let poll_handle = tokio::spawn(async move {
117 loop {
118 tokio::time::sleep(Duration::from_secs(5)).await;
119 if poll_flag.load(std::sync::atomic::Ordering::Relaxed) {
120 break;
121 }
122 let Ok(mut poll_client) =
124 sqry_daemon_client::DaemonClient::connect(&poll_socket).await
125 else {
126 continue;
127 };
128 if let Ok(status) = poll_client.status().await {
129 let elapsed = started.elapsed().as_secs();
130 if let Some(ws_state) = extract_workspace_state(&status, &poll_path) {
131 eprint!("\rsqry: {ws_state} ({elapsed}s elapsed)");
132 let _ = std::io::stderr().flush();
133 }
134 }
135 }
136 });
137
138 let result = tokio::select! {
140 res = client.rebuild(&canonical_path, force) => res,
141 () = tokio::time::sleep_until(tokio::time::Instant::from_std(deadline)) => {
142 poll_done.store(true, std::sync::atomic::Ordering::Relaxed);
143 let _ = poll_handle.await;
144 let elapsed_ms = started.elapsed().as_millis() as u64;
145 if json {
146 let out = serde_json::json!({
147 "status": "timeout",
148 "elapsed_ms": elapsed_ms,
149 "message": "rebuild still in progress on daemon"
150 });
151 println!("{}", serde_json::to_string_pretty(&out)?);
152 } else {
153 eprintln!("\nsqry: rebuild timed out after {timeout}s (daemon continues in background)");
154 }
155 std::process::exit(2);
156 }
157 };
158
159 poll_done.store(true, std::sync::atomic::Ordering::Relaxed);
160 let _ = poll_handle.await;
161 eprint!("\r\x1b[K");
163
164 match result {
165 Ok(value) => {
166 if json {
167 let mut out = serde_json::Map::new();
168 out.insert(
169 "status".to_owned(),
170 serde_json::Value::String("completed".to_owned()),
171 );
172 if let Some(r) = value.get("result") {
174 if let Some(d) = r.get("duration_ms") {
175 out.insert("duration_ms".to_owned(), d.clone());
176 }
177 if let Some(n) = r.get("nodes") {
178 out.insert("nodes".to_owned(), n.clone());
179 }
180 if let Some(e) = r.get("edges") {
181 out.insert("edges".to_owned(), e.clone());
182 }
183 if let Some(f) = r.get("files_indexed") {
184 out.insert("files_indexed".to_owned(), f.clone());
185 }
186 }
187 println!(
188 "{}",
189 serde_json::to_string_pretty(&serde_json::Value::Object(out))?
190 );
191 } else {
192 render_rebuild_human(&value, &canonical_path);
193 }
194 }
195 Err(sqry_daemon_client::ClientError::RpcError {
196 code: -32004,
197 message,
198 ..
199 }) => {
200 anyhow::bail!(
201 "workspace {} is not loaded on the daemon. \
202 Load it first with `sqry daemon load {}`.\n (daemon said: {message})",
203 canonical_path.display(),
204 canonical_path.display()
205 );
206 }
207 Err(e) => {
208 return Err(anyhow::anyhow!("daemon/rebuild failed: {e}"));
209 }
210 }
211 anyhow::Ok(())
212 })?;
213
214 Ok(())
215}
216
217fn extract_workspace_state(status: &serde_json::Value, path: &Path) -> Option<String> {
218 let workspaces = status.get("result")?.get("workspaces")?.as_array()?;
219 let path_str = path.to_string_lossy();
220 for ws in workspaces {
221 if let Some(root) = ws.get("index_root").and_then(|r| r.as_str())
222 && root == path_str.as_ref()
223 {
224 return ws
225 .get("state")
226 .and_then(|s| s.as_str())
227 .map(|s| s.to_owned());
228 }
229 }
230 None
231}
232
233fn render_rebuild_human(value: &serde_json::Value, path: &Path) {
234 if let Some(r) = value.get("result") {
235 let duration = r.get("duration_ms").and_then(|d| d.as_u64()).unwrap_or(0);
236 let nodes = r.get("nodes").and_then(|n| n.as_u64()).unwrap_or(0);
237 let edges = r.get("edges").and_then(|e| e.as_u64()).unwrap_or(0);
238 let files = r.get("files_indexed").and_then(|f| f.as_u64()).unwrap_or(0);
239 let was_full = r.get("was_full").and_then(|w| w.as_bool()).unwrap_or(false);
240 let mode = if was_full { "full" } else { "incremental" };
241 eprintln!(
242 "sqry: {mode} rebuild of {} completed in {:.1}s ({nodes} nodes, {edges} edges, {files} files)",
243 path.display(),
244 duration as f64 / 1000.0
245 );
246 } else {
247 eprintln!("sqry: rebuild completed for {}", path.display());
248 }
249}
250
251fn run_daemon_start(sqryd_path: Option<&Path>, timeout: u64) -> Result<()> {
262 let binary = resolve_sqryd_binary(sqryd_path)?;
263
264 let socket_path = load_config_socket_path();
266
267 if socket_path
269 .as_ref()
270 .is_some_and(|sp| try_connect_sync(sp).unwrap_or(false))
271 {
272 let sp = socket_path.as_ref().unwrap();
273 eprintln!("sqry: daemon is already running (socket {})", sp.display());
274 return Ok(());
275 }
276
277 let status = std::process::Command::new(&binary)
278 .args(["start", "--detach"])
279 .stdin(std::process::Stdio::null())
280 .stdout(std::process::Stdio::inherit())
281 .stderr(std::process::Stdio::inherit())
282 .status()
283 .with_context(|| format!("failed to exec sqryd at {}", binary.display()))?;
284
285 if !status.success() {
286 let code = status.code().unwrap_or(1);
287 if code == 75 {
289 eprintln!("sqry: daemon is already running");
290 return Ok(());
291 }
292 anyhow::bail!("sqryd start --detach exited with code {code}");
293 }
294
295 if let Some(ref sp) = socket_path {
297 poll_until_reachable(sp, timeout)?;
298 eprintln!("sqry: daemon started (socket {})", sp.display());
299 } else {
300 eprintln!("sqry: daemon started");
301 }
302 Ok(())
303}
304
305fn run_daemon_stop(timeout: u64) -> Result<()> {
312 let config = load_daemon_config()?;
313 let socket_path = config.socket_path();
314
315 if !try_connect_sync(&socket_path)? {
316 eprintln!("sqry: daemon is not running");
317 return Ok(());
318 }
319
320 let rt = tokio::runtime::Builder::new_current_thread()
321 .enable_all()
322 .build()
323 .context("failed to build tokio runtime for daemon stop")?;
324
325 rt.block_on(async {
326 let mut client = sqry_daemon_client::DaemonClient::connect(&socket_path)
327 .await
328 .with_context(|| format!("failed to connect to daemon at {}", socket_path.display()))?;
329
330 let _ = client.stop().await;
333
334 let deadline = Instant::now() + Duration::from_secs(timeout);
335 loop {
336 tokio::time::sleep(Duration::from_millis(STOP_POLL_INTERVAL_MS)).await;
338
339 if !try_connect_async(&socket_path).await {
340 break;
341 }
342 if Instant::now() >= deadline {
343 anyhow::bail!(
344 "daemon did not exit within {timeout} seconds; \
345 check the daemon log for errors"
346 );
347 }
348 }
349 anyhow::Ok(())
350 })?;
351
352 eprintln!("sqry: daemon stopped");
353 Ok(())
354}
355
356fn run_daemon_status(json: bool) -> Result<()> {
362 let config = load_daemon_config()?;
363 let socket_path = config.socket_path();
364
365 if !try_connect_sync(&socket_path)? {
367 if json {
368 println!(
372 "{}",
373 serde_json::json!({
374 "error": "daemon_unreachable",
375 "socket": socket_path.display().to_string(),
376 })
377 );
378 } else {
379 eprintln!(
380 "sqry: daemon is not running (socket {})",
381 socket_path.display()
382 );
383 }
384 std::process::exit(1);
386 }
387
388 let rt = tokio::runtime::Builder::new_current_thread()
389 .enable_all()
390 .build()
391 .context("failed to build tokio runtime for daemon status")?;
392
393 rt.block_on(async {
394 let mut client = sqry_daemon_client::DaemonClient::connect(&socket_path)
395 .await
396 .with_context(|| format!("failed to connect to daemon at {}", socket_path.display()))?;
397
398 let result = client
399 .status()
400 .await
401 .context("daemon/status request failed")?;
402
403 if json {
404 let out = serde_json::to_string_pretty(&result)
405 .context("failed to serialize daemon status as JSON")?;
406 println!("{out}");
407 } else {
408 render_status_human(&result);
409 }
410 anyhow::Ok(())
411 })?;
412
413 Ok(())
414}
415
416fn run_daemon_logs(lines: usize, follow: bool) -> Result<()> {
427 let config = load_daemon_config()?;
428 let log_path = resolve_log_path(&config)?;
429
430 if !log_path.exists() {
431 anyhow::bail!(
432 "daemon log file not found at {}. \
433 Is the daemon running? Has it written any log output?",
434 log_path.display()
435 );
436 }
437
438 if follow {
439 tail_follow(&log_path, lines)?;
440 } else {
441 tail_last_n(&log_path, lines)?;
442 }
443 Ok(())
444}
445
446fn run_daemon_load(path: &Path) -> Result<()> {
456 let config = load_daemon_config()?;
457 let socket_path = config.socket_path();
458
459 let canonical_path = std::fs::canonicalize(path)
463 .with_context(|| format!("failed to canonicalize path {}", path.display()))?;
464
465 if !try_connect_sync(&socket_path)? {
466 anyhow::bail!(
467 "daemon is not running (socket {}). Start it with `sqry daemon start`.",
468 socket_path.display()
469 );
470 }
471
472 let rt = tokio::runtime::Builder::new_current_thread()
473 .enable_all()
474 .build()
475 .context("failed to build tokio runtime for daemon load")?;
476
477 rt.block_on(async {
478 let mut client = sqry_daemon_client::DaemonClient::connect(&socket_path)
479 .await
480 .with_context(|| format!("failed to connect to daemon at {}", socket_path.display()))?;
481
482 let envelope = client.load(&canonical_path).await.with_context(|| {
488 format!(
489 "daemon/load request failed for {}",
490 canonical_path.display()
491 )
492 })?;
493
494 let load_result = envelope.result;
495 eprintln!(
496 "sqry: workspace loaded at {} ({:?}, {})",
497 canonical_path.display(),
498 load_result.state,
499 human_bytes(load_result.current_bytes)
500 );
501 anyhow::Ok(())
502 })?;
503
504 Ok(())
505}
506
507fn resolve_sqryd_binary(explicit: Option<&Path>) -> Result<PathBuf> {
523 if let Some(path) = explicit {
525 if path.exists() {
526 return Ok(path.to_path_buf());
527 }
528 anyhow::bail!("explicit --sqryd-path {} does not exist", path.display());
529 }
530
531 if let Ok(exe) = std::env::current_exe() {
533 let canonical = std::fs::canonicalize(&exe).unwrap_or(exe);
535 if let Some(dir) = canonical.parent() {
536 let sibling = dir.join("sqryd");
537 if sibling.exists() {
538 return Ok(sibling);
539 }
540 let sibling_exe = dir.join("sqryd.exe");
542 if sibling_exe.exists() {
543 return Ok(sibling_exe);
544 }
545 }
546 }
547
548 if let Some(val) = std::env::var_os("SQRYD_PATH") {
550 let path = PathBuf::from(val);
551 if path.exists() {
552 return Ok(path);
553 }
554 anyhow::bail!("SQRYD_PATH={} does not exist", path.display());
555 }
556
557 which::which("sqryd").with_context(|| {
559 "sqryd binary not found. \
560 Install sqryd alongside sqry, set SQRYD_PATH, or use --sqryd-path."
561 .to_owned()
562 })
563}
564
565#[allow(dead_code)]
587pub fn try_auto_start_daemon() -> Result<bool> {
588 if std::env::var_os(ENV_DAEMON_AUTO_START).as_deref() != Some(std::ffi::OsStr::new("1")) {
589 return Ok(false);
590 }
591
592 let binary = resolve_sqryd_binary(None)?;
594
595 let socket_path = load_config_socket_path();
597 if socket_path
598 .as_ref()
599 .is_some_and(|sp| try_connect_sync(sp).unwrap_or(false))
600 {
601 return Ok(true);
602 }
603
604 let status = std::process::Command::new(&binary)
606 .args(["start", "--detach"])
607 .stdin(std::process::Stdio::null())
608 .stdout(std::process::Stdio::null())
609 .stderr(std::process::Stdio::inherit())
610 .status()
611 .with_context(|| format!("auto-start: failed to exec sqryd at {}", binary.display()))?;
612
613 if !status.success() {
614 let code = status.code().unwrap_or(1);
615 if code != 75 {
616 eprintln!(
618 "sqry: Warning: daemon auto-start failed (sqryd exited {code}); \
619 falling back to local mode"
620 );
621 return Ok(false);
622 }
623 }
624
625 Ok(true)
626}
627
628#[must_use]
636pub fn human_bytes(bytes: u64) -> String {
637 const UNITS: &[&str] = &["B", "KB", "MB", "GB", "TB"];
638 const DIVISOR: u64 = 1024;
639
640 if bytes < DIVISOR {
641 return format!("{bytes} B");
642 }
643
644 let mut value = bytes as f64;
645 let mut unit_index = 0usize;
646 while value >= DIVISOR as f64 && unit_index + 1 < UNITS.len() {
647 value /= DIVISOR as f64;
648 unit_index += 1;
649 }
650 if (value - value.floor()).abs() < 0.05 {
652 format!("{:.0} {}", value, UNITS[unit_index])
653 } else {
654 format!("{:.1} {}", value, UNITS[unit_index])
655 }
656}
657
658#[must_use]
669pub fn format_uptime(seconds: u64) -> String {
670 let days = seconds / 86_400;
671 let hours = (seconds % 86_400) / 3_600;
672 let mins = (seconds % 3_600) / 60;
673
674 match (days, hours, mins) {
675 (0, 0, _) => format!("{mins}m"),
676 (0, h, 0) => format!("{h}h"),
677 (0, h, m) => format!("{h}h {m}m"),
678 (d, 0, 0) => format!("{d}d"),
679 (d, h, 0) => format!("{d}d {h}h"),
680 (d, h, m) => format!("{d}d {h}h {m}m"),
681 }
682}
683
684fn render_status_human(envelope: &serde_json::Value) {
694 let stdout = std::io::stdout();
695 let mut handle = stdout.lock();
696 let _ = render_status_human_into(envelope, &mut handle);
698}
699
700fn render_status_human_into(
749 envelope: &serde_json::Value,
750 out: &mut dyn Write,
751) -> std::io::Result<()> {
752 let inner = envelope.get("result").unwrap_or(envelope);
756
757 let version = inner
761 .get("daemon_version")
762 .or_else(|| envelope.get("meta").and_then(|m| m.get("daemon_version")))
763 .and_then(serde_json::Value::as_str)
764 .unwrap_or("unknown");
765
766 let uptime_str = inner
768 .get("uptime_seconds")
769 .and_then(serde_json::Value::as_u64)
770 .map(format_uptime);
771
772 match uptime_str {
773 Some(uptime) => writeln!(out, "sqryd v{version} -- uptime {uptime}")?,
774 None => writeln!(out, "sqryd v{version}")?,
775 }
776
777 let memory = inner.get("memory");
780 let mem_current = memory
781 .and_then(|m| m.get("current_bytes"))
782 .and_then(serde_json::Value::as_u64);
783 let mem_limit = memory
784 .and_then(|m| m.get("limit_bytes"))
785 .and_then(serde_json::Value::as_u64);
786 let mem_peak = memory
787 .and_then(|m| m.get("high_water_bytes"))
788 .and_then(serde_json::Value::as_u64);
789
790 if mem_current.is_some() || mem_limit.is_some() {
791 writeln!(out)?;
792 let used_str = mem_current.map_or_else(|| "?".to_owned(), human_bytes);
793 let limit_str = mem_limit.map_or_else(|| "?".to_owned(), human_bytes);
794 match mem_peak {
795 Some(peak) => writeln!(
796 out,
797 "Memory: {used_str} / {limit_str} (peak: {})",
798 human_bytes(peak)
799 )?,
800 None => writeln!(out, "Memory: {used_str} / {limit_str}")?,
801 }
802 }
803
804 let workspaces = inner
806 .get("workspaces")
807 .and_then(serde_json::Value::as_array);
808
809 if let Some(wss) = workspaces {
810 writeln!(out)?;
811 writeln!(out, "Workspaces ({} loaded):", wss.len())?;
812 for ws in wss {
813 render_workspace_line_into(ws, out)?;
814 }
815 }
816
817 Ok(())
818}
819
820#[allow(dead_code)]
829fn render_workspace_line(ws: &serde_json::Value) {
830 let stdout = std::io::stdout();
831 let mut handle = stdout.lock();
832 let _ = render_workspace_line_into(ws, &mut handle);
833}
834
835fn render_workspace_line_into(ws: &serde_json::Value, out: &mut dyn Write) -> std::io::Result<()> {
845 let path = ws
848 .get("index_root")
849 .or_else(|| ws.get("path"))
850 .and_then(serde_json::Value::as_str)
851 .unwrap_or("<unknown>");
852
853 let display_path = tilde_shorten(path);
855
856 let ws_mem = ws
858 .get("current_bytes")
859 .and_then(serde_json::Value::as_u64)
860 .map(human_bytes);
861 let ws_peak = ws
862 .get("high_water_bytes")
863 .and_then(serde_json::Value::as_u64)
864 .map(human_bytes);
865
866 let mut tags: Vec<&str> = Vec::new();
868 if ws
869 .get("pinned")
870 .and_then(serde_json::Value::as_bool)
871 .unwrap_or(false)
872 {
873 tags.push("pinned");
874 }
875 let state = ws
876 .get("state")
877 .and_then(serde_json::Value::as_str)
878 .unwrap_or("Loaded");
879 tags.push(state);
880
881 if let Some(err_msg) = ws.get("last_error").and_then(serde_json::Value::as_str) {
883 let tag = format!("error: {err_msg}");
885 match (ws_mem, ws_peak) {
886 (Some(mem), Some(peak)) => {
887 writeln!(
888 out,
889 " {display_path:<30} {mem:<8} (peak: {peak:<8}) [{tags}, {tag}]",
890 tags = tags.join(", ")
891 )?;
892 }
893 (Some(mem), None) => {
894 writeln!(
895 out,
896 " {display_path:<30} {mem:<8} [{tags}, {tag}]",
897 tags = tags.join(", ")
898 )?;
899 }
900 _ => {
901 writeln!(
902 out,
903 " {display_path} [{tags}, {tag}]",
904 tags = tags.join(", ")
905 )?;
906 }
907 }
908 return Ok(());
909 }
910
911 let tag_str = format!("[{}]", tags.join(", "));
912 match (ws_mem, ws_peak) {
913 (Some(mem), Some(peak)) => {
914 writeln!(
915 out,
916 " {display_path:<30} {mem:<8} (peak: {peak:<8}) {tag_str}"
917 )?;
918 }
919 (Some(mem), None) => {
920 writeln!(out, " {display_path:<30} {mem:<8} {tag_str}")?;
921 }
922 _ => {
923 writeln!(out, " {display_path} {tag_str}")?;
924 }
925 }
926 Ok(())
927}
928
929fn tilde_shorten(path: &str) -> String {
931 if let Some(home) = dirs::home_dir() {
932 let home_str = home.to_string_lossy();
933 if let Some(stripped) = path.strip_prefix(home_str.as_ref()) {
934 return format!("~{stripped}");
935 }
936 }
937 path.to_owned()
938}
939
940pub fn tail_last_n(path: &Path, n: usize) -> Result<()> {
955 let file = std::fs::File::open(path)
956 .with_context(|| format!("failed to open log file {}", path.display()))?;
957 let buf_reader = std::io::BufReader::new(file);
958
959 let mut lines: Vec<String> = Vec::new();
961 for line in buf_reader.lines() {
962 let l = line.with_context(|| format!("error reading log file {}", path.display()))?;
963 lines.push(l);
964 }
965
966 let start = lines.len().saturating_sub(n);
968 let stdout = std::io::stdout();
969 let mut out = stdout.lock();
970 for line in &lines[start..] {
971 writeln!(out, "{line}").context("failed to write to stdout")?;
972 }
973 Ok(())
974}
975
976pub fn tail_follow(path: &Path, initial_lines: usize) -> Result<()> {
989 use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher};
990 use std::sync::mpsc;
991
992 tail_last_n(path, initial_lines)?;
994
995 let mut file = std::fs::File::open(path)
997 .with_context(|| format!("failed to open log file for follow: {}", path.display()))?;
998 let mut pos = file
999 .seek(SeekFrom::End(0))
1000 .context("failed to seek to end of log file")?;
1001
1002 let (tx, rx) = mpsc::channel::<notify::Result<notify::Event>>();
1004 let mut watcher = RecommendedWatcher::new(tx, notify::Config::default())
1005 .context("failed to create file watcher for log follow")?;
1006
1007 let parent = path.parent().unwrap_or(Path::new("."));
1009 watcher
1010 .watch(parent, RecursiveMode::NonRecursive)
1011 .with_context(|| format!("failed to watch log directory {}", parent.display()))?;
1012
1013 let stdout = std::io::stdout();
1014 let mut out = stdout.lock();
1015
1016 loop {
1018 match rx.recv_timeout(Duration::from_millis(FOLLOW_EVENT_TIMEOUT_MS)) {
1019 Ok(Ok(event)) => {
1020 let is_rotate = matches!(event.kind, EventKind::Remove(_) | EventKind::Create(_))
1021 && event.paths.iter().any(|p| p == path);
1022
1023 if is_rotate {
1024 if path.exists() {
1026 match std::fs::File::open(path) {
1027 Ok(f) => {
1028 file = f;
1029 pos = 0;
1030 }
1031 Err(e) => {
1032 eprintln!("sqry: log rotation detected but reopen failed: {e}");
1033 }
1034 }
1035 }
1036 }
1037
1038 pos = drain_new_bytes(&mut file, pos, path, &mut out)?;
1040 }
1041 Ok(Err(e)) => {
1042 eprintln!("sqry: file watcher error: {e}");
1043 }
1044 Err(mpsc::RecvTimeoutError::Timeout) => {
1045 pos = drain_new_bytes(&mut file, pos, path, &mut out)?;
1047 }
1048 Err(mpsc::RecvTimeoutError::Disconnected) => {
1049 break;
1050 }
1051 }
1052 }
1053
1054 Ok(())
1055}
1056
1057fn drain_new_bytes(
1060 file: &mut std::fs::File,
1061 current_pos: u64,
1062 path: &Path,
1063 out: &mut impl Write,
1064) -> Result<u64> {
1065 file.seek(SeekFrom::Start(current_pos))
1066 .with_context(|| format!("seek error in log file {}", path.display()))?;
1067
1068 let mut buf = Vec::new();
1069 file.read_to_end(&mut buf)
1070 .with_context(|| format!("read error in log file {}", path.display()))?;
1071
1072 if !buf.is_empty() {
1073 out.write_all(&buf)
1074 .context("failed to write log output to stdout")?;
1075 out.flush().context("failed to flush stdout")?;
1076 }
1077
1078 let new_pos = current_pos + buf.len() as u64;
1079 Ok(new_pos)
1080}
1081
1082fn load_daemon_config() -> Result<sqry_daemon::config::DaemonConfig> {
1088 sqry_daemon::config::DaemonConfig::load().context(
1089 "failed to load daemon config; ensure daemon.toml is well-formed or \
1090 remove it to use defaults",
1091 )
1092}
1093
1094fn load_config_socket_path() -> Option<PathBuf> {
1098 sqry_daemon::config::DaemonConfig::load()
1099 .ok()
1100 .map(|c| c.socket_path())
1101}
1102
1103fn resolve_log_path(config: &sqry_daemon::config::DaemonConfig) -> Result<PathBuf> {
1112 match config.log_file.as_ref() {
1113 Some(p) => Ok(p.clone()),
1114 None => {
1115 anyhow::bail!(
1116 "no log file is configured for the daemon.\n\
1117 The daemon writes to stderr by default (captured by systemd or your terminal).\n\
1118 To enable file logging, set `log_file = \"/path/to/sqryd.log\"` in daemon.toml."
1119 );
1120 }
1121 }
1122}
1123
1124fn poll_until_reachable(socket_path: &Path, timeout: u64) -> Result<()> {
1133 let deadline = Instant::now() + Duration::from_secs(timeout);
1134 loop {
1135 if try_connect_sync(socket_path).unwrap_or(false) {
1136 return Ok(());
1137 }
1138 if Instant::now() >= deadline {
1139 anyhow::bail!(
1140 "daemon process started but did not become reachable within {timeout} \
1141 seconds (socket {}). Check the daemon log for startup errors.",
1142 socket_path.display()
1143 );
1144 }
1145 std::thread::sleep(Duration::from_millis(STOP_POLL_INTERVAL_MS));
1146 }
1147}
1148
1149pub fn try_connect_sync(socket_path: &Path) -> Result<bool> {
1166 #[cfg(unix)]
1167 {
1168 use std::os::unix::net::UnixStream;
1169 match UnixStream::connect(socket_path) {
1170 Ok(_) => Ok(true),
1171 Err(e) => match e.kind() {
1172 std::io::ErrorKind::ConnectionRefused | std::io::ErrorKind::NotFound => Ok(false),
1173 _ => Err(anyhow::Error::from(e).context(format!(
1174 "unexpected error probing socket {}",
1175 socket_path.display()
1176 ))),
1177 },
1178 }
1179 }
1180 #[cfg(windows)]
1181 {
1182 Ok(socket_path.exists())
1184 }
1185 #[cfg(not(any(unix, windows)))]
1186 {
1187 let _ = socket_path;
1188 Ok(false)
1189 }
1190}
1191
1192async fn try_connect_async(socket_path: &Path) -> bool {
1196 #[cfg(unix)]
1197 {
1198 tokio::net::UnixStream::connect(socket_path).await.is_ok()
1199 }
1200 #[cfg(windows)]
1201 {
1202 socket_path.exists()
1203 }
1204 #[cfg(not(any(unix, windows)))]
1205 {
1206 let _ = socket_path;
1207 false
1208 }
1209}
1210
1211#[cfg(test)]
1216mod tests {
1217 use super::*;
1218
1219 #[test]
1226 #[serial_test::serial]
1227 fn resolve_sqryd_binary_finds_sibling() {
1228 let dir = tempfile::tempdir().expect("tempdir");
1229 let sqryd_path = dir.path().join("sqryd");
1230 std::fs::write(&sqryd_path, b"#!/bin/sh\n").expect("write fake sqryd");
1232 #[cfg(unix)]
1233 {
1234 use std::os::unix::fs::PermissionsExt;
1235 std::fs::set_permissions(&sqryd_path, std::fs::Permissions::from_mode(0o755))
1236 .expect("chmod");
1237 }
1238
1239 unsafe {
1242 std::env::set_var("SQRYD_PATH", &sqryd_path);
1243 }
1244 let result = resolve_sqryd_binary(None);
1245 unsafe {
1246 std::env::remove_var("SQRYD_PATH");
1247 }
1248
1249 assert!(result.is_ok(), "expected Ok, got {:?}", result);
1250 assert_eq!(result.unwrap(), sqryd_path);
1251 }
1252
1253 #[test]
1257 #[serial_test::serial]
1258 fn resolve_sqryd_binary_falls_back_to_path() {
1259 unsafe {
1261 std::env::remove_var("SQRYD_PATH");
1262 }
1263
1264 let result = resolve_sqryd_binary(None);
1265 let _ = result;
1268 }
1269
1270 #[test]
1272 #[serial_test::serial]
1273 fn resolve_sqryd_binary_respects_env_var() {
1274 let dir = tempfile::tempdir().expect("tempdir");
1275 let sqryd_path = dir.path().join("sqryd");
1276 std::fs::write(&sqryd_path, b"#!/bin/sh\n").expect("write fake sqryd");
1277
1278 unsafe {
1279 std::env::set_var("SQRYD_PATH", &sqryd_path);
1280 }
1281 let result = resolve_sqryd_binary(None);
1282 unsafe {
1283 std::env::remove_var("SQRYD_PATH");
1284 }
1285
1286 assert!(result.is_ok(), "expected Ok, got {:?}", result);
1287 assert_eq!(result.unwrap(), sqryd_path);
1288 }
1289
1290 #[test]
1295 fn human_bytes_formats_correctly() {
1296 assert_eq!(human_bytes(0), "0 B");
1297 assert_eq!(human_bytes(512), "512 B");
1298 assert_eq!(human_bytes(1023), "1023 B");
1299 assert_eq!(human_bytes(1024), "1 KB");
1300 assert_eq!(human_bytes(1536), "1.5 KB");
1301 assert_eq!(human_bytes(1_048_576), "1 MB");
1302 assert_eq!(human_bytes(1_073_741_824), "1 GB");
1303 assert_eq!(human_bytes(1_099_511_627_776), "1 TB");
1304 assert_eq!(human_bytes(1_572_864), "1.5 MB");
1306 }
1307
1308 #[test]
1313 fn format_uptime_renders_hours_minutes() {
1314 assert_eq!(format_uptime(0), "0m");
1315 assert_eq!(format_uptime(59), "0m");
1316 assert_eq!(format_uptime(60), "1m");
1317 assert_eq!(format_uptime(3600), "1h");
1318 assert_eq!(format_uptime(3660), "1h 1m");
1319 assert_eq!(format_uptime(7380), "2h 3m");
1320 assert_eq!(format_uptime(86400), "1d");
1321 assert_eq!(format_uptime(90061), "1d 1h 1m");
1322 assert_eq!(format_uptime(172800), "2d");
1323 }
1324
1325 #[test]
1336 fn daemon_status_human_renders_minimal_response() {
1337 let envelope = serde_json::json!({
1340 "result": {},
1341 "meta": { "stale": false, "daemon_version": "8.0.6" }
1342 });
1343 let mut buf: Vec<u8> = Vec::new();
1344 render_status_human_into(&envelope, &mut buf)
1345 .expect("render_status_human_into must not fail");
1346 let output = String::from_utf8(buf).expect("rendered output must be valid UTF-8");
1347 assert!(
1349 output.contains("v8.0.6"),
1350 "graceful degradation must fall back to meta.daemon_version '8.0.6'; got:\n{output}"
1351 );
1352 }
1353
1354 #[test]
1358 fn daemon_status_human_renders_full_response() {
1359 let envelope = serde_json::json!({
1362 "result": {
1363 "daemon_version": "8.0.6",
1364 "uptime_seconds": 8040_u64,
1365 "memory": {
1366 "limit_bytes": 2_147_483_648_u64,
1367 "current_bytes": 471_859_200_u64,
1368 "reserved_bytes": 0_u64,
1369 "high_water_bytes": 1_288_490_188_u64
1370 },
1371 "workspaces": [
1372 {
1373 "index_root": "/home/user/repos/main-project",
1374 "state": "Loaded",
1375 "pinned": true,
1376 "current_bytes": 335_544_320_u64,
1377 "high_water_bytes": 933_232_896_u64,
1378 "last_good_at": null,
1379 "last_error": null,
1380 "retry_count": 0
1381 },
1382 {
1383 "index_root": "/home/user/repos/auth-service",
1384 "state": "Loaded",
1385 "pinned": false,
1386 "current_bytes": 83_886_080_u64,
1387 "high_water_bytes": 324_534_016_u64,
1388 "last_good_at": null,
1389 "last_error": null,
1390 "retry_count": 0
1391 }
1392 ]
1393 },
1394 "meta": {
1395 "stale": false,
1396 "daemon_version": "8.0.6"
1397 }
1398 });
1399 let mut buf: Vec<u8> = Vec::new();
1400 render_status_human_into(&envelope, &mut buf)
1401 .expect("render_status_human_into must not fail");
1402 let output = String::from_utf8(buf).expect("rendered output must be valid UTF-8");
1403 assert!(
1405 output.contains("v8.0.6"),
1406 "must contain version; got:\n{output}"
1407 );
1408 assert!(
1409 output.contains("2h"),
1410 "must contain uptime hours; got:\n{output}"
1411 );
1412 assert!(
1414 output.contains("Memory:"),
1415 "must contain memory section; got:\n{output}"
1416 );
1417 assert!(
1418 output.contains("2 GB"),
1419 "must contain limit '2 GB'; got:\n{output}"
1420 );
1421 assert!(
1423 output.contains("Workspaces"),
1424 "must contain workspaces section; got:\n{output}"
1425 );
1426 assert!(
1427 output.contains("main-project"),
1428 "must contain workspace path; got:\n{output}"
1429 );
1430 assert!(
1431 output.contains("auth-service"),
1432 "must contain workspace path; got:\n{output}"
1433 );
1434 }
1435
1436 #[test]
1450 fn daemon_status_human_extracts_version_and_uptime() {
1451 let envelope = serde_json::json!({
1452 "result": {
1453 "daemon_version": "8.1.2",
1454 "uptime_seconds": 3661_u64,
1455 "memory": {
1456 "limit_bytes": 1_073_741_824_u64,
1457 "current_bytes": 104_857_600_u64,
1458 "reserved_bytes": 0_u64,
1459 "high_water_bytes": 209_715_200_u64
1460 },
1461 "workspaces": []
1462 },
1463 "meta": { "stale": false, "daemon_version": "7.9.0" }
1466 });
1467
1468 let mut buf: Vec<u8> = Vec::new();
1469 render_status_human_into(&envelope, &mut buf)
1470 .expect("render_status_human_into must not fail writing to Vec");
1471 let output = String::from_utf8(buf).expect("rendered output must be valid UTF-8");
1472
1473 assert!(
1476 output.contains("v8.1.2"),
1477 "rendered output must contain result.daemon_version '8.1.2'; got:\n{output}"
1478 );
1479 assert!(
1480 !output.contains("v7.9.0"),
1481 "rendered output must NOT contain meta.daemon_version '7.9.0'; got:\n{output}"
1482 );
1483 assert!(
1485 output.contains("1h 1m"),
1486 "rendered output must contain uptime '1h 1m'; got:\n{output}"
1487 );
1488 assert!(
1490 output.contains("100 MB"),
1491 "rendered output must contain memory current '100 MB'; got:\n{output}"
1492 );
1493 assert!(
1494 output.contains("1 GB"),
1495 "rendered output must contain memory limit '1 GB'; got:\n{output}"
1496 );
1497 assert!(
1499 output.contains("200 MB"),
1500 "rendered output must contain memory peak '200 MB'; got:\n{output}"
1501 );
1502 }
1503
1504 #[test]
1512 fn daemon_status_human_renders_workspace_canonical_fields() {
1513 let ws = serde_json::json!({
1514 "index_root": "/home/user/repos/sqry",
1515 "state": "Loaded",
1516 "pinned": true,
1517 "current_bytes": 335_544_320_u64,
1518 "high_water_bytes": 671_088_640_u64,
1519 "last_good_at": null,
1520 "last_error": null,
1521 "retry_count": 0
1522 });
1523
1524 let mut buf: Vec<u8> = Vec::new();
1525 render_workspace_line_into(&ws, &mut buf)
1526 .expect("render_workspace_line_into must not fail writing to Vec");
1527 let output = String::from_utf8(buf).expect("rendered output must be valid UTF-8");
1528
1529 assert!(
1532 output.contains("sqry"),
1533 "rendered output must contain the workspace path component 'sqry'; got:\n{output}"
1534 );
1535 assert!(
1537 output.contains("320 MB"),
1538 "rendered output must contain workspace size '320 MB' from current_bytes; got:\n{output}"
1539 );
1540 assert!(
1542 output.contains("640 MB"),
1543 "rendered output must contain workspace peak '640 MB' from high_water_bytes; got:\n{output}"
1544 );
1545 assert!(
1547 output.contains("pinned"),
1548 "rendered output must contain 'pinned' tag; got:\n{output}"
1549 );
1550 }
1551
1552 #[test]
1563 fn resolve_log_path_errors_when_log_file_not_configured() {
1564 let config = sqry_daemon::config::DaemonConfig::default();
1566 assert!(
1567 config.log_file.is_none(),
1568 "DaemonConfig::default() must have log_file = None"
1569 );
1570
1571 let result = resolve_log_path(&config);
1572 assert!(
1573 result.is_err(),
1574 "resolve_log_path must return Err when log_file is None"
1575 );
1576
1577 let err_msg = format!("{}", result.unwrap_err());
1578 assert!(
1579 err_msg.contains("log_file"),
1580 "error must mention 'log_file' to guide the user; got:\n{err_msg}"
1581 );
1582 assert!(
1583 err_msg.contains("daemon.toml"),
1584 "error must mention 'daemon.toml' to guide the user; got:\n{err_msg}"
1585 );
1586 assert!(
1587 err_msg.contains("stderr"),
1588 "error must mention 'stderr' to explain the default behaviour; got:\n{err_msg}"
1589 );
1590 }
1591
1592 #[test]
1604 fn poll_until_reachable_times_out_for_unreachable_socket() {
1605 let dir = tempfile::tempdir().expect("tempdir");
1606 let socket_path = dir.path().join("nonexistent.sock");
1607
1608 let result = poll_until_reachable(&socket_path, 0);
1610 assert!(
1611 result.is_err(),
1612 "poll_until_reachable must return Err when socket is unreachable and timeout = 0"
1613 );
1614
1615 let err_msg = format!("{}", result.unwrap_err());
1616 assert!(
1617 err_msg.contains("did not become reachable"),
1618 "error must contain 'did not become reachable'; got:\n{err_msg}"
1619 );
1620 assert!(
1622 err_msg.contains("nonexistent.sock"),
1623 "error must contain the socket path; got:\n{err_msg}"
1624 );
1625 }
1626
1627 #[test]
1634 #[serial_test::serial]
1635 fn try_auto_start_daemon_returns_false_when_disabled() {
1636 unsafe {
1637 std::env::remove_var(ENV_DAEMON_AUTO_START);
1638 }
1639 let result = try_auto_start_daemon().expect("try_auto_start_daemon must not error");
1640 assert!(
1641 !result,
1642 "expected false when SQRY_DAEMON_AUTO_START is unset"
1643 );
1644
1645 unsafe {
1647 std::env::set_var(ENV_DAEMON_AUTO_START, "0");
1648 }
1649 let result = try_auto_start_daemon().expect("try_auto_start_daemon must not error");
1650 unsafe {
1651 std::env::remove_var(ENV_DAEMON_AUTO_START);
1652 }
1653 assert!(!result, "expected false when SQRY_DAEMON_AUTO_START=0");
1654 }
1655}