1use crate::constants::{
7 DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
8 DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
9 DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, DAEMON_VERSION_RESTART_WAIT_MS,
10 SQLITE_GRAPHRAG_VERSION,
11};
12use crate::errors::AppError;
13use crate::{embedder, shutdown_requested};
14use fs4::fs_std::FileExt;
15use interprocess::local_socket::{
16 prelude::LocalSocketStream,
17 traits::{Listener as _, Stream as _},
18 GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
19 ToNsName,
20};
21use serde::{Deserialize, Serialize};
22use std::fs::{File, OpenOptions};
23use std::io::{BufRead, BufReader, Write};
24use std::path::{Path, PathBuf};
25use std::process::Stdio;
26use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
27use std::sync::Arc;
28use std::thread;
29use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
30
31const VERSION_NOT_CHECKED: u8 = 0;
32const VERSION_COMPATIBLE: u8 = 1;
33const VERSION_RESTART_ATTEMPTED: u8 = 2;
34
35static DAEMON_VERSION_STATE: AtomicU8 = AtomicU8::new(VERSION_NOT_CHECKED);
37
38#[derive(Debug, Serialize, Deserialize)]
39#[serde(tag = "request", rename_all = "snake_case")]
40pub enum DaemonRequest {
41 Ping,
42 Shutdown,
43 EmbedPassage {
44 text: String,
45 },
46 EmbedQuery {
47 text: String,
48 },
49 EmbedPassages {
50 texts: Vec<String>,
51 token_counts: Vec<usize>,
52 },
53}
54
55#[derive(Debug, Serialize, Deserialize)]
56#[serde(tag = "status", rename_all = "snake_case")]
57pub enum DaemonResponse {
58 Listening {
59 pid: u32,
60 socket: String,
61 idle_shutdown_secs: u64,
62 },
63 Ok {
64 pid: u32,
65 version: String,
66 handled_embed_requests: u64,
67 },
68 PassageEmbedding {
69 embedding: Vec<f32>,
70 handled_embed_requests: u64,
71 },
72 QueryEmbedding {
73 embedding: Vec<f32>,
74 handled_embed_requests: u64,
75 },
76 PassageEmbeddings {
77 embeddings: Vec<Vec<f32>>,
78 handled_embed_requests: u64,
79 },
80 ShuttingDown {
81 handled_embed_requests: u64,
82 },
83 Error {
84 message: String,
85 },
86}
87
88#[derive(Debug, Default, Serialize, Deserialize)]
89struct DaemonSpawnState {
90 consecutive_failures: u32,
91 not_before_epoch_ms: u64,
92 last_error: Option<String>,
93}
94
95pub fn daemon_label(models_dir: &Path) -> String {
96 let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
97 .to_hex()
98 .to_string();
99 format!("sqlite-graphrag-daemon-{}", &hash[..16])
100}
101
102pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
103 request_if_available(models_dir, &DaemonRequest::Ping)
104}
105
106pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
107 request_if_available(models_dir, &DaemonRequest::Shutdown)
108}
109
110pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
111 match request_or_autostart(
112 models_dir,
113 &DaemonRequest::EmbedPassage {
114 text: text.to_string(),
115 },
116 true,
117 )? {
118 Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
119 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
120 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
121 "unexpected daemon response for passage embedding: {other:?}"
122 ))),
123 None => {
124 let embedder = embedder::get_embedder(models_dir)?;
125 embedder::embed_passage(embedder, text)
126 }
127 }
128}
129
130pub fn embed_query_or_local(
131 models_dir: &Path,
132 text: &str,
133 cli_autostart: bool,
134) -> Result<Vec<f32>, AppError> {
135 match request_or_autostart(
136 models_dir,
137 &DaemonRequest::EmbedQuery {
138 text: text.to_string(),
139 },
140 cli_autostart,
141 )? {
142 Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
143 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
144 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
145 "unexpected daemon response for query embedding: {other:?}"
146 ))),
147 None => {
148 let embedder = embedder::get_embedder(models_dir)?;
149 embedder::embed_query(embedder, text)
150 }
151 }
152}
153
154pub fn embed_passages_controlled_or_local(
155 models_dir: &Path,
156 texts: &[&str],
157 token_counts: &[usize],
158) -> Result<Vec<Vec<f32>>, AppError> {
159 let request = DaemonRequest::EmbedPassages {
160 texts: texts.iter().map(|t| (*t).to_string()).collect(),
161 token_counts: token_counts.to_vec(),
162 };
163
164 match request_or_autostart(models_dir, &request, true)? {
165 Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
166 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
167 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
168 "unexpected daemon response for passage embedding batch: {other:?}"
169 ))),
170 None => {
171 let embedder = embedder::get_embedder(models_dir)?;
172 embedder::embed_passages_controlled(embedder, texts, token_counts)
173 }
174 }
175}
176
177struct DaemonSpawnGuard {
178 models_dir: PathBuf,
179}
180
181impl DaemonSpawnGuard {
182 fn new(models_dir: &Path) -> Self {
183 Self {
184 models_dir: models_dir.to_path_buf(),
185 }
186 }
187}
188
189impl Drop for DaemonSpawnGuard {
190 fn drop(&mut self) {
191 let lock_path = spawn_lock_path(&self.models_dir);
192 if lock_path.exists() {
193 match std::fs::remove_file(&lock_path) {
194 Ok(()) => {
195 tracing::debug!(
196 path = %lock_path.display(),
197 "spawn lock file removed during graceful daemon shutdown"
198 );
199 }
200 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
201 Err(err) => {
202 tracing::warn!(
203 error = %err,
204 path = %lock_path.display(),
205 "failed to remove spawn lock file while shutting down daemon"
206 );
207 }
208 }
209 }
210 tracing::info!(
211 "daemon shut down gracefully; socket will be cleaned up by OS or by the next daemon via try_overwrite"
212 );
213 }
214}
215
216pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
217 let permits = std::thread::available_parallelism()
220 .map(|n| n.get())
221 .unwrap_or(2)
222 .clamp(2, 8);
223 let rt = tokio::runtime::Builder::new_multi_thread()
224 .worker_threads(permits)
225 .thread_name("daemon-worker")
226 .enable_all()
227 .build()
228 .map_err(AppError::Io)?;
229
230 rt.block_on(run_async(models_dir, idle_shutdown_secs, permits))
231}
232
233async fn run_async(
234 models_dir: &Path,
235 idle_shutdown_secs: u64,
236 permits: usize,
237) -> Result<(), AppError> {
238 let socket = daemon_label(models_dir);
239 let name = to_local_socket_name(&socket)?;
240 let listener = ListenerOptions::new()
241 .name(name)
242 .nonblocking(ListenerNonblockingMode::Accept)
243 .try_overwrite(true)
244 .create_sync()
245 .map_err(AppError::Io)?;
246
247 let _spawn_guard = DaemonSpawnGuard::new(models_dir);
250
251 let models_dir_warm = models_dir.to_path_buf();
255 tokio::task::spawn_blocking(move || embedder::get_embedder(&models_dir_warm).map(|_| ()))
256 .await
257 .map_err(|e| AppError::Internal(anyhow::anyhow!("model warm-up panicked: {e}")))??;
258
259 crate::output::emit_json(&DaemonResponse::Listening {
260 pid: std::process::id(),
261 socket,
262 idle_shutdown_secs,
263 })?;
264
265 let handled_embed_requests = Arc::new(AtomicU64::new(0));
266 let mut last_activity = Instant::now();
267 let models_dir = models_dir.to_path_buf();
268 let permit_pool = Arc::new(tokio::sync::Semaphore::new(permits));
270
271 loop {
272 if shutdown_requested() {
273 break;
274 }
275
276 if !daemon_control_dir(&models_dir).exists() {
277 tracing::info!("daemon control directory disappeared; shutting down");
278 break;
279 }
280
281 match listener.accept() {
282 Ok(stream) => {
283 last_activity = Instant::now();
284 let models_dir_clone = models_dir.clone();
285 let counter = Arc::clone(&handled_embed_requests);
286 let permit =
287 permit_pool.clone().acquire_owned().await.map_err(|e| {
288 AppError::Internal(anyhow::anyhow!("semaphore closed: {e}"))
289 })?;
290 let should_exit = tokio::task::spawn_blocking(move || {
291 let _permit = permit; handle_client(stream, &models_dir_clone, &counter)
293 })
294 .await
295 .map_err(|e| {
296 AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}"))
297 })??;
298
299 if should_exit {
300 break;
301 }
302 }
303 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
304 if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
305 tracing::info!(
306 idle_shutdown_secs,
307 handled_embed_requests = handled_embed_requests.load(Ordering::Relaxed),
308 "daemon idle timeout reached"
309 );
310 break;
311 }
312 tokio::time::sleep(Duration::from_millis(50)).await;
313 }
314 Err(err) => return Err(AppError::Io(err)),
315 }
316 }
317
318 Ok(())
319}
320
321fn handle_client(
322 stream: LocalSocketStream,
323 models_dir: &Path,
324 handled_embed_requests: &AtomicU64,
325) -> Result<bool, AppError> {
326 let mut reader = BufReader::new(stream);
327 let mut line = String::new();
328 reader.read_line(&mut line).map_err(AppError::Io)?;
329
330 if line.trim().is_empty() {
331 write_response(
332 reader.get_mut(),
333 &DaemonResponse::Error {
334 message: "empty request to daemon".to_string(),
335 },
336 )?;
337 return Ok(false);
338 }
339
340 let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
341 let (response, should_exit) = match request {
342 DaemonRequest::Ping => (
343 DaemonResponse::Ok {
344 pid: std::process::id(),
345 version: SQLITE_GRAPHRAG_VERSION.to_string(),
346 handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
347 },
348 false,
349 ),
350 DaemonRequest::Shutdown => (
351 DaemonResponse::ShuttingDown {
352 handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
353 },
354 true,
355 ),
356 DaemonRequest::EmbedPassage { text } => {
357 let embedder = embedder::get_embedder(models_dir)?;
358 let embedding = embedder::embed_passage(embedder, &text)?;
359 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
360 (
361 DaemonResponse::PassageEmbedding {
362 embedding,
363 handled_embed_requests: count,
364 },
365 false,
366 )
367 }
368 DaemonRequest::EmbedQuery { text } => {
369 let embedder = embedder::get_embedder(models_dir)?;
370 let embedding = embedder::embed_query(embedder, &text)?;
371 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
372 (
373 DaemonResponse::QueryEmbedding {
374 embedding,
375 handled_embed_requests: count,
376 },
377 false,
378 )
379 }
380 DaemonRequest::EmbedPassages {
381 texts,
382 token_counts,
383 } => {
384 let embedder = embedder::get_embedder(models_dir)?;
385 let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
386 let embeddings =
387 embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
388 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
389 (
390 DaemonResponse::PassageEmbeddings {
391 embeddings,
392 handled_embed_requests: count,
393 },
394 false,
395 )
396 }
397 };
398
399 write_response(reader.get_mut(), &response)?;
400 Ok(should_exit)
401}
402
403fn write_response(
404 stream: &mut LocalSocketStream,
405 response: &DaemonResponse,
406) -> Result<(), AppError> {
407 serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
408 stream.write_all(b"\n").map_err(AppError::Io)?;
409 stream.flush().map_err(AppError::Io)?;
410 Ok(())
411}
412
413fn request_if_available(
414 models_dir: &Path,
415 request: &DaemonRequest,
416) -> Result<Option<DaemonResponse>, AppError> {
417 let socket = daemon_label(models_dir);
418 let name = match to_local_socket_name(&socket) {
419 Ok(name) => name,
420 Err(err) => return Err(AppError::Io(err)),
421 };
422
423 let mut stream = match LocalSocketStream::connect(name) {
424 Ok(stream) => stream,
425 Err(err)
426 if matches!(
427 err.kind(),
428 std::io::ErrorKind::NotFound
429 | std::io::ErrorKind::ConnectionRefused
430 | std::io::ErrorKind::AddrNotAvailable
431 | std::io::ErrorKind::TimedOut
432 ) =>
433 {
434 return Ok(None);
435 }
436 Err(err) => return Err(AppError::Io(err)),
437 };
438
439 serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
440 stream.write_all(b"\n").map_err(AppError::Io)?;
441 stream.flush().map_err(AppError::Io)?;
442
443 let mut reader = BufReader::new(stream);
444 let mut line = String::new();
445 reader.read_line(&mut line).map_err(AppError::Io)?;
446 if line.trim().is_empty() {
447 return Err(AppError::Embedding(
448 "daemon returned an empty response".into(),
449 ));
450 }
451
452 let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
453 Ok(Some(response))
454}
455
456fn should_autostart(cli_flag: bool) -> bool {
457 if !cli_flag {
458 return false; }
460 !autostart_disabled_by_env()
461}
462
463fn maybe_restart_for_version_mismatch(models_dir: &Path) -> Result<(), AppError> {
468 if DAEMON_VERSION_STATE
469 .compare_exchange(
470 VERSION_NOT_CHECKED,
471 VERSION_COMPATIBLE,
472 Ordering::SeqCst,
473 Ordering::SeqCst,
474 )
475 .is_err()
476 {
477 return Ok(());
479 }
480
481 let response = match try_ping(models_dir)? {
482 Some(r) => r,
483 None => return Ok(()), };
485
486 let daemon_version = match &response {
487 DaemonResponse::Ok { version, .. } => version.as_str(),
488 _ => return Ok(()), };
490
491 if daemon_version == SQLITE_GRAPHRAG_VERSION {
492 return Ok(()); }
494
495 DAEMON_VERSION_STATE.store(VERSION_RESTART_ATTEMPTED, Ordering::SeqCst);
497
498 tracing::warn!(
499 daemon_version = %daemon_version,
500 cli_version = SQLITE_GRAPHRAG_VERSION,
501 "daemon version mismatch detected; auto-restarting daemon"
502 );
503
504 try_shutdown(models_dir)?;
506
507 wait_for_daemon_exit(models_dir)?;
509
510 ensure_daemon_running(models_dir)?;
512
513 Ok(())
514}
515
516fn wait_for_daemon_exit(models_dir: &Path) -> Result<(), AppError> {
520 let deadline = Instant::now() + Duration::from_millis(DAEMON_VERSION_RESTART_WAIT_MS);
521 let mut sleep_ms: u64 = 50;
522
523 while Instant::now() < deadline {
524 if try_ping(models_dir)?.is_none() {
525 tracing::debug!("stale daemon exited after version-mismatch shutdown");
526 return Ok(());
527 }
528 thread::sleep(Duration::from_millis(sleep_ms));
529 sleep_ms = (sleep_ms * 2).min(500);
530 }
531
532 tracing::warn!(
533 timeout_ms = DAEMON_VERSION_RESTART_WAIT_MS,
534 "timed out waiting for stale daemon to exit after version-mismatch shutdown"
535 );
536 Ok(())
537}
538
539fn request_or_autostart(
540 models_dir: &Path,
541 request: &DaemonRequest,
542 cli_autostart: bool,
543) -> Result<Option<DaemonResponse>, AppError> {
544 if DAEMON_VERSION_STATE.load(Ordering::SeqCst) == VERSION_NOT_CHECKED {
545 maybe_restart_for_version_mismatch(models_dir)?;
546 }
547
548 if let Some(response) = request_if_available(models_dir, request)? {
549 clear_spawn_backoff_state(models_dir).ok();
550 return Ok(Some(response));
551 }
552
553 if !should_autostart(cli_autostart) {
554 return Ok(None);
555 }
556
557 if !ensure_daemon_running(models_dir)? {
558 return Ok(None);
559 }
560
561 request_if_available(models_dir, request)
562}
563
564fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
565 if (try_ping(models_dir)?).is_some() {
566 clear_spawn_backoff_state(models_dir).ok();
567 return Ok(true);
568 }
569
570 if spawn_backoff_active(models_dir)? {
571 tracing::warn!("daemon autostart suppressed by backoff window");
572 return Ok(false);
573 }
574
575 let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
576 Some(lock) => lock,
577 None => return wait_for_daemon_ready(models_dir),
578 };
579
580 if (try_ping(models_dir)?).is_some() {
581 clear_spawn_backoff_state(models_dir).ok();
582 drop(spawn_lock);
583 return Ok(true);
584 }
585
586 let exe = match std::env::current_exe() {
587 Ok(path) => path,
588 Err(err) => {
589 record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
590 drop(spawn_lock);
591 return Ok(false);
592 }
593 };
594
595 let mut child = std::process::Command::new(exe);
596 child
597 .arg("daemon")
598 .arg("--idle-shutdown-secs")
599 .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
600 .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
601 .env_remove("LD_PRELOAD")
602 .env_remove("LD_LIBRARY_PATH")
603 .env_remove("LD_AUDIT")
604 .env_remove("DYLD_INSERT_LIBRARIES")
605 .env_remove("DYLD_LIBRARY_PATH")
606 .stdin(Stdio::null())
607 .stdout(Stdio::null())
608 .stderr(Stdio::null());
609
610 match child.spawn() {
611 Ok(child_handle) => {
612 let pid = child_handle.id();
624 drop(child_handle);
625 tracing::debug!(
626 pid,
627 "daemon detached; lifecycle managed via spawn lock + readiness file"
628 );
629 let ready = wait_for_daemon_ready(models_dir)?;
630 if ready {
631 clear_spawn_backoff_state(models_dir).ok();
632 } else {
633 record_spawn_failure(
634 models_dir,
635 "daemon did not become healthy after autostart".to_string(),
636 )?;
637 }
638 drop(spawn_lock);
639 Ok(ready)
640 }
641 Err(err) => {
642 record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
643 drop(spawn_lock);
644 Ok(false)
645 }
646 }
647}
648
649fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
650 let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
651 let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
652
653 while Instant::now() < deadline {
654 if (try_ping(models_dir)?).is_some() {
655 return Ok(true);
656 }
657 thread::sleep(Duration::from_millis(sleep_ms));
658 sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
659 }
660
661 Ok(false)
662}
663
664fn autostart_disabled_by_env() -> bool {
665 std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
666 || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
667 && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
668}
669
670fn daemon_control_dir(models_dir: &Path) -> PathBuf {
671 models_dir
672 .parent()
673 .map(Path::to_path_buf)
674 .unwrap_or_else(|| models_dir.to_path_buf())
675}
676
677fn spawn_lock_path(models_dir: &Path) -> PathBuf {
678 daemon_control_dir(models_dir).join("daemon-spawn.lock")
679}
680
681fn spawn_state_path(models_dir: &Path) -> PathBuf {
682 daemon_control_dir(models_dir).join("daemon-spawn-state.json")
683}
684
685fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
686 let path = spawn_lock_path(models_dir);
687 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
688 let file = OpenOptions::new()
689 .read(true)
690 .write(true)
691 .create(true)
692 .truncate(false)
693 .open(path)
694 .map_err(AppError::Io)?;
695
696 let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
697 loop {
698 match file.try_lock_exclusive() {
699 Ok(()) => return Ok(Some(file)),
700 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
701 if Instant::now() >= deadline {
702 return Ok(None);
703 }
704 thread::sleep(Duration::from_millis(50));
705 }
706 Err(err) => return Err(AppError::Io(err)),
707 }
708 }
709}
710
711fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
712 let state = load_spawn_state(models_dir)?;
713 Ok(now_epoch_ms() < state.not_before_epoch_ms)
714}
715
716fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
717 let mut state = load_spawn_state(models_dir)?;
718 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
719 let exponent = state.consecutive_failures.saturating_sub(1).min(6);
720 let base_ms =
721 (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
722 let half = base_ms / 2;
725 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
726 let backoff_ms = half + jitter;
727 state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
728 state.last_error = Some(message);
729 save_spawn_state(models_dir, &state)
730}
731
732fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
733 let path = spawn_state_path(models_dir);
734 if path.exists() {
735 std::fs::remove_file(path).map_err(AppError::Io)?;
736 }
737 Ok(())
738}
739
740fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
741 let path = spawn_state_path(models_dir);
742 if !path.exists() {
743 return Ok(DaemonSpawnState::default());
744 }
745
746 let bytes = std::fs::read(path).map_err(AppError::Io)?;
747 serde_json::from_slice(&bytes).map_err(AppError::Json)
748}
749
750fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
751 let path = spawn_state_path(models_dir);
752 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
753 let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
754 std::fs::write(path, bytes).map_err(AppError::Io)
755}
756
757fn now_epoch_ms() -> u64 {
758 SystemTime::now()
759 .duration_since(UNIX_EPOCH)
760 .unwrap_or_else(|_| Duration::from_secs(0))
761 .as_millis() as u64
762}
763
764fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
765 if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
766 return Ok(ns_name);
767 }
768
769 let path = if cfg!(unix) {
774 let base = std::env::var_os("XDG_RUNTIME_DIR")
775 .or_else(|| std::env::var_os("SQLITE_GRAPHRAG_HOME"))
776 .map(std::path::PathBuf::from)
777 .unwrap_or_else(std::env::temp_dir);
778 base.join(format!("{name}.sock"))
779 .to_string_lossy()
780 .into_owned()
781 } else {
782 format!(r"\\.\pipe\{name}")
783 };
784 path.to_fs_name::<GenericFilePath>()
785}
786
787#[cfg(test)]
788mod tests {
789 use super::*;
790
791 #[test]
792 fn record_and_clear_spawn_backoff_state() {
793 let tmp = tempfile::tempdir().unwrap();
794 let models_dir = tmp.path().join("cache").join("models");
795 std::fs::create_dir_all(&models_dir).unwrap();
796
797 assert!(!spawn_backoff_active(&models_dir).unwrap());
798
799 record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
800 assert!(spawn_backoff_active(&models_dir).unwrap());
801
802 let state = load_spawn_state(&models_dir).unwrap();
803 assert_eq!(state.consecutive_failures, 1);
804 assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
805
806 clear_spawn_backoff_state(&models_dir).unwrap();
807 assert!(!spawn_backoff_active(&models_dir).unwrap());
808 }
809
810 #[test]
811 fn daemon_control_dir_uses_models_parent() {
812 let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
813 let models_dir = base.join("models");
814 assert_eq!(daemon_control_dir(&models_dir), base);
815 }
816
817 #[test]
818 fn version_state_constants_are_distinct() {
819 assert_ne!(VERSION_NOT_CHECKED, VERSION_COMPATIBLE);
820 assert_ne!(VERSION_NOT_CHECKED, VERSION_RESTART_ATTEMPTED);
821 assert_ne!(VERSION_COMPATIBLE, VERSION_RESTART_ATTEMPTED);
822 }
823
824 #[test]
825 fn wait_for_daemon_exit_immediate_when_not_running() {
826 let tmp = tempfile::tempdir().unwrap();
827 let models_dir = tmp.path().join("cache").join("models");
828 std::fs::create_dir_all(&models_dir).unwrap();
829
830 let start = Instant::now();
831 wait_for_daemon_exit(&models_dir).unwrap();
832 assert!(start.elapsed() < Duration::from_millis(500));
834 }
835}